X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;f=backends%2Finnduct.c;h=385c6416cb59b7fa1f8f850c5613c17f7dffb03b;hb=a4a53bbbbcca1a6a6022d1efd7d02dd49be2f6f2;hp=968a177be7eeb893c4056a57a6fa65f68d4cdd9d;hpb=b54c887b2cb991ab68d6d70984128f425b54eabc;p=innduct.git diff --git a/backends/innduct.c b/backends/innduct.c index 968a177..385c641 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,8 +1,9 @@ /* * TODO - * - close idle connections - * - cope better with garbage in feed file - * - cope better with NULs in feed file + * - actually implement badusage + * - options for all options + * - manpage + * - pid, sitename, hostname in lockfile * - -k kill mode ? */ @@ -155,13 +156,16 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. /*============================== PROGRAM ==============================*/ -#define _GNU_SOURCE +#define _GNU_SOURCE 1 -#include "inn/list.h" #include "config.h" #include "storage.h" #include "nntp.h" #include "libinn.h" +#include "inndcomm.h" + +#include "inn/list.h" +#include "inn/innconf.h" #include #include @@ -179,14 +183,14 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #include #include #include +#include +#include #include #include /*----- general definitions, probably best not changed -----*/ -#define PERIOD_SECONDS 60 - #define CONNCHILD_ESTATUS_STREAM 4 #define CONNCHILD_ESTATUS_NOSTREAM 5 @@ -195,6 +199,10 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10) +#define VA va_list al; va_start(al,fmt) +#define PRINTF(f,a) __attribute__((__format__(printf,f,a))) +#define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a))) + /*----- doubly linked lists -----*/ #define ISNODE(T) struct { T *succ, *pred; } node /* must be at start */ @@ -227,6 +235,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead) #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail) +#define LIST_INIT(l) ((l).hd, list_new((struct list*)&(l))) #define LIST_HEAD(l) ((typeof((l).hd))(list_head((struct list*)&(l)))) #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n)))) #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n)))) @@ -264,12 +273,15 @@ static void *conn_write_some_xmits(Conn *conn); static void xmit_free(XmitDetails *d); +#define SMS(newstate, periods, why) \ + (statemc_setstate(sm_##newstate,(periods),#newstate,(why))) static void statemc_setstate(StateMachineState newsms, int periods, const char *forlog, const char *why); + static void statemc_start_flush(const char *why); /* Normal => Flushing */ static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */ -static void check_master_queue(void); +static void check_assign_articles(void); static void queue_check_input_done(void); static void statemc_check_flushing_done(void); @@ -282,23 +294,27 @@ static void open_defer(void); static void close_defer(void); static void search_backlog_file(void); -static void inputfile_tailing_start(InputFile *ipf); -static void inputfile_tailing_stop(InputFile *ipf); +static void inputfile_reading_start(InputFile *ipf); +static void inputfile_reading_stop(InputFile *ipf); -static int filemon_init(void); static void filemon_start(InputFile *ipf); static void filemon_stop(InputFile *ipf); static void filemon_callback(InputFile *ipf); +static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0); +static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3); + /*----- configuration options -----*/ -static char *sitename, *feedfile; +static const char *sitename, *feedfile, *pathoutgoing; static const char *remote_host; static int quiet_multiple=0, become_daemon=1; static int max_connections=10, max_queue_per_conn=200; static int target_max_feedfile_size=100000; +static int period_seconds=60; + static double max_bad_data_ratio= 0.01; static int max_bad_data_initial= 30; /* in one corrupt 4096-byte block the number of newlines has @@ -308,6 +324,7 @@ static int connection_setup_timeout=200, port=119, try_stream=1; static int inndcomm_flush_timeout=100; static int reconnect_delay_periods, flushfail_retry_periods, open_wait_periods; static int backlog_retry_minperiods, backlog_spontaneous_rescan_periods; +static int spontaneous_flush_periods, need_activity_periods; static const char *inndconffile; static double nocheck_thresh_pct= 95.0; @@ -377,7 +394,7 @@ struct InputFile { int fd; Filemon_Perfile *filemon; - oop_read *rd; + oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */ long inprogress; /* no. of articles read but not processed */ off_t offset; int skippinglong; @@ -419,7 +436,9 @@ static const char *sms_names[]= { struct Conn { ISNODE(Conn); - int fd, max_queue, stream, quitting; + int fd; /* may be 0, meaning closed (during construction/destruction) */ + int max_queue, stream, quitting; + int since_activity; /* periods */ ArticleList waiting; /* not yet told peer */ ArticleList priority; /* peer says send it now */ ArticleList sent; /* offered/transmitted - in xmit or waiting reply */ @@ -429,34 +448,30 @@ struct Conn { }; -/*----- operational variables -----*/ +/*----- general operational variables -----*/ +/* main initialises */ static oop_source *loop; - -static int until_connect; static ConnList conns; static ArticleList queue; - static char *path_lock, *path_flushing, *path_defer, *globpat_backlog; -#define SMS(newstate, periods, why) \ - (statemc_setstate(sm_##newstate,(periods),#newstate,(why))) - +/* statemc_init initialises */ static StateMachineState sms; static FILE *defer; static InputFile *main_input_file, *flushing_input_file, *backlog_input_file; -static int sm_period_counter, until_backlog_nextscan; +static int sm_period_counter; +/* initialisation to 0 is good */ +static int until_connect, until_backlog_nextscan; static double accept_proportion; static int nocheck, nocheck_reported; /*========== logging ==========*/ -static void logcore(int sysloglevel, const char *fmt, ...) - __attribute__((__format__(printf,2,3))); +static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3); static void logcore(int sysloglevel, const char *fmt, ...) { - va_list al; - va_start(al,fmt); + VA; if (become_daemon) { vsyslog(sysloglevel,fmt,al); } else { @@ -467,10 +482,9 @@ static void logcore(int sysloglevel, const char *fmt, ...) { } static void logv(int sysloglevel, const char *pfx, int errnoval, - int exitstatus, const char *fmt, va_list al) - __attribute__((__format__(printf,5,0))); + const char *fmt, va_list al) PRINTF(5,0); static void logv(int sysloglevel, const char *pfx, int errnoval, - int exitstatus, const char *fmt, va_list al) { + const char *fmt, va_list al) { char msgbuf[256]; /* NB do not call xvasprintf here or you'll recurse */ vsnprintf(msgbuf,sizeof(msgbuf), fmt,al); msgbuf[sizeof(msgbuf)-1]= 0; @@ -484,50 +498,68 @@ static void logv(int sysloglevel, const char *pfx, int errnoval, errnoval>=0 ? strerror(errnoval) : ""); } -#define logwrap(fn, pfx, sysloglevel, err, estatus) \ - static void fn(const char *fmt, ...) \ - __attribute__((__format__(printf,1,2))); \ +#define diewrap(fn, pfx, sysloglevel, err, estatus) \ + static void fn(const char *fmt, ...) NORET_PRINTF(1,2); \ + static void fn(const char *fmt, ...) { \ + VA; \ + logv(sysloglevel, pfx, err, fmt, al); \ + exit(estatus); \ + } + +#define logwrap(fn, pfx, sysloglevel, err) \ + static void fn(const char *fmt, ...) PRINTF(1,2); \ static void fn(const char *fmt, ...) { \ - va_list al; \ - va_start(al,fmt); \ - logv(sysloglevel, pfx, err, estatus, fmt, al); \ + VA; \ + logv(sysloglevel, pfx, err, fmt, al); \ + va_end(al); \ } -logwrap(sysdie, " critical", LOG_CRIT, errno, 16); -logwrap(die, " critical", LOG_CRIT, -1, 16); +diewrap(sysdie, " critical", LOG_CRIT, errno, 16); +diewrap(die, " critical", LOG_CRIT, -1, 16); -logwrap(sysfatal, " fatal", LOG_ERR, errno, 12); -logwrap(fatal, " fatal", LOG_ERR, -1, 12); +diewrap(sysfatal, " fatal", LOG_ERR, errno, 12); +diewrap(fatal, " fatal", LOG_ERR, -1, 12); -logwrap(syswarn, " warning", LOG_WARNING, errno, 0); -logwrap(warn, " warning", LOG_WARNING, -1, 0); +logwrap(syswarn, " warning", LOG_WARNING, errno); +logwrap(warn, " warning", LOG_WARNING, -1); -logwrap(notice, "", LOG_NOTICE, -1, 0); -logwrap(info, " info", LOG_INFO, -1, 0); -logwrap(debug, " debug", LOG_DEBUG, -1, 0); +logwrap(notice, "", LOG_NOTICE, -1); +logwrap(info, " info", LOG_INFO, -1); +logwrap(debug, " debug", LOG_DEBUG, -1); /*========== utility functions etc. ==========*/ -static char *xvasprintf(const char *fmt, va_list al) - __attribute__((__format__(printf,1,0))); +static char *xvasprintf(const char *fmt, va_list al) PRINTF(1,0); static char *xvasprintf(const char *fmt, va_list al) { char *str; int rc= vasprintf(&str,fmt,al); if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt); return str; } -static char *xasprintf(const char *fmt, ...) - __attribute__((__format__(printf,1,2))); +static char *xasprintf(const char *fmt, ...) PRINTF(1,2); static char *xasprintf(const char *fmt, ...) { - va_list al; - va_start(al,fmt); + VA; char *str= xvasprintf(fmt,al); va_end(al); return str; } -static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } } +static int close_perhaps(int *fd) { + if (!*fd) return 0; + int r= close(*fd); + *fd=0; + return r; +} +static void xclose(int fd, const char *what, const char *what2) { + int r= close(fd); + if (r) sysdie("close %s%s",what,what2?what2:""); +} +static void xclose_perhaps(int *fd, const char *what, const char *what2) { + if (!*fd) return; + xclose(*fd,what,what2); + *fd=0; +} static pid_t xfork(const char *what) { pid_t child; @@ -586,6 +618,11 @@ static void xunlink(const char *path, const char *what) { if (r) sysdie("can't unlink %s %s", path, what); } +static time_t xtime(void) { + time_t now= time(0); + if (now==-1) sysdie("time(2) failed"); +} + static void check_isreg(const struct stat *stab, const char *path, const char *what) { if (!S_ISREG(stab->st_mode)) @@ -647,15 +684,119 @@ static char *sanitise(const char *input) { return sanibuf; } -/*========== making new connections ==========*/ +static int isewouldblock(int errnoval) { + return errnoval==EWOULDBLOCK || errnoval==EAGAIN; +} + +/*========== management of connections ==========*/ static void conn_dispose(Conn *conn) { if (!conn) return; - perhaps_close(&conn->fd); + if (conn->fd) { + loop->cancel_fd(loop, conn->fd, OOP_WRITE); + loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION); + } + int r= close_perhaps(&conn->fd); + if (r) info("C%d error closing socket: %s", conn->fd, strerror(errno)); free(conn); until_connect= reconnect_delay_periods; } +static void *conn_exception(oop_source *lp, int fd, + oop_event ev, void *conn_v) { + Conn *conn= conn_v; + unsigned char ch; + assert(fd == conn->fd); + assert(ev == OOP_EXCEPTION); + int r= read(conn->fd, &ch, 1); + if (r<0) connfail(conn,"read failed: %s",strerror(errno)); + else connfail(conn,"exceptional condition on socket (peer sent urgent" + " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch); + return OOP_CONTINUE; +} + +static void vconnfail(Conn *conn, const char *fmt, va_list al) { + int requeue[art_MaxState]; + + Article *art; + while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art); + while ((art= LIST_REMHEAD(conn->waiting))) LIST_ADDTAIL(queue, art); + while ((art= LIST_REMHEAD(conn->sent))) { + requeue[art->state]++; + if (art->state==art_Unsolicited) art->state= art_Unchecked; + LIST_ADDTAIL(queue,art); + } + + int i; + XmitDetails *d; + for (i=0, d=conn->xmitd; ixmitu; i++, d++) + xmit_free(d); + + char *m= xvasprintf(fmt,al); + warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s", + conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); + free(m); + + LIST_REMOVE(conns,conn); + conn_dispose(conn); + check_assign_articles(); +} + +static void connfail(Conn *conn, const char *fmt, ...) { + va_list al; + va_start(al,fmt); + vconnfail(conn,fmt,al); + va_end(al); +} + +static void check_idle_conns(void) { + Conn *conn; + for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) + conn->since_activity++; + search_again: + for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) { + if (conn->since_activity <= need_activity_periods) continue; + + /* We need to shut this down */ + if (conn->quitting) + connfail(conn,"timed out waiting for response to QUIT"); + else if (conn->sent.count) + connfail(conn,"timed out waiting for responses"); + else if (conn->waiting.count || conn->priority.count) + connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent"); + else if (conn->xmitu) + connfail(conn,"peer has been sending responses" + " before receiving our commands!"); + else { + static const char quitcmd[]= "QUIT\r\n"; + int todo= sizeof(quitcmd)-1; + const char *p= quitcmd; + for (;;) { + int r= write(conn->fd, p, todo); + if (r<0) { + if (isewouldblock(errno)) + connfail(conn, "blocked writing QUIT to idle connection"); + else + connfail(conn, "failed to write QUIT to idle connection: %s", + strerror(errno)); + break; + } + assert(r<=todo); + todo -= r; + if (!todo) { + conn->quitting= 1; + conn->since_activity= 0; + debug("C%d is idle, quitting", conn->fd); + break; + } + } + } + goto search_again; + } +} + +/*---------- making new connections ----------*/ + static int connecting_sockets[2]= {-1,-1}; static pid_t connecting_child; @@ -663,8 +804,8 @@ static void connect_attempt_discard(void) { if (connecting_sockets[0]) cancel_fd_read_except(connecting_sockets[0]); - perhaps_close(&connecting_sockets[0]); - perhaps_close(&connecting_sockets[1]); + xclose_perhaps(&connecting_sockets[0], "connecting socketpair (read)",0); + xclose_perhaps(&connecting_sockets[1], "connecting socketpair (write)",0); if (connecting_child) { int r= kill(connecting_child, SIGTERM); @@ -700,16 +841,18 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { if (got != -1) { assert(got==connecting_child); connecting_child= 0; - if (WIFEXITED(status) && - (WEXITSTATUS(status) != 0 && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)) { - /* child already reported the problem */ + if (WIFEXITED(status)) { + if (WEXITSTATUS(status) != 0 && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM) + /* child already reported the problem */; + else + warn("connect: connection child exited code %d but no cmsg", + WEXITSTATUS(status)); } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { warn("connect: connection attempt timed out"); - } else if (!WIFEXITED(status)) { + } else { report_child_status("connect", status); - /* that's probably the root cause then */ } } else { /* child is still running apparently, report the socket problem */ @@ -717,8 +860,10 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { syswarn("connect: read from child socket failed"); else if (e == OOP_EXCEPTION) warn("connect: unexpected exception on child socket"); - else + else if (!rs) warn("connect: unexpected EOF on child socket"); + else + fatal("connect: unexpected lack of cmsg from child"); } goto x; } @@ -733,9 +878,10 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { CHK(len, CMSG_LEN(sizeof(conn->fd))); #undef CHK - if (CMSG_NXTHDR(&msg,h)) { die("connect: child sent many cmsgs"); goto x; } + if (CMSG_NXTHDR(&msg,h)) die("connect: child sent many cmsgs"); memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd)); + loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn); int status; pid_t got= waitpid(connecting_child, &status, 0); @@ -753,12 +899,16 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { } /* Phew! */ + LIST_INIT(conn->waiting); + LIST_INIT(conn->priority); + LIST_INIT(conn->sent); setnonblock(conn->fd, 1); conn->max_queue= conn->stream ? max_queue_per_conn : 1; - LIST_ADDHEAD(conns, conn); notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain"); + LIST_ADDHEAD(conns, conn); + connect_attempt_discard(); - check_master_queue(); + check_assign_articles(); return 0; x: @@ -862,16 +1012,15 @@ static void connect_start(void) { connect_attempt_discard(); } +/*---------- assigning articles to conns, and transmitting ----------*/ -/*========== overall control of article flow ==========*/ - -static void check_master_queue(void) { +static void check_assign_articles(void) { for (;;) { if (!queue.count) break; Conn *walk, *use=0; - int spare; + int spare, inqueue; /* Find a connection to offer this article. We prefer a busy * connection to an idle one, provided it's not full. We take the @@ -880,8 +1029,9 @@ static void check_master_queue(void) { * connections, the spare ones will go away eventually. */ for (walk=LIST_HEAD(conns); walk; walk=LIST_NEXT(walk)) { - int inqueue= walk->sent.count + walk->priority.count - + walk->waiting.count; + if (walk->quitting) continue; + inqueue= walk->sent.count + walk->priority.count + + walk->waiting.count; spare= walk->max_queue - inqueue; assert(inqueue <= max_queue_per_conn); assert(spare >= 0); @@ -889,6 +1039,7 @@ static void check_master_queue(void) { else if (spare>0) /*working*/ { use= walk; break; } } if (use) { + if (!inqueue) use->since_activity= 0; /* reset idle counter */ while (spare>0) { Article *art= LIST_REMHEAD(queue); LIST_ADDTAIL(use->waiting, art); @@ -933,45 +1084,6 @@ static void conn_maybe_write(Conn *conn) { } } -static void vconnfail(Conn *conn, const char *fmt, va_list al) - __attribute__((__format__(printf,2,0))); - -static void vconnfail(Conn *conn, const char *fmt, va_list al) { - int requeue[art_MaxState]; - - Article *art; - while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art); - while ((art= LIST_REMHEAD(conn->waiting))) LIST_ADDTAIL(queue, art); - while ((art= LIST_REMHEAD(conn->sent))) { - requeue[art->state]++; - if (art->state==art_Unsolicited) art->state= art_Unchecked; - LIST_ADDTAIL(queue,art); - } - - int i; - XmitDetails *d; - for (i=0, d=conn->xmitd; ixmitu; i++, d++) - xmit_free(d); - - char *m= xvasprintf(fmt,al); - warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s", - conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); - free(m); - - LIST_REMOVE(conns,conn); - conn_dispose(conn); - check_master_queue(); -} - -static void connfail(Conn *conn, const char *fmt, ...) - __attribute__((__format__(printf,2,3))); -static void connfail(Conn *conn, const char *fmt, ...) { - va_list al; - va_start(al,fmt); - vconnfail(conn,fmt,al); - va_end(al); -} - /*========== article transmission ==========*/ static XmitDetails *xmit_core(Conn *conn, const char *data, int len, @@ -1016,7 +1128,7 @@ static void *conn_write_some_xmits(Conn *conn) { if (count > IOV_MAX) count= IOV_MAX; ssize_t rs= writev(conn->fd, conn->xmit, count); if (rs < 0) { - if (errno == EAGAIN) return OOP_CONTINUE; + if (isewouldblock(errno)) return OOP_CONTINUE; connfail(conn, "write failed: %s", strerror(errno)); return OOP_HALT; } @@ -1191,6 +1303,7 @@ static void article_done(Conn *conn, Article *art, int whichcount) { else if (whichcount == RC_unwanted) update_nocheck(0); InputFile *ipf= art->ipf; + while (art->blanklen) { static const char spaces[]= " " @@ -1211,11 +1324,10 @@ static void article_done(Conn *conn, Article *art, int whichcount) { ipf->inprogress--; assert(ipf->inprogress >= 0); + free(art); if (!ipf->inprogress && ipf != main_input_file) queue_check_input_done(); - - free(art); } static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, @@ -1242,7 +1354,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, if (code!=205 && code!=503) { connfail(conn, "peer gave unexpected response to QUIT: %s", sani); } else { - notice("C%d idle connection closed\n"); + notice("C%d idle connection closed", conn->fd); assert(!conn->waiting.count); assert(!conn->priority.count); assert(!conn->sent.count); @@ -1253,6 +1365,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, return OOP_CONTINUE; } + conn->since_activity= 0; Article *art; #define GET_ARTICLE(musthavesent) \ @@ -1307,7 +1420,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, } conn_maybe_write(conn); - check_master_queue(); + check_assign_articles(); return OOP_CONTINUE; } @@ -1316,15 +1429,11 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, static void feedfile_eof(InputFile *ipf) { assert(ipf != main_input_file); /* promised by tailing_try_read */ - - inputfile_tailing_stop(ipf); - assert(ipf->fd >= 0); - if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path); - ipf->fd= -1; + inputfile_reading_stop(ipf); if (ipf == flushing_input_file) { assert(sms==sm_SEPARATED || sms==sm_DROPPING); - if (main_input_file) inputfile_tailing_start(main_input_file); + if (main_input_file) inputfile_reading_start(main_input_file); statemc_check_flushing_done(); } else if (ipf == backlog_input_file) { statemc_check_backlog_done(); @@ -1339,6 +1448,7 @@ static InputFile *open_input_file(const char *path) { if (errno==ENOENT) return 0; sysfatal("unable to open input file %s", path); } + assert(fd>0); InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1); memset(ipf,0,sizeof(*ipf)); @@ -1349,14 +1459,12 @@ static InputFile *open_input_file(const char *path) { return ipf; } -static void close_input_file(InputFile *ipf) { +static void close_input_file(InputFile *ipf) { /* does not free */ assert(!ipf->readable_callback); /* must have had ->on_cancel */ - assert(!ipf->filemon); /* must have had inputfile_tailing_stop */ - assert(!ipf->rd); /* must have had inputfile_tailing_stop */ + assert(!ipf->filemon); /* must have had inputfile_reading_stop */ + assert(!ipf->rd); /* must have had inputfile_reading_stop */ assert(!ipf->inprogress); /* no dangling pointers pointing here */ - - if (ipf->fd >= 0) - if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path); + xclose_perhaps(&ipf->fd, "input file ", ipf->path); } @@ -1436,12 +1544,12 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, ipf->readcount_ok++; art= xmalloc(sizeof(*art) - 1 + midlen + 1); - art->offset= ipf->offset; - art->blanklen= recsz; - art->midlen= midlen; art->state= art_Unchecked; + art->midlen= midlen; art->ipf= ipf; ipf->inprogress++; art->token= TextToToken(tokentextbuf); + art->offset= ipf->offset; + art->blanklen= recsz; strcpy(art->messageid, space+1); LIST_ADDTAIL(queue, art); @@ -1449,7 +1557,7 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, ipf->offset >= target_max_feedfile_size) statemc_start_flush("feed file size"); - check_master_queue(); + check_assign_articles(); return OOP_CONTINUE; } @@ -1503,10 +1611,10 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, errno=EAGAIN; return -1; } else if (ipf==flushing_input_file) { - assert(ipf->fd>=0); + assert(ipf->rd); assert(sms==sm_SEPARATED || sms==sm_DROPPING); } else if (ipf==backlog_input_file) { - assert(ipf->fd>=0); + assert(ipf->rd); } else { abort(); } @@ -1566,7 +1674,7 @@ static void *filemon_inotify_readable(oop_source *lp, int fd, for (;;) { int r= read(filemon_inotify_fd, &iev, sizeof(iev)); if (r==-1) { - if (errno==EAGAIN) break; + if (isewouldblock(errno)) break; sysdie("read from inotify master"); } else if (r==sizeof(iev)) { assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax); @@ -1636,8 +1744,8 @@ static const oop_rd_style feedfile_rdstyle= { OOP_RD_SHORTREC_LONG, }; -static void inputfile_tailing_start(InputFile *ipf) { - assert(!ipf->fd); +static void inputfile_reading_start(InputFile *ipf) { + assert(!ipf->rd); ipf->readable.on_readable= tailing_on_readable; ipf->readable.on_cancel= tailing_on_cancel; ipf->readable.try_read= tailing_try_read; @@ -1648,15 +1756,15 @@ static void inputfile_tailing_start(InputFile *ipf) { ipf->readable_callback_user= 0; ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0); - assert(ipf->fd); + assert(ipf->rd); int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE, feedfile_got_article,ipf, feedfile_read_err, ipf); if (r) sysdie("unable start reading feedfile %s",ipf->path); } -static void inputfile_tailing_stop(InputFile *ipf) { - assert(ipf->fd); +static void inputfile_reading_stop(InputFile *ipf) { + assert(ipf->rd); oop_rd_cancel(ipf->rd); oop_rd_delete(ipf->rd); ipf->rd= 0; @@ -1723,7 +1831,7 @@ static void inputfile_tailing_stop(InputFile *ipf) { | V | | V | ============= V V ============ | SEPARATED-1 | | DROPPING-1 - | flsh->fd>=0 | | flsh->fd>=0 + | flsh->fd>0 | | flsh->fd>0 | [Separated] | | [Dropping] | main F idle | | main none | old D tail | | old D tail @@ -1733,7 +1841,7 @@ static void inputfile_tailing_stop(InputFile *ipf) { | V | | V | =============== | | =============== | SEPARATED-2 | | DROPPING-2 - | flsh->fd==-1 | V flsh->fd==-1 + | flsh->fd==0 | V flsh->fd==0 | [Finishing] | | [Dropping] | main F tail | `. main none | old D closed | `. old D closed @@ -1768,7 +1876,7 @@ static void inputfile_tailing_stop(InputFile *ipf) { static void startup_set_input_file(InputFile *f) { assert(!main_input_file); main_input_file= f; - inputfile_tailing_start(f); + inputfile_reading_start(f); } static void statemc_init(void) { @@ -1789,7 +1897,7 @@ static void statemc_init(void) { fl.l_whence= SEEK_SET; int r= fcntl(lockfd, F_SETLK, &fl); if (r==-1) { - if (errno==EACCES || errno==EAGAIN) { + if (errno==EACCES || isewouldblock(errno)) { if (quiet_multiple) exit(0); fatal("another duct holds the lockfile"); } @@ -1905,7 +2013,7 @@ static void statemc_period_poll(void) { static int inputfile_is_done(InputFile *ipf) { if (!ipf) return 0; if (ipf->inprogress) return 0; /* new article in the meantime */ - if (ipf->fd >= 0); return 0; /* not had EOF */ + if (ipf->rd) return 0; /* not had EOF */ return 1; } @@ -2007,7 +2115,7 @@ static void statemc_setstate(StateMachineState newsms, int periods, if (!main_input_file) xtra= "-ABSENT"; break; case sm_SEPARATED: case sm_DROPPING: - xtra= flushing_input_file->fd >= 0 ? "-1" : "-2"; + xtra= flushing_input_file->rd ? "-1" : "-2"; break; default:; } @@ -2079,13 +2187,13 @@ static void close_defer(void) { if (!defer) return; - xfstat(fileno(defer), &stab, path_defer, "defer file"); + struct stat stab; + xfstat_isreg(fileno(defer), &stab, path_defer, "defer file"); if (fclose(defer)) sysfatal("could not close defer file %s", path_defer); defer= 0; - time_t now= time(0); - if (now==-1) sysdie("time(2) failed"); + time_t now= xtime(); char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile, (unsigned long)now, @@ -2114,12 +2222,12 @@ static void search_backlog_file(void) { /* returns non-0 iff there are any backlog files */ glob_t gl; - int r; + int r, i; struct stat stab; const char *oldest_path=0; time_t oldest_mtime, now; - if (backlog_input_file) return 3; + if (backlog_input_file) return; try_again: @@ -2170,12 +2278,12 @@ static void search_backlog_file(void) { exit(0); } until_backlog_nextscan= backlog_spontaneous_rescan_periods; - return 0; + return; } - now= time(); if (now==-1) sysdie("time(2) failed"); + now= xtime(); double age= difftime(now, oldest_mtime); - long age_deficiency= (backlog_retry_minperiods * PERIOD_SECONDS) - age; + long age_deficiency= (backlog_retry_minperiods * period_seconds) - age; if (age_deficiency <= 0) { debug("backlog scan: found age=%f deficiency=%ld oldest=%s", @@ -2186,12 +2294,12 @@ static void search_backlog_file(void) { warn("backlog file %s vanished as we opened it", backlog_input_file); goto try_again; } - inputfile_tailing_start(backlog_input_file); + inputfile_reading_start(backlog_input_file); until_backlog_nextscan= -1; - return 1; + return; } - until_backlog_nextscan= age_deficiency / PERIOD_SECONDS; + until_backlog_nextscan= age_deficiency / period_seconds; if (backlog_spontaneous_rescan_periods >= 0 && until_backlog_nextscan > backlog_spontaneous_rescan_periods) @@ -2199,7 +2307,7 @@ static void search_backlog_file(void) { debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s", age, age_deficiency, until_backlog_nextscan, oldest_path); - return 2; + return; } /*========== flushing the feed ==========*/ @@ -2209,7 +2317,7 @@ static pid_t inndcomm_child; static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { assert(inndcomm_child); int status= xwaitpid(&inndcomm_child, "inndcomm"); - loop->cancel_fd(fd); + cancel_fd_read_except(fd); close(fd); assert(!flushing_input_file); @@ -2287,11 +2395,12 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ if (pipe(pipefds)) sysdie("create pipe for inndcomm child sentinel"); - inndcomm_child= xfork(); + inndcomm_child= xfork("inndcomm child"); if (!inndcomm_child) { - static char flushargv[2]= { sitename, 0 }; + const char *flushargv[2]= { sitename, 0 }; char *reply; + int r; close(pipefds[0]); @@ -2316,14 +2425,13 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ static void postfork_inputfile(InputFile *ipf) { if (!ipf) return; - assert(ipf->fd >= 0); - close(ipf->fd); - ipf->fd= -1; + xclose(ipf->fd, "(in child) input file ", ipf->path); } -static void postfork_stdio(FILE *f) { +static void postfork_stdio(FILE *f, const char *what, const char *what2) { /* we have no stdio streams that are buffered long-term */ - if (f) fclose(f); + if (!f) return; + if (fclose(f)) sysdie("close (in child) %s%s", what, what2?what2:0); } static void postfork(const char *what) { @@ -2335,13 +2443,13 @@ static void postfork(const char *what) { Conn *conn; for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) - close(conn->fd); + close_perhaps(&conn->fd); - postfork_stdio(defer); + postfork_stdio(defer, "defer file ", path_defer); } -#define EVERY(what, interval, body) \ - static const struct timeval what##_timeout = { 5, 0 }; \ +#define EVERY(what, interval_sec, interval_usec, body) \ + static struct timeval what##_timeout = { interval_sec, interval_usec }; \ static void what##_schedule(void); \ static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \ { body } \ @@ -2351,13 +2459,13 @@ static void postfork(const char *what) { loop->on_time(loop, what##_timeout, what##_timedout, 0); \ } -EVERY(filepoll, {5,0}, { +EVERY(filepoll, 5,0, { if (main_input_file && main_input_file->readable_callback) filemon_callback(main_input_file); }); -#define DEBUGF_IPF(wh) " " #wh "=%p/%s:ip=%ld,off=%ld,fd=%d%s" \ -#define DEBUG_IPF(sh) \ +#define DEBUGF_IPF(wh) " " #wh "=%p/%s:ip=%ld,off=%ld,fd=%d%s" +#define DEBUG_IPF(wh) \ wh##_input_file, debug_ipf_path(wh##_input_file), \ wh##_input_file->inprogress, (long)wh##_input_file->offset, \ wh##_input_file->fd, wh##_input_file->rd ? "+" : "" @@ -2366,10 +2474,10 @@ static const char *debug_ipf_path(InputFile *ipf) { return slash ? slash+1 : ipf->path; } -EVERY(period, {PERIOD_SECONDS,0}, { +EVERY(period, -1,0, { debug("PERIOD" " sms=%s[%d] conns=%d queue=%d until_connect=%d" - " input_files" DEBUGF_IPF(main) DEBUGF_IPF(old) DEBUGF_FMT(flushing) + " input_files" DEBUGF_IPF(main) DEBUGF_IPF(old) DEBUGF_IPF(flushing) " children connecting=%ld inndcomm_child" , sms_names[sms], sm_period_counter, @@ -2383,109 +2491,122 @@ EVERY(period, {PERIOD_SECONDS,0}, { poll_backlog_file(); if (!backlog_input_file) close_defer(); /* want to start on a new backlog */ statemc_period_poll(); - check_master_queue(); + check_assign_articles(); + check_idle_conns(); }); /*========== option parsing ==========*/ +/*---------- generic option parser and logging ----------*/ + +static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0); +static void vbadusage(const char *fmt, va_list al) { + abort(); +} +static void badusage(const char *fmt, ...) NORET_PRINTF(1,2); +static void badusage(const char *fmt, ...) { + va_list al; + va_start(al,fmt); + vbadusage(fmt,al); +} + enum OptFlags { - of_seconds= 001000u; - of_boolean= 002000u; + of_seconds= 001000u, + of_boolean= 002000u, }; typedef struct Option Option; typedef void OptionParser(const Option*, const char *val); struct Option { - int short; - const char *long; + int shrt; + const char *lng; void *store; OptionParser *fn; - int noarg; + int noarg, intval; }; -void op_integer(const Option *o, const char *val) { +/*---------- specific option types ----------*/ + +static void op_integer(const Option *o, const char *val) { char *ep; errno= 0; unsigned long ul= strtoul(val,&ep,10); if (*ep || ep==val || errno || ul>INT_MAX) - badusage("bad integer value for %s",o->long); + badusage("bad integer value for %s",o->lng); int *store= o->store; *store= ul; } -void op_double(const Option *o, const char *val) { +static void op_double(const Option *o, const char *val) { int *store= o->store; char *ep; errno= 0; *store= strtod(val, &ep); if (*ep || ep==val || errno) - badusage("bad floating point value for %s",o->long); + badusage("bad floating point value for %s",o->lng); } -void op_string(const Option *o, const char *val) { - char **store= o->store; - free(*store); +static void op_string(const Option *o, const char *val) { + const char **store= o->store; *store= val; } -void op_seconds(const Option *o, const char *val) { +static void op_seconds(const Option *o, const char *val) { int *store= o->store; char *ep; + int unit; double v= strtod(val,&ep); - if (ep==val) badusage("bad time/duration value for %s",o->long); + if (ep==val) badusage("bad time/duration value for %s",o->lng); if (!*ep || !strcmp(ep,"s")) unit= 1; else if (!strcmp(ep,"m")) unit= 60; else if (!strcmp(ep,"h")) unit= 3600; else if (!strcmp(ep,"d")) unit= 86400; - else badusage("bad units %s for time/duration value for %s",ep,o->long); + else badusage("bad units %s for time/duration value for %s",ep,o->lng); v *= unit; v= ceil(v); - if (v > INT_MAX) badusage("time/duration value for %s out of range",o->long); + if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng); *store= v; } -void op_periods_rndup(const Option *o, const char *val) { +static void op_periods_rndup(const Option *o, const char *val) { int *store= o->store; op_seconds(o,val); - *store += PERIOD_SECONDS-1; - *store /= PERIOD_SECONDS; + *store += period_seconds-1; + *store /= period_seconds; } -void op_periods_booltrue(const Option *o, const char *val) { +static void op_setint(const Option *o, const char *val) { int *store= o->store; - *store= 1; -} -void op_periods_boolfalse(const Option *o, const char *val) { - int *store= o->store; - *store= 0; + *store= o->intval; } +/*---------- specific options ----------*/ + static const Option options[]= { {'f',"feedfile", &feedfile, op_string }, -{'q',"quiet-multiple", &quiet_multiple, op_booltrue, 1 }, +{'q',"quiet-multiple", &quiet_multiple, op_setint, 1,1 }, -{ 0, "max-connections", &max_connections op_integer }, -{ 0, "max-queue-per-conn", &max_queue_per_conn op_integer }, +{ 0, "max-connections", &max_connections, op_integer }, +{ 0, "max-queue-per-conn", &max_queue_per_conn, op_integer }, -{ 0, "streaming", &try_stream, op_booltrue, 1 }, -{ 0, "no-streaming", &try_stream, op_boolfalse, 1 }, -{'P',"port", &port op_integer }, +{ 0, "streaming", &try_stream, op_setint, 1,1 }, +{ 0, "no-streaming", &try_stream, op_setint, 1,0 }, +{'P',"port", &port, op_integer }, { 0, "inndconf", &inndconffile, op_string }, -{'d',"daemon", &become_daemon, op_booltrue, 1 }, -{ 0, "no-daemon", &become_daemon, op_boolfalse, 1 }, +{'d',"daemon", &become_daemon, op_setint, 1,1 }, +{ 0, "no-daemon", &become_daemon, op_setint, 1,0 }, { 0, "no-check-proportion", &nocheck_thresh_pct, op_double }, { 0, "no-check-filter", &nocheck_decay_articles, op_double }, { 0, "reconnect-interval", &reconnect_delay_periods, op_periods_rndup }, { 0, "flush-retry-interval", &flushfail_retry_periods, op_periods_rndup }, -{ 0, "connection-timeout", &connection_timeout, op_seconds }, { 0, "inndcomm-timeout", &inndcomm_flush_timeout, op_seconds }, }; @@ -2504,25 +2625,25 @@ int main(int argc, char **argv) { arg++; char *equals= strchr(arg,'='); int len= equals ? (equals - arg) : strlen(arg); - for (o=options; o->long; o++) - if (strlen(o->long) == len && !memcmp(o->long,arg,len)) + for (o=options; o->lng; o++) + if (strlen(o->lng) == len && !memcmp(o->lng,arg,len)) goto found_long; badusage("unknown long option --%s",arg); found_long: if (o->noarg) { - if (equals) badusage("option --%s does not take a value",o->long); + if (equals) badusage("option --%s does not take a value",o->lng); arg= 0; } else if (equals) { arg= equals+1; } else { arg= *++argv; - if (!arg) badusage("option --%s needs a value",o->long); + if (!arg) badusage("option --%s needs a value",o->lng); } o->fn(o, arg); break; /* eaten the whole argument now */ } - for (o=options; o->long; o++) - if (a == o->short) + for (o=options; o->lng; o++) + if (a == o->shrt) goto found_short; badusage("unknown short option -%c",a); found_short: @@ -2531,7 +2652,7 @@ int main(int argc, char **argv) { } else { if (!*++arg) { arg= *++argv; - if (!arg) badusage("option -%c needs a value",o->short); + if (!arg) badusage("option -%c needs a value",o->shrt); } o->fn(o,arg); break; /* eaten the whole argument now */ @@ -2572,13 +2693,15 @@ int main(int argc, char **argv) { if (strchr(feedfile, c)) badusage("feed filename may not contain metacharacter %c",c); - loop= oop_sys_new(); - if (!loop) sysdie("could not create liboop event loop"); + oop_source_sys *sysloop= oop_sys_new(); + if (!sysloop) sysdie("could not create liboop event loop"); + loop= (oop_source*)sysloop; if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) sysdie("could not ignore SIGPIPE"); if (become_daemon) { + int i; for (i=3; i<255; i++) /* do this now before we open syslog, etc. */ close(i); @@ -2603,14 +2726,20 @@ int main(int argc, char **argv) { notice("starting"); - if (!filemon_init()) { + LIST_INIT(conns); + LIST_INIT(queue); + + if (!filemon_method_init()) { warn("no file monitoring available, polling"); filepoll_schedule(); } + period_timeout.tv_sec= period_seconds; period_schedule(); statemc_init(); - loop->execute. + void *r= oop_sys_run(sysloop); + assert(r == OOP_ERROR); + sysdie("event loop failed"); }