X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=blobdiff_plain;f=backends%2Finnduct.c;h=1328d372f8d8390cb822f734a2476f044b1c6404;hp=60cfea6625e5d4c5e226ec53c695854b9369bc90;hb=599596434b5b74c0280006508d195d69e71d0814;hpb=8ebd40aedb307a863ee236116e708e8f11529a12 diff --git a/backends/innduct.c b/backends/innduct.c index 60cfea6..1328d37 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,8 +1,12 @@ /* * TODO - * - close idle connections - * - cope better with garbage in feed file - * - cope better with NULs in feed file + * - check all structs initialised + * - check all fd watches properly undone + * - check all init functions called + * - actually implement badusage + * - options for all options + * - manpage + * - pid, sitename, hostname in lockfile * - -k kill mode ? */ @@ -155,15 +159,17 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. /*============================== PROGRAM ==============================*/ -#define _GNU_SOURCE +#define _GNU_SOURCE 1 -#include "inn/list.h" #include "config.h" #include "storage.h" #include "nntp.h" #include "libinn.h" #include "inndcomm.h" +#include "inn/list.h" +#include "inn/innconf.h" + #include #include #include @@ -181,14 +187,13 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #include #include #include +#include #include #include /*----- general definitions, probably best not changed -----*/ -#define PERIOD_SECONDS 60 - #define CONNCHILD_ESTATUS_STREAM 4 #define CONNCHILD_ESTATUS_NOSTREAM 5 @@ -197,6 +202,10 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10) +#define VA va_list al; va_start(al,fmt) +#define PRINTF(f,a) __attribute__((__format__(printf,f,a))) +#define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a))) + /*----- doubly linked lists -----*/ #define ISNODE(T) struct { T *succ, *pred; } node /* must be at start */ @@ -292,15 +301,20 @@ static void filemon_start(InputFile *ipf); static void filemon_stop(InputFile *ipf); static void filemon_callback(InputFile *ipf); +static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0); +static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3); + /*----- configuration options -----*/ -static char *sitename, *feedfile; +static const char *sitename, *feedfile, *pathoutgoing; static const char *remote_host; static int quiet_multiple=0, become_daemon=1; static int max_connections=10, max_queue_per_conn=200; static int target_max_feedfile_size=100000; +static int period_seconds=60; + static double max_bad_data_ratio= 0.01; static int max_bad_data_initial= 30; /* in one corrupt 4096-byte block the number of newlines has @@ -310,7 +324,7 @@ static int connection_setup_timeout=200, port=119, try_stream=1; static int inndcomm_flush_timeout=100; static int reconnect_delay_periods, flushfail_retry_periods, open_wait_periods; static int backlog_retry_minperiods, backlog_spontaneous_rescan_periods; -static int spontaneous_flush_periods; +static int spontaneous_flush_periods, need_activity_periods; static const char *inndconffile; static double nocheck_thresh_pct= 95.0; @@ -423,6 +437,7 @@ static const char *sms_names[]= { struct Conn { ISNODE(Conn); int fd, max_queue, stream, quitting; + int since_activity; /* periods */ ArticleList waiting; /* not yet told peer */ ArticleList priority; /* peer says send it now */ ArticleList sent; /* offered/transmitted - in xmit or waiting reply */ @@ -455,11 +470,9 @@ static int nocheck, nocheck_reported; /*========== logging ==========*/ -static void logcore(int sysloglevel, const char *fmt, ...) - __attribute__((__format__(printf,2,3))); +static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3); static void logcore(int sysloglevel, const char *fmt, ...) { - va_list al; - va_start(al,fmt); + VA; if (become_daemon) { vsyslog(sysloglevel,fmt,al); } else { @@ -470,8 +483,7 @@ static void logcore(int sysloglevel, const char *fmt, ...) { } static void logv(int sysloglevel, const char *pfx, int errnoval, - const char *fmt, va_list al) - __attribute__((__format__(printf,5,0))); + const char *fmt, va_list al) PRINTF(5,0); static void logv(int sysloglevel, const char *pfx, int errnoval, const char *fmt, va_list al) { char msgbuf[256]; /* NB do not call xvasprintf here or you'll recurse */ @@ -488,21 +500,17 @@ static void logv(int sysloglevel, const char *pfx, int errnoval, } #define diewrap(fn, pfx, sysloglevel, err, estatus) \ - static void fn(const char *fmt, ...) \ - __attribute__((__noreturn__,__format__(printf,1,2))); \ + static void fn(const char *fmt, ...) NORET_PRINTF(1,2); \ static void fn(const char *fmt, ...) { \ - va_list al; \ - va_start(al,fmt); \ + VA; \ logv(sysloglevel, pfx, err, fmt, al); \ exit(estatus); \ } #define logwrap(fn, pfx, sysloglevel, err) \ - static void fn(const char *fmt, ...) \ - __attribute__((__format__(printf,1,2))); \ + static void fn(const char *fmt, ...) PRINTF(1,2); \ static void fn(const char *fmt, ...) { \ - va_list al; \ - va_start(al,fmt); \ + VA; \ logv(sysloglevel, pfx, err, fmt, al); \ va_end(al); \ } @@ -523,19 +531,16 @@ logwrap(debug, " debug", LOG_DEBUG, -1); /*========== utility functions etc. ==========*/ -static char *xvasprintf(const char *fmt, va_list al) - __attribute__((__format__(printf,1,0))); +static char *xvasprintf(const char *fmt, va_list al) PRINTF(1,0); static char *xvasprintf(const char *fmt, va_list al) { char *str; int rc= vasprintf(&str,fmt,al); if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt); return str; } -static char *xasprintf(const char *fmt, ...) - __attribute__((__format__(printf,1,2))); +static char *xasprintf(const char *fmt, ...) PRINTF(1,2); static char *xasprintf(const char *fmt, ...) { - va_list al; - va_start(al,fmt); + VA; char *str= xvasprintf(fmt,al); va_end(al); return str; @@ -666,6 +671,10 @@ static char *sanitise(const char *input) { return sanibuf; } +static int isewouldblock(int errnoval) { + return errnoval==EWOULDBLOCK || errnoval==EAGAIN; +} + /*========== making new connections ==========*/ static void conn_dispose(Conn *conn) { @@ -881,6 +890,51 @@ static void connect_start(void) { connect_attempt_discard(); } +static void check_idle_conns(void) { + Conn *conn; + for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) + conn->since_activity++; + search_again: + for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) { + if (conn->since_activity <= need_activity_periods) continue; + + /* We need to shut this down */ + if (conn->quitting) + connfail(conn,"timed out waiting for response to QUIT"); + else if (conn->sent.count) + connfail(conn,"timed out waiting for responses"); + else if (conn->waiting.count || conn->priority.count) + connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent"); + else if (conn->xmitu) + connfail(conn,"peer has been sending responses" + " before receiving our commands!"); + else { + static const char quitcmd[]= "QUIT\r\n"; + int todo= sizeof(quitcmd)-1; + const char *p= quitcmd; + for (;;) { + int r= write(conn->fd, p, todo); + if (r<0) { + if (isewouldblock(errno)) + connfail(conn, "blocked writing QUIT to idle connection"); + else + connfail(conn, "failed to write QUIT to idle connection: %s", + strerror(errno)); + break; + } + assert(r<=todo); + todo -= r; + if (!todo) { + conn->quitting= 1; + conn->since_activity= 0; + debug("C%d is idle, quitting", conn->fd); + break; + } + } + } + goto search_again; + } +} /*========== overall control of article flow ==========*/ @@ -890,7 +944,7 @@ static void check_master_queue(void) { break; Conn *walk, *use=0; - int spare; + int spare, inqueue; /* Find a connection to offer this article. We prefer a busy * connection to an idle one, provided it's not full. We take the @@ -899,8 +953,9 @@ static void check_master_queue(void) { * connections, the spare ones will go away eventually. */ for (walk=LIST_HEAD(conns); walk; walk=LIST_NEXT(walk)) { - int inqueue= walk->sent.count + walk->priority.count - + walk->waiting.count; + if (walk->quitting) continue; + inqueue= walk->sent.count + walk->priority.count + + walk->waiting.count; spare= walk->max_queue - inqueue; assert(inqueue <= max_queue_per_conn); assert(spare >= 0); @@ -908,6 +963,7 @@ static void check_master_queue(void) { else if (spare>0) /*working*/ { use= walk; break; } } if (use) { + if (!inqueue) use->since_activity= 0; /* reset idle counter */ while (spare>0) { Article *art= LIST_REMHEAD(queue); LIST_ADDTAIL(use->waiting, art); @@ -952,9 +1008,6 @@ static void conn_maybe_write(Conn *conn) { } } -static void vconnfail(Conn *conn, const char *fmt, va_list al) - __attribute__((__format__(printf,2,0))); - static void vconnfail(Conn *conn, const char *fmt, va_list al) { int requeue[art_MaxState]; @@ -982,8 +1035,6 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) { check_master_queue(); } -static void connfail(Conn *conn, const char *fmt, ...) - __attribute__((__format__(printf,2,3))); static void connfail(Conn *conn, const char *fmt, ...) { va_list al; va_start(al,fmt); @@ -1035,7 +1086,7 @@ static void *conn_write_some_xmits(Conn *conn) { if (count > IOV_MAX) count= IOV_MAX; ssize_t rs= writev(conn->fd, conn->xmit, count); if (rs < 0) { - if (errno == EAGAIN) return OOP_CONTINUE; + if (isewouldblock(errno)) return OOP_CONTINUE; connfail(conn, "write failed: %s", strerror(errno)); return OOP_HALT; } @@ -1261,7 +1312,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, if (code!=205 && code!=503) { connfail(conn, "peer gave unexpected response to QUIT: %s", sani); } else { - notice("C%d idle connection closed\n"); + notice("C%d idle connection closed", conn->fd); assert(!conn->waiting.count); assert(!conn->priority.count); assert(!conn->sent.count); @@ -1272,6 +1323,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, return OOP_CONTINUE; } + conn->since_activity= 0; Article *art; #define GET_ARTICLE(musthavesent) \ @@ -1585,7 +1637,7 @@ static void *filemon_inotify_readable(oop_source *lp, int fd, for (;;) { int r= read(filemon_inotify_fd, &iev, sizeof(iev)); if (r==-1) { - if (errno==EAGAIN) break; + if (isewouldblock(errno)) break; sysdie("read from inotify master"); } else if (r==sizeof(iev)) { assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax); @@ -1808,7 +1860,7 @@ static void statemc_init(void) { fl.l_whence= SEEK_SET; int r= fcntl(lockfd, F_SETLK, &fl); if (r==-1) { - if (errno==EACCES || errno==EAGAIN) { + if (errno==EACCES || isewouldblock(errno)) { if (quiet_multiple) exit(0); fatal("another duct holds the lockfile"); } @@ -2194,7 +2246,7 @@ static void search_backlog_file(void) { now= xtime(); double age= difftime(now, oldest_mtime); - long age_deficiency= (backlog_retry_minperiods * PERIOD_SECONDS) - age; + long age_deficiency= (backlog_retry_minperiods * period_seconds) - age; if (age_deficiency <= 0) { debug("backlog scan: found age=%f deficiency=%ld oldest=%s", @@ -2210,7 +2262,7 @@ static void search_backlog_file(void) { return; } - until_backlog_nextscan= age_deficiency / PERIOD_SECONDS; + until_backlog_nextscan= age_deficiency / period_seconds; if (backlog_spontaneous_rescan_periods >= 0 && until_backlog_nextscan > backlog_spontaneous_rescan_periods) @@ -2361,8 +2413,7 @@ static void postfork(const char *what) { } #define EVERY(what, interval_sec, interval_usec, body) \ - static const struct timeval what##_timeout = \ - { interval_sec, interval_usec }; \ + static struct timeval what##_timeout = { interval_sec, interval_usec }; \ static void what##_schedule(void); \ static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \ { body } \ @@ -2387,7 +2438,7 @@ static const char *debug_ipf_path(InputFile *ipf) { return slash ? slash+1 : ipf->path; } -EVERY(period, PERIOD_SECONDS,0, { +EVERY(period, -1,0, { debug("PERIOD" " sms=%s[%d] conns=%d queue=%d until_connect=%d" " input_files" DEBUGF_IPF(main) DEBUGF_IPF(old) DEBUGF_IPF(flushing) @@ -2405,18 +2456,19 @@ EVERY(period, PERIOD_SECONDS,0, { if (!backlog_input_file) close_defer(); /* want to start on a new backlog */ statemc_period_poll(); check_master_queue(); + check_idle_conns(); }); /*========== option parsing ==========*/ -static void vbadusage(const char *fmt, va_list al) - __attribute__((__noreturn__,__format__(printf,1,0))); +/*---------- generic option parser and logging ----------*/ + +static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0); static void vbadusage(const char *fmt, va_list al) { abort(); } -static void badusage(const char *fmt, ...) - __attribute__((__noreturn__,__format__(printf,1,2))); +static void badusage(const char *fmt, ...) NORET_PRINTF(1,2); static void badusage(const char *fmt, ...) { va_list al; va_start(al,fmt); @@ -2436,9 +2488,11 @@ struct Option { const char *lng; void *store; OptionParser *fn; - int noarg; + int noarg, intval; }; +/*---------- specific option types ----------*/ + static void op_integer(const Option *o, const char *val) { char *ep; errno= 0; @@ -2459,14 +2513,14 @@ static void op_double(const Option *o, const char *val) { } static void op_string(const Option *o, const char *val) { - char **store= o->store; - free(*store); + const char **store= o->store; *store= val; } static void op_seconds(const Option *o, const char *val) { int *store= o->store; char *ep; + int unit; double v= strtod(val,&ep); if (ep==val) badusage("bad time/duration value for %s",o->lng); @@ -2486,40 +2540,37 @@ static void op_seconds(const Option *o, const char *val) { static void op_periods_rndup(const Option *o, const char *val) { int *store= o->store; op_seconds(o,val); - *store += PERIOD_SECONDS-1; - *store /= PERIOD_SECONDS; + *store += period_seconds-1; + *store /= period_seconds; } -static void op_periods_booltrue(const Option *o, const char *val) { +static void op_setint(const Option *o, const char *val) { int *store= o->store; - *store= 1; -} -static void op_periods_boolfalse(const Option *o, const char *val) { - int *store= o->store; - *store= 0; + *store= o->intval; } +/*---------- specific options ----------*/ + static const Option options[]= { {'f',"feedfile", &feedfile, op_string }, -{'q',"quiet-multiple", &quiet_multiple, op_booltrue, 1 }, +{'q',"quiet-multiple", &quiet_multiple, op_setint, 1,1 }, -{ 0, "max-connections", &max_connections op_integer }, -{ 0, "max-queue-per-conn", &max_queue_per_conn op_integer }, +{ 0, "max-connections", &max_connections, op_integer }, +{ 0, "max-queue-per-conn", &max_queue_per_conn, op_integer }, -{ 0, "streaming", &try_stream, op_booltrue, 1 }, -{ 0, "no-streaming", &try_stream, op_boolfalse, 1 }, -{'P',"port", &port op_integer }, +{ 0, "streaming", &try_stream, op_setint, 1,1 }, +{ 0, "no-streaming", &try_stream, op_setint, 1,0 }, +{'P',"port", &port, op_integer }, { 0, "inndconf", &inndconffile, op_string }, -{'d',"daemon", &become_daemon, op_booltrue, 1 }, -{ 0, "no-daemon", &become_daemon, op_boolfalse, 1 }, +{'d',"daemon", &become_daemon, op_setint, 1,1 }, +{ 0, "no-daemon", &become_daemon, op_setint, 1,0 }, { 0, "no-check-proportion", &nocheck_thresh_pct, op_double }, { 0, "no-check-filter", &nocheck_decay_articles, op_double }, { 0, "reconnect-interval", &reconnect_delay_periods, op_periods_rndup }, { 0, "flush-retry-interval", &flushfail_retry_periods, op_periods_rndup }, -{ 0, "connection-timeout", &connection_timeout, op_seconds }, { 0, "inndcomm-timeout", &inndcomm_flush_timeout, op_seconds }, }; @@ -2606,13 +2657,15 @@ int main(int argc, char **argv) { if (strchr(feedfile, c)) badusage("feed filename may not contain metacharacter %c",c); - loop= oop_sys_new(); - if (!loop) sysdie("could not create liboop event loop"); + oop_source_sys *sysloop= oop_sys_new(); + if (!sysloop) sysdie("could not create liboop event loop"); + loop= (oop_source*)sysloop; if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) sysdie("could not ignore SIGPIPE"); if (become_daemon) { + int i; for (i=3; i<255; i++) /* do this now before we open syslog, etc. */ close(i); @@ -2637,14 +2690,17 @@ int main(int argc, char **argv) { notice("starting"); - if (!filemon_init()) { + if (!filemon_method_init()) { warn("no file monitoring available, polling"); filepoll_schedule(); } + period_timeout.tv_sec= period_seconds; period_schedule(); statemc_init(); - loop->execute. + void *r= oop_sys_run(sysloop); + assert(r == OOP_ERROR); + sysdie("event loop failed"); }