chiark / gitweb /
check responses are in right phase for article; implement notice_processed
authorIan Jackson <ian@liberator.relativity.greenend.org.uk>
Wed, 21 Apr 2010 00:49:58 +0000 (01:49 +0100)
committerIan Jackson <ian@liberator.relativity.greenend.org.uk>
Wed, 21 Apr 2010 00:49:58 +0000 (01:49 +0100)
backends/innduct.c

index 9f032f41571857618f6172889e9aa56a7d69a6ad..46b84c6eccf39932dc262b57f0a255dcabc464e7 100644 (file)
@@ -200,24 +200,27 @@ static int nocheck, nocheck_reported;
 
 /*----- statistics -----*/
 
-#define RESULT_COUNTS                          \
-  RC(offered)                                  \
-  RC(sent)                                     \
-  RC(unwanted)                                 \
-  RC(accepted)                                 \
-  RC(rejected)                                 \
-  RC(deferred)
+typedef enum {      /* in queue                 in conn->sent             */
+  art_Unchecked,    /*   not checked, not sent    checking                */
+  art_Wanted,       /*   checked, wanted          sent body as requested  */
+  art_Unsolicited,  /*   -                        sent body without check */
+  art_MaxState
+} ArtState;
+
+#define RESULT_COUNTS(RCS,RCN)                 \
+  RCS(sent)                                    \
+  RCS(accepted)                                        \
+  RCN(unwanted)                                        \
+  RCN(rejected)                                        \
+  RCN(deferred)                                        \
+  RCN(connretry)
 
 typedef enum {
 #define RC_INDEX(x) RCI_##x
-  RESULT_COUNTS
+  RESULT_COUNTS(RC_INDEX, RC_INDEX)
   RCI_max
 } ResultCountIndex;
 
-typedef struct {
-  int articles[2 /* checked */][RCI_max];
-} Counts;
-
 
 /*----- transmission buffers -----*/
 
@@ -239,8 +242,8 @@ typedef struct {
 /*----- core operational data structure types -----*/
 
 struct Article {
+  ArtState state;
   int midlen;
-  int checked;
   InputFile *ipf;
   TOKEN token;
   off_t offset;
@@ -261,7 +264,7 @@ typedef struct InputFile {
   long inprogress; /* no. of articles read but not processed */
   off_t offset;
 
-  Counts counts;
+  int counts[art_MaxState][RCI_max];
   char path[];
 } InputFile;
 
@@ -842,7 +845,7 @@ static void conn_make_some_xmits(Conn *conn) {
     Article *art= LIST_REMHEAD(queue);
     if (!art) break;
 
-    if (art->checked || (conn->stream && nocheck)) {
+    if (art->state >= art_Wanted || (conn->stream && nocheck)) {
       /* actually send it */
 
       ARTHANDLE *artdata= SMretrieve();
@@ -863,11 +866,13 @@ static void conn_make_some_xmits(Conn *conn) {
        }
       }
 
-      art->sent= 1;
+      art->state=
+       art->state == art_Unchecked ? art_Unsolicited :
+       art->state == art_Wanted    ? art_Wanted      :
+       abort();
+      art->ipf->counts[art->state].sent++;
       LIST_ADDTAIL(conn->sent, art);
 
-      art->ipf->counts[art->checked].sent++;
-
     } else {
       /* check it */
 
@@ -878,8 +883,9 @@ static void conn_make_some_xmits(Conn *conn) {
       xmit_noalloc(art->mid, art->midlen);
       XMIT_LITERAL("\r\n");
 
+      assert(art->state == art_Unchecked);
+      art->ipf->counts[art->state].sent++;
       LIST_ADDTAIL(conn->sent, art);
-      art->ipf->counts[art->checked].offered++;
     }
   }
 }
@@ -903,8 +909,10 @@ static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_event ev,
 
 static Article *article_reply_check(Connection *conn, const char *response,
                                    int code_indicates_streaming,
+                                   int must_have_sent
+                                       /* 1:yes, -1:no, 0:dontcare */,
                                    const char *sanitised_response) {
-  Article *art= LIST_REMHEAD(conn->sent);
+  Article *art= conn->sent.head;
 
   if (!art) {
     connfail(conn,
@@ -943,6 +951,19 @@ static Article *article_reply_check(Connection *conn, const char *response,
     }
   }
 
+  if (must_have_sent>0 && art->state < art_Wanted) {
+    connfail("peer says article accepted but we had not sent the body: %s",
+            sanitised_response);
+    return 0;
+  }
+  if (must_have_sent<0 && art->state >= art_Wanted) {
+    connfail("peer says please sent the article but we just did: %s",
+            sanitised_response);
+    return 0;
+  }
+
+  Article *art_again= LIST_REMHEAD(conn->sent);
+  assert(art_again == art);
   return art;
 }
 
@@ -953,14 +974,14 @@ static void update_nocheck(int accepted) {
   if (new_nocheck && !nocheck_reported) {
     notice("entering nocheck mode for the first time");
     nocheck_reported= 1;
-  } else if (new_nocheck != nockech) {
+  } else if (new_nocheck != nocheck) {
     debug("nocheck mode %s", new_nocheck ? "start" : "stop");
   }
   nocheck= new_nocheck;
 }
 
 static void article_done(Connection *conn, Article *art, int whichcount) {
-  art->ipf->counts.articles[art->checked][whichcount]++;
+  art->ipf->counts[art->state][whichcount]++;
   if (whichcount == RC_accepted) update_nocheck(1);
   else if (whichcount == RC_unwanted) update_nocheck(0);
 
@@ -1033,13 +1054,13 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
 
   Article *art;
 
-#define GET_ARTICLE                                                        \
-  art= article_reply_check(conn, data, code_streaming, sani);              \
+#define GET_ARTICLE(musthavesent)                                          \
+  art= article_reply_check(conn, data, musthavesent, code_streaming, sani); \
   if (art) ; else return OOP_CONTINUE /* reply_check has failed the conn */
 
-#define ARTICLE_DEALTWITH(streaming,how)                       \
+#define ARTICLE_DEALTWITH(streaming,musthavesent,how)          \
   code_streaming= (streaming)                                  \
-  GET_ARTICLE;                                                 \
+  GET_ARTICLE(musthavesent);                                   \
   article_done(conn, art, RC_##how);  break;
 
 #define PEERBADMSG(m) connfail(conn, m ": %s", sani);  return OOP_CONTINUE
@@ -1052,32 +1073,29 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
   case 503: PEERBADMSG("peer timed us out");
   default:  PEERBADMSG("peer sent unexpected message");
 
-  case 435: ARTICLE_DEALTWITH(0,unwanted); /* IHAVE says they have it */
-  case 438: ARTICLE_DEALTWITH(1,unwanted); /* CHECK/TAKETHIS: they have it */
+  case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
+  case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
 
-  case 235: ARTICLE_DEALTWITH(0,accepted); /* IHAVE says thanks */
-  case 239: ARTICLE_DEALTWITH(1,accepted); /* TAKETHIS says thanks */
+  case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
+  case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
 
-  case 437: ARTICLE_DEALTWITH(0,rejected); /* IHAVE says rejected */
-  case 439: ARTICLE_DEALTWITH(1,rejected); /* TAKETHIS says rejected */
+  case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
+  case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
 
   case 238: /* CHECK says send it */
     code_streaming= 1;
   case 335: /* IHAVE says send it */
-    GET_ARTICLE;
-    count_checkedwanted++;
+    GET_ARTICLE(-1);
+    assert(art->state == art_Unchecked);
+    art->ipf->counts[art->state].accepted++;
+    art->state= art_Wanted;
     LIST_ADDTAIL(conn->queue);
-    if (art->checked) {
-      connfail("peer gave %d response to article body: %s",code, sani);
-      return OOP_CONTINUE;
-    }
-    art->checked= 1;
     break;
 
   case 431: /* CHECK or TAKETHIS says try later */
     code_streaming= 1;
   case 436: /* IHAVE says try later */
-    GET_ARTICLE;
+    GET_ARTICLE(0);
     open_defer();
     if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
        || fflush(defer))
@@ -1172,7 +1190,7 @@ typedef void *feedfile_got_article(oop_source *lp, oop_read *rd,
     art->offset= ipf->offset;
     art->blanklen= recsz;
     art->midlen= midlen;
-    art->checked= 0;
+    art->state= art_Unchecked;
     art->ipf= ipf;  ipf->inprogress++;
     art->token= TextToToken(tokentextbuf);
     strcpy(art->messageid, space+1);
@@ -1641,8 +1659,26 @@ static int inputfile_is_done(InputFile *ipf) {
 
 static void notice_processed(InputFile *ipf, const char *what,
                             const char *spec) {
-  info("processed %s%s checked=%d unchecked=%d sent=%d");
-  wip;
+#define RCI_NOTHING(x) /* nothing */
+#define RCI_TRIPLE_FMT(x) " " #x "=%d(id%d+bd%d+nc%d)"
+#define RCI_TRIPLE_VALS(x)                     \
+       , ipf->counts[art_Unchecked].sent       \
+       + ipf->counts[art_Wanted].sent          \
+       + ipf->counts[art_Unsolicited].sent,    \
+       ipf->counts[art_Unchecked].sent         \
+       , ipf->counts[art_Wanted].sent          \
+       , ipf->counts[art_Unsolicited].sent
+
+  info("processed %s%s offered=%d(ch%d,nc%d) accepted=%d(ch%d+nc%d)"
+       RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
+       ,
+       what,spec,
+       ipf->counts[art_Unchecked].sent + ipf->counts[art_Unsolicited].sent
+       , ipf->counts[art_Unchecked].sent, ipf->counts[art_Unsolicited].sent,
+       ipf->counts[art_Wanted].accepted + ipf->counts[art_Unsolicited].accepted
+       ,ipf->counts[art_Wanted].accepted,ipf->counts[art_Unsolicited].accepted
+       RESULT_COUNTS(RCI_NOTHING,  RCI_TRIPLE_VALS)
+       );
 }
 
 static void statemc_check_backlog_done(void) {