chiark / gitweb /
wip manpage
[innduct.git] / backends / innduct.c
index b9460ed598a62a205ef865608cbab2151815bc49..5452cd831012a3ee9a8471397001fb2daded7a8c 100644 (file)
@@ -200,24 +200,36 @@ static int nocheck, nocheck_reported;
 
 /*----- statistics -----*/
 
-#define RESULT_COUNTS                          \
-  RC(offered)                                  \
-  RC(sent)                                     \
-  RC(unwanted)                                 \
-  RC(accepted)                                 \
-  RC(rejected)                                 \
-  RC(deferred)
+typedef enum {      /* in queue                 in conn->sent             */
+  art_Unchecked,    /*   not checked, not sent    checking                */
+  art_Wanted,       /*   checked, wanted          sent body as requested  */
+  art_Unsolicited,  /*   -                        sent body without check */
+  art_MaxState
+} ArtState;
+
+#define RESULT_COUNTS(RCS,RCN)                 \
+  RCS(sent)                                    \
+  RCS(accepted)                                        \
+  RCN(unwanted)                                        \
+  RCN(rejected)                                        \
+  RCN(deferred)                                        \
+  RCN(connretry)
+
+#define RCI_TRIPLE_FMT_BASE "%d(id%d+bd%d+nc%d)"
+#define RCI_TRIPLE_VALS_BASE(counts,x)         \
+       , counts[art_Unchecked] x               \
+       + counts[art_Wanted] x                  \
+       + counts[art_Unsolicited] x,            \
+       counts[art_Unchecked] x                 \
+       , counts[art_Wanted] x                  \
+       , counts[art_Unsolicited] x
 
 typedef enum {
 #define RC_INDEX(x) RCI_##x
-  RESULT_COUNTS
+  RESULT_COUNTS(RC_INDEX, RC_INDEX)
   RCI_max
 } ResultCountIndex;
 
-typedef struct {
-  int articles[2 /* checked */][RCI_max];
-} Counts;
-
 
 /*----- transmission buffers -----*/
 
@@ -239,8 +251,8 @@ typedef struct {
 /*----- core operational data structure types -----*/
 
 struct Article {
+  ArtState state;
   int midlen;
-  int checked, sentbody;
   InputFile *ipf;
   TOKEN token;
   off_t offset;
@@ -261,7 +273,7 @@ typedef struct InputFile {
   long inprogress; /* no. of articles read but not processed */
   off_t offset;
 
-  Counts counts;
+  int counts[art_MaxState][RCI_max];
   char path[];
 } InputFile;
 
@@ -333,6 +345,7 @@ static pid_t xfork(const char *what) {
   child= fork();
   if (child==-1) sysdie("cannot fork for %s",what);
   if (!child) postfork(what);
+  debug("forked %s %ld", what, (unsigned long)child);
   return child;
 }
 
@@ -432,6 +445,7 @@ static void logcore(int sysloglevel, const char *fmt, ...) {
     vfprintf(stderr,fmt,al);
     putc('\n',stderr);
   }
+  va_end(al);
 }
 
 static void logv(int sysloglevel, const char *pfx, int errnoval,
@@ -488,6 +502,8 @@ static void connect_attempt_discard(void) {
   perhaps_close(&connecting_sockets[1]);
 
   if (connecting_child) {
+    r= kill(connecting_child, SIGTERM);
+    if (r) syswarn("failed to kill connecting child");
     int status= xwaitpid(&connecting_child, "connect");
 
     if (!(WIFEXITED(status) ||
@@ -574,7 +590,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
 
   /* Phew! */
   LIST_ADDHEAD(idle, conn);
-  notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain");
+  notice("#%d connected %s", conn->fd, conn->stream ? "streaming" : "plain");
   connect_attempt_discard();
   check_master_queue();
   return 0;
@@ -587,7 +603,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
   connect_attempt_discard();
 }
 
-static void connect_start() {
+static void connect_start(void) {
   assert(!connecting_sockets[0]);
   assert(!connecting_sockets[1]);
   assert(!connecting_child);
@@ -765,6 +781,45 @@ static void conn_check_work(Conn *conn)  {
   }
 }
 
+static void vconnfail(Conn *conn, const char *fmt, va_list al)
+     __attribute__((printf,2,0));
+
+static void vconnfail(Conn *conn, const char *fmt, va_list al) {
+  int requeue[art_MaxState];
+
+  Article *art;
+  while ((art= LIST_REMHEAD(conn->queue))) LIST_ADDTAIL(queue);
+  while ((art= LIST_REMHEAD(conn->sent))) {
+    counts[art->state]++;
+    if (art->state==art_Unsolicited) art->state= art_Unchecked;
+    LIST_ADDTAIL(queue);
+  }
+
+  int i;
+  XmitDetails *xd;
+  for (i=0, dp=&conn->xmitd; i<conn->xmitu; i++, dp++)
+    xmit_free(dp);
+
+  char *m= xvasprintf(fmt,al);
+  warn("#%d connection failed, requeueing " RCI_TRIPLE_FMT_BASE ": %s",
+       conn->fd, RCI_TRIPLE_FMT_VALS(requeue, /*nothing*/), m);
+  free(m);
+
+  close(conn->fd);
+  free(conn);
+
+  connect_delay= reconnect_delay_periods;
+  check_master_queue();
+}
+
+static void connfail(Connection *conn, const char *fmt, ...)
+     __attribute__((printf,2,3));
+static void connfail(Connection *conn, const char *fmt, ...) {
+  va_list al;
+  va_start(al,fmt);
+  vconnfail(fmt,al);
+  va_end(al);
+}
 
 /*========== article transmission ==========*/
 
@@ -842,7 +897,7 @@ static void conn_make_some_xmits(Conn *conn) {
     Article *art= LIST_REMHEAD(queue);
     if (!art) break;
 
-    if (art->checked || (conn->stream && nocheck)) {
+    if (art->state >= art_Wanted || (conn->stream && nocheck)) {
       /* actually send it */
 
       ARTHANDLE *artdata= SMretrieve();
@@ -863,11 +918,13 @@ static void conn_make_some_xmits(Conn *conn) {
        }
       }
 
-      art->sent= 1;
+      art->state=
+       art->state == art_Unchecked ? art_Unsolicited :
+       art->state == art_Wanted    ? art_Wanted      :
+       abort();
+      art->ipf->counts[art->state].sent++;
       LIST_ADDTAIL(conn->sent, art);
 
-      art->ipf->counts[art->checked].sent++;
-
     } else {
       /* check it */
 
@@ -878,8 +935,9 @@ static void conn_make_some_xmits(Conn *conn) {
       xmit_noalloc(art->mid, art->midlen);
       XMIT_LITERAL("\r\n");
 
+      assert(art->state == art_Unchecked);
+      art->ipf->counts[art->state].sent++;
       LIST_ADDTAIL(conn->sent, art);
-      art->ipf->counts[art->checked].offered++;
     }
   }
 }
@@ -903,8 +961,10 @@ static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_event ev,
 
 static Article *article_reply_check(Connection *conn, const char *response,
                                    int code_indicates_streaming,
+                                   int must_have_sent
+                                       /* 1:yes, -1:no, 0:dontcare */,
                                    const char *sanitised_response) {
-  Article *art= LIST_REMHEAD(conn->sent);
+  Article *art= conn->sent.head;
 
   if (!art) {
     connfail(conn,
@@ -943,6 +1003,19 @@ static Article *article_reply_check(Connection *conn, const char *response,
     }
   }
 
+  if (must_have_sent>0 && art->state < art_Wanted) {
+    connfail("peer says article accepted but we had not sent the body: %s",
+            sanitised_response);
+    return 0;
+  }
+  if (must_have_sent<0 && art->state >= art_Wanted) {
+    connfail("peer says please sent the article but we just did: %s",
+            sanitised_response);
+    return 0;
+  }
+
+  Article *art_again= LIST_REMHEAD(conn->sent);
+  assert(art_again == art);
   return art;
 }
 
@@ -953,15 +1026,14 @@ static void update_nocheck(int accepted) {
   if (new_nocheck && !nocheck_reported) {
     notice("entering nocheck mode for the first time");
     nocheck_reported= 1;
-  } else if (new_nocheck != nockech) {
+  } else if (new_nocheck != nocheck) {
     debug("nocheck mode %s", new_nocheck ? "start" : "stop");
   }
   nocheck= new_nocheck;
 }
 
 static void article_done(Connection *conn, Article *art, int whichcount) {
-  *count++;
-  art->ipf->counts.articles[art->checked][whichcount]++;
+  art->ipf->counts[art->state][whichcount]++;
   if (whichcount == RC_accepted) update_nocheck(1);
   else if (whichcount == RC_unwanted) update_nocheck(0);
 
@@ -1034,13 +1106,13 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
 
   Article *art;
 
-#define GET_ARTICLE                                                        \
-  art= article_reply_check(conn, data, code_streaming, sani);              \
+#define GET_ARTICLE(musthavesent)                                          \
+  art= article_reply_check(conn, data, musthavesent, code_streaming, sani); \
   if (art) ; else return OOP_CONTINUE /* reply_check has failed the conn */
 
-#define ARTICLE_DEALTWITH(streaming,how)                       \
+#define ARTICLE_DEALTWITH(streaming,musthavesent,how)          \
   code_streaming= (streaming)                                  \
-  GET_ARTICLE;                                                 \
+  GET_ARTICLE(musthavesent);                                   \
   article_done(conn, art, RC_##how);  break;
 
 #define PEERBADMSG(m) connfail(conn, m ": %s", sani);  return OOP_CONTINUE
@@ -1053,32 +1125,29 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
   case 503: PEERBADMSG("peer timed us out");
   default:  PEERBADMSG("peer sent unexpected message");
 
-  case 435: ARTICLE_DEALTWITH(0,unwanted); /* IHAVE says they have it */
-  case 438: ARTICLE_DEALTWITH(1,unwanted); /* CHECK/TAKETHIS: they have it */
+  case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
+  case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
 
-  case 235: ARTICLE_DEALTWITH(0,accepted); /* IHAVE says thanks */
-  case 239: ARTICLE_DEALTWITH(1,accepted); /* TAKETHIS says thanks */
+  case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
+  case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
 
-  case 437: ARTICLE_DEALTWITH(0,rejected); /* IHAVE says rejected */
-  case 439: ARTICLE_DEALTWITH(1,rejected); /* TAKETHIS says rejected */
+  case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
+  case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
 
   case 238: /* CHECK says send it */
     code_streaming= 1;
   case 335: /* IHAVE says send it */
-    GET_ARTICLE;
-    count_checkedwanted++;
+    GET_ARTICLE(-1);
+    assert(art->state == art_Unchecked);
+    art->ipf->counts[art->state].accepted++;
+    art->state= art_Wanted;
     LIST_ADDTAIL(conn->queue);
-    if (art->checked) {
-      connfail("peer gave %d response to article body: %s",code, sani);
-      return OOP_CONTINUE;
-    }
-    art->checked= 1;
     break;
 
   case 431: /* CHECK or TAKETHIS says try later */
     code_streaming= 1;
   case 436: /* IHAVE says try later */
-    GET_ARTICLE;
+    GET_ARTICLE(0);
     open_defer();
     if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
        || fflush(defer))
@@ -1173,7 +1242,7 @@ typedef void *feedfile_got_article(oop_source *lp, oop_read *rd,
     art->offset= ipf->offset;
     art->blanklen= recsz;
     art->midlen= midlen;
-    art->checked= art->sentbody= 0;
+    art->state= art_Unchecked;
     art->ipf= ipf;  ipf->inprogress++;
     art->token= TextToToken(tokentextbuf);
     strcpy(art->messageid, space+1);
@@ -1639,12 +1708,36 @@ static int inputfile_is_done(InputFile *ipf) {
   if (ipf->fd >= 0); return 0; /* not had EOF */
   return 1;
 }
+
+static void notice_processed(InputFile *ipf, const char *what,
+                            const char *spec) {
+#define RCI_NOTHING(x) /* nothing */
+#define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
+#define RCI_TRIPLE_VALS(x) RCI_TRIPLE_VALS_BASE(ipf->counts, .x)
+
+  info("processed %s%s offered=%d(ch%d,nc%d) accepted=%d(ch%d+nc%d)"
+       RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
+       ,
+       what,spec,
+       ipf->counts[art_Unchecked].sent + ipf->counts[art_Unsolicited].sent
+       , ipf->counts[art_Unchecked].sent, ipf->counts[art_Unsolicited].sent,
+       ipf->counts[art_Wanted].accepted + ipf->counts[art_Unsolicited].accepted
+       ,ipf->counts[art_Wanted].accepted,ipf->counts[art_Unsolicited].accepted
+       RESULT_COUNTS(RCI_NOTHING,  RCI_TRIPLE_VALS)
+       );
+}
+
 static void statemc_check_backlog_done(void) {
   InputFile *ipf= backlog_input_file();
   if (!inputfile_is_done(ipf)) return;
 
-  notice_processed(ipf,"backlog file",ipf->path);
+  const char *slash= strrchr(ipf->path, "/");
+  const char *leaf= slash ? slash+1 : ipf->path;
+  const char *under= strchr(slash, "_");
+  const char *rest= under ? under+1 : leaf;
+  if (!strncmp(rest,"backlog",7)) rest += 7;
+  notice_processed(ipf,"backlog:",rest);
+  
   close_input_file(ipf);
   if (unlink(ipf->path)) {
     if (errno != ENOENT)
@@ -1665,7 +1758,7 @@ static void statemc_check_flushing_done(void) {
 
   assert(sms==sm_SEPARATED || sms==sm_DROPPING);
 
-  notice_processed(ipf,"feed file",0);
+  notice_processed(ipf,"feedfile",0);
 
   close_defer();