X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=blobdiff_plain;f=backends%2Finnduct.c;h=772fc89c0079e99be2146d40a53fd7e65bcf344c;hp=561ff0022ff30e3d43130aa31e4bcc5ce4f56d6a;hb=3fca16a4ee8d432329ce49383132b0de5137a70f;hpb=f4edee10297eb6cf34c745a1a68ed796339abe5e diff --git a/backends/innduct.c b/backends/innduct.c index 561ff00..772fc89 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -391,6 +391,8 @@ static int backlog_spontrescan_periods=300; static int spontaneous_flush_periods=100000; static int max_separated_periods=2000; static int need_activity_periods=1000; +static int recentact_thresh=3; +static int recentact_periods=1000; static double max_bad_data_ratio= 1; /* conv'd from percentage by main */ static int max_bad_data_initial= 30; @@ -528,6 +530,9 @@ static ConnList conns; static char *path_lock, *path_flushing, *path_defer, *path_dump; static char *globpat_backlog; static pid_t self_pid; +static int *recentact_perperiod; +static int recentact_circptr; +static int recentact_total; /* statemc_init initialises */ static StateMachineState sms; @@ -1064,12 +1069,24 @@ static void cli_init(void) { /*========== management of connections ==========*/ +static void reconnect_blocking_event(void) { + until_connect= reconnect_delay_periods; +} + static void conn_closefd(Conn *conn, const char *msgprefix) { int r= close_perhaps(&conn->fd); if (r) info("C%d %serror closing socket: %s", conn->fd, msgprefix, strerror(errno)); } +static int conn_busy(Conn *conn) { + return + conn->waiting.count || + conn->priority.count || + conn->sent.count || + conn->xmitu; +} + static void conn_dispose(Conn *conn) { if (!conn) return; if (conn->rd) { @@ -1083,7 +1100,6 @@ static void conn_dispose(Conn *conn) { } conn_closefd(conn,""); free(conn); - until_connect= reconnect_delay_periods; } static void *conn_exception(oop_source *lp, int fd, @@ -1130,6 +1146,7 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) { conn->fd, conns.count, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); free(m); + reconnect_blocking_event(); conn_dispose(conn); check_assign_articles(); } @@ -1141,10 +1158,47 @@ static void connfail(Conn *conn, const char *fmt, ...) { va_end(al); } +static void conn_idle_close(Conn *conn, const char *why) { + static const char quitcmd[]= "QUIT\r\n"; + int todo= sizeof(quitcmd)-1; + const char *p= quitcmd; + for (;;) { + int r= write(conn->fd, p, todo); + if (r<0) { + if (isewouldblock(errno)) + connfail(conn, "blocked writing QUIT to idle connection"); + else + connfail(conn, "failed to write QUIT to idle connection: %s", + strerror(errno)); + break; + } + assert(r<=todo); + todo -= r; + if (!todo) { + conn->quitting= 1; + conn->since_activity= 0; + debug("C%d is idle (%s), quitting", conn->fd, why); + break; + } + } +} + +/* + * For our last connection, we also shut it down if we have had + * less than K in the last L + */ static void check_idle_conns(void) { Conn *conn; + + int veryrecentact= recentact_perperiod[recentact_circptr]; + recentact_circptr++; + recentact_circptr %= recentact_periods; + recentact_total -= recentact_perperiod[recentact_circptr]; + recentact_perperiod[recentact_circptr]= 0; + FOR_CONN(conn) conn->since_activity++; + search_again: FOR_CONN(conn) { if (conn->since_activity <= need_activity_periods) continue; @@ -1159,32 +1213,18 @@ static void check_idle_conns(void) { else if (conn->xmitu) connfail(conn,"peer has been sending responses" " before receiving our commands!"); - else { - static const char quitcmd[]= "QUIT\r\n"; - int todo= sizeof(quitcmd)-1; - const char *p= quitcmd; - for (;;) { - int r= write(conn->fd, p, todo); - if (r<0) { - if (isewouldblock(errno)) - connfail(conn, "blocked writing QUIT to idle connection"); - else - connfail(conn, "failed to write QUIT to idle connection: %s", - strerror(errno)); - break; - } - assert(r<=todo); - todo -= r; - if (!todo) { - conn->quitting= 1; - conn->since_activity= 0; - debug("C%d is idle, quitting", conn->fd); - break; - } - } - } + else + conn_idle_close(conn, "no activity"); + goto search_again; } + + conn= LIST_HEAD(conns); + if (!veryrecentact && + conns.count==1 && + recentact_total+veryrecentact < recentact_thresh && + !conn_busy(conn)) + conn_idle_close(conn, "low volume"); } /*---------- making new connections ----------*/ @@ -1193,6 +1233,7 @@ static pid_t connecting_child; static int connecting_fdpass_sock; static void connect_attempt_discard(void) { + reconnect_blocking_event(); if (connecting_child) { int status= xwaitpid(&connecting_child, "connect"); if (!(WIFEXITED(status) || @@ -1331,6 +1372,7 @@ static void connect_start(void) { assert(!connecting_fdpass_sock); info("starting connection attempt"); + reconnect_blocking_event(); int socks[2]; int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks); @@ -1475,11 +1517,11 @@ static void check_assign_articles(void) { Article *art= dequeue(0); if (!art) break; LIST_ADDTAIL(use->waiting, art); + recentact_perperiod[recentact_circptr]++; spare--; } conn_maybe_write(use); } else if (allow_connect_start()) { - until_connect= reconnect_delay_periods; connect_start(); break; } else { @@ -1901,20 +1943,16 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, return OOP_CONTINUE; } - int conn_busy= - conn->waiting.count || - conn->priority.count || - conn->sent.count || - conn->xmitu; + int busy= conn_busy(conn); if (conn->quitting) { - if (code!=205 && code!=503) { + if (code!=205 && code!=400) { connfail(conn, "peer gave unexpected response to QUIT: %s", sani); } else { LIST_REMOVE(conns,conn); notice("C%d (now %d) idle connection closed by us", conn->fd, conns.count); - assert(!conn_busy); + assert(!busy); conn_dispose(conn); } return OOP_CONTINUE; @@ -1946,7 +1984,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, default: PEERBADMSG("peer sent unexpected message"); case 400: - if (conn_busy) + if (busy) PEERBADMSG("peer timed us out or stopped accepting articles"); LIST_REMOVE(conns,conn); @@ -3612,6 +3650,8 @@ static const Option innduct_options[]= { {0,"max-flush-interval", "PERIOD", &spontaneous_flush_periods,op_seconds }, {0,"flush-finish-timeout", "PERIOD", &max_separated_periods, op_seconds }, {0,"idle-timeout", "PERIOD", &need_activity_periods, op_seconds }, +{0,"low-volume-thresh", "PERIOD", &recentact_thresh, op_integer }, +{0,"low-volume-window", "PERIOD", &recentact_periods, op_seconds }, {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio, op_double }, {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer }, @@ -3685,6 +3725,7 @@ int main(int argc, char **argv) { convert_to_periods_rndup(&spontaneous_flush_periods); convert_to_periods_rndup(&max_separated_periods); convert_to_periods_rndup(&need_activity_periods); + convert_to_periods_rndup(&recentact_periods); if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100) badusage("bad input data ratio must be between 0..100"); @@ -3719,6 +3760,13 @@ int main(int argc, char **argv) { if (strchr(feedfile, c)) badusage("feed filename may not contain metacharacter %c",c); + int i; + recentact_perperiod= xcalloc(sizeof(*recentact_perperiod),recentact_periods); + for (i=0; i