/*----- statistics -----*/
-#define RESULT_COUNTS \
- RC(offered) \
- RC(sent) \
- RC(unwanted) \
- RC(accepted) \
- RC(rejected) \
- RC(deferred)
+typedef enum { /* in queue in conn->sent */
+ art_Unchecked, /* not checked, not sent checking */
+ art_Wanted, /* checked, wanted sent body as requested */
+ art_Unsolicited, /* - sent body without check */
+ art_MaxState
+} ArtState;
+
+#define RESULT_COUNTS(RCS,RCN) \
+ RCS(sent) \
+ RCS(accepted) \
+ RCN(unwanted) \
+ RCN(rejected) \
+ RCN(deferred) \
+ RCN(connretry)
+
+#define RCI_TRIPLE_FMT_BASE "%d(id%d+bd%d+nc%d)"
+#define RCI_TRIPLE_VALS_BASE(counts,x) \
+ , counts[art_Unchecked] x \
+ + counts[art_Wanted] x \
+ + counts[art_Unsolicited] x, \
+ counts[art_Unchecked] x \
+ , counts[art_Wanted] x \
+ , counts[art_Unsolicited] x
typedef enum {
#define RC_INDEX(x) RCI_##x
- RESULT_COUNTS
+ RESULT_COUNTS(RC_INDEX, RC_INDEX)
RCI_max
} ResultCountIndex;
-typedef struct {
- int articles[2 /* checked */][RCI_max];
-} Counts;
-
/*----- transmission buffers -----*/
/*----- core operational data structure types -----*/
struct Article {
+ ArtState state;
int midlen;
- int checked;
InputFile *ipf;
TOKEN token;
off_t offset;
long inprogress; /* no. of articles read but not processed */
off_t offset;
- Counts counts;
+ int counts[art_MaxState][RCI_max];
char path[];
} InputFile;
child= fork();
if (child==-1) sysdie("cannot fork for %s",what);
if (!child) postfork(what);
+ debug("forked %s %ld", what, (unsigned long)child);
return child;
}
vfprintf(stderr,fmt,al);
putc('\n',stderr);
}
+ va_end(al);
}
static void logv(int sysloglevel, const char *pfx, int errnoval,
perhaps_close(&connecting_sockets[1]);
if (connecting_child) {
+ r= kill(connecting_child, SIGTERM);
+ if (r) syswarn("failed to kill connecting child");
int status= xwaitpid(&connecting_child, "connect");
if (!(WIFEXITED(status) ||
/* Phew! */
LIST_ADDHEAD(idle, conn);
- notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain");
+ notice("#%d connected %s", conn->fd, conn->stream ? "streaming" : "plain");
connect_attempt_discard();
check_master_queue();
return 0;
connect_attempt_discard();
}
-static void connect_start() {
+static void connect_start(void) {
assert(!connecting_sockets[0]);
assert(!connecting_sockets[1]);
assert(!connecting_child);
}
}
+static void vconnfail(Conn *conn, const char *fmt, va_list al)
+ __attribute__((printf,2,0));
+
+static void vconnfail(Conn *conn, const char *fmt, va_list al) {
+ int requeue[art_MaxState];
+
+ Article *art;
+ while ((art= LIST_REMHEAD(conn->queue))) LIST_ADDTAIL(queue);
+ while ((art= LIST_REMHEAD(conn->sent))) {
+ counts[art->state]++;
+ if (art->state==art_Unsolicited) art->state= art_Unchecked;
+ LIST_ADDTAIL(queue);
+ }
+
+ int i;
+ XmitDetails *xd;
+ for (i=0, dp=&conn->xmitd; i<conn->xmitu; i++, dp++)
+ xmit_free(dp);
+
+ char *m= xvasprintf(fmt,al);
+ warn("#%d connection failed, requeueing " RCI_TRIPLE_FMT_BASE ": %s",
+ conn->fd, RCI_TRIPLE_FMT_VALS(requeue, /*nothing*/), m);
+ free(m);
+
+ close(conn->fd);
+ free(conn);
+
+ connect_delay= reconnect_delay_periods;
+ check_master_queue();
+}
+
+static void connfail(Connection *conn, const char *fmt, ...)
+ __attribute__((printf,2,3));
+static void connfail(Connection *conn, const char *fmt, ...) {
+ va_list al;
+ va_start(al,fmt);
+ vconnfail(fmt,al);
+ va_end(al);
+}
/*========== article transmission ==========*/
Article *art= LIST_REMHEAD(queue);
if (!art) break;
- if (art->checked || (conn->stream && nocheck)) {
+ if (art->state >= art_Wanted || (conn->stream && nocheck)) {
/* actually send it */
ARTHANDLE *artdata= SMretrieve();
}
}
- art->sent= 1;
+ art->state=
+ art->state == art_Unchecked ? art_Unsolicited :
+ art->state == art_Wanted ? art_Wanted :
+ abort();
+ art->ipf->counts[art->state].sent++;
LIST_ADDTAIL(conn->sent, art);
- art->ipf->counts[art->checked].sent++;
-
} else {
/* check it */
xmit_noalloc(art->mid, art->midlen);
XMIT_LITERAL("\r\n");
+ assert(art->state == art_Unchecked);
+ art->ipf->counts[art->state].sent++;
LIST_ADDTAIL(conn->sent, art);
- art->ipf->counts[art->checked].offered++;
}
}
}
static Article *article_reply_check(Connection *conn, const char *response,
int code_indicates_streaming,
+ int must_have_sent
+ /* 1:yes, -1:no, 0:dontcare */,
const char *sanitised_response) {
- Article *art= LIST_REMHEAD(conn->sent);
+ Article *art= conn->sent.head;
if (!art) {
connfail(conn,
}
}
+ if (must_have_sent>0 && art->state < art_Wanted) {
+ connfail("peer says article accepted but we had not sent the body: %s",
+ sanitised_response);
+ return 0;
+ }
+ if (must_have_sent<0 && art->state >= art_Wanted) {
+ connfail("peer says please sent the article but we just did: %s",
+ sanitised_response);
+ return 0;
+ }
+
+ Article *art_again= LIST_REMHEAD(conn->sent);
+ assert(art_again == art);
return art;
}
if (new_nocheck && !nocheck_reported) {
notice("entering nocheck mode for the first time");
nocheck_reported= 1;
- } else if (new_nocheck != nockech) {
+ } else if (new_nocheck != nocheck) {
debug("nocheck mode %s", new_nocheck ? "start" : "stop");
}
nocheck= new_nocheck;
}
static void article_done(Connection *conn, Article *art, int whichcount) {
- art->ipf->counts.articles[art->checked][whichcount]++;
+ art->ipf->counts[art->state][whichcount]++;
if (whichcount == RC_accepted) update_nocheck(1);
else if (whichcount == RC_unwanted) update_nocheck(0);
Article *art;
-#define GET_ARTICLE \
- art= article_reply_check(conn, data, code_streaming, sani); \
+#define GET_ARTICLE(musthavesent) \
+ art= article_reply_check(conn, data, musthavesent, code_streaming, sani); \
if (art) ; else return OOP_CONTINUE /* reply_check has failed the conn */
-#define ARTICLE_DEALTWITH(streaming,how) \
+#define ARTICLE_DEALTWITH(streaming,musthavesent,how) \
code_streaming= (streaming) \
- GET_ARTICLE; \
+ GET_ARTICLE(musthavesent); \
article_done(conn, art, RC_##how); break;
#define PEERBADMSG(m) connfail(conn, m ": %s", sani); return OOP_CONTINUE
case 503: PEERBADMSG("peer timed us out");
default: PEERBADMSG("peer sent unexpected message");
- case 435: ARTICLE_DEALTWITH(0,unwanted); /* IHAVE says they have it */
- case 438: ARTICLE_DEALTWITH(1,unwanted); /* CHECK/TAKETHIS: they have it */
+ case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
+ case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
- case 235: ARTICLE_DEALTWITH(0,accepted); /* IHAVE says thanks */
- case 239: ARTICLE_DEALTWITH(1,accepted); /* TAKETHIS says thanks */
+ case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
+ case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
- case 437: ARTICLE_DEALTWITH(0,rejected); /* IHAVE says rejected */
- case 439: ARTICLE_DEALTWITH(1,rejected); /* TAKETHIS says rejected */
+ case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
+ case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
case 238: /* CHECK says send it */
code_streaming= 1;
case 335: /* IHAVE says send it */
- GET_ARTICLE;
- count_checkedwanted++;
+ GET_ARTICLE(-1);
+ assert(art->state == art_Unchecked);
+ art->ipf->counts[art->state].accepted++;
+ art->state= art_Wanted;
LIST_ADDTAIL(conn->queue);
- if (art->checked) {
- connfail("peer gave %d response to article body: %s",code, sani);
- return OOP_CONTINUE;
- }
- art->checked= 1;
break;
case 431: /* CHECK or TAKETHIS says try later */
code_streaming= 1;
case 436: /* IHAVE says try later */
- GET_ARTICLE;
+ GET_ARTICLE(0);
open_defer();
if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
|| fflush(defer))
art->offset= ipf->offset;
art->blanklen= recsz;
art->midlen= midlen;
- art->checked= 0;
+ art->state= art_Unchecked;
art->ipf= ipf; ipf->inprogress++;
art->token= TextToToken(tokentextbuf);
strcpy(art->messageid, space+1);
static void notice_processed(InputFile *ipf, const char *what,
const char *spec) {
- info("processed %s%s checked=%d unchecked=%d sent=%d");
- wip;
+#define RCI_NOTHING(x) /* nothing */
+#define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
+#define RCI_TRIPLE_VALS(x) RCI_TRIPLE_VALS_BASE(ipf->counts, .x)
+
+ info("processed %s%s offered=%d(ch%d,nc%d) accepted=%d(ch%d+nc%d)"
+ RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
+ ,
+ what,spec,
+ ipf->counts[art_Unchecked].sent + ipf->counts[art_Unsolicited].sent
+ , ipf->counts[art_Unchecked].sent, ipf->counts[art_Unsolicited].sent,
+ ipf->counts[art_Wanted].accepted + ipf->counts[art_Unsolicited].accepted
+ ,ipf->counts[art_Wanted].accepted,ipf->counts[art_Unsolicited].accepted
+ RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_VALS)
+ );
}
static void statemc_check_backlog_done(void) {