X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=blobdiff_plain;f=backends%2Finnduct.c;h=43ae915a1f3646fe58e96c9726b6de405d962d81;hp=58d2870de5b8bed162d49e161635e384584f8834;hb=ffe397bdaa6e5f125df3b5abfd71dc402008be9b;hpb=0556ef460c00ebe81f3c23c8bcfcfeb1f937c95d diff --git a/backends/innduct.c b/backends/innduct.c index 58d2870..43ae915 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,11 +1,12 @@ /* * TODO - * - close idle connections - * - cope better with garbage in feed file - * - cope better with NULs in feed file - * - check all structs initialised - * - check all fd watches properly undone * - check all init functions called + * - check ipf->inprogress managed properly + * - xperhaps_close + * - actually implement badusage + * - options for all options + * - manpage + * - pid, sitename, hostname in lockfile * - -k kill mode ? */ @@ -158,7 +159,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. /*============================== PROGRAM ==============================*/ -#define _GNU_SOURCE +#define _GNU_SOURCE 1 #include "config.h" #include "storage.h" @@ -193,8 +194,6 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. /*----- general definitions, probably best not changed -----*/ -#define PERIOD_SECONDS 60 - #define CONNCHILD_ESTATUS_STREAM 4 #define CONNCHILD_ESTATUS_NOSTREAM 5 @@ -203,6 +202,10 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10) +#define VA va_list al; va_start(al,fmt) +#define PRINTF(f,a) __attribute__((__format__(printf,f,a))) +#define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a))) + /*----- doubly linked lists -----*/ #define ISNODE(T) struct { T *succ, *pred; } node /* must be at start */ @@ -235,6 +238,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead) #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail) +#define LIST_INIT(l) ((l).hd, list_new((struct list*)&(l))) #define LIST_HEAD(l) ((typeof((l).hd))(list_head((struct list*)&(l)))) #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n)))) #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n)))) @@ -298,6 +302,9 @@ static void filemon_start(InputFile *ipf); static void filemon_stop(InputFile *ipf); static void filemon_callback(InputFile *ipf); +static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0); +static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3); + /*----- configuration options -----*/ static const char *sitename, *feedfile, *pathoutgoing; @@ -307,6 +314,8 @@ static int quiet_multiple=0, become_daemon=1; static int max_connections=10, max_queue_per_conn=200; static int target_max_feedfile_size=100000; +static int period_seconds=60; + static double max_bad_data_ratio= 0.01; static int max_bad_data_initial= 30; /* in one corrupt 4096-byte block the number of newlines has @@ -316,7 +325,7 @@ static int connection_setup_timeout=200, port=119, try_stream=1; static int inndcomm_flush_timeout=100; static int reconnect_delay_periods, flushfail_retry_periods, open_wait_periods; static int backlog_retry_minperiods, backlog_spontaneous_rescan_periods; -static int spontaneous_flush_periods; +static int spontaneous_flush_periods, need_activity_periods; static const char *inndconffile; static double nocheck_thresh_pct= 95.0; @@ -383,7 +392,7 @@ struct InputFile { oop_readable_call *readable_callback; void *readable_callback_user; - int fd; + int fd; /* may be 0, meaning closed */ Filemon_Perfile *filemon; oop_read *rd; @@ -428,7 +437,9 @@ static const char *sms_names[]= { struct Conn { ISNODE(Conn); - int fd, max_queue, stream, quitting; + int fd; /* may be 0, meaning closed (during construction/destruction) */ + int max_queue, stream, quitting; + int since_activity; /* periods */ ArticleList waiting; /* not yet told peer */ ArticleList priority; /* peer says send it now */ ArticleList sent; /* offered/transmitted - in xmit or waiting reply */ @@ -461,10 +472,6 @@ static int nocheck, nocheck_reported; /*========== logging ==========*/ -#define VA va_list al; va_start(al,fmt) -#define PRINTF(f,a) __attribute__((__format__(printf,f,a))) -#define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a))) - static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3); static void logcore(int sysloglevel, const char *fmt, ...) { VA; @@ -666,10 +673,18 @@ static char *sanitise(const char *input) { return sanibuf; } +static int isewouldblock(int errnoval) { + return errnoval==EWOULDBLOCK || errnoval==EAGAIN; +} + /*========== making new connections ==========*/ static void conn_dispose(Conn *conn) { if (!conn) return; + if (conn->fd) { + loop->cancel_fd(loop, conn->fd, OOP_WRITE); + loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION); + } perhaps_close(&conn->fd); free(conn); until_connect= reconnect_delay_periods; @@ -719,16 +734,18 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { if (got != -1) { assert(got==connecting_child); connecting_child= 0; - if (WIFEXITED(status) && - (WEXITSTATUS(status) != 0 && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)) { - /* child already reported the problem */ + if (WIFEXITED(status)) { + if (WEXITSTATUS(status) != 0 && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM) + /* child already reported the problem */; + else + warn("connect: connection child exited code %d but no cmsg", + WEXITSTATUS(status)); } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { warn("connect: connection attempt timed out"); - } else if (!WIFEXITED(status)) { + } else { report_child_status("connect", status); - /* that's probably the root cause then */ } } else { /* child is still running apparently, report the socket problem */ @@ -736,8 +753,10 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { syswarn("connect: read from child socket failed"); else if (e == OOP_EXCEPTION) warn("connect: unexpected exception on child socket"); - else + else if (!rs) warn("connect: unexpected EOF on child socket"); + else + fatal("connect: unexpected lack of cmsg from child"); } goto x; } @@ -752,9 +771,10 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { CHK(len, CMSG_LEN(sizeof(conn->fd))); #undef CHK - if (CMSG_NXTHDR(&msg,h)) { die("connect: child sent many cmsgs"); goto x; } + if (CMSG_NXTHDR(&msg,h)) die("connect: child sent many cmsgs"); memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd)); + loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn); int status; pid_t got= waitpid(connecting_child, &status, 0); @@ -772,10 +792,14 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { } /* Phew! */ + LIST_INIT(conn->waiting); + LIST_INIT(conn->priority); + LIST_INIT(conn->sent); setnonblock(conn->fd, 1); conn->max_queue= conn->stream ? max_queue_per_conn : 1; - LIST_ADDHEAD(conns, conn); notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain"); + LIST_ADDHEAD(conns, conn); + connect_attempt_discard(); check_master_queue(); return 0; @@ -785,6 +809,18 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { connect_attempt_discard(); } +static void conn_exception(oop_source *lp, int fd, + oop_event ev, void *conn_v) { + Conn *conn= conn_v; + char ch; + assert(fd == conn->fd); + assert(ev == OOP_EXCEPTION); + r= read(conn->fd, &ch, 1); + if (r<0) connfail(conn,"read failed: %s",strerror(errno)); + connfail(conn,"exceptional condition on socket (peer sent urgent" + " data? read(,,1)=%d)",r); +} + static int allow_connect_start(void) { return conns.count < max_connections && !connecting_child @@ -881,6 +917,51 @@ static void connect_start(void) { connect_attempt_discard(); } +static void check_idle_conns(void) { + Conn *conn; + for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) + conn->since_activity++; + search_again: + for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) { + if (conn->since_activity <= need_activity_periods) continue; + + /* We need to shut this down */ + if (conn->quitting) + connfail(conn,"timed out waiting for response to QUIT"); + else if (conn->sent.count) + connfail(conn,"timed out waiting for responses"); + else if (conn->waiting.count || conn->priority.count) + connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent"); + else if (conn->xmitu) + connfail(conn,"peer has been sending responses" + " before receiving our commands!"); + else { + static const char quitcmd[]= "QUIT\r\n"; + int todo= sizeof(quitcmd)-1; + const char *p= quitcmd; + for (;;) { + int r= write(conn->fd, p, todo); + if (r<0) { + if (isewouldblock(errno)) + connfail(conn, "blocked writing QUIT to idle connection"); + else + connfail(conn, "failed to write QUIT to idle connection: %s", + strerror(errno)); + break; + } + assert(r<=todo); + todo -= r; + if (!todo) { + conn->quitting= 1; + conn->since_activity= 0; + debug("C%d is idle, quitting", conn->fd); + break; + } + } + } + goto search_again; + } +} /*========== overall control of article flow ==========*/ @@ -890,7 +971,7 @@ static void check_master_queue(void) { break; Conn *walk, *use=0; - int spare; + int spare, inqueue; /* Find a connection to offer this article. We prefer a busy * connection to an idle one, provided it's not full. We take the @@ -899,8 +980,9 @@ static void check_master_queue(void) { * connections, the spare ones will go away eventually. */ for (walk=LIST_HEAD(conns); walk; walk=LIST_NEXT(walk)) { - int inqueue= walk->sent.count + walk->priority.count - + walk->waiting.count; + if (walk->quitting) continue; + inqueue= walk->sent.count + walk->priority.count + + walk->waiting.count; spare= walk->max_queue - inqueue; assert(inqueue <= max_queue_per_conn); assert(spare >= 0); @@ -908,6 +990,7 @@ static void check_master_queue(void) { else if (spare>0) /*working*/ { use= walk; break; } } if (use) { + if (!inqueue) use->since_activity= 0; /* reset idle counter */ while (spare>0) { Article *art= LIST_REMHEAD(queue); LIST_ADDTAIL(use->waiting, art); @@ -952,7 +1035,6 @@ static void conn_maybe_write(Conn *conn) { } } -static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0); static void vconnfail(Conn *conn, const char *fmt, va_list al) { int requeue[art_MaxState]; @@ -980,7 +1062,6 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) { check_master_queue(); } -static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3); static void connfail(Conn *conn, const char *fmt, ...) { va_list al; va_start(al,fmt); @@ -1032,7 +1113,7 @@ static void *conn_write_some_xmits(Conn *conn) { if (count > IOV_MAX) count= IOV_MAX; ssize_t rs= writev(conn->fd, conn->xmit, count); if (rs < 0) { - if (errno == EAGAIN) return OOP_CONTINUE; + if (isewouldblock(errno)) return OOP_CONTINUE; connfail(conn, "write failed: %s", strerror(errno)); return OOP_HALT; } @@ -1207,6 +1288,8 @@ static void article_done(Conn *conn, Article *art, int whichcount) { else if (whichcount == RC_unwanted) update_nocheck(0); InputFile *ipf= art->ipf; + assert(ipf->fd); + while (art->blanklen) { static const char spaces[]= " " @@ -1227,11 +1310,10 @@ static void article_done(Conn *conn, Article *art, int whichcount) { ipf->inprogress--; assert(ipf->inprogress >= 0); + free(art); if (!ipf->inprogress && ipf != main_input_file) queue_check_input_done(); - - free(art); } static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, @@ -1258,7 +1340,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, if (code!=205 && code!=503) { connfail(conn, "peer gave unexpected response to QUIT: %s", sani); } else { - notice("C%d idle connection closed\n"); + notice("C%d idle connection closed", conn->fd); assert(!conn->waiting.count); assert(!conn->priority.count); assert(!conn->sent.count); @@ -1269,6 +1351,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, return OOP_CONTINUE; } + conn->since_activity= 0; Article *art; #define GET_ARTICLE(musthavesent) \ @@ -1334,9 +1417,10 @@ static void feedfile_eof(InputFile *ipf) { assert(ipf != main_input_file); /* promised by tailing_try_read */ inputfile_tailing_stop(ipf); - assert(ipf->fd >= 0); + assert(ipf->fd > 0); +fixme do not close fd do something else because art_done needs it to blank entries; if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path); - ipf->fd= -1; + ipf->fd= 0; if (ipf == flushing_input_file) { assert(sms==sm_SEPARATED || sms==sm_DROPPING); @@ -1355,6 +1439,7 @@ static InputFile *open_input_file(const char *path) { if (errno==ENOENT) return 0; sysfatal("unable to open input file %s", path); } + assert(fd>0); InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1); memset(ipf,0,sizeof(*ipf)); @@ -1365,14 +1450,16 @@ static InputFile *open_input_file(const char *path) { return ipf; } -static void close_input_file(InputFile *ipf) { +static void close_input_file(InputFile *ipf) { /* does not free */ assert(!ipf->readable_callback); /* must have had ->on_cancel */ assert(!ipf->filemon); /* must have had inputfile_tailing_stop */ assert(!ipf->rd); /* must have had inputfile_tailing_stop */ assert(!ipf->inprogress); /* no dangling pointers pointing here */ - if (ipf->fd >= 0) + if (ipf->fd) { if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path); + ipf->fd= 0; + } } @@ -1452,12 +1539,12 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, ipf->readcount_ok++; art= xmalloc(sizeof(*art) - 1 + midlen + 1); - art->offset= ipf->offset; - art->blanklen= recsz; - art->midlen= midlen; art->state= art_Unchecked; + art->midlen= midlen; art->ipf= ipf; ipf->inprogress++; art->token= TextToToken(tokentextbuf); + art->offset= ipf->offset; + art->blanklen= recsz; strcpy(art->messageid, space+1); LIST_ADDTAIL(queue, art); @@ -1519,10 +1606,10 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, errno=EAGAIN; return -1; } else if (ipf==flushing_input_file) { - assert(ipf->fd>=0); + assert(ipf->fd); assert(sms==sm_SEPARATED || sms==sm_DROPPING); } else if (ipf==backlog_input_file) { - assert(ipf->fd>=0); + assert(ipf->fd); } else { abort(); } @@ -1582,7 +1669,7 @@ static void *filemon_inotify_readable(oop_source *lp, int fd, for (;;) { int r= read(filemon_inotify_fd, &iev, sizeof(iev)); if (r==-1) { - if (errno==EAGAIN) break; + if (isewouldblock(errno)) break; sysdie("read from inotify master"); } else if (r==sizeof(iev)) { assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax); @@ -1653,7 +1740,7 @@ static const oop_rd_style feedfile_rdstyle= { }; static void inputfile_tailing_start(InputFile *ipf) { - assert(!ipf->fd); + assert(!ipf->rd); ipf->readable.on_readable= tailing_on_readable; ipf->readable.on_cancel= tailing_on_cancel; ipf->readable.try_read= tailing_try_read; @@ -1672,7 +1759,7 @@ static void inputfile_tailing_start(InputFile *ipf) { } static void inputfile_tailing_stop(InputFile *ipf) { - assert(ipf->fd); + assert(ipf->rd); oop_rd_cancel(ipf->rd); oop_rd_delete(ipf->rd); ipf->rd= 0; @@ -1739,7 +1826,7 @@ static void inputfile_tailing_stop(InputFile *ipf) { | V | | V | ============= V V ============ | SEPARATED-1 | | DROPPING-1 - | flsh->fd>=0 | | flsh->fd>=0 + | flsh->fd>0 | | flsh->fd>0 | [Separated] | | [Dropping] | main F idle | | main none | old D tail | | old D tail @@ -1749,7 +1836,7 @@ static void inputfile_tailing_stop(InputFile *ipf) { | V | | V | =============== | | =============== | SEPARATED-2 | | DROPPING-2 - | flsh->fd==-1 | V flsh->fd==-1 + | flsh->fd==0 | V flsh->fd==0 | [Finishing] | | [Dropping] | main F tail | `. main none | old D closed | `. old D closed @@ -1805,7 +1892,7 @@ static void statemc_init(void) { fl.l_whence= SEEK_SET; int r= fcntl(lockfd, F_SETLK, &fl); if (r==-1) { - if (errno==EACCES || errno==EAGAIN) { + if (errno==EACCES || isewouldblock(errno)) { if (quiet_multiple) exit(0); fatal("another duct holds the lockfile"); } @@ -1921,7 +2008,7 @@ static void statemc_period_poll(void) { static int inputfile_is_done(InputFile *ipf) { if (!ipf) return 0; if (ipf->inprogress) return 0; /* new article in the meantime */ - if (ipf->fd >= 0); return 0; /* not had EOF */ + if (ipf->fd) return 0; /* not had EOF */ return 1; } @@ -2023,7 +2110,7 @@ static void statemc_setstate(StateMachineState newsms, int periods, if (!main_input_file) xtra= "-ABSENT"; break; case sm_SEPARATED: case sm_DROPPING: - xtra= flushing_input_file->fd >= 0 ? "-1" : "-2"; + xtra= flushing_input_file->fd ? "-1" : "-2"; break; default:; } @@ -2191,7 +2278,7 @@ static void search_backlog_file(void) { now= xtime(); double age= difftime(now, oldest_mtime); - long age_deficiency= (backlog_retry_minperiods * PERIOD_SECONDS) - age; + long age_deficiency= (backlog_retry_minperiods * period_seconds) - age; if (age_deficiency <= 0) { debug("backlog scan: found age=%f deficiency=%ld oldest=%s", @@ -2207,7 +2294,7 @@ static void search_backlog_file(void) { return; } - until_backlog_nextscan= age_deficiency / PERIOD_SECONDS; + until_backlog_nextscan= age_deficiency / period_seconds; if (backlog_spontaneous_rescan_periods >= 0 && until_backlog_nextscan > backlog_spontaneous_rescan_periods) @@ -2333,9 +2420,8 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ static void postfork_inputfile(InputFile *ipf) { if (!ipf) return; - assert(ipf->fd >= 0); - close(ipf->fd); - ipf->fd= -1; + assert(ipf->fd); + perhaps_close(&ipf->fd); } static void postfork_stdio(FILE *f) { @@ -2358,8 +2444,7 @@ static void postfork(const char *what) { } #define EVERY(what, interval_sec, interval_usec, body) \ - static const struct timeval what##_timeout = \ - { interval_sec, interval_usec }; \ + static struct timeval what##_timeout = { interval_sec, interval_usec }; \ static void what##_schedule(void); \ static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \ { body } \ @@ -2384,7 +2469,7 @@ static const char *debug_ipf_path(InputFile *ipf) { return slash ? slash+1 : ipf->path; } -EVERY(period, PERIOD_SECONDS,0, { +EVERY(period, -1,0, { debug("PERIOD" " sms=%s[%d] conns=%d queue=%d until_connect=%d" " input_files" DEBUGF_IPF(main) DEBUGF_IPF(old) DEBUGF_IPF(flushing) @@ -2402,6 +2487,7 @@ EVERY(period, PERIOD_SECONDS,0, { if (!backlog_input_file) close_defer(); /* want to start on a new backlog */ statemc_period_poll(); check_master_queue(); + check_idle_conns(); }); @@ -2485,8 +2571,8 @@ static void op_seconds(const Option *o, const char *val) { static void op_periods_rndup(const Option *o, const char *val) { int *store= o->store; op_seconds(o,val); - *store += PERIOD_SECONDS-1; - *store /= PERIOD_SECONDS; + *store += period_seconds-1; + *store /= period_seconds; } static void op_setint(const Option *o, const char *val) { @@ -2635,11 +2721,15 @@ int main(int argc, char **argv) { notice("starting"); + LIST_INIT(conns); + LIST_INIT(queue); + if (!filemon_method_init()) { warn("no file monitoring available, polling"); filepoll_schedule(); } + period_timeout.tv_sec= period_seconds; period_schedule(); statemc_init();