X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;f=backends%2Finnduct.c;h=421623914d6c41ac79461c83dee34020064b9f6e;hb=5d6e18436eb0f3711d9fe4027c881833b0d9490d;hp=b8124704b71b69de5ab752d361a97e9446c55b88;hpb=4e748bf495b4c5c0582b877aee531dec31fc2a36;p=inn-innduct.git diff --git a/backends/innduct.c b/backends/innduct.c index b812470..4216239 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -145,12 +145,15 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. * */ +/*============================== PROGRAM ==============================*/ + #define _GNU_SOURCE +#include "inn/list.h" #include "config.h" #include "storage.h" -#include "oop.h" -#include "oop-read.h" +#include "nntp.h" +#include "libinn.h" #include #include @@ -163,6 +166,13 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #include #include #include +#include +#include +#include +#include + +#include +#include /*----- general definitions, probably best not changed -----*/ @@ -174,8 +184,84 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #define INNDCOMMCHILD_ESTATUS_FAIL 6 #define INNDCOMMCHILD_ESTATUS_NONESUCH 7 +/*----- doubly linked lists -----*/ + +#define ISNODE(T) struct { T *succ, *pred; } node /* must be at start */ +#define DEFLIST(T) typedef struct { T *hd, *tl, *tp; int count; } T##List + +#define NODE(n) (assert((void*)&(n)->node == &(n)), \ + (struct node*)&(n)->node) + +#define LIST_CHECKCANHAVENODE(l,n) \ + ((void)((n) == ((l).hd))) /* just for the type check */ + +#define LIST_ADDSOMEHOW(l,n,list_addsomehow) \ + ( LIST_CHECKCANHAVENODE(l,n), \ + list_addsomehow((struct list*)&(l), NODE((n))), \ + (void)(l).count++ \ + ) + +#define LIST_REMSOMEHOW(l,list_remsomehow) \ + ( (typeof((l).hd)) \ + ( (l).count \ + ? ( (l).count--, \ + list_remsomehow((struct list*)&(l)) ) \ + : 0 \ + ) \ + ) + + +#define LIST_ADDHEAD(l,n) LIST_ADDSOMEHOW((l),(n),list_addhead) +#define LIST_ADDTAIL(l,n) LIST_ADDSOMEHOW((l),(n),list_addtail) +#define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead) +#define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail) + +#define LIST_HEAD(l) ((typeof((l).hd))(list_head((struct list*)&(l)))) +#define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n)))) +#define LIST_BACK(n) ((typeof(n))list_pred(NODE((n)))) + +#define LIST_REMOVE(l,n) \ + ( LIST_CHECKCANHAVENODE(l,n), \ + list_remove(NODE((n))), \ + (void)(l).count-- \ + ) + +#define LIST_INSERT(l,n,pred) \ + ( LIST_CHECKCANHAVENODE(l,n), \ + LIST_CHECKCANHAVENODE(l,pred), \ + list_insert((struct list*)&(l), NODE((n)), NODE((pred))), \ + (void)(l).count++ \ + ) + +/*----- type predeclarations -----*/ + typedef struct Conn Conn; typedef struct Article Article; +typedef struct InputFile InputFile; +typedef struct XmitDetails XmitDetails; +typedef enum StateMachineState StateMachineState; + +DEFLIST(Conn); +DEFLIST(Article); + +/*----- function predeclarations -----*/ + +static void conn_check_work(Conn *conn); +static void conn_make_some_xmits(Conn *conn); +static void *conn_write_some_xmits(Conn *conn); + +static void xmit_free(XmitDetails *d); + +static int filemon_init(void); +static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path); +static void filemon_callback(void); + +static void statemc_setstate(StateMachineState newsms, int periods, + const char *forlog, const char *why); +static void check_master_queue(void); + +static void postfork(const char *what); +static void postfork_inputfile(InputFile *ipf); /*----- configuration options -----*/ @@ -199,28 +285,6 @@ static double nocheck_decay; /* computed in main from _articles */ static int nocheck, nocheck_reported; -/*----- doubly linked lists -----*/ - -#define ISNODE(T) T *next, *back; -#define LIST(T) struct { T *head, *tail, *tailpred; int count; } - -#define NODE(n) ((struct node*)&(n)->head) - -#define LIST_ADDHEAD(l,n) \ - (list_addhead((struct list*)&(l), NODE((n))), (void)(l).count++) -#define LIST_ADDTAIL(l,n) \ - (list_addtail((struct list*)&(l), NODE((n))), (void)(l).count++) - -#define LIST_REMHEAD(l) \ - ((l).count ? ((l).count--, (void*)list_remhead((struct list*)&(l))) : 0) -#define LIST_REMTAIL(l) \ - ((l).count ? ((l).count--, (void*)list_remtail((struct list*)&(l))) : 0) -#define LIST_REMOVE(l,n) \ - (list_remove(NODE((n))), (void)(l).count--) -#define LIST_INSERT(l,n,pred) \ - (list_insert((struct list*)&(l), NODE((n)), NODE((pred))), (void)(l).count++) - - /*----- statistics -----*/ typedef enum { /* in queue in conn->sent */ @@ -240,7 +304,7 @@ typedef enum { /* in queue in conn->sent */ #define RCI_TRIPLE_FMT_BASE "%d(id%d+bd%d+nc%d)" #define RCI_TRIPLE_VALS_BASE(counts,x) \ - , counts[art_Unchecked] x \ + counts[art_Unchecked] x \ + counts[art_Wanted] x \ + counts[art_Unsolicited] x, \ counts[art_Unchecked] x \ @@ -262,19 +326,19 @@ typedef enum { xk_Malloc, xk_Const, xk_Artdata } XmitKind; -typedef struct { +struct XmitDetails { XmitKind kind; union { char *malloc_tofree; ARTHANDLE *sm_art; } info; -} XmitDetails; +}; /*----- core operational data structure types -----*/ -typedef struct InputFile { - /* This is an instance of struct oop_readable */ +struct InputFile { + /* This is also an instance of struct oop_readable */ struct oop_readable readable; /* first */ oop_readable_call *readable_callback; void *readable_callback_user; @@ -288,9 +352,10 @@ typedef struct InputFile { int counts[art_MaxState][RCI_max]; char path[]; -} InputFile; +}; struct Article { + ISNODE(Article); ArtState state; int midlen; InputFile *ipf; @@ -308,10 +373,10 @@ struct Article { X(DROPPING) \ X(DROPPED) -typedef enum { +enum StateMachineState { #define SMS_DEF_ENUM(s) sm_##s, SMS_LIST(SMS_DEF_ENUM) -} StateMachineState; +}; static const char *sms_names[]= { #define SMS_DEF_NAME(s) #s , @@ -322,8 +387,8 @@ static const char *sms_names[]= { struct Conn { ISNODE(Conn); int fd, max_queue, stream; - LIST(Article) queue; /* not yet told peer, or CHECK said send it */ - LIST(Article) sent; /* offered/transmitted - in xmit or waiting reply */ + ArticleList queue; /* not yet told peer, or CHECK said send it */ + ArticleList sent; /* offered/transmitted - in xmit or waiting reply */ struct iovec xmit[CONNIOVS]; XmitDetails xmitd[CONNIOVS]; int xmitu; @@ -334,9 +399,9 @@ struct Conn { static oop_source *loop; -static int nconns; -static LIST(Conn) idle, working, full; -static LIST(Article) *queue; +static int until_connect; +static ConnList conns; +static ArticleList queue; static char *path_lock, *path_flushing, *path_defer; @@ -349,17 +414,6 @@ static InputFile *main_input_file, *flushing_input_file, *backlog_input_file; static int sm_period_counter; -/*----- function predeclarations -----*/ - -static void conn_check_work(Conn *conn); - -static int filemon_init(void); -static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path); -static void filemon_callback(void); - -static void statemc_setstate(StateMachineState newsms, int periods, - const char *forlog, const char *why); - /*========== logging ==========*/ static void logcore(int sysloglevel, const char *fmt, ...) @@ -381,7 +435,7 @@ static void logv(int sysloglevel, const char *pfx, int errnoval, __attribute__((__format__(printf,5,0))); static void logv(int sysloglevel, const char *pfx, int errnoval, int exitstatus, const char *fmt, va_list al) { - char msgbuf[256]; + char msgbuf[256]; /* NB do not call xvasprintf here or you'll recurse */ vsnprintf(msgbuf,sizeof(msgbuf), fmt,al); msgbuf[sizeof(msgbuf)-1]= 0; @@ -419,15 +473,26 @@ logwrap(debug, " debug", LOG_DEBUG, -1, 0); /*========== utility functions etc. ==========*/ -static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } } - -static void *xmalloc(size_t sz) { - if (!sz) return 0; - void *r= malloc(sz); - if (r) return r; - sysdie("malloc (%ld bytes) failed", (unsigned long)sz); +static char *xvasprintf(const char *fmt, va_list al) + __attribute__((__format__(printf,1,0))); +static char *xvasprintf(const char *fmt, va_list al) { + char *str; + int rc= vasprintf(&str,fmt,al); + if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt); + return str; +} +static char *xasprintf(const char *fmt, ...) + __attribute__((__format__(printf,1,2))); +static char *xasprintf(const char *fmt, ...) { + va_list al; + va_start(al,fmt); + char *str= xvasprintf(fmt,al); + va_end(al); + return str; } +static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } } + static pid_t xfork(const char *what) { pid_t child; @@ -493,7 +558,7 @@ static void check_isreg(const struct stat *stab, const char *path, } static void xfstat(int fd, struct stat *stab_r, const char *what) { - int r= fstab(fd, stab_r); + int r= fstat(fd, stab_r); if (r) sysdie("could not fstat %s", what); } @@ -515,6 +580,13 @@ static void xlstat_isreg(const char *path, struct stat *stab, check_isreg(stab, path, what); } +static void setnonblock(int fd, int nonblocking) { + int r= fcntl(fd, F_GETFL); if (r<0) sysdie("setnonblocking fcntl F_GETFL"); + if (nonblocking) r |= O_NONBLOCK; + else r &= ~O_NONBLOCK; + r= fcntl(fd, F_SETFL, r); if (r<0) sysdie("setnonblocking fcntl F_SETFL"); +} + static int samefile(const struct stat *a, const struct stat *b) { assert(S_ISREG(a->st_mode)); assert(S_ISREG(b->st_mode)); @@ -522,6 +594,23 @@ static int samefile(const struct stat *a, const struct stat *b) { a->st_dev == b->st_dev); } +static char *sanitise(const char *input) { + static char sanibuf[100]; /* returns pointer to this buffer! */ + + const char *p= input; + char *q= sanibuf; + *q++= '`'; + for (;;) { + if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"'.."); break; } + int c= *p++; + if (!c) { *q++= '\''; *q=0; break; } + if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; } + sprintf(q,"\\x%02x",c); + q += 4; + } + return sanibuf; +} + /*========== making new connections ==========*/ static int connecting_sockets[2]= {-1,-1}; @@ -529,7 +618,7 @@ static pid_t connecting_child; static void connect_attempt_discard(void) { if (connecting_sockets[0]) - cancel_fd(connecting_sockets[0]); + cancel_fd_read_except(connecting_sockets[0]); perhaps_close(&connecting_sockets[0]); perhaps_close(&connecting_sockets[1]); @@ -569,11 +658,11 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { assert(got==connecting_child); connecting_child= 0; if (WIFEXITED(status) && - (WEXITSTATUS(status) != 0 + (WEXITSTATUS(status) != 0 && WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)) { /* child already reported the problem */ - } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALARM) { + } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { warn("connect: connection attempt timed out"); } else if (!WIFEXITED(status)) { report_child_status("connect", status); @@ -583,7 +672,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { /* child is still running apparently, report the socket problem */ if (rs < 0) syswarn("connect: read from child socket failed"); - else if (e == OOP_EXCEPTIONN) + else if (e == OOP_EXCEPTION) warn("connect: unexpected exception on child socket"); else warn("connect: unexpected EOF on child socket"); @@ -598,13 +687,14 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { } CHK(level, SOL_SOCKET); CHK(type, SCM_RIGHTS); - CHK(len, CMSG_LEN(sizeof(conn-b>fd))); + CHK(len, CMSG_LEN(sizeof(conn->fd))); #undef CHK - if (CMSG_NXTHDR,&msg,h) { die("connect: child sent many cmsgs"); goto x; } + if (CMSG_NXTHDR(&msg,h)) { die("connect: child sent many cmsgs"); goto x; } memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd)); + int status; pid_t got= waitpid(connecting_child, &status, 0); if (got==-1) sysdie("connect: real wait for child"); assert(got == connecting_child); @@ -619,10 +709,10 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { fatal("connect: child gave unexpected exit status %d", es); } - set nonblocking; - /* Phew! */ - LIST_ADDHEAD(idle, conn); + setnonblock(conn->fd, 1); + conn->max_queue= conn->stream ? max_queue_per_conn : 1; + LIST_ADDHEAD(conns, conn); notice("#%d connected %s", conn->fd, conn->stream ? "streaming" : "plain"); connect_attempt_discard(); check_master_queue(); @@ -643,7 +733,7 @@ static void connect_start(void) { notice("starting connection attempt"); - r= socketpair(AF_UNIX, SOCK_STREAM, 0, connecting_sockets); + int r= socketpair(AF_UNIX, SOCK_STREAM, 0, connecting_sockets); if (r) { syswarn("connect: cannot create socketpair for child"); goto x; } connecting_child= xfork("connection"); @@ -657,15 +747,11 @@ static void connect_start(void) { if (r) sysdie("connect: close parent socket in child"); alarm(connection_setup_timeout); - if (NNTPconnect(remote_host, port, &cn_from, &cn_to, buf) < 0) { - if (buf[0]) { - sanitise_inplace(buf); - fatal("connect: rejected: %s", buf); - } else { - sysfatal("connect: connection attempt failed"); - } + if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) { + if (buf[0]) fatal("connect: rejected: %s", sanitise(buf)); + else sysfatal("connect: connection attempt failed"); } - if (NNTPsendpassword(remote_host, cn_from, cn_to) < 0) + if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0) sysfatal("connect: authentication failed"); if (try_stream) { if (fputs("MODE STREAM\r\n", cn_to) || @@ -680,19 +766,16 @@ static void connect_start(void) { } int l= strlen(buf); assert(l>=1); - if (buf[-1]!='\n') { - sanitise_inplace(buf); + if (buf[-1]!='\n') fatal("connect: response to MODE STREAM is too long: %.100s...", - remote_host, buf); - } - l--; if (l>0 && buf[1-]=='\r') l--; + remote_host, sanitise(buf)); + l--; if (l>0 && buf[l-1]=='\r') l--; buf[l]= 0; char *ep; int rcode= strtoul(buf,&ep,10); - if (ep != buf[3]) { - sanitise_inplace(buf); - fatal("connect: bad response to MODE STREAM: %.50s", buf); - } + if (ep != &buf[3]) + fatal("connect: bad response to MODE STREAM: %.50s", sanitise(buf)); + switch (rcode) { case 203: exitstatus= CONNCHILD_ESTATUS_STREAM; @@ -701,8 +784,8 @@ static void connect_start(void) { case 500: break; default: - sanitise_inplace(buf); - warn("connect: unexpected response to MODE STREAM: %.50s", buf); + warn("connect: unexpected response to MODE STREAM: %.50s", + sanitise(buf)); exitstatus= 2; break; } @@ -727,7 +810,7 @@ static void connect_start(void) { if (r) sysdie("connect: close child socket in parent"); on_fd_read_except(connecting_sockets[0], connchild_event); - return OOP_CONTINUE; + return; x: connect_attempt_discard(); @@ -737,57 +820,40 @@ static void connect_start(void) { /*========== overall control of article flow ==========*/ static void check_master_queue(void) { - if (!queue.count) - return; - - Conn *last_assigned=0; for (;;) { - if (working.head) { - conn_assign_one_article(&working, &last_assigned); - } else if (idle.head) { - conn_assign_one_article(&idle, &last_assigned); - } else if (nconns < maxconns && queue.count >= max_queue_per_conn && - !connecting_child && !connect_delay) { - connect_delay= reconnect_delay_periods; + if (!queue.count) + break; + + Conn *walk, *use=0; + int spare; + for (walk=LIST_HEAD(conns); walk; walk=LIST_NEXT(walk)) { + int inqueue= walk->sent.count + walk->queue.count; + spare= walk->max_queue - inqueue; + assert(inqueue <= max_queue_per_conn); + assert(spare >= 0); + if (inqueue==0) /*idle*/ { if (!use) use= walk; } + else if (spare>0) /*working*/ { use= walk; break; } + } + if (use) { + while (spare>0) { + Article *art= LIST_REMHEAD(queue); + LIST_ADDTAIL(use->queue, art); + spare--; + } + conn_check_work(use); + } else if (conns.count < max_connections && + !connecting_child && !until_connect) { + until_connect= reconnect_delay_periods; connect_start(); + break; } else { break; } } - conn_check_work(last_assigned); -} - -static void conn_assign_one_article(LIST(Conn) *connlist, - Conn **last_assigned) { - Conn *conn= connlist->head; - - LIST_REMOVE(*connlist, conn); - Article *art= LIST_REMHEAD(queue); - LIST_ADDTAIL(conn->queue, art); - LIST_ADD(*conn_determine_right_list(conn), conn); - - /* This slightly odd arrangement is so that we call conn_check_work - * once after filling the queue for a new connection in - * check_master_queue, rather than for each article. */ - if (conn != *last_assigned && *last_assigned) - conn_check_work(*last_assigned); - *last_assigned= conn; -} - -static int conn_total_queued_articles(Conn *conn) { - return conn->sent.count + conn->queue.count; -} - -static LIST(Conn) *conn_determine_right_list(Conn *conn) { - int inqueue= conn_total_queued_articles(conn); - assert(inqueue <= max_queue); - if (inqueue == 0) return &idle; - if (inqueue == conn->max_queue) return &full; - return &working; } -static void *conn_writeable(oop_source *l, int fd, int ev, void *u) { - check_conn_work(u); +static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) { + conn_check_work(u); return OOP_CONTINUE; } @@ -815,42 +881,43 @@ static void conn_check_work(Conn *conn) { } static void vconnfail(Conn *conn, const char *fmt, va_list al) - __attribute__((printf,2,0)); + __attribute__((__format__(printf,2,0))); static void vconnfail(Conn *conn, const char *fmt, va_list al) { int requeue[art_MaxState]; Article *art; - while ((art= LIST_REMHEAD(conn->queue))) LIST_ADDTAIL(queue); + while ((art= LIST_REMHEAD(conn->queue))) LIST_ADDTAIL(queue, art); while ((art= LIST_REMHEAD(conn->sent))) { - counts[art->state]++; + requeue[art->state]++; if (art->state==art_Unsolicited) art->state= art_Unchecked; - LIST_ADDTAIL(queue); + LIST_ADDTAIL(queue,art); } int i; - XmitDetails *xd; - for (i=0, dp=&conn->xmitd; ixmitu; i++, dp++) - xmit_free(dp); + XmitDetails *d; + for (i=0, d=conn->xmitd; ixmitu; i++, d++) + xmit_free(d); char *m= xvasprintf(fmt,al); warn("#%d connection failed, requeueing " RCI_TRIPLE_FMT_BASE ": %s", - conn->fd, RCI_TRIPLE_FMT_VALS(requeue, /*nothing*/), m); + conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); free(m); close(conn->fd); + LIST_REMOVE(conns,conn); free(conn); - connect_delay= reconnect_delay_periods; + until_connect= reconnect_delay_periods; check_master_queue(); } -static void connfail(Connection *conn, const char *fmt, ...) - __attribute__((printf,2,3)); -static void connfail(Connection *conn, const char *fmt, ...) { +static void connfail(Conn *conn, const char *fmt, ...) + __attribute__((__format__(printf,2,3))); +static void connfail(Conn *conn, const char *fmt, ...) { va_list al; va_start(al,fmt); - vconnfail(fmt,al); + vconnfail(conn,fmt,al); va_end(al); } @@ -860,7 +927,7 @@ static XmitDetails *xmit_core(Conn *conn, const char *data, int len, XmitKind kind) { /* caller must then fill in details */ struct iovec *v= &conn->xmit[conn->xmitu]; XmitDetails *d= &conn->xmitd[conn->xmitu++]; - v->iov_base= data; + v->iov_base= (char*)data; v->iov_len= len; d->kind= kind; return d; @@ -872,7 +939,7 @@ static void xmit_noalloc(Conn *conn, const char *data, int len) { #define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1)) static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) { - XmitDetails *d= xmit_core(conn, ah->data, ah->len, sk_Artdata); + XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata); d->info.sm_art= ah; } @@ -904,7 +971,8 @@ static void *conn_write_some_xmits(Conn *conn) { } assert(rs > 0); - for (done=0; rs && donexmitu; done++) { struct iovec *vp= &conn->xmit[done]; XmitDetails *dp= &conn->xmitd[done]; if (rs > vp->iov_len) { @@ -933,12 +1001,12 @@ static void conn_make_some_xmits(Conn *conn) { if (art->state >= art_Wanted || (conn->stream && nocheck)) { /* actually send it */ - ARTHANDLE *artdata= SMretrieve(); + ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL); if (conn->stream) { if (artdata) { XMIT_LITERAL("TAKETHIS "); - xmit_noalloc(conn, art->mid, art->midlen); + xmit_noalloc(conn, art->messageid, art->midlen); XMIT_LITERAL("\r\n"); xmit_artbody(conn, artdata); } @@ -954,8 +1022,8 @@ static void conn_make_some_xmits(Conn *conn) { art->state= art->state == art_Unchecked ? art_Unsolicited : art->state == art_Wanted ? art_Wanted : - abort(); - art->ipf->counts[art->state].sent++; + (abort(),-1); + art->ipf->counts[art->state][RCI_sent]++; LIST_ADDTAIL(conn->sent, art); } else { @@ -965,11 +1033,11 @@ static void conn_make_some_xmits(Conn *conn) { XMIT_LITERAL("IHAVE "); else XMIT_LITERAL("CHECK "); - xmit_noalloc(art->mid, art->midlen); + xmit_noalloc(conn, art->messageid, art->midlen); XMIT_LITERAL("\r\n"); assert(art->state == art_Unchecked); - art->ipf->counts[art->state].sent++; + art->ipf->counts[art->state][RCI_sent]++; LIST_ADDTAIL(conn->sent, art); } } @@ -992,12 +1060,12 @@ static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_event ev, return OOP_CONTINUE; } -static Article *article_reply_check(Connection *conn, const char *response, +static Article *article_reply_check(Conn *conn, const char *response, int code_indicates_streaming, int must_have_sent /* 1:yes, -1:no, 0:dontcare */, const char *sanitised_response) { - Article *art= conn->sent.head; + Article *art= LIST_HEAD(conn->sent); if (!art) { connfail(conn, @@ -1009,20 +1077,20 @@ static Article *article_reply_check(Connection *conn, const char *response, if (code_indicates_streaming) { assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */ if (!conn->stream) { - connfail("peer gave streaming response code " + connfail(conn, "peer gave streaming response code " " to IHAVE or subsequent body: %s", sanitised_response); return 0; } const char *got_mid= response+4; int got_midlen= strcspn(got_mid, " \n\r"); if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') { - connfail("peer gave streaming response with syntactically invalid" + connfail(conn, "peer gave streaming response with syntactically invalid" " messageid: %s", sanitised_response); return 0; } if (got_midlen != art->midlen || memcmp(got_mid, art->messageid, got_midlen)) { - connfail("peer gave streaming response code to wrong article -" + connfail(conn, "peer gave streaming response code to wrong article -" " probable synchronisation problem; we offered: %s;" " peer said: %s", art->messageid, sanitised_response); @@ -1030,19 +1098,19 @@ static Article *article_reply_check(Connection *conn, const char *response, } } else { if (conn->stream) { - connfail("peer gave non-streaming response code to CHECK/TAKETHIS: %s", - sanitised_response); + connfail(conn, "peer gave non-streaming response code to" + " CHECK/TAKETHIS: %s", sanitised_response); return 0; } } if (must_have_sent>0 && art->state < art_Wanted) { - connfail("peer says article accepted but we had not sent the body: %s", - sanitised_response); + connfail(conn, "peer says article accepted but" + " we had not sent the body: %s", sanitised_response); return 0; } if (must_have_sent<0 && art->state >= art_Wanted) { - connfail("peer says please sent the article but we just did: %s", + connfail(conn, "peer says please sent the article but we just did: %s", sanitised_response); return 0; } @@ -1065,10 +1133,10 @@ static void update_nocheck(int accepted) { nocheck= new_nocheck; } -static void article_done(Connection *conn, Article *art, int whichcount) { +static void article_done(Conn *conn, Article *art, int whichcount) { art->ipf->counts[art->state][whichcount]++; - if (whichcount == RC_accepted) update_nocheck(1); - else if (whichcount == RC_unwanted) update_nocheck(0); + if (whichcount == RCI_accepted) update_nocheck(1); + else if (whichcount == RCI_unwanted) update_nocheck(0); InputFile *ipf= art->ipf; while (art->blanklen) { @@ -1109,22 +1177,12 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, } assert(ev == OOP_RD_OK); + char *sani= sanitise(data, recsz); + char *ep; unsigned long code= strtoul(data, &ep, 10); if (ep != data+3 || *ep != ' ' || data[0]=='0') { - char sanibuf[100]; - const char *p= data; - char *q= sanibuf; - *q++= '`'; - for (;;) { - if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"..."); break; } - int c= *p++; - if (!c) { *q++= '\''; break; } - if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; } - sprintf(q,"\\x%02x",c); - q += 4; - } - connfail(conn, "badly formatted response from peer: %s", sanibuf); + connfail(conn, "badly formatted response from peer: %s", sani); return OOP_CONTINUE; } @@ -1746,7 +1804,7 @@ static void notice_processed(InputFile *ipf, const char *what, const char *spec) { #define RCI_NOTHING(x) /* nothing */ #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE -#define RCI_TRIPLE_VALS(x) RCI_TRIPLE_VALS_BASE(ipf->counts, .x) +#define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, .x) info("processed %s%s offered=%d(ch%d,nc%d) accepted=%d(ch%d+nc%d)" RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT) @@ -2143,13 +2201,6 @@ static void postfork_inputfile(InputFile *ipf) { ipf->fd= -1; } -static void postfork_conns(Connection *conn) { - while (conn) { - close(conn->fd); - conn= conn->next; - } -} - static void postfork_stdio(FILE *f) { /* we have no stdio streams that are buffered long-term */ if (f) fclose(f); @@ -2161,9 +2212,11 @@ static void postfork(const char *what) { postfork_inputfile(main_input_file); postfork_inputfile(flushing_input_file); - postfork_conns(idle.head); - postfork_conns(working.head); - postfork_conns(full.head); + + Conn *conn; + for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) + close(conn->fd); + postfork_stdio(defer); } @@ -2195,18 +2248,17 @@ static const char *debug_ipf_path(InputFile *ipf) { EVERY(period, {PERIOD_SECONDS,0}, { debug("PERIOD" - " sms=%s[%d] queue=%d connect_delay=%d" + " sms=%s[%d] conns=%d queue=%d until_connect=%d" " input_files" DEBUGF_IPF(main) DEBUGF_IPF(old) DEBUGF_FMT(flushing) - " conns idle=%d working=%d full=%d" " children connecting=%ld inndcomm_child" , - sms_names[sms], sm_period_counter, queue.count, connect_delay, + sms_names[sms], sm_period_counter, + queue.count, conns.count, until_connect, DEBUG_IPF(main), DEBUG_IPF(flushing), DEBUG_IPF(flushing), - idle.count, working.count, full.count, (long)connecting_child, (long)inndcomm_child ); - if (connect_delay) connect_delay--; + if (until_connect) until_connect--; poll_backlog_file(); if (!backlog_input_file) close_defer(); /* want to start on a new backlog */