X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;f=backends%2Finnduct.c;h=385c6416cb59b7fa1f8f850c5613c17f7dffb03b;hb=a4a53bbbbcca1a6a6022d1efd7d02dd49be2f6f2;hp=1328d372f8d8390cb822f734a2476f044b1c6404;hpb=599596434b5b74c0280006508d195d69e71d0814;p=innduct.git diff --git a/backends/innduct.c b/backends/innduct.c index 1328d37..385c641 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,8 +1,5 @@ /* * TODO - * - check all structs initialised - * - check all fd watches properly undone - * - check all init functions called * - actually implement badusage * - options for all options * - manpage @@ -238,6 +235,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead) #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail) +#define LIST_INIT(l) ((l).hd, list_new((struct list*)&(l))) #define LIST_HEAD(l) ((typeof((l).hd))(list_head((struct list*)&(l)))) #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n)))) #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n)))) @@ -275,12 +273,15 @@ static void *conn_write_some_xmits(Conn *conn); static void xmit_free(XmitDetails *d); +#define SMS(newstate, periods, why) \ + (statemc_setstate(sm_##newstate,(periods),#newstate,(why))) static void statemc_setstate(StateMachineState newsms, int periods, const char *forlog, const char *why); + static void statemc_start_flush(const char *why); /* Normal => Flushing */ static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */ -static void check_master_queue(void); +static void check_assign_articles(void); static void queue_check_input_done(void); static void statemc_check_flushing_done(void); @@ -293,10 +294,9 @@ static void open_defer(void); static void close_defer(void); static void search_backlog_file(void); -static void inputfile_tailing_start(InputFile *ipf); -static void inputfile_tailing_stop(InputFile *ipf); +static void inputfile_reading_start(InputFile *ipf); +static void inputfile_reading_stop(InputFile *ipf); -static int filemon_init(void); static void filemon_start(InputFile *ipf); static void filemon_stop(InputFile *ipf); static void filemon_callback(InputFile *ipf); @@ -394,7 +394,7 @@ struct InputFile { int fd; Filemon_Perfile *filemon; - oop_read *rd; + oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */ long inprogress; /* no. of articles read but not processed */ off_t offset; int skippinglong; @@ -436,7 +436,8 @@ static const char *sms_names[]= { struct Conn { ISNODE(Conn); - int fd, max_queue, stream, quitting; + int fd; /* may be 0, meaning closed (during construction/destruction) */ + int max_queue, stream, quitting; int since_activity; /* periods */ ArticleList waiting; /* not yet told peer */ ArticleList priority; /* peer says send it now */ @@ -447,24 +448,22 @@ struct Conn { }; -/*----- operational variables -----*/ +/*----- general operational variables -----*/ +/* main initialises */ static oop_source *loop; - -static int until_connect; static ConnList conns; static ArticleList queue; - static char *path_lock, *path_flushing, *path_defer, *globpat_backlog; -#define SMS(newstate, periods, why) \ - (statemc_setstate(sm_##newstate,(periods),#newstate,(why))) - +/* statemc_init initialises */ static StateMachineState sms; static FILE *defer; static InputFile *main_input_file, *flushing_input_file, *backlog_input_file; -static int sm_period_counter, until_backlog_nextscan; +static int sm_period_counter; +/* initialisation to 0 is good */ +static int until_connect, until_backlog_nextscan; static double accept_proportion; static int nocheck, nocheck_reported; @@ -546,7 +545,21 @@ static char *xasprintf(const char *fmt, ...) { return str; } -static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } } +static int close_perhaps(int *fd) { + if (!*fd) return 0; + int r= close(*fd); + *fd=0; + return r; +} +static void xclose(int fd, const char *what, const char *what2) { + int r= close(fd); + if (r) sysdie("close %s%s",what,what2?what2:""); +} +static void xclose_perhaps(int *fd, const char *what, const char *what2) { + if (!*fd) return; + xclose(*fd,what,what2); + *fd=0; +} static pid_t xfork(const char *what) { pid_t child; @@ -675,15 +688,115 @@ static int isewouldblock(int errnoval) { return errnoval==EWOULDBLOCK || errnoval==EAGAIN; } -/*========== making new connections ==========*/ +/*========== management of connections ==========*/ static void conn_dispose(Conn *conn) { if (!conn) return; - perhaps_close(&conn->fd); + if (conn->fd) { + loop->cancel_fd(loop, conn->fd, OOP_WRITE); + loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION); + } + int r= close_perhaps(&conn->fd); + if (r) info("C%d error closing socket: %s", conn->fd, strerror(errno)); free(conn); until_connect= reconnect_delay_periods; } +static void *conn_exception(oop_source *lp, int fd, + oop_event ev, void *conn_v) { + Conn *conn= conn_v; + unsigned char ch; + assert(fd == conn->fd); + assert(ev == OOP_EXCEPTION); + int r= read(conn->fd, &ch, 1); + if (r<0) connfail(conn,"read failed: %s",strerror(errno)); + else connfail(conn,"exceptional condition on socket (peer sent urgent" + " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch); + return OOP_CONTINUE; +} + +static void vconnfail(Conn *conn, const char *fmt, va_list al) { + int requeue[art_MaxState]; + + Article *art; + while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art); + while ((art= LIST_REMHEAD(conn->waiting))) LIST_ADDTAIL(queue, art); + while ((art= LIST_REMHEAD(conn->sent))) { + requeue[art->state]++; + if (art->state==art_Unsolicited) art->state= art_Unchecked; + LIST_ADDTAIL(queue,art); + } + + int i; + XmitDetails *d; + for (i=0, d=conn->xmitd; ixmitu; i++, d++) + xmit_free(d); + + char *m= xvasprintf(fmt,al); + warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s", + conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); + free(m); + + LIST_REMOVE(conns,conn); + conn_dispose(conn); + check_assign_articles(); +} + +static void connfail(Conn *conn, const char *fmt, ...) { + va_list al; + va_start(al,fmt); + vconnfail(conn,fmt,al); + va_end(al); +} + +static void check_idle_conns(void) { + Conn *conn; + for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) + conn->since_activity++; + search_again: + for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) { + if (conn->since_activity <= need_activity_periods) continue; + + /* We need to shut this down */ + if (conn->quitting) + connfail(conn,"timed out waiting for response to QUIT"); + else if (conn->sent.count) + connfail(conn,"timed out waiting for responses"); + else if (conn->waiting.count || conn->priority.count) + connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent"); + else if (conn->xmitu) + connfail(conn,"peer has been sending responses" + " before receiving our commands!"); + else { + static const char quitcmd[]= "QUIT\r\n"; + int todo= sizeof(quitcmd)-1; + const char *p= quitcmd; + for (;;) { + int r= write(conn->fd, p, todo); + if (r<0) { + if (isewouldblock(errno)) + connfail(conn, "blocked writing QUIT to idle connection"); + else + connfail(conn, "failed to write QUIT to idle connection: %s", + strerror(errno)); + break; + } + assert(r<=todo); + todo -= r; + if (!todo) { + conn->quitting= 1; + conn->since_activity= 0; + debug("C%d is idle, quitting", conn->fd); + break; + } + } + } + goto search_again; + } +} + +/*---------- making new connections ----------*/ + static int connecting_sockets[2]= {-1,-1}; static pid_t connecting_child; @@ -691,8 +804,8 @@ static void connect_attempt_discard(void) { if (connecting_sockets[0]) cancel_fd_read_except(connecting_sockets[0]); - perhaps_close(&connecting_sockets[0]); - perhaps_close(&connecting_sockets[1]); + xclose_perhaps(&connecting_sockets[0], "connecting socketpair (read)",0); + xclose_perhaps(&connecting_sockets[1], "connecting socketpair (write)",0); if (connecting_child) { int r= kill(connecting_child, SIGTERM); @@ -728,16 +841,18 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { if (got != -1) { assert(got==connecting_child); connecting_child= 0; - if (WIFEXITED(status) && - (WEXITSTATUS(status) != 0 && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)) { - /* child already reported the problem */ + if (WIFEXITED(status)) { + if (WEXITSTATUS(status) != 0 && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM) + /* child already reported the problem */; + else + warn("connect: connection child exited code %d but no cmsg", + WEXITSTATUS(status)); } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { warn("connect: connection attempt timed out"); - } else if (!WIFEXITED(status)) { + } else { report_child_status("connect", status); - /* that's probably the root cause then */ } } else { /* child is still running apparently, report the socket problem */ @@ -745,8 +860,10 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { syswarn("connect: read from child socket failed"); else if (e == OOP_EXCEPTION) warn("connect: unexpected exception on child socket"); - else + else if (!rs) warn("connect: unexpected EOF on child socket"); + else + fatal("connect: unexpected lack of cmsg from child"); } goto x; } @@ -761,9 +878,10 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { CHK(len, CMSG_LEN(sizeof(conn->fd))); #undef CHK - if (CMSG_NXTHDR(&msg,h)) { die("connect: child sent many cmsgs"); goto x; } + if (CMSG_NXTHDR(&msg,h)) die("connect: child sent many cmsgs"); memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd)); + loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn); int status; pid_t got= waitpid(connecting_child, &status, 0); @@ -781,12 +899,16 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { } /* Phew! */ + LIST_INIT(conn->waiting); + LIST_INIT(conn->priority); + LIST_INIT(conn->sent); setnonblock(conn->fd, 1); conn->max_queue= conn->stream ? max_queue_per_conn : 1; - LIST_ADDHEAD(conns, conn); notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain"); + LIST_ADDHEAD(conns, conn); + connect_attempt_discard(); - check_master_queue(); + check_assign_articles(); return 0; x: @@ -890,55 +1012,9 @@ static void connect_start(void) { connect_attempt_discard(); } -static void check_idle_conns(void) { - Conn *conn; - for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) - conn->since_activity++; - search_again: - for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) { - if (conn->since_activity <= need_activity_periods) continue; +/*---------- assigning articles to conns, and transmitting ----------*/ - /* We need to shut this down */ - if (conn->quitting) - connfail(conn,"timed out waiting for response to QUIT"); - else if (conn->sent.count) - connfail(conn,"timed out waiting for responses"); - else if (conn->waiting.count || conn->priority.count) - connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent"); - else if (conn->xmitu) - connfail(conn,"peer has been sending responses" - " before receiving our commands!"); - else { - static const char quitcmd[]= "QUIT\r\n"; - int todo= sizeof(quitcmd)-1; - const char *p= quitcmd; - for (;;) { - int r= write(conn->fd, p, todo); - if (r<0) { - if (isewouldblock(errno)) - connfail(conn, "blocked writing QUIT to idle connection"); - else - connfail(conn, "failed to write QUIT to idle connection: %s", - strerror(errno)); - break; - } - assert(r<=todo); - todo -= r; - if (!todo) { - conn->quitting= 1; - conn->since_activity= 0; - debug("C%d is idle, quitting", conn->fd); - break; - } - } - } - goto search_again; - } -} - -/*========== overall control of article flow ==========*/ - -static void check_master_queue(void) { +static void check_assign_articles(void) { for (;;) { if (!queue.count) break; @@ -1008,40 +1084,6 @@ static void conn_maybe_write(Conn *conn) { } } -static void vconnfail(Conn *conn, const char *fmt, va_list al) { - int requeue[art_MaxState]; - - Article *art; - while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art); - while ((art= LIST_REMHEAD(conn->waiting))) LIST_ADDTAIL(queue, art); - while ((art= LIST_REMHEAD(conn->sent))) { - requeue[art->state]++; - if (art->state==art_Unsolicited) art->state= art_Unchecked; - LIST_ADDTAIL(queue,art); - } - - int i; - XmitDetails *d; - for (i=0, d=conn->xmitd; ixmitu; i++, d++) - xmit_free(d); - - char *m= xvasprintf(fmt,al); - warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s", - conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); - free(m); - - LIST_REMOVE(conns,conn); - conn_dispose(conn); - check_master_queue(); -} - -static void connfail(Conn *conn, const char *fmt, ...) { - va_list al; - va_start(al,fmt); - vconnfail(conn,fmt,al); - va_end(al); -} - /*========== article transmission ==========*/ static XmitDetails *xmit_core(Conn *conn, const char *data, int len, @@ -1261,6 +1303,7 @@ static void article_done(Conn *conn, Article *art, int whichcount) { else if (whichcount == RC_unwanted) update_nocheck(0); InputFile *ipf= art->ipf; + while (art->blanklen) { static const char spaces[]= " " @@ -1281,11 +1324,10 @@ static void article_done(Conn *conn, Article *art, int whichcount) { ipf->inprogress--; assert(ipf->inprogress >= 0); + free(art); if (!ipf->inprogress && ipf != main_input_file) queue_check_input_done(); - - free(art); } static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, @@ -1378,7 +1420,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, } conn_maybe_write(conn); - check_master_queue(); + check_assign_articles(); return OOP_CONTINUE; } @@ -1387,15 +1429,11 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, static void feedfile_eof(InputFile *ipf) { assert(ipf != main_input_file); /* promised by tailing_try_read */ - - inputfile_tailing_stop(ipf); - assert(ipf->fd >= 0); - if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path); - ipf->fd= -1; + inputfile_reading_stop(ipf); if (ipf == flushing_input_file) { assert(sms==sm_SEPARATED || sms==sm_DROPPING); - if (main_input_file) inputfile_tailing_start(main_input_file); + if (main_input_file) inputfile_reading_start(main_input_file); statemc_check_flushing_done(); } else if (ipf == backlog_input_file) { statemc_check_backlog_done(); @@ -1410,6 +1448,7 @@ static InputFile *open_input_file(const char *path) { if (errno==ENOENT) return 0; sysfatal("unable to open input file %s", path); } + assert(fd>0); InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1); memset(ipf,0,sizeof(*ipf)); @@ -1420,14 +1459,12 @@ static InputFile *open_input_file(const char *path) { return ipf; } -static void close_input_file(InputFile *ipf) { +static void close_input_file(InputFile *ipf) { /* does not free */ assert(!ipf->readable_callback); /* must have had ->on_cancel */ - assert(!ipf->filemon); /* must have had inputfile_tailing_stop */ - assert(!ipf->rd); /* must have had inputfile_tailing_stop */ + assert(!ipf->filemon); /* must have had inputfile_reading_stop */ + assert(!ipf->rd); /* must have had inputfile_reading_stop */ assert(!ipf->inprogress); /* no dangling pointers pointing here */ - - if (ipf->fd >= 0) - if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path); + xclose_perhaps(&ipf->fd, "input file ", ipf->path); } @@ -1507,12 +1544,12 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, ipf->readcount_ok++; art= xmalloc(sizeof(*art) - 1 + midlen + 1); - art->offset= ipf->offset; - art->blanklen= recsz; - art->midlen= midlen; art->state= art_Unchecked; + art->midlen= midlen; art->ipf= ipf; ipf->inprogress++; art->token= TextToToken(tokentextbuf); + art->offset= ipf->offset; + art->blanklen= recsz; strcpy(art->messageid, space+1); LIST_ADDTAIL(queue, art); @@ -1520,7 +1557,7 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, ipf->offset >= target_max_feedfile_size) statemc_start_flush("feed file size"); - check_master_queue(); + check_assign_articles(); return OOP_CONTINUE; } @@ -1574,10 +1611,10 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, errno=EAGAIN; return -1; } else if (ipf==flushing_input_file) { - assert(ipf->fd>=0); + assert(ipf->rd); assert(sms==sm_SEPARATED || sms==sm_DROPPING); } else if (ipf==backlog_input_file) { - assert(ipf->fd>=0); + assert(ipf->rd); } else { abort(); } @@ -1707,8 +1744,8 @@ static const oop_rd_style feedfile_rdstyle= { OOP_RD_SHORTREC_LONG, }; -static void inputfile_tailing_start(InputFile *ipf) { - assert(!ipf->fd); +static void inputfile_reading_start(InputFile *ipf) { + assert(!ipf->rd); ipf->readable.on_readable= tailing_on_readable; ipf->readable.on_cancel= tailing_on_cancel; ipf->readable.try_read= tailing_try_read; @@ -1719,15 +1756,15 @@ static void inputfile_tailing_start(InputFile *ipf) { ipf->readable_callback_user= 0; ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0); - assert(ipf->fd); + assert(ipf->rd); int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE, feedfile_got_article,ipf, feedfile_read_err, ipf); if (r) sysdie("unable start reading feedfile %s",ipf->path); } -static void inputfile_tailing_stop(InputFile *ipf) { - assert(ipf->fd); +static void inputfile_reading_stop(InputFile *ipf) { + assert(ipf->rd); oop_rd_cancel(ipf->rd); oop_rd_delete(ipf->rd); ipf->rd= 0; @@ -1794,7 +1831,7 @@ static void inputfile_tailing_stop(InputFile *ipf) { | V | | V | ============= V V ============ | SEPARATED-1 | | DROPPING-1 - | flsh->fd>=0 | | flsh->fd>=0 + | flsh->fd>0 | | flsh->fd>0 | [Separated] | | [Dropping] | main F idle | | main none | old D tail | | old D tail @@ -1804,7 +1841,7 @@ static void inputfile_tailing_stop(InputFile *ipf) { | V | | V | =============== | | =============== | SEPARATED-2 | | DROPPING-2 - | flsh->fd==-1 | V flsh->fd==-1 + | flsh->fd==0 | V flsh->fd==0 | [Finishing] | | [Dropping] | main F tail | `. main none | old D closed | `. old D closed @@ -1839,7 +1876,7 @@ static void inputfile_tailing_stop(InputFile *ipf) { static void startup_set_input_file(InputFile *f) { assert(!main_input_file); main_input_file= f; - inputfile_tailing_start(f); + inputfile_reading_start(f); } static void statemc_init(void) { @@ -1976,7 +2013,7 @@ static void statemc_period_poll(void) { static int inputfile_is_done(InputFile *ipf) { if (!ipf) return 0; if (ipf->inprogress) return 0; /* new article in the meantime */ - if (ipf->fd >= 0); return 0; /* not had EOF */ + if (ipf->rd) return 0; /* not had EOF */ return 1; } @@ -2078,7 +2115,7 @@ static void statemc_setstate(StateMachineState newsms, int periods, if (!main_input_file) xtra= "-ABSENT"; break; case sm_SEPARATED: case sm_DROPPING: - xtra= flushing_input_file->fd >= 0 ? "-1" : "-2"; + xtra= flushing_input_file->rd ? "-1" : "-2"; break; default:; } @@ -2257,7 +2294,7 @@ static void search_backlog_file(void) { warn("backlog file %s vanished as we opened it", backlog_input_file); goto try_again; } - inputfile_tailing_start(backlog_input_file); + inputfile_reading_start(backlog_input_file); until_backlog_nextscan= -1; return; } @@ -2388,14 +2425,13 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ static void postfork_inputfile(InputFile *ipf) { if (!ipf) return; - assert(ipf->fd >= 0); - close(ipf->fd); - ipf->fd= -1; + xclose(ipf->fd, "(in child) input file ", ipf->path); } -static void postfork_stdio(FILE *f) { +static void postfork_stdio(FILE *f, const char *what, const char *what2) { /* we have no stdio streams that are buffered long-term */ - if (f) fclose(f); + if (!f) return; + if (fclose(f)) sysdie("close (in child) %s%s", what, what2?what2:0); } static void postfork(const char *what) { @@ -2407,9 +2443,9 @@ static void postfork(const char *what) { Conn *conn; for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) - close(conn->fd); + close_perhaps(&conn->fd); - postfork_stdio(defer); + postfork_stdio(defer, "defer file ", path_defer); } #define EVERY(what, interval_sec, interval_usec, body) \ @@ -2455,7 +2491,7 @@ EVERY(period, -1,0, { poll_backlog_file(); if (!backlog_input_file) close_defer(); /* want to start on a new backlog */ statemc_period_poll(); - check_master_queue(); + check_assign_articles(); check_idle_conns(); }); @@ -2690,6 +2726,9 @@ int main(int argc, char **argv) { notice("starting"); + LIST_INIT(conns); + LIST_INIT(queue); + if (!filemon_method_init()) { warn("no file monitoring available, polling"); filepoll_schedule();