X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;f=backends%2Finnduct.c;h=421623914d6c41ac79461c83dee34020064b9f6e;hb=5d6e18436eb0f3711d9fe4027c881833b0d9490d;hp=a2b3742b47ef924ff6675780ea744926cd1a101a;hpb=7b80be094547bc246b651c8841c8ed3a837fe4cf;p=inn-innduct.git diff --git a/backends/innduct.c b/backends/innduct.c index a2b3742..4216239 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -187,13 +187,13 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. /*----- doubly linked lists -----*/ #define ISNODE(T) struct { T *succ, *pred; } node /* must be at start */ -#define DEFLIST(T) typedef struct { T *head, *tail, *tp; int count; } T##List +#define DEFLIST(T) typedef struct { T *hd, *tl, *tp; int count; } T##List #define NODE(n) (assert((void*)&(n)->node == &(n)), \ (struct node*)&(n)->node) #define LIST_CHECKCANHAVENODE(l,n) \ - ((void)((n) == ((l).head))) /* just for the type check */ + ((void)((n) == ((l).hd))) /* just for the type check */ #define LIST_ADDSOMEHOW(l,n,list_addsomehow) \ ( LIST_CHECKCANHAVENODE(l,n), \ @@ -202,7 +202,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. ) #define LIST_REMSOMEHOW(l,list_remsomehow) \ - ( (typeof((l).head)) \ + ( (typeof((l).hd)) \ ( (l).count \ ? ( (l).count--, \ list_remsomehow((struct list*)&(l)) ) \ @@ -216,6 +216,10 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead) #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail) +#define LIST_HEAD(l) ((typeof((l).hd))(list_head((struct list*)&(l)))) +#define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n)))) +#define LIST_BACK(n) ((typeof(n))list_pred(NODE((n)))) + #define LIST_REMOVE(l,n) \ ( LIST_CHECKCANHAVENODE(l,n), \ list_remove(NODE((n))), \ @@ -252,8 +256,6 @@ static int filemon_init(void); static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path); static void filemon_callback(void); -static ConnList *conn_determine_right_list(Conn *conn); -static void conn_assign_one_article(ConnList *connlist, Conn **last_assigned); static void statemc_setstate(StateMachineState newsms, int periods, const char *forlog, const char *why); static void check_master_queue(void); @@ -302,7 +304,7 @@ typedef enum { /* in queue in conn->sent */ #define RCI_TRIPLE_FMT_BASE "%d(id%d+bd%d+nc%d)" #define RCI_TRIPLE_VALS_BASE(counts,x) \ - , counts[art_Unchecked] x \ + counts[art_Unchecked] x \ + counts[art_Wanted] x \ + counts[art_Unsolicited] x, \ counts[art_Unchecked] x \ @@ -397,8 +399,8 @@ struct Conn { static oop_source *loop; -static int nconns, until_connect; -static ConnList idle, working, full; +static int until_connect; +static ConnList conns; static ArticleList queue; static char *path_lock, *path_flushing, *path_defer; @@ -710,7 +712,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { /* Phew! */ setnonblock(conn->fd, 1); conn->max_queue= conn->stream ? max_queue_per_conn : 1; - LIST_ADDHEAD(idle, conn); + LIST_ADDHEAD(conns, conn); notice("#%d connected %s", conn->fd, conn->stream ? "streaming" : "plain"); connect_attempt_discard(); check_master_queue(); @@ -817,53 +819,37 @@ static void connect_start(void) { /*========== overall control of article flow ==========*/ -static int conn_owned_articles(Conn *conn) { - return conn->sent.count + conn->queue.count; -} - static void check_master_queue(void) { - if (!queue.count) - return; - - Conn *last_assigned=0; for (;;) { - if (working.head) { - conn_assign_one_article(&working, &last_assigned); - } else if (idle.head) { - conn_assign_one_article(&idle, &last_assigned); - } else if (full.count < max_connections && - !connecting_child && !until_connect) { + if (!queue.count) + break; + + Conn *walk, *use=0; + int spare; + for (walk=LIST_HEAD(conns); walk; walk=LIST_NEXT(walk)) { + int inqueue= walk->sent.count + walk->queue.count; + spare= walk->max_queue - inqueue; + assert(inqueue <= max_queue_per_conn); + assert(spare >= 0); + if (inqueue==0) /*idle*/ { if (!use) use= walk; } + else if (spare>0) /*working*/ { use= walk; break; } + } + if (use) { + while (spare>0) { + Article *art= LIST_REMHEAD(queue); + LIST_ADDTAIL(use->queue, art); + spare--; + } + conn_check_work(use); + } else if (conns.count < max_connections && + !connecting_child && !until_connect) { until_connect= reconnect_delay_periods; connect_start(); + break; } else { break; } } - conn_check_work(last_assigned); -} - -static void conn_assign_one_article(ConnList *connlist, Conn **last_assigned) { - Conn *conn= connlist->head; - - LIST_REMOVE(*connlist, conn); - Article *art= LIST_REMHEAD(queue); - LIST_ADDTAIL(conn->queue, art); - LIST_ADDTAIL(*conn_determine_right_list(conn), conn); - - /* This slightly odd arrangement is so that we call conn_check_work - * once after filling the queue for a new connection in - * check_master_queue, rather than for each article. */ - if (conn != *last_assigned && *last_assigned) - conn_check_work(*last_assigned); - *last_assigned= conn; -} - -static ConnList *conn_determine_right_list(Conn *conn) { - int inqueue= conn_owned_articles(conn); - assert(inqueue <= max_queue_per_conn); - if (inqueue == 0) return &idle; - if (inqueue == conn->max_queue) return &full; - return &working; } static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) { @@ -915,23 +901,23 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) { char *m= xvasprintf(fmt,al); warn("#%d connection failed, requeueing " RCI_TRIPLE_FMT_BASE ": %s", - conn->fd, RCI_TRIPLE_FMT_VALS(requeue, /*nothing*/), m); + conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); free(m); close(conn->fd); - fixme remove conn from the appropriate list; + LIST_REMOVE(conns,conn); free(conn); until_connect= reconnect_delay_periods; check_master_queue(); } -static void connfail(Connection *conn, const char *fmt, ...) +static void connfail(Conn *conn, const char *fmt, ...) __attribute__((__format__(printf,2,3))); -static void connfail(Connection *conn, const char *fmt, ...) { +static void connfail(Conn *conn, const char *fmt, ...) { va_list al; va_start(al,fmt); - vconnfail(fmt,al); + vconnfail(conn,fmt,al); va_end(al); } @@ -941,7 +927,7 @@ static XmitDetails *xmit_core(Conn *conn, const char *data, int len, XmitKind kind) { /* caller must then fill in details */ struct iovec *v= &conn->xmit[conn->xmitu]; XmitDetails *d= &conn->xmitd[conn->xmitu++]; - v->iov_base= data; + v->iov_base= (char*)data; v->iov_len= len; d->kind= kind; return d; @@ -953,7 +939,7 @@ static void xmit_noalloc(Conn *conn, const char *data, int len) { #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1)) static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) { - XmitDetails *d= xmit_core(conn, ah->data, ah->len, sk_Artdata); + XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata); d->info.sm_art= ah; } @@ -985,7 +971,8 @@ static void *conn_write_some_xmits(Conn *conn) { } assert(rs > 0); - for (done=0; rs && donexmitu; done++) { struct iovec *vp= &conn->xmit[done]; XmitDetails *dp= &conn->xmitd[done]; if (rs > vp->iov_len) { @@ -1014,12 +1001,12 @@ static void conn_make_some_xmits(Conn *conn) { if (art->state >= art_Wanted || (conn->stream && nocheck)) { /* actually send it */ - ARTHANDLE *artdata= SMretrieve(); + ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL); if (conn->stream) { if (artdata) { XMIT_LITERAL("TAKETHIS "); - xmit_noalloc(conn, art->mid, art->midlen); + xmit_noalloc(conn, art->messageid, art->midlen); XMIT_LITERAL("\r\n"); xmit_artbody(conn, artdata); } @@ -1035,8 +1022,8 @@ static void conn_make_some_xmits(Conn *conn) { art->state= art->state == art_Unchecked ? art_Unsolicited : art->state == art_Wanted ? art_Wanted : - abort(); - art->ipf->counts[art->state].sent++; + (abort(),-1); + art->ipf->counts[art->state][RCI_sent]++; LIST_ADDTAIL(conn->sent, art); } else { @@ -1046,11 +1033,11 @@ static void conn_make_some_xmits(Conn *conn) { XMIT_LITERAL("IHAVE "); else XMIT_LITERAL("CHECK "); - xmit_noalloc(art->mid, art->midlen); + xmit_noalloc(conn, art->messageid, art->midlen); XMIT_LITERAL("\r\n"); assert(art->state == art_Unchecked); - art->ipf->counts[art->state].sent++; + art->ipf->counts[art->state][RCI_sent]++; LIST_ADDTAIL(conn->sent, art); } } @@ -1073,12 +1060,12 @@ static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_event ev, return OOP_CONTINUE; } -static Article *article_reply_check(Connection *conn, const char *response, +static Article *article_reply_check(Conn *conn, const char *response, int code_indicates_streaming, int must_have_sent /* 1:yes, -1:no, 0:dontcare */, const char *sanitised_response) { - Article *art= conn->sent.head; + Article *art= LIST_HEAD(conn->sent); if (!art) { connfail(conn, @@ -1090,20 +1077,20 @@ static Article *article_reply_check(Connection *conn, const char *response, if (code_indicates_streaming) { assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */ if (!conn->stream) { - connfail("peer gave streaming response code " + connfail(conn, "peer gave streaming response code " " to IHAVE or subsequent body: %s", sanitised_response); return 0; } const char *got_mid= response+4; int got_midlen= strcspn(got_mid, " \n\r"); if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') { - connfail("peer gave streaming response with syntactically invalid" + connfail(conn, "peer gave streaming response with syntactically invalid" " messageid: %s", sanitised_response); return 0; } if (got_midlen != art->midlen || memcmp(got_mid, art->messageid, got_midlen)) { - connfail("peer gave streaming response code to wrong article -" + connfail(conn, "peer gave streaming response code to wrong article -" " probable synchronisation problem; we offered: %s;" " peer said: %s", art->messageid, sanitised_response); @@ -1111,19 +1098,19 @@ static Article *article_reply_check(Connection *conn, const char *response, } } else { if (conn->stream) { - connfail("peer gave non-streaming response code to CHECK/TAKETHIS: %s", - sanitised_response); + connfail(conn, "peer gave non-streaming response code to" + " CHECK/TAKETHIS: %s", sanitised_response); return 0; } } if (must_have_sent>0 && art->state < art_Wanted) { - connfail("peer says article accepted but we had not sent the body: %s", - sanitised_response); + connfail(conn, "peer says article accepted but" + " we had not sent the body: %s", sanitised_response); return 0; } if (must_have_sent<0 && art->state >= art_Wanted) { - connfail("peer says please sent the article but we just did: %s", + connfail(conn, "peer says please sent the article but we just did: %s", sanitised_response); return 0; } @@ -1146,10 +1133,10 @@ static void update_nocheck(int accepted) { nocheck= new_nocheck; } -static void article_done(Connection *conn, Article *art, int whichcount) { +static void article_done(Conn *conn, Article *art, int whichcount) { art->ipf->counts[art->state][whichcount]++; - if (whichcount == RC_accepted) update_nocheck(1); - else if (whichcount == RC_unwanted) update_nocheck(0); + if (whichcount == RCI_accepted) update_nocheck(1); + else if (whichcount == RCI_unwanted) update_nocheck(0); InputFile *ipf= art->ipf; while (art->blanklen) { @@ -1817,7 +1804,7 @@ static void notice_processed(InputFile *ipf, const char *what, const char *spec) { #define RCI_NOTHING(x) /* nothing */ #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE -#define RCI_TRIPLE_VALS(x) RCI_TRIPLE_VALS_BASE(ipf->counts, .x) +#define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, .x) info("processed %s%s offered=%d(ch%d,nc%d) accepted=%d(ch%d+nc%d)" RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT) @@ -2214,13 +2201,6 @@ static void postfork_inputfile(InputFile *ipf) { ipf->fd= -1; } -static void postfork_conns(Connection *conn) { - while (conn) { - close(conn->fd); - conn= conn->next; - } -} - static void postfork_stdio(FILE *f) { /* we have no stdio streams that are buffered long-term */ if (f) fclose(f); @@ -2232,9 +2212,11 @@ static void postfork(const char *what) { postfork_inputfile(main_input_file); postfork_inputfile(flushing_input_file); - postfork_conns(idle.head); - postfork_conns(working.head); - postfork_conns(full.head); + + Conn *conn; + for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) + close(conn->fd); + postfork_stdio(defer); } @@ -2266,14 +2248,13 @@ static const char *debug_ipf_path(InputFile *ipf) { EVERY(period, {PERIOD_SECONDS,0}, { debug("PERIOD" - " sms=%s[%d] queue=%d until_connect=%d" + " sms=%s[%d] conns=%d queue=%d until_connect=%d" " input_files" DEBUGF_IPF(main) DEBUGF_IPF(old) DEBUGF_FMT(flushing) - " conns idle=%d working=%d full=%d" " children connecting=%ld inndcomm_child" , - sms_names[sms], sm_period_counter, queue.count, until_connect, + sms_names[sms], sm_period_counter, + queue.count, conns.count, until_connect, DEBUG_IPF(main), DEBUG_IPF(flushing), DEBUG_IPF(flushing), - idle.count, working.count, full.count, (long)connecting_child, (long)inndcomm_child );