#define RCI_TRIPLE_FMT_BASE "%d(id%d+bd%d+nc%d)"
#define RCI_TRIPLE_VALS_BASE(counts,x) \
- , counts[art_Unchecked] x \
+ counts[art_Unchecked] x \
+ counts[art_Wanted] x \
+ counts[art_Unsolicited] x, \
counts[art_Unchecked] x \
char *m= xvasprintf(fmt,al);
warn("#%d connection failed, requeueing " RCI_TRIPLE_FMT_BASE ": %s",
- conn->fd, RCI_TRIPLE_FMT_VALS(requeue, /*nothing*/), m);
+ conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
free(m);
close(conn->fd);
- fixme remove conn from the appropriate list;
+ LIST_REMOVE(conns,conn);
free(conn);
until_connect= reconnect_delay_periods;
check_master_queue();
}
-static void connfail(Connection *conn, const char *fmt, ...)
+static void connfail(Conn *conn, const char *fmt, ...)
__attribute__((__format__(printf,2,3)));
-static void connfail(Connection *conn, const char *fmt, ...) {
+static void connfail(Conn *conn, const char *fmt, ...) {
va_list al;
va_start(al,fmt);
- vconnfail(fmt,al);
+ vconnfail(conn,fmt,al);
va_end(al);
}
XmitKind kind) { /* caller must then fill in details */
struct iovec *v= &conn->xmit[conn->xmitu];
XmitDetails *d= &conn->xmitd[conn->xmitu++];
- v->iov_base= data;
+ v->iov_base= (char*)data;
v->iov_len= len;
d->kind= kind;
return d;
#define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
- XmitDetails *d= xmit_core(conn, ah->data, ah->len, sk_Artdata);
+ XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
d->info.sm_art= ah;
}
}
assert(rs > 0);
- for (done=0; rs && done<xmitu; done++) {
+ int done;
+ for (done=0; rs && done<conn->xmitu; done++) {
struct iovec *vp= &conn->xmit[done];
XmitDetails *dp= &conn->xmitd[done];
if (rs > vp->iov_len) {
if (art->state >= art_Wanted || (conn->stream && nocheck)) {
/* actually send it */
- ARTHANDLE *artdata= SMretrieve();
+ ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
if (conn->stream) {
if (artdata) {
XMIT_LITERAL("TAKETHIS ");
- xmit_noalloc(conn, art->mid, art->midlen);
+ xmit_noalloc(conn, art->messageid, art->midlen);
XMIT_LITERAL("\r\n");
xmit_artbody(conn, artdata);
}
art->state=
art->state == art_Unchecked ? art_Unsolicited :
art->state == art_Wanted ? art_Wanted :
- abort();
- art->ipf->counts[art->state].sent++;
+ (abort(),-1);
+ art->ipf->counts[art->state][RCI_sent]++;
LIST_ADDTAIL(conn->sent, art);
} else {
XMIT_LITERAL("IHAVE ");
else
XMIT_LITERAL("CHECK ");
- xmit_noalloc(art->mid, art->midlen);
+ xmit_noalloc(conn, art->messageid, art->midlen);
XMIT_LITERAL("\r\n");
assert(art->state == art_Unchecked);
- art->ipf->counts[art->state].sent++;
+ art->ipf->counts[art->state][RCI_sent]++;
LIST_ADDTAIL(conn->sent, art);
}
}
return OOP_CONTINUE;
}
-static Article *article_reply_check(Connection *conn, const char *response,
+static Article *article_reply_check(Conn *conn, const char *response,
int code_indicates_streaming,
int must_have_sent
/* 1:yes, -1:no, 0:dontcare */,
const char *sanitised_response) {
- Article *art= conn->sent.head;
+ Article *art= LIST_HEAD(conn->sent);
if (!art) {
connfail(conn,
if (code_indicates_streaming) {
assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
if (!conn->stream) {
- connfail("peer gave streaming response code "
+ connfail(conn, "peer gave streaming response code "
" to IHAVE or subsequent body: %s", sanitised_response);
return 0;
}
const char *got_mid= response+4;
int got_midlen= strcspn(got_mid, " \n\r");
if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
- connfail("peer gave streaming response with syntactically invalid"
+ connfail(conn, "peer gave streaming response with syntactically invalid"
" messageid: %s", sanitised_response);
return 0;
}
if (got_midlen != art->midlen ||
memcmp(got_mid, art->messageid, got_midlen)) {
- connfail("peer gave streaming response code to wrong article -"
+ connfail(conn, "peer gave streaming response code to wrong article -"
" probable synchronisation problem; we offered: %s;"
" peer said: %s",
art->messageid, sanitised_response);
}
} else {
if (conn->stream) {
- connfail("peer gave non-streaming response code to CHECK/TAKETHIS: %s",
- sanitised_response);
+ connfail(conn, "peer gave non-streaming response code to"
+ " CHECK/TAKETHIS: %s", sanitised_response);
return 0;
}
}
if (must_have_sent>0 && art->state < art_Wanted) {
- connfail("peer says article accepted but we had not sent the body: %s",
- sanitised_response);
+ connfail(conn, "peer says article accepted but"
+ " we had not sent the body: %s", sanitised_response);
return 0;
}
if (must_have_sent<0 && art->state >= art_Wanted) {
- connfail("peer says please sent the article but we just did: %s",
+ connfail(conn, "peer says please sent the article but we just did: %s",
sanitised_response);
return 0;
}
nocheck= new_nocheck;
}
-static void article_done(Connection *conn, Article *art, int whichcount) {
+static void article_done(Conn *conn, Article *art, int whichcount) {
art->ipf->counts[art->state][whichcount]++;
- if (whichcount == RC_accepted) update_nocheck(1);
- else if (whichcount == RC_unwanted) update_nocheck(0);
+ if (whichcount == RCI_accepted) update_nocheck(1);
+ else if (whichcount == RCI_unwanted) update_nocheck(0);
InputFile *ipf= art->ipf;
while (art->blanklen) {
const char *spec) {
#define RCI_NOTHING(x) /* nothing */
#define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
-#define RCI_TRIPLE_VALS(x) RCI_TRIPLE_VALS_BASE(ipf->counts, .x)
+#define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, .x)
info("processed %s%s offered=%d(ch%d,nc%d) accepted=%d(ch%d+nc%d)"
RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)