X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=blobdiff_plain;f=backends%2Finnduct.c;h=1328d372f8d8390cb822f734a2476f044b1c6404;hp=fb7bd48b08c2ba7371a92d022904d54d8234d0f8;hb=599596434b5b74c0280006508d195d69e71d0814;hpb=6f45275e9ef394a795727df959501c5fa375949e diff --git a/backends/innduct.c b/backends/innduct.c index fb7bd48..1328d37 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,6 +1,5 @@ /* * TODO - * - close idle connections * - check all structs initialised * - check all fd watches properly undone * - check all init functions called @@ -160,7 +159,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. /*============================== PROGRAM ==============================*/ -#define _GNU_SOURCE +#define _GNU_SOURCE 1 #include "config.h" #include "storage.h" @@ -195,8 +194,6 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. /*----- general definitions, probably best not changed -----*/ -#define PERIOD_SECONDS 60 - #define CONNCHILD_ESTATUS_STREAM 4 #define CONNCHILD_ESTATUS_NOSTREAM 5 @@ -205,6 +202,10 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10) +#define VA va_list al; va_start(al,fmt) +#define PRINTF(f,a) __attribute__((__format__(printf,f,a))) +#define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a))) + /*----- doubly linked lists -----*/ #define ISNODE(T) struct { T *succ, *pred; } node /* must be at start */ @@ -300,6 +301,9 @@ static void filemon_start(InputFile *ipf); static void filemon_stop(InputFile *ipf); static void filemon_callback(InputFile *ipf); +static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0); +static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3); + /*----- configuration options -----*/ static const char *sitename, *feedfile, *pathoutgoing; @@ -309,6 +313,8 @@ static int quiet_multiple=0, become_daemon=1; static int max_connections=10, max_queue_per_conn=200; static int target_max_feedfile_size=100000; +static int period_seconds=60; + static double max_bad_data_ratio= 0.01; static int max_bad_data_initial= 30; /* in one corrupt 4096-byte block the number of newlines has @@ -318,7 +324,7 @@ static int connection_setup_timeout=200, port=119, try_stream=1; static int inndcomm_flush_timeout=100; static int reconnect_delay_periods, flushfail_retry_periods, open_wait_periods; static int backlog_retry_minperiods, backlog_spontaneous_rescan_periods; -static int spontaneous_flush_periods; +static int spontaneous_flush_periods, need_activity_periods; static const char *inndconffile; static double nocheck_thresh_pct= 95.0; @@ -431,6 +437,7 @@ static const char *sms_names[]= { struct Conn { ISNODE(Conn); int fd, max_queue, stream, quitting; + int since_activity; /* periods */ ArticleList waiting; /* not yet told peer */ ArticleList priority; /* peer says send it now */ ArticleList sent; /* offered/transmitted - in xmit or waiting reply */ @@ -463,10 +470,6 @@ static int nocheck, nocheck_reported; /*========== logging ==========*/ -#define VA va_list al; va_start(al,fmt) -#define PRINTF(f,a) __attribute__((__format__(printf,f,a))) -#define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a))) - static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3); static void logcore(int sysloglevel, const char *fmt, ...) { VA; @@ -668,6 +671,10 @@ static char *sanitise(const char *input) { return sanibuf; } +static int isewouldblock(int errnoval) { + return errnoval==EWOULDBLOCK || errnoval==EAGAIN; +} + /*========== making new connections ==========*/ static void conn_dispose(Conn *conn) { @@ -883,6 +890,51 @@ static void connect_start(void) { connect_attempt_discard(); } +static void check_idle_conns(void) { + Conn *conn; + for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) + conn->since_activity++; + search_again: + for (conn=LIST_HEAD(conns); conn; conn=LIST_NEXT(conn)) { + if (conn->since_activity <= need_activity_periods) continue; + + /* We need to shut this down */ + if (conn->quitting) + connfail(conn,"timed out waiting for response to QUIT"); + else if (conn->sent.count) + connfail(conn,"timed out waiting for responses"); + else if (conn->waiting.count || conn->priority.count) + connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent"); + else if (conn->xmitu) + connfail(conn,"peer has been sending responses" + " before receiving our commands!"); + else { + static const char quitcmd[]= "QUIT\r\n"; + int todo= sizeof(quitcmd)-1; + const char *p= quitcmd; + for (;;) { + int r= write(conn->fd, p, todo); + if (r<0) { + if (isewouldblock(errno)) + connfail(conn, "blocked writing QUIT to idle connection"); + else + connfail(conn, "failed to write QUIT to idle connection: %s", + strerror(errno)); + break; + } + assert(r<=todo); + todo -= r; + if (!todo) { + conn->quitting= 1; + conn->since_activity= 0; + debug("C%d is idle, quitting", conn->fd); + break; + } + } + } + goto search_again; + } +} /*========== overall control of article flow ==========*/ @@ -892,7 +944,7 @@ static void check_master_queue(void) { break; Conn *walk, *use=0; - int spare; + int spare, inqueue; /* Find a connection to offer this article. We prefer a busy * connection to an idle one, provided it's not full. We take the @@ -901,8 +953,9 @@ static void check_master_queue(void) { * connections, the spare ones will go away eventually. */ for (walk=LIST_HEAD(conns); walk; walk=LIST_NEXT(walk)) { - int inqueue= walk->sent.count + walk->priority.count - + walk->waiting.count; + if (walk->quitting) continue; + inqueue= walk->sent.count + walk->priority.count + + walk->waiting.count; spare= walk->max_queue - inqueue; assert(inqueue <= max_queue_per_conn); assert(spare >= 0); @@ -910,6 +963,7 @@ static void check_master_queue(void) { else if (spare>0) /*working*/ { use= walk; break; } } if (use) { + if (!inqueue) use->since_activity= 0; /* reset idle counter */ while (spare>0) { Article *art= LIST_REMHEAD(queue); LIST_ADDTAIL(use->waiting, art); @@ -954,7 +1008,6 @@ static void conn_maybe_write(Conn *conn) { } } -static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0); static void vconnfail(Conn *conn, const char *fmt, va_list al) { int requeue[art_MaxState]; @@ -982,7 +1035,6 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) { check_master_queue(); } -static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3); static void connfail(Conn *conn, const char *fmt, ...) { va_list al; va_start(al,fmt); @@ -1034,7 +1086,7 @@ static void *conn_write_some_xmits(Conn *conn) { if (count > IOV_MAX) count= IOV_MAX; ssize_t rs= writev(conn->fd, conn->xmit, count); if (rs < 0) { - if (errno == EAGAIN) return OOP_CONTINUE; + if (isewouldblock(errno)) return OOP_CONTINUE; connfail(conn, "write failed: %s", strerror(errno)); return OOP_HALT; } @@ -1260,7 +1312,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, if (code!=205 && code!=503) { connfail(conn, "peer gave unexpected response to QUIT: %s", sani); } else { - notice("C%d idle connection closed\n"); + notice("C%d idle connection closed", conn->fd); assert(!conn->waiting.count); assert(!conn->priority.count); assert(!conn->sent.count); @@ -1271,6 +1323,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, return OOP_CONTINUE; } + conn->since_activity= 0; Article *art; #define GET_ARTICLE(musthavesent) \ @@ -1584,7 +1637,7 @@ static void *filemon_inotify_readable(oop_source *lp, int fd, for (;;) { int r= read(filemon_inotify_fd, &iev, sizeof(iev)); if (r==-1) { - if (errno==EAGAIN) break; + if (isewouldblock(errno)) break; sysdie("read from inotify master"); } else if (r==sizeof(iev)) { assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax); @@ -1807,7 +1860,7 @@ static void statemc_init(void) { fl.l_whence= SEEK_SET; int r= fcntl(lockfd, F_SETLK, &fl); if (r==-1) { - if (errno==EACCES || errno==EAGAIN) { + if (errno==EACCES || isewouldblock(errno)) { if (quiet_multiple) exit(0); fatal("another duct holds the lockfile"); } @@ -2193,7 +2246,7 @@ static void search_backlog_file(void) { now= xtime(); double age= difftime(now, oldest_mtime); - long age_deficiency= (backlog_retry_minperiods * PERIOD_SECONDS) - age; + long age_deficiency= (backlog_retry_minperiods * period_seconds) - age; if (age_deficiency <= 0) { debug("backlog scan: found age=%f deficiency=%ld oldest=%s", @@ -2209,7 +2262,7 @@ static void search_backlog_file(void) { return; } - until_backlog_nextscan= age_deficiency / PERIOD_SECONDS; + until_backlog_nextscan= age_deficiency / period_seconds; if (backlog_spontaneous_rescan_periods >= 0 && until_backlog_nextscan > backlog_spontaneous_rescan_periods) @@ -2360,8 +2413,7 @@ static void postfork(const char *what) { } #define EVERY(what, interval_sec, interval_usec, body) \ - static const struct timeval what##_timeout = \ - { interval_sec, interval_usec }; \ + static struct timeval what##_timeout = { interval_sec, interval_usec }; \ static void what##_schedule(void); \ static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \ { body } \ @@ -2386,7 +2438,7 @@ static const char *debug_ipf_path(InputFile *ipf) { return slash ? slash+1 : ipf->path; } -EVERY(period, PERIOD_SECONDS,0, { +EVERY(period, -1,0, { debug("PERIOD" " sms=%s[%d] conns=%d queue=%d until_connect=%d" " input_files" DEBUGF_IPF(main) DEBUGF_IPF(old) DEBUGF_IPF(flushing) @@ -2404,6 +2456,7 @@ EVERY(period, PERIOD_SECONDS,0, { if (!backlog_input_file) close_defer(); /* want to start on a new backlog */ statemc_period_poll(); check_master_queue(); + check_idle_conns(); }); @@ -2487,8 +2540,8 @@ static void op_seconds(const Option *o, const char *val) { static void op_periods_rndup(const Option *o, const char *val) { int *store= o->store; op_seconds(o,val); - *store += PERIOD_SECONDS-1; - *store /= PERIOD_SECONDS; + *store += period_seconds-1; + *store /= period_seconds; } static void op_setint(const Option *o, const char *val) { @@ -2642,6 +2695,7 @@ int main(int argc, char **argv) { filepoll_schedule(); } + period_timeout.tv_sec= period_seconds; period_schedule(); statemc_init();