X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=blobdiff_plain;f=backends%2Finnduct.c;h=6e9dd262dbfd5bd4c573dc5b3c858df6fb949a91;hp=f8af279a37693ddabec10495c1ab7ef4d2ddbfd8;hb=c07a9d0aee10d37d55c8a6f60cf261da9e2d44cd;hpb=94d174d6b8da51513defa3bbba662d4332e9902c diff --git a/backends/innduct.c b/backends/innduct.c index f8af279..6e9dd26 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -23,6 +23,13 @@ * with GPLv3. If not then please let me know. -Ian Jackson.) */ +/* + * todo + * + * don't mind reconnecting if we just disconnected due to idle + * some weird disconnection event still investigating + */ + /* * Newsfeeds file entries should look like this: * host.name.of.site[/exclude,exclude,...]\ @@ -384,6 +391,8 @@ static int backlog_spontrescan_periods=300; static int spontaneous_flush_periods=100000; static int max_separated_periods=2000; static int need_activity_periods=1000; +static int lowvol_thresh=3; +static int lowvol_periods=1000; static double max_bad_data_ratio= 1; /* conv'd from percentage by main */ static int max_bad_data_initial= 30; @@ -502,7 +511,8 @@ struct Conn { ISNODE(Conn); int fd; /* may be 0, meaning closed (during construction/destruction) */ oop_read *rd; /* likewise */ - int max_queue, stream, quitting; + int max_queue, stream; + const char *quitting; int since_activity; /* periods */ ArticleList waiting; /* not yet told peer */ ArticleList priority; /* peer says send it now */ @@ -521,6 +531,9 @@ static ConnList conns; static char *path_lock, *path_flushing, *path_defer, *path_dump; static char *globpat_backlog; static pid_t self_pid; +static int *lowvol_perperiod; +static int lowvol_circptr; +static int lowvol_total; /* does not include current period */ /* statemc_init initialises */ static StateMachineState sms; @@ -1057,12 +1070,24 @@ static void cli_init(void) { /*========== management of connections ==========*/ +static void reconnect_blocking_event(void) { + until_connect= reconnect_delay_periods; +} + static void conn_closefd(Conn *conn, const char *msgprefix) { int r= close_perhaps(&conn->fd); if (r) info("C%d %serror closing socket: %s", conn->fd, msgprefix, strerror(errno)); } +static int conn_busy(Conn *conn) { + return + conn->waiting.count || + conn->priority.count || + conn->sent.count || + conn->xmitu; +} + static void conn_dispose(Conn *conn) { if (!conn) return; if (conn->rd) { @@ -1076,7 +1101,6 @@ static void conn_dispose(Conn *conn) { } conn_closefd(conn,""); free(conn); - until_connect= reconnect_delay_periods; } static void *conn_exception(oop_source *lp, int fd, @@ -1119,10 +1143,11 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) { LIST_REMOVE(conns,conn); char *m= xvasprintf(fmt,al); - warn("C%d[%d] connection failed requeueing " RCI_TRIPLE_FMT_BASE ": %s", + warn("C%d (now %d) connection failed requeueing " RCI_TRIPLE_FMT_BASE ": %s", conn->fd, conns.count, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); free(m); + reconnect_blocking_event(); conn_dispose(conn); check_assign_articles(); } @@ -1134,17 +1159,56 @@ static void connfail(Conn *conn, const char *fmt, ...) { va_end(al); } +static void conn_idle_close(Conn *conn, const char *why) { + static const char quitcmd[]= "QUIT\r\n"; + int todo= sizeof(quitcmd)-1; + const char *p= quitcmd; + for (;;) { + int r= write(conn->fd, p, todo); + if (r<0) { + if (isewouldblock(errno)) + connfail(conn, "blocked writing QUIT to idle connection"); + else + connfail(conn, "failed to write QUIT to idle connection: %s", + strerror(errno)); + break; + } + assert(r<=todo); + todo -= r; + if (!todo) { + conn->quitting= why; + conn->since_activity= 0; + debug("C%d is idle (%s), quitting", conn->fd, why); + break; + } + } +} + +/* + * For our last connection, we also shut it down if we have had + * less than K in the last L + */ static void check_idle_conns(void) { Conn *conn; + + int volthisperiod= lowvol_perperiod[lowvol_circptr]; + lowvol_circptr++; + lowvol_circptr %= lowvol_periods; + lowvol_total += volthisperiod; + lowvol_total -= lowvol_perperiod[lowvol_circptr]; + lowvol_perperiod[lowvol_circptr]= 0; + FOR_CONN(conn) conn->since_activity++; + search_again: FOR_CONN(conn) { if (conn->since_activity <= need_activity_periods) continue; /* We need to shut this down */ if (conn->quitting) - connfail(conn,"timed out waiting for response to QUIT"); + connfail(conn,"timed out waiting for response to QUIT (%s)", + conn->quitting); else if (conn->sent.count) connfail(conn,"timed out waiting for responses"); else if (conn->waiting.count || conn->priority.count) @@ -1152,32 +1216,18 @@ static void check_idle_conns(void) { else if (conn->xmitu) connfail(conn,"peer has been sending responses" " before receiving our commands!"); - else { - static const char quitcmd[]= "QUIT\r\n"; - int todo= sizeof(quitcmd)-1; - const char *p= quitcmd; - for (;;) { - int r= write(conn->fd, p, todo); - if (r<0) { - if (isewouldblock(errno)) - connfail(conn, "blocked writing QUIT to idle connection"); - else - connfail(conn, "failed to write QUIT to idle connection: %s", - strerror(errno)); - break; - } - assert(r<=todo); - todo -= r; - if (!todo) { - conn->quitting= 1; - conn->since_activity= 0; - debug("C%d is idle, quitting", conn->fd); - break; - } - } - } + else + conn_idle_close(conn, "no activity"); + goto search_again; } + + conn= LIST_HEAD(conns); + if (!volthisperiod && + conns.count==1 && + lowvol_total < lowvol_thresh && + !conn_busy(conn)) + conn_idle_close(conn, "low volume"); } /*---------- making new connections ----------*/ @@ -1300,8 +1350,8 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd); LIST_ADDHEAD(conns, conn); - notice("C%d[%d] connected %s", - conns.count, conn->fd, conn->stream ? "streaming" : "plain"); + notice("C%d (now %d) connected %s", + conn->fd, conns.count, conn->stream ? "streaming" : "plain"); connect_attempt_discard(); check_assign_articles(); @@ -1310,6 +1360,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { x: conn_dispose(conn); connect_attempt_discard(); + reconnect_blocking_event(); return OOP_CONTINUE; } @@ -1324,6 +1375,8 @@ static void connect_start(void) { assert(!connecting_fdpass_sock); info("starting connection attempt"); + int ok_until_connect= until_connect; + reconnect_blocking_event(); int socks[2]; int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks); @@ -1416,6 +1469,9 @@ static void connect_start(void) { connecting_fdpass_sock= socks[0]; xsetnonblock(connecting_fdpass_sock, 1); on_fd_read_except(connecting_fdpass_sock, connchild_event); + + if (!conns.count) + until_connect= ok_until_connect; } /*---------- assigning articles to conns, and transmitting ----------*/ @@ -1468,11 +1524,11 @@ static void check_assign_articles(void) { Article *art= dequeue(0); if (!art) break; LIST_ADDTAIL(use->waiting, art); + lowvol_perperiod[lowvol_circptr]++; spare--; } conn_maybe_write(use); } else if (allow_connect_start()) { - until_connect= reconnect_delay_periods; connect_start(); break; } else { @@ -1586,6 +1642,7 @@ static void autodefer_input_file_articles(InputFile *ipf) { } static void autodefer_input_file(InputFile *ipf) { + static const char *const abandon= "stuck"; ipf->autodefer= 0; autodefer_input_file_articles(ipf); @@ -1596,11 +1653,11 @@ static void autodefer_input_file(InputFile *ipf) { if (has_article_in(&walk->waiting, ipf) || has_article_in(&walk->priority, ipf) || has_article_in(&walk->sent, ipf)) - walk->quitting= -1; + walk->quitting= abandon; } while (ipf->inprogress) { FOR_CONN(walk) - if (walk->quitting < 0) goto found; + if (walk->quitting == abandon) goto found; abort(); /* where are they ?? */ found: @@ -1663,15 +1720,18 @@ static void *conn_write_some_xmits(Conn *conn) { assert(rs > 0); int done; - for (done=0; rs && donexmitu; done++) { + for (done=0; rs; ) { + assert(donexmitu); struct iovec *vp= &conn->xmit[done]; XmitDetails *dp= &conn->xmitd[done]; - if (rs > vp->iov_len) { + if (rs >= vp->iov_len) { rs -= vp->iov_len; - xmit_free(dp); + xmit_free(dp); /* vp->iov_len -= vp->iov_len, etc. */ + done++; } else { vp->iov_base= (char*)vp->iov_base + rs; vp->iov_len -= rs; + break; /* rs -= rs */ } } int newu= conn->xmitu - done; @@ -1891,19 +1951,17 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, return OOP_CONTINUE; } - int conn_busy= - conn->waiting.count || - conn->priority.count || - conn->sent.count || - conn->xmitu; + int busy= conn_busy(conn); if (conn->quitting) { - if (code!=205 && code!=503) { - connfail(conn, "peer gave unexpected response to QUIT: %s", sani); + if (code!=205 && code!=400) { + connfail(conn, "peer gave unexpected response to QUIT (%s): %s", + conn->quitting, sani); } else { LIST_REMOVE(conns,conn); - notice("C%d[%d] idle connection closed by us", conns.count, conn->fd); - assert(!conn_busy); + notice("C%d (now %d) idle connection closed (%s)", + conn->fd, conns.count, conn->quitting); + assert(!busy); conn_dispose(conn); } return OOP_CONTINUE; @@ -1935,11 +1993,12 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, default: PEERBADMSG("peer sent unexpected message"); case 400: - if (conn_busy) + if (busy) PEERBADMSG("peer timed us out or stopped accepting articles"); LIST_REMOVE(conns,conn); - notice("C%d[%d] idle connection closed by peer", conns.count, conn->fd); + notice("C%d (now %d) idle connection closed by peer", + conns.count, conn->fd); conn_dispose(conn); return OOP_CONTINUE; @@ -3253,11 +3312,11 @@ static void period(void) { debug("PERIOD" " sms=%s[%d] conns=%d until_connect=%d" " input_files main:%s flushing:%s backlog:%s[%d]" - " children connecting=%ld inndcomm=%ld" + " children connecting=%ld inndcomm=%ld lowvol_total=%d" , sms_names[sms], until_flush, conns.count, until_connect, dipf_main, dipf_flushing, dipf_backlog, until_backlog_nextscan, - (long)connecting_child, (long)inndcomm_child + (long)connecting_child, (long)inndcomm_child, lowvol_total ); free(dipf_main); @@ -3348,6 +3407,17 @@ CCMD(dump) { DUMPV("%d", , cli_master); fprintf(f,"\n"); + fprintf(f,"lowvol"); + DUMPV("%d", , lowvol_circptr); + DUMPV("%d", , lowvol_total); + fprintf(f,":"); + for (i=0; ifd); DUMPV("%p",conn->,rd); DUMPV("%d",conn->,max_queue); - DUMPV("%d",conn->,stream); DUMPV("%d",conn->,quitting); + DUMPV("%d",conn->,stream); DUMPV("\"%s\"",conn->,quitting); DUMPV("%d",conn->,since_activity); fprintf(f,"\n"); @@ -3600,6 +3670,8 @@ static const Option innduct_options[]= { {0,"max-flush-interval", "PERIOD", &spontaneous_flush_periods,op_seconds }, {0,"flush-finish-timeout", "PERIOD", &max_separated_periods, op_seconds }, {0,"idle-timeout", "PERIOD", &need_activity_periods, op_seconds }, +{0,"low-volume-thresh", "PERIOD", &lowvol_thresh, op_integer }, +{0,"low-volume-window", "PERIOD", &lowvol_periods, op_seconds }, {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio, op_double }, {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer }, @@ -3673,6 +3745,7 @@ int main(int argc, char **argv) { convert_to_periods_rndup(&spontaneous_flush_periods); convert_to_periods_rndup(&max_separated_periods); convert_to_periods_rndup(&need_activity_periods); + convert_to_periods_rndup(&lowvol_periods); if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100) badusage("bad input data ratio must be between 0..100"); @@ -3707,6 +3780,14 @@ int main(int argc, char **argv) { if (strchr(feedfile, c)) badusage("feed filename may not contain metacharacter %c",c); + int i; + lowvol_perperiod= xcalloc(sizeof(*lowvol_perperiod), lowvol_periods); + for (i=0; i