X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=blobdiff_plain;f=backends%2Finnduct.c;h=d59d888362c3fd838ecab027abec067ab0826f37;hp=925b7fc9611a64970e7ad960e91c34878c75cede;hb=8dabf92db4447ec601ecbab0a709650f427d016b;hpb=8f9c239a1c4116bb35218911c12582380987a842 diff --git a/backends/innduct.c b/backends/innduct.c index 925b7fc..d59d888 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -358,7 +358,7 @@ static oop_rd_call peer_rd_err, peer_rd_ok; static const char *sitename, *remote_host; static const char *feedfile, *path_run, *path_cli, *path_cli_dir; static int quiet_multiple=0; -static int become_daemon=1, try_filemon=1; +static int interactive=0, try_filemon=1; static int try_stream=1; static int port=119; static const char *inndconffile; @@ -366,7 +366,7 @@ static const char *inndconffile; static int max_connections=10; static int max_queue_per_conn=200; static int target_max_feedfile_size=100000; -static int period_seconds=60; +static int period_seconds=30; static int filepoll_seconds=5; static int max_queue_per_ipf=-1; @@ -379,11 +379,13 @@ static double nocheck_decay= 100; /* conv'd from articles to lambda by main */ /* all these are initialised to seconds, and converted to periods in main */ static int reconnect_delay_periods=1000; static int flushfail_retry_periods=1000; -static int backlog_retry_minperiods=50; +static int backlog_retry_minperiods=100; static int backlog_spontrescan_periods=300; static int spontaneous_flush_periods=100000; static int max_separated_periods=2000; static int need_activity_periods=1000; +static int lowvol_thresh=3; +static int lowvol_periods=1000; static double max_bad_data_ratio= 1; /* conv'd from percentage by main */ static int max_bad_data_initial= 30; @@ -502,7 +504,9 @@ struct Conn { ISNODE(Conn); int fd; /* may be 0, meaning closed (during construction/destruction) */ oop_read *rd; /* likewise */ - int max_queue, stream, quitting; + int oopwriting; /* since on_fd is not idempotent */ + int max_queue, stream; + const char *quitting; int since_activity; /* periods */ ArticleList waiting; /* not yet told peer */ ArticleList priority; /* peer says send it now */ @@ -521,6 +525,9 @@ static ConnList conns; static char *path_lock, *path_flushing, *path_defer, *path_dump; static char *globpat_backlog; static pid_t self_pid; +static int *lowvol_perperiod; +static int lowvol_circptr; +static int lowvol_total; /* does not include current period */ /* statemc_init initialises */ static StateMachineState sms; @@ -541,7 +548,7 @@ int simulate_flush= -1; static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3); static void logcore(int sysloglevel, const char *fmt, ...) { VA; - if (become_daemon) { + if (interactive < 2) { vsyslog(sysloglevel,fmt,al); } else { if (self_pid) fprintf(stderr,"[%lu] ",(unsigned long)self_pid); @@ -1057,12 +1064,24 @@ static void cli_init(void) { /*========== management of connections ==========*/ +static void reconnect_blocking_event(void) { + until_connect= reconnect_delay_periods; +} + static void conn_closefd(Conn *conn, const char *msgprefix) { int r= close_perhaps(&conn->fd); if (r) info("C%d %serror closing socket: %s", conn->fd, msgprefix, strerror(errno)); } +static int conn_busy(Conn *conn) { + return + conn->waiting.count || + conn->priority.count || + conn->sent.count || + conn->xmitu; +} + static void conn_dispose(Conn *conn) { if (!conn) return; if (conn->rd) { @@ -1076,7 +1095,6 @@ static void conn_dispose(Conn *conn) { } conn_closefd(conn,""); free(conn); - until_connect= reconnect_delay_periods; } static void *conn_exception(oop_source *lp, int fd, @@ -1116,12 +1134,14 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) { for (i=0, d=conn->xmitd; ixmitu; i++, d++) xmit_free(d); + LIST_REMOVE(conns,conn); + char *m= xvasprintf(fmt,al); - warn("C%d connection failed requeueing " RCI_TRIPLE_FMT_BASE ": %s", - conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); + warn("C%d (now %d) connection failed requeueing " RCI_TRIPLE_FMT_BASE ": %s", + conn->fd, conns.count, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); free(m); - LIST_REMOVE(conns,conn); + reconnect_blocking_event(); conn_dispose(conn); check_assign_articles(); } @@ -1133,17 +1153,56 @@ static void connfail(Conn *conn, const char *fmt, ...) { va_end(al); } +static void conn_idle_close(Conn *conn, const char *why) { + static const char quitcmd[]= "QUIT\r\n"; + int todo= sizeof(quitcmd)-1; + const char *p= quitcmd; + for (;;) { + int r= write(conn->fd, p, todo); + if (r<0) { + if (isewouldblock(errno)) + connfail(conn, "blocked writing QUIT to idle connection"); + else + connfail(conn, "failed to write QUIT to idle connection: %s", + strerror(errno)); + break; + } + assert(r<=todo); + todo -= r; + if (!todo) { + conn->quitting= why; + conn->since_activity= 0; + debug("C%d is idle (%s), quitting", conn->fd, why); + break; + } + } +} + +/* + * For our last connection, we also shut it down if we have had + * less than K in the last L + */ static void check_idle_conns(void) { Conn *conn; + + int volthisperiod= lowvol_perperiod[lowvol_circptr]; + lowvol_circptr++; + lowvol_circptr %= lowvol_periods; + lowvol_total += volthisperiod; + lowvol_total -= lowvol_perperiod[lowvol_circptr]; + lowvol_perperiod[lowvol_circptr]= 0; + FOR_CONN(conn) conn->since_activity++; + search_again: FOR_CONN(conn) { if (conn->since_activity <= need_activity_periods) continue; /* We need to shut this down */ if (conn->quitting) - connfail(conn,"timed out waiting for response to QUIT"); + connfail(conn,"timed out waiting for response to QUIT (%s)", + conn->quitting); else if (conn->sent.count) connfail(conn,"timed out waiting for responses"); else if (conn->waiting.count || conn->priority.count) @@ -1151,32 +1210,18 @@ static void check_idle_conns(void) { else if (conn->xmitu) connfail(conn,"peer has been sending responses" " before receiving our commands!"); - else { - static const char quitcmd[]= "QUIT\r\n"; - int todo= sizeof(quitcmd)-1; - const char *p= quitcmd; - for (;;) { - int r= write(conn->fd, p, todo); - if (r<0) { - if (isewouldblock(errno)) - connfail(conn, "blocked writing QUIT to idle connection"); - else - connfail(conn, "failed to write QUIT to idle connection: %s", - strerror(errno)); - break; - } - assert(r<=todo); - todo -= r; - if (!todo) { - conn->quitting= 1; - conn->since_activity= 0; - debug("C%d is idle, quitting", conn->fd); - break; - } - } - } + else + conn_idle_close(conn, "no activity"); + goto search_again; } + + conn= LIST_HEAD(conns); + if (!volthisperiod && + conns.count==1 && + lowvol_total < lowvol_thresh && + !conn_busy(conn)) + conn_idle_close(conn, "low volume"); } /*---------- making new connections ----------*/ @@ -1298,8 +1343,9 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { &peer_rd_err, conn); if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd); - notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain"); LIST_ADDHEAD(conns, conn); + notice("C%d (now %d) connected %s", + conn->fd, conns.count, conn->stream ? "streaming" : "plain"); connect_attempt_discard(); check_assign_articles(); @@ -1308,6 +1354,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { x: conn_dispose(conn); connect_attempt_discard(); + reconnect_blocking_event(); return OOP_CONTINUE; } @@ -1322,6 +1369,8 @@ static void connect_start(void) { assert(!connecting_fdpass_sock); info("starting connection attempt"); + int ok_until_connect= until_connect; + reconnect_blocking_event(); int socks[2]; int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks); @@ -1414,6 +1463,9 @@ static void connect_start(void) { connecting_fdpass_sock= socks[0]; xsetnonblock(connecting_fdpass_sock, 1); on_fd_read_except(connecting_fdpass_sock, connchild_event); + + if (!conns.count) + until_connect= ok_until_connect; } /*---------- assigning articles to conns, and transmitting ----------*/ @@ -1466,11 +1518,11 @@ static void check_assign_articles(void) { Article *art= dequeue(0); if (!art) break; LIST_ADDTAIL(use->waiting, art); + lowvol_perperiod[lowvol_circptr]++; spare--; } conn_maybe_write(use); } else if (allow_connect_start()) { - until_connect= reconnect_delay_periods; connect_start(); break; } else { @@ -1489,12 +1541,16 @@ static void conn_maybe_write(Conn *conn) { conn_make_some_xmits(conn); if (!conn->xmitu) { loop->cancel_fd(loop, conn->fd, OOP_WRITE); + conn->oopwriting= 0; return; } void *rp= conn_write_some_xmits(conn); if (rp==OOP_CONTINUE) { - loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn); + if (!conn->oopwriting) { + loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn); + conn->oopwriting= 1; + } return; } else if (rp==OOP_HALT) { return; @@ -1584,6 +1640,7 @@ static void autodefer_input_file_articles(InputFile *ipf) { } static void autodefer_input_file(InputFile *ipf) { + static const char *const abandon= "stuck"; ipf->autodefer= 0; autodefer_input_file_articles(ipf); @@ -1594,11 +1651,11 @@ static void autodefer_input_file(InputFile *ipf) { if (has_article_in(&walk->waiting, ipf) || has_article_in(&walk->priority, ipf) || has_article_in(&walk->sent, ipf)) - walk->quitting= -1; + walk->quitting= abandon; } while (ipf->inprogress) { FOR_CONN(walk) - if (walk->quitting < 0) goto found; + if (walk->quitting == abandon) goto found; abort(); /* where are they ?? */ found: @@ -1661,15 +1718,18 @@ static void *conn_write_some_xmits(Conn *conn) { assert(rs > 0); int done; - for (done=0; rs && donexmitu; done++) { + for (done=0; rs; ) { + assert(donexmitu); struct iovec *vp= &conn->xmit[done]; XmitDetails *dp= &conn->xmitd[done]; - if (rs > vp->iov_len) { + if (rs >= vp->iov_len) { rs -= vp->iov_len; - xmit_free(dp); + xmit_free(dp); /* vp->iov_len -= vp->iov_len, etc. */ + done++; } else { vp->iov_base= (char*)vp->iov_base + rs; vp->iov_len -= rs; + break; /* rs -= rs */ } } int newu= conn->xmitu - done; @@ -1889,19 +1949,17 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, return OOP_CONTINUE; } - int conn_busy= - conn->waiting.count || - conn->priority.count || - conn->sent.count || - conn->xmitu; + int busy= conn_busy(conn); if (conn->quitting) { - if (code!=205 && code!=503) { - connfail(conn, "peer gave unexpected response to QUIT: %s", sani); + if (code!=205 && code!=400) { + connfail(conn, "peer gave unexpected response to QUIT (%s): %s", + conn->quitting, sani); } else { - notice("C%d idle connection closed by us", conn->fd); - assert(!conn_busy); LIST_REMOVE(conns,conn); + notice("C%d (now %d) idle connection closed (%s)", + conn->fd, conns.count, conn->quitting); + assert(!busy); conn_dispose(conn); } return OOP_CONTINUE; @@ -1933,11 +1991,12 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, default: PEERBADMSG("peer sent unexpected message"); case 400: - if (conn_busy) + if (busy) PEERBADMSG("peer timed us out or stopped accepting articles"); - notice("C%d idle connection closed by peer", conn->fd); LIST_REMOVE(conns,conn); + notice("C%d (now %d) idle connection closed by peer", + conns.count, conn->fd); conn_dispose(conn); return OOP_CONTINUE; @@ -3077,13 +3136,15 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { main_input_file= open_input_file(feedfile); if (!main_input_file) - die("flush succeeded but feedfile %s does not exist!", feedfile); + die("flush succeeded but feedfile %s does not exist!" + " (this probably means feedfile does not correspond" + " to site %s in newsfeeds)", feedfile, sitename); if (flushing_input_file) { - SMS(SEPARATED, max_separated_periods, "recovery flush complete"); + SMS(SEPARATED, max_separated_periods, "flush complete"); } else { close_defer(); - SMS(NORMAL, spontaneous_flush_periods, "flush complete"); + SMS(NORMAL, spontaneous_flush_periods, "recovery flush complete"); } return OOP_CONTINUE; @@ -3249,11 +3310,11 @@ static void period(void) { debug("PERIOD" " sms=%s[%d] conns=%d until_connect=%d" " input_files main:%s flushing:%s backlog:%s[%d]" - " children connecting=%ld inndcomm=%ld" + " children connecting=%ld inndcomm=%ld lowvol_total=%d" , sms_names[sms], until_flush, conns.count, until_connect, dipf_main, dipf_flushing, dipf_backlog, until_backlog_nextscan, - (long)connecting_child, (long)inndcomm_child + (long)connecting_child, (long)inndcomm_child, lowvol_total ); free(dipf_main); @@ -3344,6 +3405,17 @@ CCMD(dump) { DUMPV("%d", , cli_master); fprintf(f,"\n"); + fprintf(f,"lowvol"); + DUMPV("%d", , lowvol_circptr); + DUMPV("%d", , lowvol_total); + fprintf(f,":"); + for (i=0; ifd); DUMPV("%p",conn->,rd); DUMPV("%d",conn->,max_queue); - DUMPV("%d",conn->,stream); DUMPV("%d",conn->,quitting); + DUMPV("%d",conn->,stream); DUMPV("\"%s\"",conn->,quitting); DUMPV("%d",conn->,since_activity); fprintf(f,"\n"); @@ -3407,7 +3479,7 @@ static void vbadusage(const char *fmt, va_list al) { fprintf(stderr, "bad usage: %s\n" "say --help for help, or read the manpage\n", m); - if (become_daemon) + if (interactive < 2) syslog(LOG_CRIT,"innduct: invoked with bad usage: %s",m); exit(8); } @@ -3567,7 +3639,8 @@ static void help(const Option *o, const char *val); static const Option innduct_options[]= { {'f',"feedfile", "F", &feedfile, op_string }, {'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 }, -{0,"no-daemon", 0, &become_daemon, op_setint, 0 }, +{0,"no-daemon", 0, &interactive, op_setint, 1 }, +{0,"interactive", 0, &interactive, op_setint, 2 }, {0,"no-streaming", 0, &try_stream, op_setint, 0 }, {0,"no-filemon", 0, &try_filemon, op_setint, 0 }, {'C',"inndconf", "F", &inndconffile, op_string }, @@ -3596,6 +3669,8 @@ static const Option innduct_options[]= { {0,"max-flush-interval", "PERIOD", &spontaneous_flush_periods,op_seconds }, {0,"flush-finish-timeout", "PERIOD", &max_separated_periods, op_seconds }, {0,"idle-timeout", "PERIOD", &need_activity_periods, op_seconds }, +{0,"low-volume-thresh", "PERIOD", &lowvol_thresh, op_integer }, +{0,"low-volume-window", "PERIOD", &lowvol_periods, op_seconds }, {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio, op_double }, {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer }, @@ -3641,7 +3716,10 @@ int main(int argc, char **argv) { sitename= *argv++; if (!sitename) badusage("need site name argument"); - remote_host= *argv++; + + if (*argv) remote_host= *argv++; + else remote_host= sitename; + if (*argv) badusage("too many non-option arguments"); /* defaults */ @@ -3666,6 +3744,7 @@ int main(int argc, char **argv) { convert_to_periods_rndup(&spontaneous_flush_periods); convert_to_periods_rndup(&max_separated_periods); convert_to_periods_rndup(&need_activity_periods); + convert_to_periods_rndup(&lowvol_periods); if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100) badusage("bad input data ratio must be between 0..100"); @@ -3700,6 +3779,14 @@ int main(int argc, char **argv) { if (strchr(feedfile, c)) badusage("feed filename may not contain metacharacter %c",c); + int i; + lowvol_perperiod= xcalloc(sizeof(*lowvol_perperiod), lowvol_periods); + for (i=0; i= 2) cli_stdio(); cli_init();