From: Ian Jackson Date: Wed, 21 Apr 2010 00:49:58 +0000 (+0100) Subject: check responses are in right phase for article; implement notice_processed X-Git-Tag: innduct-0.1~157 X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=innduct.git;a=commitdiff_plain;h=1739a5b98c2d8b6a9be18cfa9b8a3ae9faa51180 check responses are in right phase for article; implement notice_processed --- diff --git a/backends/innduct.c b/backends/innduct.c index 9f032f4..46b84c6 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -200,24 +200,27 @@ static int nocheck, nocheck_reported; /*----- statistics -----*/ -#define RESULT_COUNTS \ - RC(offered) \ - RC(sent) \ - RC(unwanted) \ - RC(accepted) \ - RC(rejected) \ - RC(deferred) +typedef enum { /* in queue in conn->sent */ + art_Unchecked, /* not checked, not sent checking */ + art_Wanted, /* checked, wanted sent body as requested */ + art_Unsolicited, /* - sent body without check */ + art_MaxState +} ArtState; + +#define RESULT_COUNTS(RCS,RCN) \ + RCS(sent) \ + RCS(accepted) \ + RCN(unwanted) \ + RCN(rejected) \ + RCN(deferred) \ + RCN(connretry) typedef enum { #define RC_INDEX(x) RCI_##x - RESULT_COUNTS + RESULT_COUNTS(RC_INDEX, RC_INDEX) RCI_max } ResultCountIndex; -typedef struct { - int articles[2 /* checked */][RCI_max]; -} Counts; - /*----- transmission buffers -----*/ @@ -239,8 +242,8 @@ typedef struct { /*----- core operational data structure types -----*/ struct Article { + ArtState state; int midlen; - int checked; InputFile *ipf; TOKEN token; off_t offset; @@ -261,7 +264,7 @@ typedef struct InputFile { long inprogress; /* no. of articles read but not processed */ off_t offset; - Counts counts; + int counts[art_MaxState][RCI_max]; char path[]; } InputFile; @@ -842,7 +845,7 @@ static void conn_make_some_xmits(Conn *conn) { Article *art= LIST_REMHEAD(queue); if (!art) break; - if (art->checked || (conn->stream && nocheck)) { + if (art->state >= art_Wanted || (conn->stream && nocheck)) { /* actually send it */ ARTHANDLE *artdata= SMretrieve(); @@ -863,11 +866,13 @@ static void conn_make_some_xmits(Conn *conn) { } } - art->sent= 1; + art->state= + art->state == art_Unchecked ? art_Unsolicited : + art->state == art_Wanted ? art_Wanted : + abort(); + art->ipf->counts[art->state].sent++; LIST_ADDTAIL(conn->sent, art); - art->ipf->counts[art->checked].sent++; - } else { /* check it */ @@ -878,8 +883,9 @@ static void conn_make_some_xmits(Conn *conn) { xmit_noalloc(art->mid, art->midlen); XMIT_LITERAL("\r\n"); + assert(art->state == art_Unchecked); + art->ipf->counts[art->state].sent++; LIST_ADDTAIL(conn->sent, art); - art->ipf->counts[art->checked].offered++; } } } @@ -903,8 +909,10 @@ static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_event ev, static Article *article_reply_check(Connection *conn, const char *response, int code_indicates_streaming, + int must_have_sent + /* 1:yes, -1:no, 0:dontcare */, const char *sanitised_response) { - Article *art= LIST_REMHEAD(conn->sent); + Article *art= conn->sent.head; if (!art) { connfail(conn, @@ -943,6 +951,19 @@ static Article *article_reply_check(Connection *conn, const char *response, } } + if (must_have_sent>0 && art->state < art_Wanted) { + connfail("peer says article accepted but we had not sent the body: %s", + sanitised_response); + return 0; + } + if (must_have_sent<0 && art->state >= art_Wanted) { + connfail("peer says please sent the article but we just did: %s", + sanitised_response); + return 0; + } + + Article *art_again= LIST_REMHEAD(conn->sent); + assert(art_again == art); return art; } @@ -953,14 +974,14 @@ static void update_nocheck(int accepted) { if (new_nocheck && !nocheck_reported) { notice("entering nocheck mode for the first time"); nocheck_reported= 1; - } else if (new_nocheck != nockech) { + } else if (new_nocheck != nocheck) { debug("nocheck mode %s", new_nocheck ? "start" : "stop"); } nocheck= new_nocheck; } static void article_done(Connection *conn, Article *art, int whichcount) { - art->ipf->counts.articles[art->checked][whichcount]++; + art->ipf->counts[art->state][whichcount]++; if (whichcount == RC_accepted) update_nocheck(1); else if (whichcount == RC_unwanted) update_nocheck(0); @@ -1033,13 +1054,13 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, Article *art; -#define GET_ARTICLE \ - art= article_reply_check(conn, data, code_streaming, sani); \ +#define GET_ARTICLE(musthavesent) \ + art= article_reply_check(conn, data, musthavesent, code_streaming, sani); \ if (art) ; else return OOP_CONTINUE /* reply_check has failed the conn */ -#define ARTICLE_DEALTWITH(streaming,how) \ +#define ARTICLE_DEALTWITH(streaming,musthavesent,how) \ code_streaming= (streaming) \ - GET_ARTICLE; \ + GET_ARTICLE(musthavesent); \ article_done(conn, art, RC_##how); break; #define PEERBADMSG(m) connfail(conn, m ": %s", sani); return OOP_CONTINUE @@ -1052,32 +1073,29 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, case 503: PEERBADMSG("peer timed us out"); default: PEERBADMSG("peer sent unexpected message"); - case 435: ARTICLE_DEALTWITH(0,unwanted); /* IHAVE says they have it */ - case 438: ARTICLE_DEALTWITH(1,unwanted); /* CHECK/TAKETHIS: they have it */ + case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */ + case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */ - case 235: ARTICLE_DEALTWITH(0,accepted); /* IHAVE says thanks */ - case 239: ARTICLE_DEALTWITH(1,accepted); /* TAKETHIS says thanks */ + case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */ + case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */ - case 437: ARTICLE_DEALTWITH(0,rejected); /* IHAVE says rejected */ - case 439: ARTICLE_DEALTWITH(1,rejected); /* TAKETHIS says rejected */ + case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */ + case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */ case 238: /* CHECK says send it */ code_streaming= 1; case 335: /* IHAVE says send it */ - GET_ARTICLE; - count_checkedwanted++; + GET_ARTICLE(-1); + assert(art->state == art_Unchecked); + art->ipf->counts[art->state].accepted++; + art->state= art_Wanted; LIST_ADDTAIL(conn->queue); - if (art->checked) { - connfail("peer gave %d response to article body: %s",code, sani); - return OOP_CONTINUE; - } - art->checked= 1; break; case 431: /* CHECK or TAKETHIS says try later */ code_streaming= 1; case 436: /* IHAVE says try later */ - GET_ARTICLE; + GET_ARTICLE(0); open_defer(); if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0 || fflush(defer)) @@ -1172,7 +1190,7 @@ typedef void *feedfile_got_article(oop_source *lp, oop_read *rd, art->offset= ipf->offset; art->blanklen= recsz; art->midlen= midlen; - art->checked= 0; + art->state= art_Unchecked; art->ipf= ipf; ipf->inprogress++; art->token= TextToToken(tokentextbuf); strcpy(art->messageid, space+1); @@ -1641,8 +1659,26 @@ static int inputfile_is_done(InputFile *ipf) { static void notice_processed(InputFile *ipf, const char *what, const char *spec) { - info("processed %s%s checked=%d unchecked=%d sent=%d"); - wip; +#define RCI_NOTHING(x) /* nothing */ +#define RCI_TRIPLE_FMT(x) " " #x "=%d(id%d+bd%d+nc%d)" +#define RCI_TRIPLE_VALS(x) \ + , ipf->counts[art_Unchecked].sent \ + + ipf->counts[art_Wanted].sent \ + + ipf->counts[art_Unsolicited].sent, \ + ipf->counts[art_Unchecked].sent \ + , ipf->counts[art_Wanted].sent \ + , ipf->counts[art_Unsolicited].sent + + info("processed %s%s offered=%d(ch%d,nc%d) accepted=%d(ch%d+nc%d)" + RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT) + , + what,spec, + ipf->counts[art_Unchecked].sent + ipf->counts[art_Unsolicited].sent + , ipf->counts[art_Unchecked].sent, ipf->counts[art_Unsolicited].sent, + ipf->counts[art_Wanted].accepted + ipf->counts[art_Unsolicited].accepted + ,ipf->counts[art_Wanted].accepted,ipf->counts[art_Unsolicited].accepted + RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_VALS) + ); } static void statemc_check_backlog_done(void) {