X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=blobdiff_plain;f=backends%2Finnduct.c;h=0e0b0edeeca2c01b11a820a2b977b3ae56b9b077;hp=9ee13bc7d372cfd774a67f086ada6c5af9e88f54;hb=f7573c26c1935719413e79cb82068da35a27b16b;hpb=f885f7d165d73beb199e8c048ece4a6f07f5e2cc diff --git a/backends/innduct.c b/backends/innduct.c index 9ee13bc..0e0b0ed 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,3 +1,9 @@ +/* + * TODO + * - close idle connections + * - -k kill mode ? + */ + /* * Newsfeeds file entries should look like this: * host.name.of.site[/exclude,exclude,...]\ @@ -216,7 +222,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead) #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail) -#define LIST_HEAD(l) ((typeof((l)->hd))(list_head((struct list*)&(l)))) +#define LIST_HEAD(l) ((typeof((l).hd))(list_head((struct list*)&(l)))) #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n)))) #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n)))) @@ -246,7 +252,7 @@ DEFLIST(Article); /*----- function predeclarations -----*/ -static void conn_check_work(Conn *conn); +static void conn_maybe_write(Conn *conn); static void conn_make_some_xmits(Conn *conn); static void *conn_write_some_xmits(Conn *conn); @@ -256,11 +262,10 @@ static int filemon_init(void); static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path); static void filemon_callback(void); -static ConnList *conn_determine_right_list(Conn *conn); -static void conn_assign_one_article(ConnList *connlist, Conn **last_assigned); static void statemc_setstate(StateMachineState newsms, int periods, const char *forlog, const char *why); static void check_master_queue(void); +static void queue_check_input_done(void); static void postfork(const char *what); static void postfork_inputfile(InputFile *ipf); @@ -306,7 +311,7 @@ typedef enum { /* in queue in conn->sent */ #define RCI_TRIPLE_FMT_BASE "%d(id%d+bd%d+nc%d)" #define RCI_TRIPLE_VALS_BASE(counts,x) \ - , counts[art_Unchecked] x \ + counts[art_Unchecked] x \ + counts[art_Wanted] x \ + counts[art_Unsolicited] x, \ counts[art_Unchecked] x \ @@ -314,7 +319,7 @@ typedef enum { /* in queue in conn->sent */ , counts[art_Unsolicited] x typedef enum { -#define RC_INDEX(x) RCI_##x, +#define RC_INDEX(x) RC_##x, RESULT_COUNTS(RC_INDEX, RC_INDEX) RCI_max } ResultCountIndex; @@ -388,8 +393,9 @@ static const char *sms_names[]= { struct Conn { ISNODE(Conn); - int fd, max_queue, stream; - ArticleList queue; /* not yet told peer, or CHECK said send it */ + int fd, max_queue, stream, quitting; + ArticleList waiting; /* not yet told peer */ + ArticleList priority; /* peer says send it now */ ArticleList sent; /* offered/transmitted - in xmit or waiting reply */ struct iovec xmit[CONNIOVS]; XmitDetails xmitd[CONNIOVS]; @@ -615,6 +621,13 @@ static char *sanitise(const char *input) { /*========== making new connections ==========*/ +static void conn_dispose(Conn *conn) { + if (!conn) return; + perhaps_close(&conn->fd); + free(conn); + until_connect= reconnect_delay_periods; +} + static int connecting_sockets[2]= {-1,-1}; static pid_t connecting_child; @@ -721,13 +734,16 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { return 0; x: - if (conn) { - perhaps_close(&conn->fd); - free(conn); - } + conn_dispose(conn); connect_attempt_discard(); } +static int allow_connect_start(void) { + return conns.count < max_connections + && !connecting_child + && !until_connect; +} + static void connect_start(void) { assert(!connecting_sockets[0]); assert(!connecting_sockets[1]); @@ -821,61 +837,52 @@ static void connect_start(void) { /*========== overall control of article flow ==========*/ -static int conn_owned_articles(Conn *conn) { - return conn->sent.count + conn->queue.count; -} - static void check_master_queue(void) { - if (!queue.count) - return; - - Conn *last_assigned=0; for (;;) { - if (working.head) { - conn_assign_one_article(&working, &last_assigned); - } else if (idle.head) { - conn_assign_one_article(&idle, &last_assigned); - } else if (full.count < max_connections && - !connecting_child && !until_connect) { + if (!queue.count) + break; + + Conn *walk, *use=0; + int spare; + + /* Find a connection to offer this article. We prefer a busy + * connection to an idle one, provided it's not full. We take the + * first (oldest) and since that's stable, it will mean we fill up + * connections in order. That way if we have too many + * connections, the spare ones will go away eventually. + */ + for (walk=LIST_HEAD(conns); walk; walk=LIST_NEXT(walk)) { + int inqueue= walk->sent.count + walk->priority.count + + walk->waiting.count; + spare= walk->max_queue - inqueue; + assert(inqueue <= max_queue_per_conn); + assert(spare >= 0); + if (inqueue==0) /*idle*/ { if (!use) use= walk; } + else if (spare>0) /*working*/ { use= walk; break; } + } + if (use) { + while (spare>0) { + Article *art= LIST_REMHEAD(queue); + LIST_ADDTAIL(use->waiting, art); + spare--; + } + conn_maybe_write(use); + } else if (allow_connect_start()) { until_connect= reconnect_delay_periods; connect_start(); + break; } else { break; } } - conn_check_work(last_assigned); -} - -static void conn_assign_one_article(ConnList *connlist, Conn **last_assigned) { - Conn *conn= connlist->head; - - LIST_REMOVE(*connlist, conn); - Article *art= LIST_REMHEAD(queue); - LIST_ADDTAIL(conn->queue, art); - LIST_ADDTAIL(*conn_determine_right_list(conn), conn); - - /* This slightly odd arrangement is so that we call conn_check_work - * once after filling the queue for a new connection in - * check_master_queue, rather than for each article. */ - if (conn != *last_assigned && *last_assigned) - conn_check_work(*last_assigned); - *last_assigned= conn; -} - -static ConnList *conn_determine_right_list(Conn *conn) { - int inqueue= conn_owned_articles(conn); - assert(inqueue <= max_queue_per_conn); - if (inqueue == 0) return &idle; - if (inqueue == conn->max_queue) return &full; - return &working; } static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) { - conn_check_work(u); + conn_maybe_write(u); return OOP_CONTINUE; } -static void conn_check_work(Conn *conn) { +static void conn_maybe_write(Conn *conn) { void *rp= 0; for (;;) { conn_make_some_xmits(conn); @@ -905,7 +912,8 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) { int requeue[art_MaxState]; Article *art; - while ((art= LIST_REMHEAD(conn->queue))) LIST_ADDTAIL(queue, art); + while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art); + while ((art= LIST_REMHEAD(conn->waiting))) LIST_ADDTAIL(queue, art); while ((art= LIST_REMHEAD(conn->sent))) { requeue[art->state]++; if (art->state==art_Unsolicited) art->state= art_Unchecked; @@ -919,23 +927,20 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) { char *m= xvasprintf(fmt,al); warn("#%d connection failed, requeueing " RCI_TRIPLE_FMT_BASE ": %s", - conn->fd, RCI_TRIPLE_FMT_VALS(requeue, /*nothing*/), m); + conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); free(m); - close(conn->fd); - fixme remove conn from the appropriate list; - free(conn); - - until_connect= reconnect_delay_periods; + LIST_REMOVE(conns,conn); + conn_dispose(conn); check_master_queue(); } -static void connfail(Connection *conn, const char *fmt, ...) +static void connfail(Conn *conn, const char *fmt, ...) __attribute__((__format__(printf,2,3))); -static void connfail(Connection *conn, const char *fmt, ...) { +static void connfail(Conn *conn, const char *fmt, ...) { va_list al; va_start(al,fmt); - vconnfail(fmt,al); + vconnfail(conn,fmt,al); va_end(al); } @@ -945,7 +950,7 @@ static XmitDetails *xmit_core(Conn *conn, const char *data, int len, XmitKind kind) { /* caller must then fill in details */ struct iovec *v= &conn->xmit[conn->xmitu]; XmitDetails *d= &conn->xmitd[conn->xmitu++]; - v->iov_base= data; + v->iov_base= (char*)data; v->iov_len= len; d->kind= kind; return d; @@ -957,7 +962,7 @@ static void xmit_noalloc(Conn *conn, const char *data, int len) { #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1)) static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) { - XmitDetails *d= xmit_core(conn, ah->data, ah->len, sk_Artdata); + XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata); d->info.sm_art= ah; } @@ -989,7 +994,8 @@ static void *conn_write_some_xmits(Conn *conn) { } assert(rs > 0); - for (done=0; rs && donexmitu; done++) { struct iovec *vp= &conn->xmit[done]; XmitDetails *dp= &conn->xmitd[done]; if (rs > vp->iov_len) { @@ -1012,18 +1018,19 @@ static void conn_make_some_xmits(Conn *conn) { if (conn->xmitu+5 > CONNIOVS) break; - Article *art= LIST_REMHEAD(queue); + Article *art= LIST_REMHEAD(priority); + if (!art) art= LIST_REMHEAD(waiting); if (!art) break; if (art->state >= art_Wanted || (conn->stream && nocheck)) { /* actually send it */ - ARTHANDLE *artdata= SMretrieve(); + ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL); if (conn->stream) { if (artdata) { XMIT_LITERAL("TAKETHIS "); - xmit_noalloc(conn, art->mid, art->midlen); + xmit_noalloc(conn, art->messageid, art->midlen); XMIT_LITERAL("\r\n"); xmit_artbody(conn, artdata); } @@ -1039,8 +1046,8 @@ static void conn_make_some_xmits(Conn *conn) { art->state= art->state == art_Unchecked ? art_Unsolicited : art->state == art_Wanted ? art_Wanted : - abort(); - art->ipf->counts[art->state].sent++; + (abort(),-1); + art->ipf->counts[art->state][RC_sent]++; LIST_ADDTAIL(conn->sent, art); } else { @@ -1050,11 +1057,11 @@ static void conn_make_some_xmits(Conn *conn) { XMIT_LITERAL("IHAVE "); else XMIT_LITERAL("CHECK "); - xmit_noalloc(art->mid, art->midlen); + xmit_noalloc(conn, art->messageid, art->midlen); XMIT_LITERAL("\r\n"); assert(art->state == art_Unchecked); - art->ipf->counts[art->state].sent++; + art->ipf->counts[art->state][RCI_sent]++; LIST_ADDTAIL(conn->sent, art); } } @@ -1077,12 +1084,12 @@ static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_event ev, return OOP_CONTINUE; } -static Article *article_reply_check(Connection *conn, const char *response, +static Article *article_reply_check(Conn *conn, const char *response, int code_indicates_streaming, int must_have_sent /* 1:yes, -1:no, 0:dontcare */, const char *sanitised_response) { - Article *art= conn->sent.head; + Article *art= LIST_HEAD(conn->sent); if (!art) { connfail(conn, @@ -1094,20 +1101,20 @@ static Article *article_reply_check(Connection *conn, const char *response, if (code_indicates_streaming) { assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */ if (!conn->stream) { - connfail("peer gave streaming response code " + connfail(conn, "peer gave streaming response code " " to IHAVE or subsequent body: %s", sanitised_response); return 0; } const char *got_mid= response+4; int got_midlen= strcspn(got_mid, " \n\r"); if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') { - connfail("peer gave streaming response with syntactically invalid" + connfail(conn, "peer gave streaming response with syntactically invalid" " messageid: %s", sanitised_response); return 0; } if (got_midlen != art->midlen || memcmp(got_mid, art->messageid, got_midlen)) { - connfail("peer gave streaming response code to wrong article -" + connfail(conn, "peer gave streaming response code to wrong article -" " probable synchronisation problem; we offered: %s;" " peer said: %s", art->messageid, sanitised_response); @@ -1115,19 +1122,19 @@ static Article *article_reply_check(Connection *conn, const char *response, } } else { if (conn->stream) { - connfail("peer gave non-streaming response code to CHECK/TAKETHIS: %s", - sanitised_response); + connfail(conn, "peer gave non-streaming response code to" + " CHECK/TAKETHIS: %s", sanitised_response); return 0; } } if (must_have_sent>0 && art->state < art_Wanted) { - connfail("peer says article accepted but we had not sent the body: %s", - sanitised_response); + connfail(conn, "peer says article accepted but" + " we had not sent the body: %s", sanitised_response); return 0; } if (must_have_sent<0 && art->state >= art_Wanted) { - connfail("peer says please sent the article but we just did: %s", + connfail(conn, "peer says please sent the article but we just did: %s", sanitised_response); return 0; } @@ -1138,8 +1145,8 @@ static Article *article_reply_check(Connection *conn, const char *response, } static void update_nocheck(int accepted) { - accept_proportion *= accept_decay; - accept_proportion += accepted; + accept_proportion *= nocheck_decay; + accept_proportion += accepted * (1.0 - nocheck_decay); int new_nocheck= accept_proportion >= nocheck_thresh; if (new_nocheck && !nocheck_reported) { notice("entering nocheck mode for the first time"); @@ -1150,7 +1157,7 @@ static void update_nocheck(int accepted) { nocheck= new_nocheck; } -static void article_done(Connection *conn, Article *art, int whichcount) { +static void article_done(Conn *conn, Article *art, int whichcount) { art->ipf->counts[art->state][whichcount]++; if (whichcount == RC_accepted) update_nocheck(1); else if (whichcount == RC_unwanted) update_nocheck(0); @@ -1194,7 +1201,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, } assert(ev == OOP_RD_OK); - char *sani= sanitise(data, recsz); + char *sani= sanitise(data); char *ep; unsigned long code= strtoul(data, &ep, 10); @@ -1204,12 +1211,18 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, } if (conn->quitting) { - if (code!=205) { - connfail(conn, "peer gave failure response to QUIT: %s", sani); - return OOP_CONTINUE; + if (code!=205 && code!=503) { + connfail(conn, "peer gave unexpected response to QUIT: %s", sani); + } else { + notice("#%d idle connection closed\n"); + assert(!conn->waiting.count); + assert(!conn->priority.count); + assert(!conn->sent.count); + assert(!conn->xmitu); + LIST_REMOVE(conns,conn); + conn_dispose(conn); } - conn close ok; - return; + return OOP_CONTINUE; } Article *art; @@ -1219,7 +1232,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, if (art) ; else return OOP_CONTINUE /* reply_check has failed the conn */ #define ARTICLE_DEALTWITH(streaming,musthavesent,how) \ - code_streaming= (streaming) \ + code_streaming= (streaming); \ GET_ARTICLE(musthavesent); \ article_done(conn, art, RC_##how); break; @@ -1247,9 +1260,9 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, case 335: /* IHAVE says send it */ GET_ARTICLE(-1); assert(art->state == art_Unchecked); - art->ipf->counts[art->state].accepted++; + art->ipf->counts[art->state][RC_accepted]++; art->state= art_Wanted; - LIST_ADDTAIL(conn->queue); + LIST_ADDTAIL(conn->priority, art); break; case 431: /* CHECK or TAKETHIS says try later */ @@ -1265,7 +1278,8 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, } - check_check_work(conn); + conn_maybe_write(sendon); + check_master_queue(); return OOP_CONTINUE; } @@ -1821,7 +1835,7 @@ static void notice_processed(InputFile *ipf, const char *what, const char *spec) { #define RCI_NOTHING(x) /* nothing */ #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE -#define RCI_TRIPLE_VALS(x) RCI_TRIPLE_VALS_BASE(ipf->counts, .x) +#define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, .x) info("processed %s%s offered=%d(ch%d,nc%d) accepted=%d(ch%d+nc%d)" RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT) @@ -2218,13 +2232,6 @@ static void postfork_inputfile(InputFile *ipf) { ipf->fd= -1; } -static void postfork_conns(Connection *conn) { - while (conn) { - close(conn->fd); - conn= conn->next; - } -} - static void postfork_stdio(FILE *f) { /* we have no stdio streams that are buffered long-term */ if (f) fclose(f); @@ -2236,9 +2243,11 @@ static void postfork(const char *what) { postfork_inputfile(main_input_file); postfork_inputfile(flushing_input_file); - postfork_conns(idle.head); - postfork_conns(working.head); - postfork_conns(full.head); + + Conn *conn; + for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) + close(conn->fd); + postfork_stdio(defer); } @@ -2270,14 +2279,13 @@ static const char *debug_ipf_path(InputFile *ipf) { EVERY(period, {PERIOD_SECONDS,0}, { debug("PERIOD" - " sms=%s[%d] queue=%d until_connect=%d" + " sms=%s[%d] conns=%d queue=%d until_connect=%d" " input_files" DEBUGF_IPF(main) DEBUGF_IPF(old) DEBUGF_FMT(flushing) - " conns idle=%d working=%d full=%d" " children connecting=%ld inndcomm_child" , - sms_names[sms], sm_period_counter, queue.count, until_connect, + sms_names[sms], sm_period_counter, + queue.count, conns.count, until_connect, DEBUG_IPF(main), DEBUG_IPF(flushing), DEBUG_IPF(flushing), - idle.count, working.count, full.count, (long)connecting_child, (long)inndcomm_child );