From: Ian Jackson Date: Tue, 27 Apr 2010 11:38:29 +0000 (+0100) Subject: close idle connections and spot unresponsive ones X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=commitdiff_plain;h=e9b81dd830ea514915e276345a31dcf0ed016aa2 close idle connections and spot unresponsive ones --- diff --git a/backends/innduct.c b/backends/innduct.c index fb7bd48..47561e3 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,6 +1,6 @@ /* * TODO - * - close idle connections + * - make period size a tuneable * - check all structs initialised * - check all fd watches properly undone * - check all init functions called @@ -160,7 +160,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. /*============================== PROGRAM ==============================*/ -#define _GNU_SOURCE +#define _GNU_SOURCE 1 #include "config.h" #include "storage.h" @@ -205,6 +205,10 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10) +#define VA va_list al; va_start(al,fmt) +#define PRINTF(f,a) __attribute__((__format__(printf,f,a))) +#define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a))) + /*----- doubly linked lists -----*/ #define ISNODE(T) struct { T *succ, *pred; } node /* must be at start */ @@ -300,6 +304,9 @@ static void filemon_start(InputFile *ipf); static void filemon_stop(InputFile *ipf); static void filemon_callback(InputFile *ipf); +static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0); +static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3); + /*----- configuration options -----*/ static const char *sitename, *feedfile, *pathoutgoing; @@ -318,7 +325,7 @@ static int connection_setup_timeout=200, port=119, try_stream=1; static int inndcomm_flush_timeout=100; static int reconnect_delay_periods, flushfail_retry_periods, open_wait_periods; static int backlog_retry_minperiods, backlog_spontaneous_rescan_periods; -static int spontaneous_flush_periods; +static int spontaneous_flush_periods, need_activity_periods; static const char *inndconffile; static double nocheck_thresh_pct= 95.0; @@ -431,6 +438,7 @@ static const char *sms_names[]= { struct Conn { ISNODE(Conn); int fd, max_queue, stream, quitting; + int since_activity; /* periods */ ArticleList waiting; /* not yet told peer */ ArticleList priority; /* peer says send it now */ ArticleList sent; /* offered/transmitted - in xmit or waiting reply */ @@ -463,10 +471,6 @@ static int nocheck, nocheck_reported; /*========== logging ==========*/ -#define VA va_list al; va_start(al,fmt) -#define PRINTF(f,a) __attribute__((__format__(printf,f,a))) -#define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a))) - static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3); static void logcore(int sysloglevel, const char *fmt, ...) { VA; @@ -668,6 +672,10 @@ static char *sanitise(const char *input) { return sanibuf; } +static int isewouldblock(int errnoval) { + return errnoval==EWOULDBLOCK || errnoval==EAGAIN; +} + /*========== making new connections ==========*/ static void conn_dispose(Conn *conn) { @@ -883,6 +891,51 @@ static void connect_start(void) { connect_attempt_discard(); } +static void check_idle_conns(void) { + Conn *conn; + for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) + conn->since_activity++; + search_again: + for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) { + if (conn->since_activity <= need_activity_periods) continue; + + /* We need to shut this down */ + if (conn->quitting) + connfail(conn,"timed out waiting for response to QUIT"); + else if (conn->sent.count) + connfail(conn,"timed out waiting for responses"); + else if (conn->waiting.count || conn->priority.count) + connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent"); + else if (conn->xmitu) + connfail(conn,"peer has been sending responses" + " before receiving our commands!"); + else { + static const char quitcmd[]= "QUIT\r\n"; + int todo= sizeof(quitcmd)-1; + const char *p= quitcmd; + for (;;) { + int r= write(conn->fd, p, todo); + if (r<0) { + if (isewouldblock(errno)) + connfail(conn, "blocked writing QUIT to idle connection"); + else + connfail(conn, "failed to write QUIT to idle connection: %s", + strerror(errno)); + break; + } + assert(r<=todo); + todo -= r; + if (!todo) { + conn->quitting= 1; + conn->since_activity= 0; + debug("C%d is idle, quitting", conn->fd); + break; + } + } + } + goto search_again; + } +} /*========== overall control of article flow ==========*/ @@ -892,7 +945,7 @@ static void check_master_queue(void) { break; Conn *walk, *use=0; - int spare; + int spare, inqueue; /* Find a connection to offer this article. We prefer a busy * connection to an idle one, provided it's not full. We take the @@ -901,8 +954,9 @@ static void check_master_queue(void) { * connections, the spare ones will go away eventually. */ for (walk=LIST_HEAD(conns); walk; walk=LIST_NEXT(walk)) { - int inqueue= walk->sent.count + walk->priority.count - + walk->waiting.count; + if (walk->quitting) continue; + inqueue= walk->sent.count + walk->priority.count + + walk->waiting.count; spare= walk->max_queue - inqueue; assert(inqueue <= max_queue_per_conn); assert(spare >= 0); @@ -910,6 +964,7 @@ static void check_master_queue(void) { else if (spare>0) /*working*/ { use= walk; break; } } if (use) { + if (!inqueue) use->since_activity= 0; /* reset idle counter */ while (spare>0) { Article *art= LIST_REMHEAD(queue); LIST_ADDTAIL(use->waiting, art); @@ -954,7 +1009,6 @@ static void conn_maybe_write(Conn *conn) { } } -static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0); static void vconnfail(Conn *conn, const char *fmt, va_list al) { int requeue[art_MaxState]; @@ -982,7 +1036,6 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) { check_master_queue(); } -static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3); static void connfail(Conn *conn, const char *fmt, ...) { va_list al; va_start(al,fmt); @@ -1034,7 +1087,7 @@ static void *conn_write_some_xmits(Conn *conn) { if (count > IOV_MAX) count= IOV_MAX; ssize_t rs= writev(conn->fd, conn->xmit, count); if (rs < 0) { - if (errno == EAGAIN) return OOP_CONTINUE; + if (isewouldblock(errno)) return OOP_CONTINUE; connfail(conn, "write failed: %s", strerror(errno)); return OOP_HALT; } @@ -1260,7 +1313,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, if (code!=205 && code!=503) { connfail(conn, "peer gave unexpected response to QUIT: %s", sani); } else { - notice("C%d idle connection closed\n"); + notice("C%d idle connection closed", conn->fd); assert(!conn->waiting.count); assert(!conn->priority.count); assert(!conn->sent.count); @@ -1271,6 +1324,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, return OOP_CONTINUE; } + conn->since_activity= 0; Article *art; #define GET_ARTICLE(musthavesent) \ @@ -1584,7 +1638,7 @@ static void *filemon_inotify_readable(oop_source *lp, int fd, for (;;) { int r= read(filemon_inotify_fd, &iev, sizeof(iev)); if (r==-1) { - if (errno==EAGAIN) break; + if (isewouldblock(errno)) break; sysdie("read from inotify master"); } else if (r==sizeof(iev)) { assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax); @@ -1807,7 +1861,7 @@ static void statemc_init(void) { fl.l_whence= SEEK_SET; int r= fcntl(lockfd, F_SETLK, &fl); if (r==-1) { - if (errno==EACCES || errno==EAGAIN) { + if (errno==EACCES || isewouldblock(errno)) { if (quiet_multiple) exit(0); fatal("another duct holds the lockfile"); } @@ -2404,6 +2458,7 @@ EVERY(period, PERIOD_SECONDS,0, { if (!backlog_input_file) close_defer(); /* want to start on a new backlog */ statemc_period_poll(); check_master_queue(); + check_idle_conns(); });