chiark
/
gitweb
/
~ian
/
innduct.git
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
5d6e184
)
wip make it compile
author
Ian Jackson
<ian@liberator.(none)>
Sun, 25 Apr 2010 15:48:12 +0000
(16:48 +0100)
committer
Ian Jackson
<ian@liberator.(none)>
Sun, 25 Apr 2010 15:48:12 +0000
(16:48 +0100)
backends/innduct.c
patch
|
blob
|
history
diff --git
a/backends/innduct.c
b/backends/innduct.c
index 421623914d6c41ac79461c83dee34020064b9f6e..1104ef4a7244ea160d890e85657265781a1e291e 100644
(file)
--- a/
backends/innduct.c
+++ b/
backends/innduct.c
@@
-1,3
+1,9
@@
+/*
+ * TODO
+ * - close idle connections
+ * - -k kill mode ?
+ */
+
/*
* Newsfeeds file entries should look like this:
* host.name.of.site[/exclude,exclude,...]\
/*
* Newsfeeds file entries should look like this:
* host.name.of.site[/exclude,exclude,...]\
@@
-259,6
+265,7
@@
static void filemon_callback(void);
static void statemc_setstate(StateMachineState newsms, int periods,
const char *forlog, const char *why);
static void check_master_queue(void);
static void statemc_setstate(StateMachineState newsms, int periods,
const char *forlog, const char *why);
static void check_master_queue(void);
+static void queue_check_input_done(void);
static void postfork(const char *what);
static void postfork_inputfile(InputFile *ipf);
static void postfork(const char *what);
static void postfork_inputfile(InputFile *ipf);
@@
-312,7
+319,7
@@
typedef enum { /* in queue in conn->sent */
, counts[art_Unsolicited] x
typedef enum {
, counts[art_Unsolicited] x
typedef enum {
-#define RC_INDEX(x) RC
I
_##x,
+#define RC_INDEX(x) RC_##x,
RESULT_COUNTS(RC_INDEX, RC_INDEX)
RCI_max
} ResultCountIndex;
RESULT_COUNTS(RC_INDEX, RC_INDEX)
RCI_max
} ResultCountIndex;
@@
-386,7
+393,7
@@
static const char *sms_names[]= {
struct Conn {
ISNODE(Conn);
struct Conn {
ISNODE(Conn);
- int fd, max_queue, stream;
+ int fd, max_queue, stream
, quitting
;
ArticleList queue; /* not yet told peer, or CHECK said send it */
ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
struct iovec xmit[CONNIOVS];
ArticleList queue; /* not yet told peer, or CHECK said send it */
ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
struct iovec xmit[CONNIOVS];
@@
-613,6
+620,13
@@
static char *sanitise(const char *input) {
/*========== making new connections ==========*/
/*========== making new connections ==========*/
+static void conn_dispose(Conn *conn) {
+ if (!conn) return;
+ perhaps_close(&conn->fd);
+ free(conn);
+ until_connect= reconnect_delay_periods;
+}
+
static int connecting_sockets[2]= {-1,-1};
static pid_t connecting_child;
static int connecting_sockets[2]= {-1,-1};
static pid_t connecting_child;
@@
-719,10
+733,7
@@
static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
return 0;
x:
return 0;
x:
- if (conn) {
- perhaps_close(&conn->fd);
- free(conn);
- }
+ conn_dispose(conn);
connect_attempt_discard();
}
connect_attempt_discard();
}
@@
-904,11
+915,8
@@
static void vconnfail(Conn *conn, const char *fmt, va_list al) {
conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
free(m);
conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
free(m);
- close(conn->fd);
LIST_REMOVE(conns,conn);
LIST_REMOVE(conns,conn);
- free(conn);
-
- until_connect= reconnect_delay_periods;
+ conn_dispose(conn);
check_master_queue();
}
check_master_queue();
}
@@
-1023,7
+1031,7
@@
static void conn_make_some_xmits(Conn *conn) {
art->state == art_Unchecked ? art_Unsolicited :
art->state == art_Wanted ? art_Wanted :
(abort(),-1);
art->state == art_Unchecked ? art_Unsolicited :
art->state == art_Wanted ? art_Wanted :
(abort(),-1);
- art->ipf->counts[art->state][RC
I
_sent]++;
+ art->ipf->counts[art->state][RC_sent]++;
LIST_ADDTAIL(conn->sent, art);
} else {
LIST_ADDTAIL(conn->sent, art);
} else {
@@
-1121,8
+1129,8
@@
static Article *article_reply_check(Conn *conn, const char *response,
}
static void update_nocheck(int accepted) {
}
static void update_nocheck(int accepted) {
- accept_proportion *=
accept
_decay;
- accept_proportion += accepted;
+ accept_proportion *=
nocheck
_decay;
+ accept_proportion += accepted
* (1.0 - nocheck_decay)
;
int new_nocheck= accept_proportion >= nocheck_thresh;
if (new_nocheck && !nocheck_reported) {
notice("entering nocheck mode for the first time");
int new_nocheck= accept_proportion >= nocheck_thresh;
if (new_nocheck && !nocheck_reported) {
notice("entering nocheck mode for the first time");
@@
-1135,8
+1143,8
@@
static void update_nocheck(int accepted) {
static void article_done(Conn *conn, Article *art, int whichcount) {
art->ipf->counts[art->state][whichcount]++;
static void article_done(Conn *conn, Article *art, int whichcount) {
art->ipf->counts[art->state][whichcount]++;
- if (whichcount == RC
I
_accepted) update_nocheck(1);
- else if (whichcount == RC
I
_unwanted) update_nocheck(0);
+ if (whichcount == RC_accepted) update_nocheck(1);
+ else if (whichcount == RC_unwanted) update_nocheck(0);
InputFile *ipf= art->ipf;
while (art->blanklen) {
InputFile *ipf= art->ipf;
while (art->blanklen) {
@@
-1177,7
+1185,7
@@
static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
}
assert(ev == OOP_RD_OK);
}
assert(ev == OOP_RD_OK);
- char *sani= sanitise(data
, recsz
);
+ char *sani= sanitise(data);
char *ep;
unsigned long code= strtoul(data, &ep, 10);
char *ep;
unsigned long code= strtoul(data, &ep, 10);
@@
-1187,12
+1195,17
@@
static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
}
if (conn->quitting) {
}
if (conn->quitting) {
- if (code!=205) {
- connfail(conn, "peer gave failure response to QUIT: %s", sani);
- return OOP_CONTINUE;
+ if (code!=205 && code!=503) {
+ connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
+ } else {
+ notice("#%d idle connection closed\n");
+ assert(!conn->queue.count);
+ assert(!conn->sent.count);
+ assert(!conn->xmitu);
+ LIST_REMOVE(conns,conn);
+ conn_dispose(conn);
}
}
- conn close ok;
- return;
+ return OOP_CONTINUE;
}
Article *art;
}
Article *art;
@@
-1202,7
+1215,7
@@
static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
if (art) ; else return OOP_CONTINUE /* reply_check has failed the conn */
#define ARTICLE_DEALTWITH(streaming,musthavesent,how) \
if (art) ; else return OOP_CONTINUE /* reply_check has failed the conn */
#define ARTICLE_DEALTWITH(streaming,musthavesent,how) \
- code_streaming= (streaming)
\
+ code_streaming= (streaming)
;
\
GET_ARTICLE(musthavesent); \
article_done(conn, art, RC_##how); break;
GET_ARTICLE(musthavesent); \
article_done(conn, art, RC_##how); break;
@@
-1230,9
+1243,9
@@
static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
case 335: /* IHAVE says send it */
GET_ARTICLE(-1);
assert(art->state == art_Unchecked);
case 335: /* IHAVE says send it */
GET_ARTICLE(-1);
assert(art->state == art_Unchecked);
- art->ipf->counts[art->state]
.accepted
++;
+ art->ipf->counts[art->state]
[RC_accepted]
++;
art->state= art_Wanted;
art->state= art_Wanted;
- LIST_ADDTAIL(conn->queue);
+ LIST_ADDTAIL(conn->queue
, art
);
break;
case 431: /* CHECK or TAKETHIS says try later */
break;
case 431: /* CHECK or TAKETHIS says try later */