X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;f=backends%2Finnduct.c;h=385c6416cb59b7fa1f8f850c5613c17f7dffb03b;hb=a4a53bbbbcca1a6a6022d1efd7d02dd49be2f6f2;hp=92e80061deb544dc94503db62c8d367dbd77646f;hpb=7b2769c6878f854cb3601fceb3b14383852f9cdf;p=innduct.git diff --git a/backends/innduct.c b/backends/innduct.c index 92e8006..385c641 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,6 +1,5 @@ /* * TODO - * - xperhaps_close * - actually implement badusage * - options for all options * - manpage @@ -282,7 +281,7 @@ static void statemc_setstate(StateMachineState newsms, int periods, static void statemc_start_flush(const char *why); /* Normal => Flushing */ static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */ -static void check_master_queue(void); +static void check_assign_articles(void); static void queue_check_input_done(void); static void statemc_check_flushing_done(void); @@ -295,8 +294,8 @@ static void open_defer(void); static void close_defer(void); static void search_backlog_file(void); -static void inputfile_tailing_start(InputFile *ipf); -static void inputfile_tailing_stop(InputFile *ipf); +static void inputfile_reading_start(InputFile *ipf); +static void inputfile_reading_stop(InputFile *ipf); static void filemon_start(InputFile *ipf); static void filemon_stop(InputFile *ipf); @@ -392,10 +391,10 @@ struct InputFile { oop_readable_call *readable_callback; void *readable_callback_user; - int fd; /* may be 0, meaning closed */ + int fd; Filemon_Perfile *filemon; - oop_read *rd; + oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */ long inprogress; /* no. of articles read but not processed */ off_t offset; int skippinglong; @@ -546,7 +545,21 @@ static char *xasprintf(const char *fmt, ...) { return str; } -static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } } +static int close_perhaps(int *fd) { + if (!*fd) return 0; + int r= close(*fd); + *fd=0; + return r; +} +static void xclose(int fd, const char *what, const char *what2) { + int r= close(fd); + if (r) sysdie("close %s%s",what,what2?what2:""); +} +static void xclose_perhaps(int *fd, const char *what, const char *what2) { + if (!*fd) return; + xclose(*fd,what,what2); + *fd=0; +} static pid_t xfork(const char *what) { pid_t child; @@ -675,7 +688,7 @@ static int isewouldblock(int errnoval) { return errnoval==EWOULDBLOCK || errnoval==EAGAIN; } -/*========== making new connections ==========*/ +/*========== management of connections ==========*/ static void conn_dispose(Conn *conn) { if (!conn) return; @@ -683,11 +696,107 @@ static void conn_dispose(Conn *conn) { loop->cancel_fd(loop, conn->fd, OOP_WRITE); loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION); } - perhaps_close(&conn->fd); + int r= close_perhaps(&conn->fd); + if (r) info("C%d error closing socket: %s", conn->fd, strerror(errno)); free(conn); until_connect= reconnect_delay_periods; } +static void *conn_exception(oop_source *lp, int fd, + oop_event ev, void *conn_v) { + Conn *conn= conn_v; + unsigned char ch; + assert(fd == conn->fd); + assert(ev == OOP_EXCEPTION); + int r= read(conn->fd, &ch, 1); + if (r<0) connfail(conn,"read failed: %s",strerror(errno)); + else connfail(conn,"exceptional condition on socket (peer sent urgent" + " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch); + return OOP_CONTINUE; +} + +static void vconnfail(Conn *conn, const char *fmt, va_list al) { + int requeue[art_MaxState]; + + Article *art; + while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art); + while ((art= LIST_REMHEAD(conn->waiting))) LIST_ADDTAIL(queue, art); + while ((art= LIST_REMHEAD(conn->sent))) { + requeue[art->state]++; + if (art->state==art_Unsolicited) art->state= art_Unchecked; + LIST_ADDTAIL(queue,art); + } + + int i; + XmitDetails *d; + for (i=0, d=conn->xmitd; ixmitu; i++, d++) + xmit_free(d); + + char *m= xvasprintf(fmt,al); + warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s", + conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); + free(m); + + LIST_REMOVE(conns,conn); + conn_dispose(conn); + check_assign_articles(); +} + +static void connfail(Conn *conn, const char *fmt, ...) { + va_list al; + va_start(al,fmt); + vconnfail(conn,fmt,al); + va_end(al); +} + +static void check_idle_conns(void) { + Conn *conn; + for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) + conn->since_activity++; + search_again: + for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) { + if (conn->since_activity <= need_activity_periods) continue; + + /* We need to shut this down */ + if (conn->quitting) + connfail(conn,"timed out waiting for response to QUIT"); + else if (conn->sent.count) + connfail(conn,"timed out waiting for responses"); + else if (conn->waiting.count || conn->priority.count) + connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent"); + else if (conn->xmitu) + connfail(conn,"peer has been sending responses" + " before receiving our commands!"); + else { + static const char quitcmd[]= "QUIT\r\n"; + int todo= sizeof(quitcmd)-1; + const char *p= quitcmd; + for (;;) { + int r= write(conn->fd, p, todo); + if (r<0) { + if (isewouldblock(errno)) + connfail(conn, "blocked writing QUIT to idle connection"); + else + connfail(conn, "failed to write QUIT to idle connection: %s", + strerror(errno)); + break; + } + assert(r<=todo); + todo -= r; + if (!todo) { + conn->quitting= 1; + conn->since_activity= 0; + debug("C%d is idle, quitting", conn->fd); + break; + } + } + } + goto search_again; + } +} + +/*---------- making new connections ----------*/ + static int connecting_sockets[2]= {-1,-1}; static pid_t connecting_child; @@ -695,8 +804,8 @@ static void connect_attempt_discard(void) { if (connecting_sockets[0]) cancel_fd_read_except(connecting_sockets[0]); - perhaps_close(&connecting_sockets[0]); - perhaps_close(&connecting_sockets[1]); + xclose_perhaps(&connecting_sockets[0], "connecting socketpair (read)",0); + xclose_perhaps(&connecting_sockets[1], "connecting socketpair (write)",0); if (connecting_child) { int r= kill(connecting_child, SIGTERM); @@ -799,7 +908,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { LIST_ADDHEAD(conns, conn); connect_attempt_discard(); - check_master_queue(); + check_assign_articles(); return 0; x: @@ -807,18 +916,6 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { connect_attempt_discard(); } -static void conn_exception(oop_source *lp, int fd, - oop_event ev, void *conn_v) { - Conn *conn= conn_v; - char ch; - assert(fd == conn->fd); - assert(ev == OOP_EXCEPTION); - r= read(conn->fd, &ch, 1); - if (r<0) connfail(conn,"read failed: %s",strerror(errno)); - connfail(conn,"exceptional condition on socket (peer sent urgent" - " data? read(,,1)=%d)",r); -} - static int allow_connect_start(void) { return conns.count < max_connections && !connecting_child @@ -915,55 +1012,9 @@ static void connect_start(void) { connect_attempt_discard(); } -static void check_idle_conns(void) { - Conn *conn; - for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) - conn->since_activity++; - search_again: - for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) { - if (conn->since_activity <= need_activity_periods) continue; +/*---------- assigning articles to conns, and transmitting ----------*/ - /* We need to shut this down */ - if (conn->quitting) - connfail(conn,"timed out waiting for response to QUIT"); - else if (conn->sent.count) - connfail(conn,"timed out waiting for responses"); - else if (conn->waiting.count || conn->priority.count) - connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent"); - else if (conn->xmitu) - connfail(conn,"peer has been sending responses" - " before receiving our commands!"); - else { - static const char quitcmd[]= "QUIT\r\n"; - int todo= sizeof(quitcmd)-1; - const char *p= quitcmd; - for (;;) { - int r= write(conn->fd, p, todo); - if (r<0) { - if (isewouldblock(errno)) - connfail(conn, "blocked writing QUIT to idle connection"); - else - connfail(conn, "failed to write QUIT to idle connection: %s", - strerror(errno)); - break; - } - assert(r<=todo); - todo -= r; - if (!todo) { - conn->quitting= 1; - conn->since_activity= 0; - debug("C%d is idle, quitting", conn->fd); - break; - } - } - } - goto search_again; - } -} - -/*========== overall control of article flow ==========*/ - -static void check_master_queue(void) { +static void check_assign_articles(void) { for (;;) { if (!queue.count) break; @@ -1033,40 +1084,6 @@ static void conn_maybe_write(Conn *conn) { } } -static void vconnfail(Conn *conn, const char *fmt, va_list al) { - int requeue[art_MaxState]; - - Article *art; - while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art); - while ((art= LIST_REMHEAD(conn->waiting))) LIST_ADDTAIL(queue, art); - while ((art= LIST_REMHEAD(conn->sent))) { - requeue[art->state]++; - if (art->state==art_Unsolicited) art->state= art_Unchecked; - LIST_ADDTAIL(queue,art); - } - - int i; - XmitDetails *d; - for (i=0, d=conn->xmitd; ixmitu; i++, d++) - xmit_free(d); - - char *m= xvasprintf(fmt,al); - warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s", - conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); - free(m); - - LIST_REMOVE(conns,conn); - conn_dispose(conn); - check_master_queue(); -} - -static void connfail(Conn *conn, const char *fmt, ...) { - va_list al; - va_start(al,fmt); - vconnfail(conn,fmt,al); - va_end(al); -} - /*========== article transmission ==========*/ static XmitDetails *xmit_core(Conn *conn, const char *data, int len, @@ -1286,7 +1303,6 @@ static void article_done(Conn *conn, Article *art, int whichcount) { else if (whichcount == RC_unwanted) update_nocheck(0); InputFile *ipf= art->ipf; - assert(ipf->fd); while (art->blanklen) { static const char spaces[]= @@ -1404,7 +1420,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, } conn_maybe_write(conn); - check_master_queue(); + check_assign_articles(); return OOP_CONTINUE; } @@ -1413,16 +1429,11 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, static void feedfile_eof(InputFile *ipf) { assert(ipf != main_input_file); /* promised by tailing_try_read */ - - inputfile_tailing_stop(ipf); - assert(ipf->fd > 0); -fixme do not close fd do something else because art_done needs it to blank entries; - if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path); - ipf->fd= 0; + inputfile_reading_stop(ipf); if (ipf == flushing_input_file) { assert(sms==sm_SEPARATED || sms==sm_DROPPING); - if (main_input_file) inputfile_tailing_start(main_input_file); + if (main_input_file) inputfile_reading_start(main_input_file); statemc_check_flushing_done(); } else if (ipf == backlog_input_file) { statemc_check_backlog_done(); @@ -1450,14 +1461,10 @@ static InputFile *open_input_file(const char *path) { static void close_input_file(InputFile *ipf) { /* does not free */ assert(!ipf->readable_callback); /* must have had ->on_cancel */ - assert(!ipf->filemon); /* must have had inputfile_tailing_stop */ - assert(!ipf->rd); /* must have had inputfile_tailing_stop */ + assert(!ipf->filemon); /* must have had inputfile_reading_stop */ + assert(!ipf->rd); /* must have had inputfile_reading_stop */ assert(!ipf->inprogress); /* no dangling pointers pointing here */ - - if (ipf->fd) { - if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path); - ipf->fd= 0; - } + xclose_perhaps(&ipf->fd, "input file ", ipf->path); } @@ -1550,7 +1557,7 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, ipf->offset >= target_max_feedfile_size) statemc_start_flush("feed file size"); - check_master_queue(); + check_assign_articles(); return OOP_CONTINUE; } @@ -1604,10 +1611,10 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, errno=EAGAIN; return -1; } else if (ipf==flushing_input_file) { - assert(ipf->fd); + assert(ipf->rd); assert(sms==sm_SEPARATED || sms==sm_DROPPING); } else if (ipf==backlog_input_file) { - assert(ipf->fd); + assert(ipf->rd); } else { abort(); } @@ -1737,7 +1744,7 @@ static const oop_rd_style feedfile_rdstyle= { OOP_RD_SHORTREC_LONG, }; -static void inputfile_tailing_start(InputFile *ipf) { +static void inputfile_reading_start(InputFile *ipf) { assert(!ipf->rd); ipf->readable.on_readable= tailing_on_readable; ipf->readable.on_cancel= tailing_on_cancel; @@ -1749,14 +1756,14 @@ static void inputfile_tailing_start(InputFile *ipf) { ipf->readable_callback_user= 0; ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0); - assert(ipf->fd); + assert(ipf->rd); int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE, feedfile_got_article,ipf, feedfile_read_err, ipf); if (r) sysdie("unable start reading feedfile %s",ipf->path); } -static void inputfile_tailing_stop(InputFile *ipf) { +static void inputfile_reading_stop(InputFile *ipf) { assert(ipf->rd); oop_rd_cancel(ipf->rd); oop_rd_delete(ipf->rd); @@ -1869,7 +1876,7 @@ static void inputfile_tailing_stop(InputFile *ipf) { static void startup_set_input_file(InputFile *f) { assert(!main_input_file); main_input_file= f; - inputfile_tailing_start(f); + inputfile_reading_start(f); } static void statemc_init(void) { @@ -2006,7 +2013,7 @@ static void statemc_period_poll(void) { static int inputfile_is_done(InputFile *ipf) { if (!ipf) return 0; if (ipf->inprogress) return 0; /* new article in the meantime */ - if (ipf->fd) return 0; /* not had EOF */ + if (ipf->rd) return 0; /* not had EOF */ return 1; } @@ -2108,7 +2115,7 @@ static void statemc_setstate(StateMachineState newsms, int periods, if (!main_input_file) xtra= "-ABSENT"; break; case sm_SEPARATED: case sm_DROPPING: - xtra= flushing_input_file->fd ? "-1" : "-2"; + xtra= flushing_input_file->rd ? "-1" : "-2"; break; default:; } @@ -2287,7 +2294,7 @@ static void search_backlog_file(void) { warn("backlog file %s vanished as we opened it", backlog_input_file); goto try_again; } - inputfile_tailing_start(backlog_input_file); + inputfile_reading_start(backlog_input_file); until_backlog_nextscan= -1; return; } @@ -2418,13 +2425,13 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ static void postfork_inputfile(InputFile *ipf) { if (!ipf) return; - assert(ipf->fd); - perhaps_close(&ipf->fd); + xclose(ipf->fd, "(in child) input file ", ipf->path); } -static void postfork_stdio(FILE *f) { +static void postfork_stdio(FILE *f, const char *what, const char *what2) { /* we have no stdio streams that are buffered long-term */ - if (f) fclose(f); + if (!f) return; + if (fclose(f)) sysdie("close (in child) %s%s", what, what2?what2:0); } static void postfork(const char *what) { @@ -2436,9 +2443,9 @@ static void postfork(const char *what) { Conn *conn; for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) - close(conn->fd); + close_perhaps(&conn->fd); - postfork_stdio(defer); + postfork_stdio(defer, "defer file ", path_defer); } #define EVERY(what, interval_sec, interval_usec, body) \ @@ -2484,7 +2491,7 @@ EVERY(period, -1,0, { poll_backlog_file(); if (!backlog_input_file) close_defer(); /* want to start on a new backlog */ statemc_period_poll(); - check_master_queue(); + check_assign_articles(); check_idle_conns(); });