X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=blobdiff_plain;f=backends%2Finnduct.c;h=adf6cad9573b4b925b79c2dacd8a46278f5500ec;hp=7557b0f694ed6863c53f3fffdf4de230a638b3ca;hb=ac93723ebee6801697191caf50f3d0b714717068;hpb=2b227ef223abbe8dc57abd6509490f73cdc48755 diff --git a/backends/innduct.c b/backends/innduct.c index 7557b0f..adf6cad 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -23,6 +23,13 @@ * with GPLv3. If not then please let me know. -Ian Jackson.) */ +/* + * todo + * + * don't mind reconnecting if we just disconnected due to idle + * some weird disconnection event still investigating + */ + /* * Newsfeeds file entries should look like this: * host.name.of.site[/exclude,exclude,...]\ @@ -343,7 +350,7 @@ static void inputfile_reading_resume(InputFile *ipf); static void filemon_start(InputFile *ipf); static void filemon_stop(InputFile *ipf); -static void filemon_callback(InputFile *ipf); +static void tailing_make_readable(InputFile *ipf); static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0); static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3); @@ -356,7 +363,7 @@ static oop_rd_call peer_rd_err, peer_rd_ok; /* when changing defaults, remember to update the manpage */ static const char *sitename, *remote_host; -static const char *feedfile, *path_cli, *path_cli_dir; +static const char *feedfile, *path_run, *path_cli, *path_cli_dir; static int quiet_multiple=0; static int become_daemon=1, try_filemon=1; static int try_stream=1; @@ -379,11 +386,13 @@ static double nocheck_decay= 100; /* conv'd from articles to lambda by main */ /* all these are initialised to seconds, and converted to periods in main */ static int reconnect_delay_periods=1000; static int flushfail_retry_periods=1000; -static int backlog_retry_minperiods=50; +static int backlog_retry_minperiods=100; static int backlog_spontrescan_periods=300; static int spontaneous_flush_periods=100000; static int max_separated_periods=2000; static int need_activity_periods=1000; +static int recentact_thresh=3; +static int recentact_periods=1000; static double max_bad_data_ratio= 1; /* conv'd from percentage by main */ static int max_bad_data_initial= 30; @@ -457,7 +466,7 @@ struct InputFile { oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */ off_t offset; - int skippinglong, paused; + int skippinglong, paused, fake_readable; ArticleList queue; long inprogress; /* includes queue.count and also articles in conns */ @@ -521,6 +530,9 @@ static ConnList conns; static char *path_lock, *path_flushing, *path_defer, *path_dump; static char *globpat_backlog; static pid_t self_pid; +static int *recentact_perperiod; +static int recentact_circptr; +static int recentact_total; /* statemc_init initialises */ static StateMachineState sms; @@ -1057,12 +1069,24 @@ static void cli_init(void) { /*========== management of connections ==========*/ +static void reconnect_blocking_event(void) { + until_connect= reconnect_delay_periods; +} + static void conn_closefd(Conn *conn, const char *msgprefix) { int r= close_perhaps(&conn->fd); if (r) info("C%d %serror closing socket: %s", conn->fd, msgprefix, strerror(errno)); } +static int conn_busy(Conn *conn) { + return + conn->waiting.count || + conn->priority.count || + conn->sent.count || + conn->xmitu; +} + static void conn_dispose(Conn *conn) { if (!conn) return; if (conn->rd) { @@ -1076,7 +1100,6 @@ static void conn_dispose(Conn *conn) { } conn_closefd(conn,""); free(conn); - until_connect= reconnect_delay_periods; } static void *conn_exception(oop_source *lp, int fd, @@ -1116,12 +1139,14 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) { for (i=0, d=conn->xmitd; ixmitu; i++, d++) xmit_free(d); + LIST_REMOVE(conns,conn); + char *m= xvasprintf(fmt,al); - warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s", - conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); + warn("C%d (now %d) connection failed requeueing " RCI_TRIPLE_FMT_BASE ": %s", + conn->fd, conns.count, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); free(m); - LIST_REMOVE(conns,conn); + reconnect_blocking_event(); conn_dispose(conn); check_assign_articles(); } @@ -1133,10 +1158,47 @@ static void connfail(Conn *conn, const char *fmt, ...) { va_end(al); } +static void conn_idle_close(Conn *conn, const char *why) { + static const char quitcmd[]= "QUIT\r\n"; + int todo= sizeof(quitcmd)-1; + const char *p= quitcmd; + for (;;) { + int r= write(conn->fd, p, todo); + if (r<0) { + if (isewouldblock(errno)) + connfail(conn, "blocked writing QUIT to idle connection"); + else + connfail(conn, "failed to write QUIT to idle connection: %s", + strerror(errno)); + break; + } + assert(r<=todo); + todo -= r; + if (!todo) { + conn->quitting= 1; + conn->since_activity= 0; + debug("C%d is idle (%s), quitting", conn->fd, why); + break; + } + } +} + +/* + * For our last connection, we also shut it down if we have had + * less than K in the last L + */ static void check_idle_conns(void) { Conn *conn; + + int veryrecentact= recentact_perperiod[recentact_circptr]; + recentact_circptr++; + recentact_circptr %= recentact_periods; + recentact_total -= recentact_perperiod[recentact_circptr]; + recentact_perperiod[recentact_circptr]= 0; + FOR_CONN(conn) conn->since_activity++; + search_again: FOR_CONN(conn) { if (conn->since_activity <= need_activity_periods) continue; @@ -1151,32 +1213,18 @@ static void check_idle_conns(void) { else if (conn->xmitu) connfail(conn,"peer has been sending responses" " before receiving our commands!"); - else { - static const char quitcmd[]= "QUIT\r\n"; - int todo= sizeof(quitcmd)-1; - const char *p= quitcmd; - for (;;) { - int r= write(conn->fd, p, todo); - if (r<0) { - if (isewouldblock(errno)) - connfail(conn, "blocked writing QUIT to idle connection"); - else - connfail(conn, "failed to write QUIT to idle connection: %s", - strerror(errno)); - break; - } - assert(r<=todo); - todo -= r; - if (!todo) { - conn->quitting= 1; - conn->since_activity= 0; - debug("C%d is idle, quitting", conn->fd); - break; - } - } - } + else + conn_idle_close(conn, "no activity"); + goto search_again; } + + conn= LIST_HEAD(conns); + if (!veryrecentact && + conns.count==1 && + recentact_total+veryrecentact < recentact_thresh && + !conn_busy(conn)) + conn_idle_close(conn, "low volume"); } /*---------- making new connections ----------*/ @@ -1298,8 +1346,9 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { &peer_rd_err, conn); if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd); - notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain"); LIST_ADDHEAD(conns, conn); + notice("C%d (now %d) connected %s", + conn->fd, conns.count, conn->stream ? "streaming" : "plain"); connect_attempt_discard(); check_assign_articles(); @@ -1308,6 +1357,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { x: conn_dispose(conn); connect_attempt_discard(); + reconnect_blocking_event(); return OOP_CONTINUE; } @@ -1322,6 +1372,8 @@ static void connect_start(void) { assert(!connecting_fdpass_sock); info("starting connection attempt"); + int ok_reconnect_delay_periods= reconnect_delay_periods; + reconnect_blocking_event(); int socks[2]; int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks); @@ -1414,6 +1466,9 @@ static void connect_start(void) { connecting_fdpass_sock= socks[0]; xsetnonblock(connecting_fdpass_sock, 1); on_fd_read_except(connecting_fdpass_sock, connchild_event); + + if (!conns.count) + reconnect_delay_periods= ok_reconnect_delay_periods; } /*---------- assigning articles to conns, and transmitting ----------*/ @@ -1466,11 +1521,11 @@ static void check_assign_articles(void) { Article *art= dequeue(0); if (!art) break; LIST_ADDTAIL(use->waiting, art); + recentact_perperiod[recentact_circptr]++; spare--; } conn_maybe_write(use); } else if (allow_connect_start()) { - until_connect= reconnect_delay_periods; connect_start(); break; } else { @@ -1661,15 +1716,18 @@ static void *conn_write_some_xmits(Conn *conn) { assert(rs > 0); int done; - for (done=0; rs && donexmitu; done++) { + for (done=0; rs; ) { + assert(donexmitu); struct iovec *vp= &conn->xmit[done]; XmitDetails *dp= &conn->xmitd[done]; - if (rs > vp->iov_len) { + if (rs >= vp->iov_len) { rs -= vp->iov_len; - xmit_free(dp); + xmit_free(dp); /* vp->iov_len -= vp->iov_len, etc. */ + done++; } else { vp->iov_base= (char*)vp->iov_base + rs; vp->iov_len -= rs; + break; /* rs -= rs */ } } int newu= conn->xmitu - done; @@ -1889,19 +1947,16 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, return OOP_CONTINUE; } - int conn_busy= - conn->waiting.count || - conn->priority.count || - conn->sent.count || - conn->xmitu; + int busy= conn_busy(conn); if (conn->quitting) { - if (code!=205 && code!=503) { + if (code!=205 && code!=400) { connfail(conn, "peer gave unexpected response to QUIT: %s", sani); } else { - notice("C%d idle connection closed by us", conn->fd); - assert(!conn_busy); LIST_REMOVE(conns,conn); + notice("C%d (now %d) idle connection closed by us", + conn->fd, conns.count); + assert(!busy); conn_dispose(conn); } return OOP_CONTINUE; @@ -1933,11 +1988,12 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, default: PEERBADMSG("peer sent unexpected message"); case 400: - if (conn_busy) + if (busy) PEERBADMSG("peer timed us out or stopped accepting articles"); - notice("C%d idle connection closed by peer", conn->fd); LIST_REMOVE(conns,conn); + notice("C%d (now %d) idle connection closed by peer", + conns.count, conn->fd); conn_dispose(conn); return OOP_CONTINUE; @@ -2125,7 +2181,16 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, static void *tailing_rable_call_time(oop_source *loop, struct timeval tv, void *user) { + /* lifetime of ipf here is OK because destruction will cause + * on_cancel which will cancel this callback */ InputFile *ipf= user; + + if (!ipf->fake_readable) return OOP_CONTINUE; + + /* we just keep calling readable until our caller (oop_rd) + * has called try_read, and try_read has found EOF so given EAGAIN */ + loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); + return ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user); } @@ -2138,9 +2203,10 @@ static void tailing_on_cancel(struct oop_readable *rable) { ipf->readable_callback= 0; } -static void tailing_queue_readable(InputFile *ipf) { - /* lifetime of ipf here is OK because destruction will cause - * on_cancel which will cancel this callback */ +static void tailing_make_readable(InputFile *ipf) { + if (!ipf || !ipf->readable_callback) /* so callers can be naive */ + return; + ipf->fake_readable= 1; loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); } @@ -2152,8 +2218,7 @@ static int tailing_on_readable(struct oop_readable *rable, ipf->readable_callback= cb; ipf->readable_callback_user= user; filemon_start(ipf); - - tailing_queue_readable(ipf); + tailing_make_readable(ipf); return 0; } @@ -2164,11 +2229,13 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, ssize_t r= read(ipf->fd, buffer, length); if (r==-1) { if (errno==EINTR) continue; + ipf->fake_readable= 0; return r; } if (!r) { if (ipf==main_input_file) { errno=EAGAIN; + ipf->fake_readable= 0; return -1; } else if (ipf==flushing_input_file) { assert(ipf->rd); @@ -2179,7 +2246,6 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, abort(); } } - tailing_queue_readable(ipf); return r; } } @@ -2244,7 +2310,7 @@ static void *filemon_inotify_readable(oop_source *lp, int fd, } InputFile *ipf= filemon_inotify_wd2ipf[iev.wd]; /*debug("filemon inotify readable read %p wd=%d", ipf, iev.wd);*/ - filemon_callback(ipf); + tailing_make_readable(ipf); } return OOP_CONTINUE; } @@ -2305,11 +2371,6 @@ static void filemon_stop(InputFile *ipf) { ipf->filemon= 0; } -static void filemon_callback(InputFile *ipf) { - if (ipf && ipf->readable_callback) /* so filepoll() can be naive */ - ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user); -} - /*---------- interface to start and stop an input file ----------*/ static const oop_rd_style feedfile_rdstyle= { @@ -3048,7 +3109,7 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { case INNDCOMMCHILD_ESTATUS_NONESUCH: notice("feed has been dropped by innd, finishing up"); flushing_input_file= main_input_file; - tailing_queue_readable(flushing_input_file); + tailing_make_readable(flushing_input_file); /* we probably previously returned EAGAIN from our fake read method * when in fact we were at EOF, so signal another readable event * so we actually see the EOF */ @@ -3068,17 +3129,19 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { case 0: /* as above */ flushing_input_file= main_input_file; - tailing_queue_readable(flushing_input_file); + tailing_make_readable(flushing_input_file); main_input_file= open_input_file(feedfile); if (!main_input_file) - die("flush succeeded but feedfile %s does not exist!", feedfile); + die("flush succeeded but feedfile %s does not exist!" + " (this probably means feedfile does not correspond" + " to site %s in newsfeeds)", feedfile, sitename); if (flushing_input_file) { - SMS(SEPARATED, max_separated_periods, "recovery flush complete"); + SMS(SEPARATED, max_separated_periods, "flush complete"); } else { close_defer(); - SMS(NORMAL, spontaneous_flush_periods, "flush complete"); + SMS(NORMAL, spontaneous_flush_periods, "recovery flush complete"); } return OOP_CONTINUE; @@ -3217,8 +3280,8 @@ static void every(int interval, int fixed_rate, void (*f)(void)) { } static void filepoll(void) { - filemon_callback(main_input_file); - filemon_callback(flushing_input_file); + tailing_make_readable(main_input_file); + tailing_make_readable(flushing_input_file); } static char *debug_report_ipf(InputFile *ipf) { @@ -3567,6 +3630,7 @@ static const Option innduct_options[]= { {0,"no-filemon", 0, &try_filemon, op_setint, 0 }, {'C',"inndconf", "F", &inndconffile, op_string }, {'P',"port", "PORT", &port, op_integer }, +{0,"chdir", "DIR", &path_run, op_string }, {0,"cli", "DIR/|PATH", &path_cli, op_string }, {0,"help", 0, 0, help }, @@ -3590,6 +3654,8 @@ static const Option innduct_options[]= { {0,"max-flush-interval", "PERIOD", &spontaneous_flush_periods,op_seconds }, {0,"flush-finish-timeout", "PERIOD", &max_separated_periods, op_seconds }, {0,"idle-timeout", "PERIOD", &need_activity_periods, op_seconds }, +{0,"low-volume-thresh", "PERIOD", &recentact_thresh, op_integer }, +{0,"low-volume-window", "PERIOD", &recentact_periods, op_seconds }, {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio, op_double }, {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer }, @@ -3635,7 +3701,10 @@ int main(int argc, char **argv) { sitename= *argv++; if (!sitename) badusage("need site name argument"); - remote_host= *argv++; + + if (*argv) remote_host= *argv++; + else remote_host= sitename; + if (*argv) badusage("too many non-option arguments"); /* defaults */ @@ -3660,21 +3729,24 @@ int main(int argc, char **argv) { convert_to_periods_rndup(&spontaneous_flush_periods); convert_to_periods_rndup(&max_separated_periods); convert_to_periods_rndup(&need_activity_periods); + convert_to_periods_rndup(&recentact_periods); if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100) badusage("bad input data ratio must be between 0..100"); max_bad_data_ratio *= 0.01; - - if (!feedfile) { - feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename); - } else if (!feedfile[0]) { - badusage("feed filename, if specified, must be nonempty"); - } else if (path_ends_slash(feedfile)) { + + if (!path_run) + path_run= innconf->pathrun; + + if (!feedfile) feedfile= sitename; + if (!feedfile[0]) badusage("feed filename, if specified, must be nonempty"); + if (path_ends_slash(feedfile)) feedfile= xasprintf("%s%s", feedfile, sitename); - } + if (feedfile[0] != '/') + feedfile= xasprintf("%s/%s", innconf->pathoutgoing, feedfile); if (!path_cli) { - path_cli_dir= xasprintf("%s/innduct", innconf->pathrun); + path_cli_dir= "innduct"; } else if (!path_cli[0] || !strcmp(path_cli,"none")) { path_cli= 0; /* ok, don't then */ } else if (path_ends_slash(path_cli)) { @@ -3692,6 +3764,13 @@ int main(int argc, char **argv) { if (strchr(feedfile, c)) badusage("feed filename may not contain metacharacter %c",c); + int i; + recentact_perperiod= xcalloc(sizeof(*recentact_perperiod),recentact_periods); + for (i=0; i