X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=innduct.git;a=blobdiff_plain;f=backends%2Finnduct.c;h=5452cd831012a3ee9a8471397001fb2daded7a8c;hp=b9460ed598a62a205ef865608cbab2151815bc49;hb=c48917ea3acd06da8ce73a9894a092cd495303d9;hpb=2092e752db3c70a7f576390ce8744d0618df088b diff --git a/backends/innduct.c b/backends/innduct.c index b9460ed..5452cd8 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -200,24 +200,36 @@ static int nocheck, nocheck_reported; /*----- statistics -----*/ -#define RESULT_COUNTS \ - RC(offered) \ - RC(sent) \ - RC(unwanted) \ - RC(accepted) \ - RC(rejected) \ - RC(deferred) +typedef enum { /* in queue in conn->sent */ + art_Unchecked, /* not checked, not sent checking */ + art_Wanted, /* checked, wanted sent body as requested */ + art_Unsolicited, /* - sent body without check */ + art_MaxState +} ArtState; + +#define RESULT_COUNTS(RCS,RCN) \ + RCS(sent) \ + RCS(accepted) \ + RCN(unwanted) \ + RCN(rejected) \ + RCN(deferred) \ + RCN(connretry) + +#define RCI_TRIPLE_FMT_BASE "%d(id%d+bd%d+nc%d)" +#define RCI_TRIPLE_VALS_BASE(counts,x) \ + , counts[art_Unchecked] x \ + + counts[art_Wanted] x \ + + counts[art_Unsolicited] x, \ + counts[art_Unchecked] x \ + , counts[art_Wanted] x \ + , counts[art_Unsolicited] x typedef enum { #define RC_INDEX(x) RCI_##x - RESULT_COUNTS + RESULT_COUNTS(RC_INDEX, RC_INDEX) RCI_max } ResultCountIndex; -typedef struct { - int articles[2 /* checked */][RCI_max]; -} Counts; - /*----- transmission buffers -----*/ @@ -239,8 +251,8 @@ typedef struct { /*----- core operational data structure types -----*/ struct Article { + ArtState state; int midlen; - int checked, sentbody; InputFile *ipf; TOKEN token; off_t offset; @@ -261,7 +273,7 @@ typedef struct InputFile { long inprogress; /* no. of articles read but not processed */ off_t offset; - Counts counts; + int counts[art_MaxState][RCI_max]; char path[]; } InputFile; @@ -333,6 +345,7 @@ static pid_t xfork(const char *what) { child= fork(); if (child==-1) sysdie("cannot fork for %s",what); if (!child) postfork(what); + debug("forked %s %ld", what, (unsigned long)child); return child; } @@ -432,6 +445,7 @@ static void logcore(int sysloglevel, const char *fmt, ...) { vfprintf(stderr,fmt,al); putc('\n',stderr); } + va_end(al); } static void logv(int sysloglevel, const char *pfx, int errnoval, @@ -488,6 +502,8 @@ static void connect_attempt_discard(void) { perhaps_close(&connecting_sockets[1]); if (connecting_child) { + r= kill(connecting_child, SIGTERM); + if (r) syswarn("failed to kill connecting child"); int status= xwaitpid(&connecting_child, "connect"); if (!(WIFEXITED(status) || @@ -574,7 +590,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { /* Phew! */ LIST_ADDHEAD(idle, conn); - notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain"); + notice("#%d connected %s", conn->fd, conn->stream ? "streaming" : "plain"); connect_attempt_discard(); check_master_queue(); return 0; @@ -587,7 +603,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { connect_attempt_discard(); } -static void connect_start() { +static void connect_start(void) { assert(!connecting_sockets[0]); assert(!connecting_sockets[1]); assert(!connecting_child); @@ -765,6 +781,45 @@ static void conn_check_work(Conn *conn) { } } +static void vconnfail(Conn *conn, const char *fmt, va_list al) + __attribute__((printf,2,0)); + +static void vconnfail(Conn *conn, const char *fmt, va_list al) { + int requeue[art_MaxState]; + + Article *art; + while ((art= LIST_REMHEAD(conn->queue))) LIST_ADDTAIL(queue); + while ((art= LIST_REMHEAD(conn->sent))) { + counts[art->state]++; + if (art->state==art_Unsolicited) art->state= art_Unchecked; + LIST_ADDTAIL(queue); + } + + int i; + XmitDetails *xd; + for (i=0, dp=&conn->xmitd; ixmitu; i++, dp++) + xmit_free(dp); + + char *m= xvasprintf(fmt,al); + warn("#%d connection failed, requeueing " RCI_TRIPLE_FMT_BASE ": %s", + conn->fd, RCI_TRIPLE_FMT_VALS(requeue, /*nothing*/), m); + free(m); + + close(conn->fd); + free(conn); + + connect_delay= reconnect_delay_periods; + check_master_queue(); +} + +static void connfail(Connection *conn, const char *fmt, ...) + __attribute__((printf,2,3)); +static void connfail(Connection *conn, const char *fmt, ...) { + va_list al; + va_start(al,fmt); + vconnfail(fmt,al); + va_end(al); +} /*========== article transmission ==========*/ @@ -842,7 +897,7 @@ static void conn_make_some_xmits(Conn *conn) { Article *art= LIST_REMHEAD(queue); if (!art) break; - if (art->checked || (conn->stream && nocheck)) { + if (art->state >= art_Wanted || (conn->stream && nocheck)) { /* actually send it */ ARTHANDLE *artdata= SMretrieve(); @@ -863,11 +918,13 @@ static void conn_make_some_xmits(Conn *conn) { } } - art->sent= 1; + art->state= + art->state == art_Unchecked ? art_Unsolicited : + art->state == art_Wanted ? art_Wanted : + abort(); + art->ipf->counts[art->state].sent++; LIST_ADDTAIL(conn->sent, art); - art->ipf->counts[art->checked].sent++; - } else { /* check it */ @@ -878,8 +935,9 @@ static void conn_make_some_xmits(Conn *conn) { xmit_noalloc(art->mid, art->midlen); XMIT_LITERAL("\r\n"); + assert(art->state == art_Unchecked); + art->ipf->counts[art->state].sent++; LIST_ADDTAIL(conn->sent, art); - art->ipf->counts[art->checked].offered++; } } } @@ -903,8 +961,10 @@ static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_event ev, static Article *article_reply_check(Connection *conn, const char *response, int code_indicates_streaming, + int must_have_sent + /* 1:yes, -1:no, 0:dontcare */, const char *sanitised_response) { - Article *art= LIST_REMHEAD(conn->sent); + Article *art= conn->sent.head; if (!art) { connfail(conn, @@ -943,6 +1003,19 @@ static Article *article_reply_check(Connection *conn, const char *response, } } + if (must_have_sent>0 && art->state < art_Wanted) { + connfail("peer says article accepted but we had not sent the body: %s", + sanitised_response); + return 0; + } + if (must_have_sent<0 && art->state >= art_Wanted) { + connfail("peer says please sent the article but we just did: %s", + sanitised_response); + return 0; + } + + Article *art_again= LIST_REMHEAD(conn->sent); + assert(art_again == art); return art; } @@ -953,15 +1026,14 @@ static void update_nocheck(int accepted) { if (new_nocheck && !nocheck_reported) { notice("entering nocheck mode for the first time"); nocheck_reported= 1; - } else if (new_nocheck != nockech) { + } else if (new_nocheck != nocheck) { debug("nocheck mode %s", new_nocheck ? "start" : "stop"); } nocheck= new_nocheck; } static void article_done(Connection *conn, Article *art, int whichcount) { - *count++; - art->ipf->counts.articles[art->checked][whichcount]++; + art->ipf->counts[art->state][whichcount]++; if (whichcount == RC_accepted) update_nocheck(1); else if (whichcount == RC_unwanted) update_nocheck(0); @@ -1034,13 +1106,13 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, Article *art; -#define GET_ARTICLE \ - art= article_reply_check(conn, data, code_streaming, sani); \ +#define GET_ARTICLE(musthavesent) \ + art= article_reply_check(conn, data, musthavesent, code_streaming, sani); \ if (art) ; else return OOP_CONTINUE /* reply_check has failed the conn */ -#define ARTICLE_DEALTWITH(streaming,how) \ +#define ARTICLE_DEALTWITH(streaming,musthavesent,how) \ code_streaming= (streaming) \ - GET_ARTICLE; \ + GET_ARTICLE(musthavesent); \ article_done(conn, art, RC_##how); break; #define PEERBADMSG(m) connfail(conn, m ": %s", sani); return OOP_CONTINUE @@ -1053,32 +1125,29 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, case 503: PEERBADMSG("peer timed us out"); default: PEERBADMSG("peer sent unexpected message"); - case 435: ARTICLE_DEALTWITH(0,unwanted); /* IHAVE says they have it */ - case 438: ARTICLE_DEALTWITH(1,unwanted); /* CHECK/TAKETHIS: they have it */ + case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */ + case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */ - case 235: ARTICLE_DEALTWITH(0,accepted); /* IHAVE says thanks */ - case 239: ARTICLE_DEALTWITH(1,accepted); /* TAKETHIS says thanks */ + case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */ + case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */ - case 437: ARTICLE_DEALTWITH(0,rejected); /* IHAVE says rejected */ - case 439: ARTICLE_DEALTWITH(1,rejected); /* TAKETHIS says rejected */ + case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */ + case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */ case 238: /* CHECK says send it */ code_streaming= 1; case 335: /* IHAVE says send it */ - GET_ARTICLE; - count_checkedwanted++; + GET_ARTICLE(-1); + assert(art->state == art_Unchecked); + art->ipf->counts[art->state].accepted++; + art->state= art_Wanted; LIST_ADDTAIL(conn->queue); - if (art->checked) { - connfail("peer gave %d response to article body: %s",code, sani); - return OOP_CONTINUE; - } - art->checked= 1; break; case 431: /* CHECK or TAKETHIS says try later */ code_streaming= 1; case 436: /* IHAVE says try later */ - GET_ARTICLE; + GET_ARTICLE(0); open_defer(); if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0 || fflush(defer)) @@ -1173,7 +1242,7 @@ typedef void *feedfile_got_article(oop_source *lp, oop_read *rd, art->offset= ipf->offset; art->blanklen= recsz; art->midlen= midlen; - art->checked= art->sentbody= 0; + art->state= art_Unchecked; art->ipf= ipf; ipf->inprogress++; art->token= TextToToken(tokentextbuf); strcpy(art->messageid, space+1); @@ -1639,12 +1708,36 @@ static int inputfile_is_done(InputFile *ipf) { if (ipf->fd >= 0); return 0; /* not had EOF */ return 1; } - + +static void notice_processed(InputFile *ipf, const char *what, + const char *spec) { +#define RCI_NOTHING(x) /* nothing */ +#define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE +#define RCI_TRIPLE_VALS(x) RCI_TRIPLE_VALS_BASE(ipf->counts, .x) + + info("processed %s%s offered=%d(ch%d,nc%d) accepted=%d(ch%d+nc%d)" + RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT) + , + what,spec, + ipf->counts[art_Unchecked].sent + ipf->counts[art_Unsolicited].sent + , ipf->counts[art_Unchecked].sent, ipf->counts[art_Unsolicited].sent, + ipf->counts[art_Wanted].accepted + ipf->counts[art_Unsolicited].accepted + ,ipf->counts[art_Wanted].accepted,ipf->counts[art_Unsolicited].accepted + RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_VALS) + ); +} + static void statemc_check_backlog_done(void) { InputFile *ipf= backlog_input_file(); if (!inputfile_is_done(ipf)) return; - notice_processed(ipf,"backlog file",ipf->path); + const char *slash= strrchr(ipf->path, "/"); + const char *leaf= slash ? slash+1 : ipf->path; + const char *under= strchr(slash, "_"); + const char *rest= under ? under+1 : leaf; + if (!strncmp(rest,"backlog",7)) rest += 7; + notice_processed(ipf,"backlog:",rest); + close_input_file(ipf); if (unlink(ipf->path)) { if (errno != ENOENT) @@ -1665,7 +1758,7 @@ static void statemc_check_flushing_done(void) { assert(sms==sm_SEPARATED || sms==sm_DROPPING); - notice_processed(ipf,"feed file",0); + notice_processed(ipf,"feedfile",0); close_defer();