X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=blobdiff_plain;f=backends%2Finnduct.c;h=8e599f56026835595780ad3a28725754699fd06f;hp=1328d372f8d8390cb822f734a2476f044b1c6404;hb=fc66b511debe93980fd49331d96939fb876024da;hpb=599596434b5b74c0280006508d195d69e71d0814 diff --git a/backends/innduct.c b/backends/innduct.c index 1328d37..8e599f5 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,8 +1,5 @@ /* * TODO - * - check all structs initialised - * - check all fd watches properly undone - * - check all init functions called * - actually implement badusage * - options for all options * - manpage @@ -73,8 +70,8 @@ OVERALL STATES: - START - | + START + | ,-->--. check F, D | | | | | | @@ -136,14 +133,14 @@ | V duct unlinks D D: duct reading | | | `--<--' | duct finishes - | processing D - | duct unlinks D - | duct exits - V - Dropped - F: ENOENT - D: ENOENT - duct not running + | processing D + | duct unlinks D + | duct exits + V + Dropped + F: ENOENT + D: ENOENT + duct not running "duct reading" means innduct is reading the file but also overwriting processed tokens. @@ -208,26 +205,29 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. /*----- doubly linked lists -----*/ -#define ISNODE(T) struct { T *succ, *pred; } node /* must be at start */ -#define DEFLIST(T) typedef struct { T *hd, *tl, *tp; int count; } T##List +#define ISNODE(T) struct node list_node +#define DEFLIST(T) \ + typedef struct { \ + union { struct list li; T *for_type; } u; \ + int count; \ + } T##List -#define NODE(n) (assert((void*)&(n)->node == &(n)), \ - (struct node*)&(n)->node) +#define NODE(n) (assert((void*)&(n)->list_node == &(n)), &(n)->list_node) #define LIST_CHECKCANHAVENODE(l,n) \ - ((void)((n) == ((l).hd))) /* just for the type check */ + ((void)((n) == ((l).u.for_type))) /* just for the type check */ -#define LIST_ADDSOMEHOW(l,n,list_addsomehow) \ - ( LIST_CHECKCANHAVENODE(l,n), \ - list_addsomehow((struct list*)&(l), NODE((n))), \ - (void)(l).count++ \ +#define LIST_ADDSOMEHOW(l,n,list_addsomehow) \ + ( LIST_CHECKCANHAVENODE(l,n), \ + list_addsomehow(&(l).u.li, NODE((n))), \ + (void)(l).count++ \ ) #define LIST_REMSOMEHOW(l,list_remsomehow) \ - ( (typeof((l).hd)) \ + ( (typeof((l).u.for_type)) \ ( (l).count \ ? ( (l).count--, \ - list_remsomehow((struct list*)&(l)) ) \ + list_remsomehow(&(l).u.li) ) \ : 0 \ ) \ ) @@ -238,7 +238,8 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead) #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail) -#define LIST_HEAD(l) ((typeof((l).hd))(list_head((struct list*)&(l)))) +#define LIST_INIT(l) (list_new(&(l).u.li)) +#define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l)))) #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n)))) #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n)))) @@ -275,28 +276,29 @@ static void *conn_write_some_xmits(Conn *conn); static void xmit_free(XmitDetails *d); +#define SMS(newstate, periods, why) \ + (statemc_setstate(sm_##newstate,(periods),#newstate,(why))) static void statemc_setstate(StateMachineState newsms, int periods, const char *forlog, const char *why); + static void statemc_start_flush(const char *why); /* Normal => Flushing */ static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */ -static void check_master_queue(void); +static void check_assign_articles(void); static void queue_check_input_done(void); static void statemc_check_flushing_done(void); static void statemc_check_backlog_done(void); -static void postfork(const char *what); -static void postfork_inputfile(InputFile *ipf); +static void postfork(void); static void open_defer(void); static void close_defer(void); static void search_backlog_file(void); -static void inputfile_tailing_start(InputFile *ipf); -static void inputfile_tailing_stop(InputFile *ipf); +static void inputfile_reading_start(InputFile *ipf); +static void inputfile_reading_stop(InputFile *ipf); -static int filemon_init(void); static void filemon_start(InputFile *ipf); static void filemon_stop(InputFile *ipf); static void filemon_callback(InputFile *ipf); @@ -304,34 +306,43 @@ static void filemon_callback(InputFile *ipf); static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0); static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3); +static const oop_rd_style peer_rd_style; +static oop_rd_call peer_rd_err, peer_rd_ok; + /*----- configuration options -----*/ -static const char *sitename, *feedfile, *pathoutgoing; -static const char *remote_host; -static int quiet_multiple=0, become_daemon=1; +static const char *sitename, *remote_host; +static const char *feedfile; +static int quiet_multiple=0; +static int become_daemon=1; +static int try_stream=1; +static int port=119; +static const char *inndconffile; -static int max_connections=10, max_queue_per_conn=200; +static int max_connections=10; +static int max_queue_per_conn=200; static int target_max_feedfile_size=100000; - static int period_seconds=60; -static double max_bad_data_ratio= 0.01; +static int connection_setup_timeout=200; +static int inndcomm_flush_timeout=100; + +static double nocheck_thresh= 95.0; /* converted from percentage by main */ +static double nocheck_decay= 100; /* conv'd from articles to lambda by main */ + +/* all these are initialised to seconds, and converted to periods in main */ +static int reconnect_delay_periods=1000; +static int flushfail_retry_periods=1000; +static int backlog_retry_minperiods=50; +static int backlog_spontrescan_periods=300; +static int spontaneous_flush_periods=100000; +static int need_activity_periods=1000; + +static double max_bad_data_ratio= 1; /* conv'd from percentage by main */ static int max_bad_data_initial= 30; /* in one corrupt 4096-byte block the number of newlines has * mean 16 and standard deviation 3.99. 30 corresponds to z=+3.5 */ -static int connection_setup_timeout=200, port=119, try_stream=1; -static int inndcomm_flush_timeout=100; -static int reconnect_delay_periods, flushfail_retry_periods, open_wait_periods; -static int backlog_retry_minperiods, backlog_spontaneous_rescan_periods; -static int spontaneous_flush_periods, need_activity_periods; -static const char *inndconffile; - -static double nocheck_thresh_pct= 95.0; -static double nocheck_thresh; /* computed in main from _pct */ -static double nocheck_decay_articles= 100; /* converted to _decay */ -static double nocheck_decay; /* computed in main from _articles */ - /*----- statistics -----*/ @@ -394,7 +405,7 @@ struct InputFile { int fd; Filemon_Perfile *filemon; - oop_read *rd; + oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */ long inprogress; /* no. of articles read but not processed */ off_t offset; int skippinglong; @@ -436,7 +447,9 @@ static const char *sms_names[]= { struct Conn { ISNODE(Conn); - int fd, max_queue, stream, quitting; + int fd; /* may be 0, meaning closed (during construction/destruction) */ + oop_read *rd; /* likewise */ + int max_queue, stream, quitting; int since_activity; /* periods */ ArticleList waiting; /* not yet told peer */ ArticleList priority; /* peer says send it now */ @@ -447,24 +460,22 @@ struct Conn { }; -/*----- operational variables -----*/ +/*----- general operational variables -----*/ +/* main initialises */ static oop_source *loop; - -static int until_connect; static ConnList conns; static ArticleList queue; - static char *path_lock, *path_flushing, *path_defer, *globpat_backlog; -#define SMS(newstate, periods, why) \ - (statemc_setstate(sm_##newstate,(periods),#newstate,(why))) - +/* statemc_init initialises */ static StateMachineState sms; static FILE *defer; static InputFile *main_input_file, *flushing_input_file, *backlog_input_file; -static int sm_period_counter, until_backlog_nextscan; +static int sm_period_counter; +/* initialisation to 0 is good */ +static int until_connect, until_backlog_nextscan; static double accept_proportion; static int nocheck, nocheck_reported; @@ -546,15 +557,29 @@ static char *xasprintf(const char *fmt, ...) { return str; } -static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } } +static int close_perhaps(int *fd) { + if (!*fd) return 0; + int r= close(*fd); + *fd=0; + return r; +} +static void xclose(int fd, const char *what, const char *what2) { + int r= close(fd); + if (r) sysdie("close %s%s",what,what2?what2:""); +} +static void xclose_perhaps(int *fd, const char *what, const char *what2) { + if (!*fd) return; + xclose(*fd,what,what2); + *fd=0; +} static pid_t xfork(const char *what) { pid_t child; child= fork(); if (child==-1) sysdie("cannot fork for %s",what); - if (!child) postfork(what); debug("forked %s %ld", what, (unsigned long)child); + if (!child) postfork(); return child; } @@ -571,7 +596,7 @@ static void report_child_status(const char *what, int status) { if (WIFEXITED(status)) { int es= WEXITSTATUS(status); if (es) - warn("%s: child died with error exit status %d",es); + warn("%s: child died with error exit status %d", what, es); } else if (WIFSIGNALED(status)) { int sig= WTERMSIG(status); const char *sigstr= strsignal(sig); @@ -582,7 +607,7 @@ static void report_child_status(const char *what, int status) { warn("%s: child died due to unknown fatal signal %d%s", what, sig, coredump); } else { - warn("%s: child died with unknown wait status %d", status); + warn("%s: child died with unknown wait status %d", what,status); } } @@ -608,6 +633,7 @@ static void xunlink(const char *path, const char *what) { static time_t xtime(void) { time_t now= time(0); if (now==-1) sysdie("time(2) failed"); + return now; } static void check_isreg(const struct stat *stab, const char *path, @@ -640,13 +666,6 @@ static void xlstat_isreg(const char *path, struct stat *stab, check_isreg(stab, path, what); } -static void setnonblock(int fd, int nonblocking) { - int r= fcntl(fd, F_GETFL); if (r<0) sysdie("setnonblocking fcntl F_GETFL"); - if (nonblocking) r |= O_NONBLOCK; - else r &= ~O_NONBLOCK; - r= fcntl(fd, F_SETFL, r); if (r<0) sysdie("setnonblocking fcntl F_SETFL"); -} - static int samefile(const struct stat *a, const struct stat *b) { assert(S_ISREG(a->st_mode)); assert(S_ISREG(b->st_mode)); @@ -675,25 +694,129 @@ static int isewouldblock(int errnoval) { return errnoval==EWOULDBLOCK || errnoval==EAGAIN; } -/*========== making new connections ==========*/ +/*========== management of connections ==========*/ + +static void conn_closefd(Conn *conn, const char *msgprefix) { + int r= close_perhaps(&conn->fd); + if (r) info("C%d %serror closing socket: %s", + conn->fd, msgprefix, strerror(errno)); +} static void conn_dispose(Conn *conn) { if (!conn) return; - perhaps_close(&conn->fd); + if (conn->rd) { + oop_rd_cancel(conn->rd); + oop_rd_delete_kill(conn->rd); + conn->rd= 0; + } + if (conn->fd) { + loop->cancel_fd(loop, conn->fd, OOP_WRITE); + loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION); + } + conn_closefd(conn,""); free(conn); until_connect= reconnect_delay_periods; } -static int connecting_sockets[2]= {-1,-1}; -static pid_t connecting_child; +static void *conn_exception(oop_source *lp, int fd, + oop_event ev, void *conn_v) { + Conn *conn= conn_v; + unsigned char ch; + assert(fd == conn->fd); + assert(ev == OOP_EXCEPTION); + int r= read(conn->fd, &ch, 1); + if (r<0) connfail(conn,"read failed: %s",strerror(errno)); + else connfail(conn,"exceptional condition on socket (peer sent urgent" + " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch); + return OOP_CONTINUE; +} -static void connect_attempt_discard(void) { - if (connecting_sockets[0]) - cancel_fd_read_except(connecting_sockets[0]); +static void vconnfail(Conn *conn, const char *fmt, va_list al) { + int requeue[art_MaxState]; - perhaps_close(&connecting_sockets[0]); - perhaps_close(&connecting_sockets[1]); + Article *art; + while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art); + while ((art= LIST_REMHEAD(conn->waiting))) LIST_ADDTAIL(queue, art); + while ((art= LIST_REMHEAD(conn->sent))) { + requeue[art->state]++; + if (art->state==art_Unsolicited) art->state= art_Unchecked; + LIST_ADDTAIL(queue,art); + } + int i; + XmitDetails *d; + for (i=0, d=conn->xmitd; ixmitu; i++, d++) + xmit_free(d); + + char *m= xvasprintf(fmt,al); + warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s", + conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); + free(m); + + LIST_REMOVE(conns,conn); + conn_dispose(conn); + check_assign_articles(); +} + +static void connfail(Conn *conn, const char *fmt, ...) { + va_list al; + va_start(al,fmt); + vconnfail(conn,fmt,al); + va_end(al); +} + +static void check_idle_conns(void) { + Conn *conn; + for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) + conn->since_activity++; + search_again: + for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) { + if (conn->since_activity <= need_activity_periods) continue; + + /* We need to shut this down */ + if (conn->quitting) + connfail(conn,"timed out waiting for response to QUIT"); + else if (conn->sent.count) + connfail(conn,"timed out waiting for responses"); + else if (conn->waiting.count || conn->priority.count) + connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent"); + else if (conn->xmitu) + connfail(conn,"peer has been sending responses" + " before receiving our commands!"); + else { + static const char quitcmd[]= "QUIT\r\n"; + int todo= sizeof(quitcmd)-1; + const char *p= quitcmd; + for (;;) { + int r= write(conn->fd, p, todo); + if (r<0) { + if (isewouldblock(errno)) + connfail(conn, "blocked writing QUIT to idle connection"); + else + connfail(conn, "failed to write QUIT to idle connection: %s", + strerror(errno)); + break; + } + assert(r<=todo); + todo -= r; + if (!todo) { + conn->quitting= 1; + conn->since_activity= 0; + debug("C%d is idle, quitting", conn->fd); + break; + } + } + } + goto search_again; + } +} + +/*---------- making new connections ----------*/ + +static pid_t connecting_child; +static int connecting_fdpass_sock; + +static void connect_attempt_discard(void) { if (connecting_child) { int r= kill(connecting_child, SIGTERM); if (r) syswarn("failed to kill connecting child"); @@ -703,6 +826,10 @@ static void connect_attempt_discard(void) { (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) report_child_status("connect", status); } + if (connecting_fdpass_sock) { + cancel_fd_read_except(connecting_fdpass_sock); + xclose_perhaps(&connecting_fdpass_sock, "connecting fdpass socket",0); + } } #define PREP_DECL_MSG_CMSG(msg) \ @@ -715,6 +842,8 @@ static void connect_attempt_discard(void) { static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { Conn *conn= 0; + assert(fd == connecting_fdpass_sock); + conn= xmalloc(sizeof(*conn)); memset(conn,0,sizeof(*conn)); @@ -728,40 +857,45 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { if (got != -1) { assert(got==connecting_child); connecting_child= 0; - if (WIFEXITED(status) && - (WEXITSTATUS(status) != 0 && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)) { - /* child already reported the problem */ + if (WIFEXITED(status)) { + if (WEXITSTATUS(status) != 0 && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM) + /* child already reported the problem */; + else + warn("connect: connection child exited code %d but no cmsg", + WEXITSTATUS(status)); } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { warn("connect: connection attempt timed out"); - } else if (!WIFEXITED(status)) { + } else { report_child_status("connect", status); - /* that's probably the root cause then */ } } else { /* child is still running apparently, report the socket problem */ if (rs < 0) - syswarn("connect: read from child socket failed"); + syswarn("connect: read from fdpass socket failed"); else if (e == OOP_EXCEPTION) - warn("connect: unexpected exception on child socket"); + warn("connect: unexpected exception on fdpass socket"); + else if (!rs) + warn("connect: unexpected EOF on fdpass socket"); else - warn("connect: unexpected EOF on child socket"); + fatal("connect: unexpected lack of cmsg from child"); } goto x; } -#define CHK(field, val) \ - if (h->cmsg_##field != val) { \ - die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d"); \ - goto x; \ +#define CHK(field, val) \ + if (h->cmsg_##field != val) { \ + die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \ + h->cmsg_##field, val); \ + goto x; \ } CHK(level, SOL_SOCKET); CHK(type, SCM_RIGHTS); CHK(len, CMSG_LEN(sizeof(conn->fd))); #undef CHK - if (CMSG_NXTHDR(&msg,h)) { die("connect: child sent many cmsgs"); goto x; } + if (CMSG_NXTHDR(&msg,h)) die("connect: child sent many cmsgs"); memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd)); @@ -781,17 +915,30 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { } /* Phew! */ - setnonblock(conn->fd, 1); + LIST_INIT(conn->waiting); + LIST_INIT(conn->priority); + LIST_INIT(conn->sent); conn->max_queue= conn->stream ? max_queue_per_conn : 1; - LIST_ADDHEAD(conns, conn); + + loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn); + conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */ + if (!conn->fd) sysdie("oop_rd_new_fd (fd=%d)",conn->fd); + int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN, + &peer_rd_ok, conn, + &peer_rd_err, conn); + if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd); + notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain"); + LIST_ADDHEAD(conns, conn); + connect_attempt_discard(); - check_master_queue(); + check_assign_articles(); return 0; x: conn_dispose(conn); connect_attempt_discard(); + return OOP_CONTINUE; } static int allow_connect_start(void) { @@ -801,14 +948,14 @@ static int allow_connect_start(void) { } static void connect_start(void) { - assert(!connecting_sockets[0]); - assert(!connecting_sockets[1]); assert(!connecting_child); + assert(!connecting_fdpass_sock); notice("starting connection attempt"); - int r= socketpair(AF_UNIX, SOCK_STREAM, 0, connecting_sockets); - if (r) { syswarn("connect: cannot create socketpair for child"); goto x; } + int socks[2]; + int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks); + if (r) { syswarn("connect: cannot create socketpair for child"); return; } connecting_child= xfork("connection"); @@ -817,8 +964,7 @@ static void connect_start(void) { char buf[NNTP_STRLEN+100]; int exitstatus= CONNCHILD_ESTATUS_NOSTREAM; - r= close(connecting_sockets[0]); - if (r) sysdie("connect: close parent socket in child"); + xclose(socks[0], "(in child) parent's connection fdpass socket",0); alarm(connection_setup_timeout); if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) { @@ -842,7 +988,7 @@ static void connect_start(void) { assert(l>=1); if (buf[-1]!='\n') fatal("connect: response to MODE STREAM is too long: %.100s...", - remote_host, sanitise(buf)); + sanitise(buf)); l--; if (l>0 && buf[l-1]=='\r') l--; buf[l]= 0; char *ep; @@ -874,77 +1020,26 @@ static void connect_start(void) { memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd)); msg.msg_controllen= cmsg->cmsg_len; - r= sendmsg(connecting_sockets[1], &msg, 0); + r= sendmsg(socks[1], &msg, 0); if (r) sysdie("sendmsg failed for new connection"); _exit(exitstatus); } - r= close(connecting_sockets[1]); connecting_sockets[1]= 0; - if (r) sysdie("connect: close child socket in parent"); - - on_fd_read_except(connecting_sockets[0], connchild_event); - return; - - x: - connect_attempt_discard(); + xclose(socks[1], "connecting fdpass child's socket",0); + connecting_fdpass_sock= socks[0]; + on_fd_read_except(connecting_fdpass_sock, connchild_event); } -static void check_idle_conns(void) { - Conn *conn; - for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) - conn->since_activity++; - search_again: - for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) { - if (conn->since_activity <= need_activity_periods) continue; +/*---------- assigning articles to conns, and transmitting ----------*/ - /* We need to shut this down */ - if (conn->quitting) - connfail(conn,"timed out waiting for response to QUIT"); - else if (conn->sent.count) - connfail(conn,"timed out waiting for responses"); - else if (conn->waiting.count || conn->priority.count) - connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent"); - else if (conn->xmitu) - connfail(conn,"peer has been sending responses" - " before receiving our commands!"); - else { - static const char quitcmd[]= "QUIT\r\n"; - int todo= sizeof(quitcmd)-1; - const char *p= quitcmd; - for (;;) { - int r= write(conn->fd, p, todo); - if (r<0) { - if (isewouldblock(errno)) - connfail(conn, "blocked writing QUIT to idle connection"); - else - connfail(conn, "failed to write QUIT to idle connection: %s", - strerror(errno)); - break; - } - assert(r<=todo); - todo -= r; - if (!todo) { - conn->quitting= 1; - conn->since_activity= 0; - debug("C%d is idle, quitting", conn->fd); - break; - } - } - } - goto search_again; - } -} - -/*========== overall control of article flow ==========*/ - -static void check_master_queue(void) { +static void check_assign_articles(void) { for (;;) { if (!queue.count) break; - + Conn *walk, *use=0; - int spare, inqueue; + int spare=0, inqueue=0; /* Find a connection to offer this article. We prefer a busy * connection to an idle one, provided it's not full. We take the @@ -986,7 +1081,6 @@ static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) { } static void conn_maybe_write(Conn *conn) { - void *rp= 0; for (;;) { conn_make_some_xmits(conn); if (!conn->xmitu) { @@ -1008,44 +1102,10 @@ static void conn_maybe_write(Conn *conn) { } } -static void vconnfail(Conn *conn, const char *fmt, va_list al) { - int requeue[art_MaxState]; - - Article *art; - while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art); - while ((art= LIST_REMHEAD(conn->waiting))) LIST_ADDTAIL(queue, art); - while ((art= LIST_REMHEAD(conn->sent))) { - requeue[art->state]++; - if (art->state==art_Unsolicited) art->state= art_Unchecked; - LIST_ADDTAIL(queue,art); - } - - int i; - XmitDetails *d; - for (i=0, d=conn->xmitd; ixmitu; i++, d++) - xmit_free(d); - - char *m= xvasprintf(fmt,al); - warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s", - conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); - free(m); - - LIST_REMOVE(conns,conn); - conn_dispose(conn); - check_master_queue(); -} - -static void connfail(Conn *conn, const char *fmt, ...) { - va_list al; - va_start(al,fmt); - vconnfail(conn,fmt,al); - va_end(al); -} - /*========== article transmission ==========*/ static XmitDetails *xmit_core(Conn *conn, const char *data, int len, - XmitKind kind) { /* caller must then fill in details */ + XmitKind kind) { /* caller must then fill in details */ struct iovec *v= &conn->xmit[conn->xmitu]; XmitDetails *d= &conn->xmitd[conn->xmitu++]; v->iov_base= (char*)data; @@ -1100,7 +1160,7 @@ static void *conn_write_some_xmits(Conn *conn) { rs -= vp->iov_len; xmit_free(dp); } else { - vp->iov_base += rs; + vp->iov_base= (char*)vp->iov_base + rs; vp->iov_len -= rs; } } @@ -1174,7 +1234,7 @@ static const oop_rd_style peer_rd_style= { OOP_RD_SHORTREC_FORBID }; -static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_event ev, +static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev, const char *errmsg, int errnoval, const char *data, size_t recsz, void *conn_v) { Conn *conn= conn_v; @@ -1185,7 +1245,7 @@ static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_event ev, static Article *article_reply_check(Conn *conn, const char *response, int code_indicates_streaming, int must_have_sent - /* 1:yes, -1:no, 0:dontcare */, + /* 1:yes, -1:no, 0:dontcare */, const char *sanitised_response) { Article *art= LIST_HEAD(conn->sent); @@ -1261,6 +1321,7 @@ static void article_done(Conn *conn, Article *art, int whichcount) { else if (whichcount == RC_unwanted) update_nocheck(0); InputFile *ipf= art->ipf; + while (art->blanklen) { static const char spaces[]= " " @@ -1272,7 +1333,8 @@ static void article_done(Conn *conn, Article *art, int whichcount) { if (r==-1) { if (errno==EINTR) continue; sysdie("failed to blank entry for %s (length %d at offset %lu) in %s", - art->messageid, art->blanklen, art->offset, ipf->path); + art->messageid, art->blanklen, + (unsigned long)art->offset, ipf->path); } assert(r>=0 && r<=w); art->blanklen -= w; @@ -1281,14 +1343,13 @@ static void article_done(Conn *conn, Article *art, int whichcount) { ipf->inprogress--; assert(ipf->inprogress >= 0); + free(art); if (!ipf->inprogress && ipf != main_input_file) queue_check_input_done(); - - free(art); } -static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, +static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, const char *errmsg, int errnoval, const char *data, size_t recsz, void *conn_v) { Conn *conn= conn_v; @@ -1378,7 +1439,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, } conn_maybe_write(conn); - check_master_queue(); + check_assign_articles(); return OOP_CONTINUE; } @@ -1387,15 +1448,11 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, static void feedfile_eof(InputFile *ipf) { assert(ipf != main_input_file); /* promised by tailing_try_read */ - - inputfile_tailing_stop(ipf); - assert(ipf->fd >= 0); - if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path); - ipf->fd= -1; + inputfile_reading_stop(ipf); if (ipf == flushing_input_file) { assert(sms==sm_SEPARATED || sms==sm_DROPPING); - if (main_input_file) inputfile_tailing_start(main_input_file); + if (main_input_file) inputfile_reading_start(main_input_file); statemc_check_flushing_done(); } else if (ipf == backlog_input_file) { statemc_check_backlog_done(); @@ -1410,6 +1467,7 @@ static InputFile *open_input_file(const char *path) { if (errno==ENOENT) return 0; sysfatal("unable to open input file %s", path); } + assert(fd>0); InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1); memset(ipf,0,sizeof(*ipf)); @@ -1420,14 +1478,12 @@ static InputFile *open_input_file(const char *path) { return ipf; } -static void close_input_file(InputFile *ipf) { +static void close_input_file(InputFile *ipf) { /* does not free */ assert(!ipf->readable_callback); /* must have had ->on_cancel */ - assert(!ipf->filemon); /* must have had inputfile_tailing_stop */ - assert(!ipf->rd); /* must have had inputfile_tailing_stop */ + assert(!ipf->filemon); /* must have had inputfile_reading_stop */ + assert(!ipf->rd); /* must have had inputfile_reading_stop */ assert(!ipf->inprogress); /* no dangling pointers pointing here */ - - if (ipf->fd >= 0) - if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path); + xclose_perhaps(&ipf->fd, "input file ", ipf->path); } @@ -1477,7 +1533,7 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, if (ipf->skippinglong) { if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */ - return; + return OOP_CONTINUE; } if (ev==OOP_RD_LONG) { ipf->skippinglong= 1; @@ -1492,7 +1548,7 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, ipf->readcount_blank++; return OOP_CONTINUE; } - + char *space= strchr(data,' '); int tokenlen= space-data; int midlen= (int)recsz-tokenlen-1; @@ -1507,12 +1563,12 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, ipf->readcount_ok++; art= xmalloc(sizeof(*art) - 1 + midlen + 1); - art->offset= ipf->offset; - art->blanklen= recsz; - art->midlen= midlen; art->state= art_Unchecked; + art->midlen= midlen; art->ipf= ipf; ipf->inprogress++; art->token= TextToToken(tokentextbuf); + art->offset= ipf->offset; + art->blanklen= recsz; strcpy(art->messageid, space+1); LIST_ADDTAIL(queue, art); @@ -1520,7 +1576,7 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, ipf->offset >= target_max_feedfile_size) statemc_start_flush("feed file size"); - check_master_queue(); + check_assign_articles(); return OOP_CONTINUE; } @@ -1574,10 +1630,10 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, errno=EAGAIN; return -1; } else if (ipf==flushing_input_file) { - assert(ipf->fd>=0); + assert(ipf->rd); assert(sms==sm_SEPARATED || sms==sm_DROPPING); } else if (ipf==backlog_input_file) { - assert(ipf->fd>=0); + assert(ipf->rd); } else { abort(); } @@ -1707,8 +1763,8 @@ static const oop_rd_style feedfile_rdstyle= { OOP_RD_SHORTREC_LONG, }; -static void inputfile_tailing_start(InputFile *ipf) { - assert(!ipf->fd); +static void inputfile_reading_start(InputFile *ipf) { + assert(!ipf->rd); ipf->readable.on_readable= tailing_on_readable; ipf->readable.on_cancel= tailing_on_cancel; ipf->readable.try_read= tailing_try_read; @@ -1719,15 +1775,15 @@ static void inputfile_tailing_start(InputFile *ipf) { ipf->readable_callback_user= 0; ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0); - assert(ipf->fd); + assert(ipf->rd); int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE, feedfile_got_article,ipf, feedfile_read_err, ipf); if (r) sysdie("unable start reading feedfile %s",ipf->path); } -static void inputfile_tailing_stop(InputFile *ipf) { - assert(ipf->fd); +static void inputfile_reading_stop(InputFile *ipf) { + assert(ipf->rd); oop_rd_cancel(ipf->rd); oop_rd_delete(ipf->rd); ipf->rd= 0; @@ -1743,16 +1799,16 @@ static void inputfile_tailing_stop(InputFile *ipf) { .=======. ||START|| - `=======' - | - | open F - | - | F ENOENT - |`---------------------------------------------------. + `=======' + | + | open F + | + | F ENOENT + |`---------------------------------------------------. F OPEN OK | | - |`---------------- - - - | + |`---------------- - - - | D ENOENT | D EXISTS see OVERALL STATES diagram | - | for full startup logic | + | for full startup logic | ,--------->| | | V | | ============ try to | @@ -1794,7 +1850,7 @@ static void inputfile_tailing_stop(InputFile *ipf) { | V | | V | ============= V V ============ | SEPARATED-1 | | DROPPING-1 - | flsh->fd>=0 | | flsh->fd>=0 + | flsh->rd!=0 | | flsh->rd!=0 | [Separated] | | [Dropping] | main F idle | | main none | old D tail | | old D tail @@ -1804,7 +1860,7 @@ static void inputfile_tailing_stop(InputFile *ipf) { | V | | V | =============== | | =============== | SEPARATED-2 | | DROPPING-2 - | flsh->fd==-1 | V flsh->fd==-1 + | flsh->rd==0 | V flsh->rd==0 | [Finishing] | | [Dropping] | main F tail | `. main none | old D closed | `. old D closed @@ -1817,29 +1873,29 @@ static void inputfile_tailing_stop(InputFile *ipf) { | | | | | | V V `----------' ============== - DROPPED - [Dropped] - main none - old none - some backlog - ============== - | - | ALL BACKLOG DONE - | - | unlink lock - | exit - V - ========== - (ESRCH) - [Droppped] - ========== + DROPPED + [Dropped] + main none + old none + some backlog + ============== + | + | ALL BACKLOG DONE + | + | unlink lock + | exit + V + ========== + (ESRCH) + [Droppped] + ========== * ->8- */ static void startup_set_input_file(InputFile *f) { assert(!main_input_file); main_input_file= f; - inputfile_tailing_start(f); + inputfile_reading_start(f); } static void statemc_init(void) { @@ -1874,8 +1930,7 @@ static void statemc_init(void) { if (!lock_noent && samefile(&stab, &stabf)) break; - if (close(lockfd)) - sysdie("could not close stale lockfile %s", path_lock); + xclose(lockfd, "stale lockfile ", path_lock); } debug("startup: locked"); @@ -1976,7 +2031,7 @@ static void statemc_period_poll(void) { static int inputfile_is_done(InputFile *ipf) { if (!ipf) return 0; if (ipf->inprogress) return 0; /* new article in the meantime */ - if (ipf->fd >= 0); return 0; /* not had EOF */ + if (ipf->rd) return 0; /* not had EOF */ return 1; } @@ -2014,7 +2069,7 @@ static void statemc_check_backlog_done(void) { const char *rest= under ? under+1 : leaf; if (!strncmp(rest,"backlog",7)) rest += 7; notice_processed(ipf,"backlog:",rest); - + close_input_file(ipf); if (unlink(ipf->path)) { if (errno != ENOENT) @@ -2074,11 +2129,13 @@ static void statemc_setstate(StateMachineState newsms, int periods, const char *xtra= ""; switch (sms) { - case sm_FLUSHING: sm_FLUSHFAILED: + case sm_FLUSHING: + case sm_FLUSHFAILED: if (!main_input_file) xtra= "-ABSENT"; break; - case sm_SEPARATED: case sm_DROPPING: - xtra= flushing_input_file->fd >= 0 ? "-1" : "-2"; + case sm_SEPARATED: + case sm_DROPPING: + xtra= flushing_input_file->rd ? "-1" : "-2"; break; default:; } @@ -2188,7 +2245,7 @@ static void search_backlog_file(void) { int r, i; struct stat stab; const char *oldest_path=0; - time_t oldest_mtime, now; + time_t oldest_mtime=0, now; if (backlog_input_file) return; @@ -2230,8 +2287,6 @@ static void search_backlog_file(void) { " nonzero (error?) return value %d", globpat_backlog, r); } - globfree(&gl); - if (!oldest_path) { debug("backlog scan: none"); @@ -2240,8 +2295,8 @@ static void search_backlog_file(void) { xunlink(path_lock, "lockfile for old feed"); exit(0); } - until_backlog_nextscan= backlog_spontaneous_rescan_periods; - return; + until_backlog_nextscan= backlog_spontrescan_periods; + goto xfree; } now= xtime(); @@ -2254,34 +2309,40 @@ static void search_backlog_file(void) { backlog_input_file= open_input_file(oldest_path); if (!backlog_input_file) { - warn("backlog file %s vanished as we opened it", backlog_input_file); + warn("backlog file %s vanished as we opened it", oldest_path); + globfree(&gl); goto try_again; } - inputfile_tailing_start(backlog_input_file); + inputfile_reading_start(backlog_input_file); until_backlog_nextscan= -1; - return; + goto xfree; } until_backlog_nextscan= age_deficiency / period_seconds; - if (backlog_spontaneous_rescan_periods >= 0 && - until_backlog_nextscan > backlog_spontaneous_rescan_periods) - until_backlog_nextscan= backlog_spontaneous_rescan_periods; + if (backlog_spontrescan_periods >= 0 && + until_backlog_nextscan > backlog_spontrescan_periods) + until_backlog_nextscan= backlog_spontrescan_periods; debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s", age, age_deficiency, until_backlog_nextscan, oldest_path); + + xfree: + globfree(&gl); return; } /*========== flushing the feed ==========*/ static pid_t inndcomm_child; +static int inndcomm_sentinel_fd; static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { assert(inndcomm_child); + assert(fd == inndcomm_sentinel_fd); int status= xwaitpid(&inndcomm_child, "inndcomm"); cancel_fd_read_except(fd); - close(fd); + xclose_perhaps(&fd, "inndcomm sentinel pipe",0); assert(!flushing_input_file); @@ -2295,7 +2356,7 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { warn("feed has been dropped by innd, finishing up"); flushing_input_file= main_input_file; tailing_queue_readable(flushing_input_file); - /* we probably previously returned EAGAIN from our fake read method + /* we probably previously returned EAGAIN from our fake read method * when in fact we were at EOF, so signal another readable event * so we actually see the EOF */ @@ -2341,6 +2402,7 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { failed: SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry"); + return OOP_CONTINUE; } static void inndcommfail(const char *what) { @@ -2355,6 +2417,7 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED); assert(!inndcomm_child); + assert(!inndcomm_sentinel_fd); if (pipe(pipefds)) sysdie("create pipe for inndcomm child sentinel"); @@ -2365,7 +2428,8 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ char *reply; int r; - close(pipefds[0]); + xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0); + /* parent spots the autoclose of pipefds[1] when we die or exit */ alarm(inndcomm_flush_timeout); r= ICCopen(); if (r) inndcommfail("connect"); @@ -2377,9 +2441,10 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ exit(INNDCOMMCHILD_ESTATUS_FAIL); } - close(pipefds[1]); - int sentinel_fd= pipefds[0]; - on_fd_read_except(sentinel_fd, inndcomm_event); + xclose(pipefds[1], "inndcomm sentinel child's end",0); + inndcomm_sentinel_fd= pipefds[0]; + assert(inndcomm_sentinel_fd); + on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event); SMS(FLUSHING, 0, why); } @@ -2388,86 +2453,103 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ static void postfork_inputfile(InputFile *ipf) { if (!ipf) return; - assert(ipf->fd >= 0); - close(ipf->fd); - ipf->fd= -1; + xclose(ipf->fd, "(in child) input file ", ipf->path); } -static void postfork_stdio(FILE *f) { +static void postfork_stdio(FILE *f, const char *what, const char *what2) { /* we have no stdio streams that are buffered long-term */ - if (f) fclose(f); + if (!f) return; + if (fclose(f)) sysdie("(in child) close %s%s", what, what2?what2:0); } -static void postfork(const char *what) { +static void postfork(void) { if (signal(SIGPIPE, SIG_DFL) == SIG_ERR) - sysdie("%s child: failed to reset SIGPIPE"); + sysdie("(in child) failed to reset SIGPIPE"); postfork_inputfile(main_input_file); postfork_inputfile(flushing_input_file); Conn *conn; for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) - close(conn->fd); + conn_closefd(conn,"(in child) "); - postfork_stdio(defer); + postfork_stdio(defer, "defer file ", path_defer); } #define EVERY(what, interval_sec, interval_usec, body) \ static struct timeval what##_timeout = { interval_sec, interval_usec }; \ static void what##_schedule(void); \ static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \ - { body } \ + body; \ what##_schedule(); \ + return OOP_CONTINUE; \ } \ static void what##_schedule(void) { \ loop->on_time(loop, what##_timeout, what##_timedout, 0); \ } -EVERY(filepoll, 5,0, { +EVERY(filepoll, 5,0, ({ if (main_input_file && main_input_file->readable_callback) filemon_callback(main_input_file); -}); +})); + +static char *debug_report_ipf(InputFile *ipf) { + if (!ipf) return xasprintf("-"); + + const char *slash= strrchr(ipf->path,'/'); + const char *path= slash ? slash+1 : ipf->path; -#define DEBUGF_IPF(wh) " " #wh "=%p/%s:ip=%ld,off=%ld,fd=%d%s" -#define DEBUG_IPF(wh) \ - wh##_input_file, debug_ipf_path(wh##_input_file), \ - wh##_input_file->inprogress, (long)wh##_input_file->offset, \ - wh##_input_file->fd, wh##_input_file->rd ? "+" : "" -static const char *debug_ipf_path(InputFile *ipf) { - char *slash= strrchr(ipf->path,'/'); - return slash ? slash+1 : ipf->path; + return xasprintf("%p/%s:ip=%ld,off=%ld,fd=%d%s", + ipf, path, + ipf->inprogress, (long)ipf->offset, + ipf->fd, ipf->rd ? "+" : ""); } -EVERY(period, -1,0, { +EVERY(period, -1,0, ({ + char *dipf_main= debug_report_ipf(main_input_file); + char *dipf_flushing= debug_report_ipf(flushing_input_file); + char *dipf_backlog= debug_report_ipf(backlog_input_file); + debug("PERIOD" " sms=%s[%d] conns=%d queue=%d until_connect=%d" - " input_files" DEBUGF_IPF(main) DEBUGF_IPF(old) DEBUGF_IPF(flushing) - " children connecting=%ld inndcomm_child" + " input_files main:%s old:%s flushing:%s" + " children connecting=%ld inndcomm_child=%ld" , sms_names[sms], sm_period_counter, - queue.count, conns.count, until_connect, - DEBUG_IPF(main), DEBUG_IPF(flushing), DEBUG_IPF(flushing), + conns.count, queue.count, until_connect, + dipf_main, dipf_flushing, dipf_backlog, (long)connecting_child, (long)inndcomm_child ); + free(dipf_main); + free(dipf_flushing); + free(dipf_backlog); + if (until_connect) until_connect--; poll_backlog_file(); if (!backlog_input_file) close_defer(); /* want to start on a new backlog */ statemc_period_poll(); - check_master_queue(); + check_assign_articles(); check_idle_conns(); -}); +})); /*========== option parsing ==========*/ -/*---------- generic option parser and logging ----------*/ - static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0); static void vbadusage(const char *fmt, va_list al) { - abort(); + char *m= xvasprintf(fmt,al); + fprintf(stderr, "bad usage: %s\n" + "say --help for help, or read the manpage\n", + m); + if (become_daemon) + syslog(LOG_CRIT,"innduct: invoked with bad usage: %s",m); + exit(8); } + +/*---------- generic option parser ----------*/ + static void badusage(const char *fmt, ...) NORET_PRINTF(1,2); static void badusage(const char *fmt, ...) { va_list al; @@ -2485,12 +2567,80 @@ typedef void OptionParser(const Option*, const char *val); struct Option { int shrt; - const char *lng; + const char *lng, *formarg; void *store; OptionParser *fn; - int noarg, intval; + int intval; }; +static void parse_options(const Option *options, char ***argvp) { + /* on return *argvp is first non-option arg; argc is not updated */ + + for (;;) { + const char *arg= *++(*argvp); + if (!arg) break; + if (*arg != '-') break; + if (!strcmp(arg,"--")) { arg= *++(*argvp); break; } + int a; + while ((a= *++arg)) { + const Option *o; + if (a=='-') { + arg++; + char *equals= strchr(arg,'='); + int len= equals ? (equals - arg) : strlen(arg); + for (o=options; o->shrt || o->lng; o++) + if (strlen(o->lng) == len && !memcmp(o->lng,arg,len)) + goto found_long; + badusage("unknown long option --%s",arg); + found_long: + if (!o->formarg) { + if (equals) badusage("option --%s does not take a value",o->lng); + arg= 0; + } else if (equals) { + arg= equals+1; + } else { + arg= *++(*argvp); + if (!arg) badusage("option --%s needs a value for %s", + o->lng, o->formarg); + } + o->fn(o, arg); + break; /* eaten the whole argument now */ + } + for (o=options; o->shrt || o->lng; o++) + if (a == o->shrt) + goto found_short; + badusage("unknown short option -%c",a); + found_short: + if (!o->formarg) { + o->fn(o,0); + } else { + if (!*++arg) { + arg= *++(*argvp); + if (!arg) badusage("option -%c needs a value for %s", + o->shrt, o->formarg); + } + o->fn(o,arg); + break; /* eaten the whole argument now */ + } + } + } +} + +#define DELIMPERHAPS(delim,str) (str) ? (delim) : "", (str) ? (str) : "" + +static void print_options(const Option *options, FILE *f) { + const Option *o; + for (o=options; o->shrt || o->lng; o++) { + char shrt[2] = { o->shrt, 0 }; + char *optspec= xasprintf("%s%s%s%s%s", + o->shrt ? "-" : "", shrt, + o->shrt && o->lng ? "|" : "", + DELIMPERHAPS("--", o->lng)); + fprintf(f, " %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg)); + free(optspec); + } +} + /*---------- specific option types ----------*/ static void op_integer(const Option *o, const char *val) { @@ -2537,13 +2687,6 @@ static void op_seconds(const Option *o, const char *val) { *store= v; } -static void op_periods_rndup(const Option *o, const char *val) { - int *store= o->store; - op_seconds(o,val); - *store += period_seconds-1; - *store /= period_seconds; -} - static void op_setint(const Option *o, const char *val) { int *store= o->store; *store= o->intval; @@ -2551,105 +2694,107 @@ static void op_setint(const Option *o, const char *val) { /*---------- specific options ----------*/ -static const Option options[]= { -{'f',"feedfile", &feedfile, op_string }, -{'q',"quiet-multiple", &quiet_multiple, op_setint, 1,1 }, +static void help(const Option *o, const char *val); + +static const Option innduct_options[]= { +{'f',"feedfile", "F", &feedfile, op_string }, +{'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 }, +{0,"no-daemon", 0, &become_daemon, op_setint, 0 }, +{0,"no-streaming", 0, &try_stream, op_setint, 0 }, +{0,"inndconf", "F", &inndconffile, op_string }, +{'P',"port", "PORT", &port, op_integer }, +{0,"help", 0, 0, help }, + +{0,"max-connections", "N", &max_connections, op_integer }, +{0,"max-queue-per-conn", "N", &max_queue_per_conn, op_integer }, +{0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer }, +{0,"period-interval", "TIME", &period_seconds, op_seconds }, -{ 0, "max-connections", &max_connections, op_integer }, -{ 0, "max-queue-per-conn", &max_queue_per_conn, op_integer }, +{0,"connection-timeout", "TIME", &connection_setup_timeout, op_seconds }, +{0,"stuck-flush-timeout","TIME", &inndcomm_flush_timeout, op_seconds }, +{0,"no-check-proportion", "PERCENT", &nocheck_thresh, op_double }, +{0,"no-check-response-time","ARTICLES", &nocheck_decay, op_double }, -{ 0, "streaming", &try_stream, op_setint, 1,1 }, -{ 0, "no-streaming", &try_stream, op_setint, 1,0 }, -{'P',"port", &port, op_integer }, -{ 0, "inndconf", &inndconffile, op_string }, -{'d',"daemon", &become_daemon, op_setint, 1,1 }, -{ 0, "no-daemon", &become_daemon, op_setint, 1,0 }, +{0,"reconnect-interval", "PERIOD", &reconnect_delay_periods, op_seconds }, +{0,"flush-retry-interval", "PERIOD", &flushfail_retry_periods, op_seconds }, +{0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds }, +{0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds}, +{0,"max-flush-interval", "PERIOD", &spontaneous_flush_periods,op_seconds }, +{0,"idle-timeout", "PERIOD", &need_activity_periods, op_seconds }, -{ 0, "no-check-proportion", &nocheck_thresh_pct, op_double }, -{ 0, "no-check-filter", &nocheck_decay_articles, op_double }, +{0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio, op_double }, +{0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer }, -{ 0, "reconnect-interval", &reconnect_delay_periods, op_periods_rndup }, -{ 0, "flush-retry-interval", &flushfail_retry_periods, op_periods_rndup }, -{ 0, "inndcomm-timeout", &inndcomm_flush_timeout, op_seconds }, +{0,0} }; -int main(int argc, char **argv) { - const char *arg; +static void printusage(FILE *f) { + fputs("usage: innduct [options] site [fqdn]\n" + "available options are:\n", f); + print_options(innduct_options, f); +} - for (;;) { - arg= *++argv; - if (!arg) break; - if (*arg != '-') break; - if (!strcmp(arg,"--")) { arg= *++argv; break; } - int a; - while ((a= *++arg)) { - const Option *o; - if (a=='-') { - arg++; - char *equals= strchr(arg,'='); - int len= equals ? (equals - arg) : strlen(arg); - for (o=options; o->lng; o++) - if (strlen(o->lng) == len && !memcmp(o->lng,arg,len)) - goto found_long; - badusage("unknown long option --%s",arg); - found_long: - if (o->noarg) { - if (equals) badusage("option --%s does not take a value",o->lng); - arg= 0; - } else if (equals) { - arg= equals+1; - } else { - arg= *++argv; - if (!arg) badusage("option --%s needs a value",o->lng); - } - o->fn(o, arg); - break; /* eaten the whole argument now */ - } - for (o=options; o->lng; o++) - if (a == o->shrt) - goto found_short; - badusage("unknown short option -%c",a); - found_short: - if (o->noarg) { - o->fn(o,0); - } else { - if (!*++arg) { - arg= *++argv; - if (!arg) badusage("option -%c needs a value",o->shrt); - } - o->fn(o,arg); - break; /* eaten the whole argument now */ - } - } +static void help(const Option *o, const char *val) { + printusage(stdout); + if (ferror(stdout) || fflush(stdout)) { + perror("innduct: writing help"); + exit(12); } + exit(0); +} - if (!arg) badusage("need site name argument"); - sitename= arg; +static void convert_to_periods_rndup(int *store) { + *store += period_seconds-1; + *store /= period_seconds; +} - if ((arg= *++argv)) - remote_host= arg; +int main(int argc, char **argv) { + if (!argv[1]) { + printusage(stderr); + exit(12); + } - if (*++argv) badusage("too many non-option arguments"); + parse_options(innduct_options, &argv); - if (nocheck_thresh_pct < 0 || nocheck_thresh_pct > 100) - badusage("nocheck threshold percentage must be between 0..100"); - nocheck_thresh= nocheck_thresh_pct * 0.01; + /* arguments */ - if (nocheck_decay_articles < 0.1) - badusage("nocheck decay articles must be at least 0.1"); - nocheck_decay= 1 - 1/nocheck_decay_articles; + sitename= *argv++; + if (!sitename) badusage("need site name argument"); + remote_host= *argv++; + if (*argv) badusage("too many non-option arguments"); + + /* defaults */ - if (!pathoutgoing) - pathoutgoing= innconf->pathoutgoing; - innconf_read(inndconffile); + if (!remote_host) remote_host= sitename; + + if (nocheck_thresh < 0 || nocheck_thresh > 100) + badusage("nocheck threshold percentage must be between 0..100"); + nocheck_thresh *= 0.01; - if (!feedfile) - feedfile= xasprintf("%s/%s",pathoutgoing,sitename); - else if (!feedfile[0]) + if (nocheck_decay < 0.1) + badusage("nocheck decay articles must be at least 0.1"); + nocheck_decay= 1 - 1.0/nocheck_decay; + + convert_to_periods_rndup(&reconnect_delay_periods); + convert_to_periods_rndup(&flushfail_retry_periods); + convert_to_periods_rndup(&backlog_retry_minperiods); + convert_to_periods_rndup(&backlog_spontrescan_periods); + convert_to_periods_rndup(&spontaneous_flush_periods); + convert_to_periods_rndup(&need_activity_periods); + + if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100) + badusage("bad input data ratio must be between 0..100"); + max_bad_data_ratio *= 0.01; + + if (!feedfile) { + innconf_read(inndconffile); + feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename); + } else if (!feedfile[0]) { badusage("feed filename must be nonempty"); - else if (feedfile[strlen(feedfile)-1]=='/') + } else if (feedfile[strlen(feedfile)-1]=='/') { feedfile= xasprintf("%s%s",feedfile,sitename); + } const char *feedfile_forbidden= "?*[~#"; int c; @@ -2657,6 +2802,8 @@ int main(int argc, char **argv) { if (strchr(feedfile, c)) badusage("feed filename may not contain metacharacter %c",c); + /* set things up */ + oop_source_sys *sysloop= oop_sys_new(); if (!sysloop) sysdie("could not create liboop event loop"); loop= (oop_source*)sysloop; @@ -2664,6 +2811,9 @@ int main(int argc, char **argv) { if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) sysdie("could not ignore SIGPIPE"); + LIST_INIT(conns); + LIST_INIT(queue); + if (become_daemon) { int i; for (i=3; i<255; i++) @@ -2676,7 +2826,7 @@ int main(int argc, char **argv) { dup2(null,0); dup2(null,1); dup2(null,2); - close(null); + xclose(null, "/dev/null original fd",0); pid_t child1= xfork("daemonise first fork"); if (child1) _exit(0); @@ -2700,6 +2850,8 @@ int main(int argc, char **argv) { statemc_init(); + /* let's go */ + void *r= oop_sys_run(sysloop); assert(r == OOP_ERROR); sysdie("event loop failed");