X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;f=backends%2Finnduct.c;h=0e0b0edeeca2c01b11a820a2b977b3ae56b9b077;hb=f7573c26c1935719413e79cb82068da35a27b16b;hp=421623914d6c41ac79461c83dee34020064b9f6e;hpb=5d6e18436eb0f3711d9fe4027c881833b0d9490d;p=inn-innduct.git diff --git a/backends/innduct.c b/backends/innduct.c index 4216239..0e0b0ed 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,3 +1,9 @@ +/* + * TODO + * - close idle connections + * - -k kill mode ? + */ + /* * Newsfeeds file entries should look like this: * host.name.of.site[/exclude,exclude,...]\ @@ -246,7 +252,7 @@ DEFLIST(Article); /*----- function predeclarations -----*/ -static void conn_check_work(Conn *conn); +static void conn_maybe_write(Conn *conn); static void conn_make_some_xmits(Conn *conn); static void *conn_write_some_xmits(Conn *conn); @@ -259,6 +265,7 @@ static void filemon_callback(void); static void statemc_setstate(StateMachineState newsms, int periods, const char *forlog, const char *why); static void check_master_queue(void); +static void queue_check_input_done(void); static void postfork(const char *what); static void postfork_inputfile(InputFile *ipf); @@ -312,7 +319,7 @@ typedef enum { /* in queue in conn->sent */ , counts[art_Unsolicited] x typedef enum { -#define RC_INDEX(x) RCI_##x, +#define RC_INDEX(x) RC_##x, RESULT_COUNTS(RC_INDEX, RC_INDEX) RCI_max } ResultCountIndex; @@ -386,8 +393,9 @@ static const char *sms_names[]= { struct Conn { ISNODE(Conn); - int fd, max_queue, stream; - ArticleList queue; /* not yet told peer, or CHECK said send it */ + int fd, max_queue, stream, quitting; + ArticleList waiting; /* not yet told peer */ + ArticleList priority; /* peer says send it now */ ArticleList sent; /* offered/transmitted - in xmit or waiting reply */ struct iovec xmit[CONNIOVS]; XmitDetails xmitd[CONNIOVS]; @@ -613,6 +621,13 @@ static char *sanitise(const char *input) { /*========== making new connections ==========*/ +static void conn_dispose(Conn *conn) { + if (!conn) return; + perhaps_close(&conn->fd); + free(conn); + until_connect= reconnect_delay_periods; +} + static int connecting_sockets[2]= {-1,-1}; static pid_t connecting_child; @@ -719,13 +734,16 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { return 0; x: - if (conn) { - perhaps_close(&conn->fd); - free(conn); - } + conn_dispose(conn); connect_attempt_discard(); } +static int allow_connect_start(void) { + return conns.count < max_connections + && !connecting_child + && !until_connect; +} + static void connect_start(void) { assert(!connecting_sockets[0]); assert(!connecting_sockets[1]); @@ -826,8 +844,16 @@ static void check_master_queue(void) { Conn *walk, *use=0; int spare; + + /* Find a connection to offer this article. We prefer a busy + * connection to an idle one, provided it's not full. We take the + * first (oldest) and since that's stable, it will mean we fill up + * connections in order. That way if we have too many + * connections, the spare ones will go away eventually. + */ for (walk=LIST_HEAD(conns); walk; walk=LIST_NEXT(walk)) { - int inqueue= walk->sent.count + walk->queue.count; + int inqueue= walk->sent.count + walk->priority.count + + walk->waiting.count; spare= walk->max_queue - inqueue; assert(inqueue <= max_queue_per_conn); assert(spare >= 0); @@ -837,12 +863,11 @@ static void check_master_queue(void) { if (use) { while (spare>0) { Article *art= LIST_REMHEAD(queue); - LIST_ADDTAIL(use->queue, art); + LIST_ADDTAIL(use->waiting, art); spare--; } - conn_check_work(use); - } else if (conns.count < max_connections && - !connecting_child && !until_connect) { + conn_maybe_write(use); + } else if (allow_connect_start()) { until_connect= reconnect_delay_periods; connect_start(); break; @@ -853,11 +878,11 @@ static void check_master_queue(void) { } static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) { - conn_check_work(u); + conn_maybe_write(u); return OOP_CONTINUE; } -static void conn_check_work(Conn *conn) { +static void conn_maybe_write(Conn *conn) { void *rp= 0; for (;;) { conn_make_some_xmits(conn); @@ -887,7 +912,8 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) { int requeue[art_MaxState]; Article *art; - while ((art= LIST_REMHEAD(conn->queue))) LIST_ADDTAIL(queue, art); + while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art); + while ((art= LIST_REMHEAD(conn->waiting))) LIST_ADDTAIL(queue, art); while ((art= LIST_REMHEAD(conn->sent))) { requeue[art->state]++; if (art->state==art_Unsolicited) art->state= art_Unchecked; @@ -904,11 +930,8 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) { conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); free(m); - close(conn->fd); LIST_REMOVE(conns,conn); - free(conn); - - until_connect= reconnect_delay_periods; + conn_dispose(conn); check_master_queue(); } @@ -995,7 +1018,8 @@ static void conn_make_some_xmits(Conn *conn) { if (conn->xmitu+5 > CONNIOVS) break; - Article *art= LIST_REMHEAD(queue); + Article *art= LIST_REMHEAD(priority); + if (!art) art= LIST_REMHEAD(waiting); if (!art) break; if (art->state >= art_Wanted || (conn->stream && nocheck)) { @@ -1023,7 +1047,7 @@ static void conn_make_some_xmits(Conn *conn) { art->state == art_Unchecked ? art_Unsolicited : art->state == art_Wanted ? art_Wanted : (abort(),-1); - art->ipf->counts[art->state][RCI_sent]++; + art->ipf->counts[art->state][RC_sent]++; LIST_ADDTAIL(conn->sent, art); } else { @@ -1121,8 +1145,8 @@ static Article *article_reply_check(Conn *conn, const char *response, } static void update_nocheck(int accepted) { - accept_proportion *= accept_decay; - accept_proportion += accepted; + accept_proportion *= nocheck_decay; + accept_proportion += accepted * (1.0 - nocheck_decay); int new_nocheck= accept_proportion >= nocheck_thresh; if (new_nocheck && !nocheck_reported) { notice("entering nocheck mode for the first time"); @@ -1135,8 +1159,8 @@ static void update_nocheck(int accepted) { static void article_done(Conn *conn, Article *art, int whichcount) { art->ipf->counts[art->state][whichcount]++; - if (whichcount == RCI_accepted) update_nocheck(1); - else if (whichcount == RCI_unwanted) update_nocheck(0); + if (whichcount == RC_accepted) update_nocheck(1); + else if (whichcount == RC_unwanted) update_nocheck(0); InputFile *ipf= art->ipf; while (art->blanklen) { @@ -1177,7 +1201,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, } assert(ev == OOP_RD_OK); - char *sani= sanitise(data, recsz); + char *sani= sanitise(data); char *ep; unsigned long code= strtoul(data, &ep, 10); @@ -1187,12 +1211,18 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, } if (conn->quitting) { - if (code!=205) { - connfail(conn, "peer gave failure response to QUIT: %s", sani); - return OOP_CONTINUE; + if (code!=205 && code!=503) { + connfail(conn, "peer gave unexpected response to QUIT: %s", sani); + } else { + notice("#%d idle connection closed\n"); + assert(!conn->waiting.count); + assert(!conn->priority.count); + assert(!conn->sent.count); + assert(!conn->xmitu); + LIST_REMOVE(conns,conn); + conn_dispose(conn); } - conn close ok; - return; + return OOP_CONTINUE; } Article *art; @@ -1202,7 +1232,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, if (art) ; else return OOP_CONTINUE /* reply_check has failed the conn */ #define ARTICLE_DEALTWITH(streaming,musthavesent,how) \ - code_streaming= (streaming) \ + code_streaming= (streaming); \ GET_ARTICLE(musthavesent); \ article_done(conn, art, RC_##how); break; @@ -1230,9 +1260,9 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, case 335: /* IHAVE says send it */ GET_ARTICLE(-1); assert(art->state == art_Unchecked); - art->ipf->counts[art->state].accepted++; + art->ipf->counts[art->state][RC_accepted]++; art->state= art_Wanted; - LIST_ADDTAIL(conn->queue); + LIST_ADDTAIL(conn->priority, art); break; case 431: /* CHECK or TAKETHIS says try later */ @@ -1248,7 +1278,8 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, } - check_check_work(conn); + conn_maybe_write(sendon); + check_master_queue(); return OOP_CONTINUE; }