X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;f=backends%2Finnduct.c;h=8e599f56026835595780ad3a28725754699fd06f;hb=fc66b511debe93980fd49331d96939fb876024da;hp=8e5181a84e9fc797684d20b7446f7163ee7f68bb;hpb=65ba173dea53ef47288f95aecf8642ac9869181f;p=inn-innduct.git diff --git a/backends/innduct.c b/backends/innduct.c index 8e5181a..8e599f5 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -306,34 +306,43 @@ static void filemon_callback(InputFile *ipf); static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0); static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3); +static const oop_rd_style peer_rd_style; +static oop_rd_call peer_rd_err, peer_rd_ok; + /*----- configuration options -----*/ -static const char *sitename, *feedfile, *pathoutgoing; -static const char *remote_host; -static int quiet_multiple=0, become_daemon=1; +static const char *sitename, *remote_host; +static const char *feedfile; +static int quiet_multiple=0; +static int become_daemon=1; +static int try_stream=1; +static int port=119; +static const char *inndconffile; -static int max_connections=10, max_queue_per_conn=200; +static int max_connections=10; +static int max_queue_per_conn=200; static int target_max_feedfile_size=100000; - static int period_seconds=60; -static double max_bad_data_ratio= 0.01; +static int connection_setup_timeout=200; +static int inndcomm_flush_timeout=100; + +static double nocheck_thresh= 95.0; /* converted from percentage by main */ +static double nocheck_decay= 100; /* conv'd from articles to lambda by main */ + +/* all these are initialised to seconds, and converted to periods in main */ +static int reconnect_delay_periods=1000; +static int flushfail_retry_periods=1000; +static int backlog_retry_minperiods=50; +static int backlog_spontrescan_periods=300; +static int spontaneous_flush_periods=100000; +static int need_activity_periods=1000; + +static double max_bad_data_ratio= 1; /* conv'd from percentage by main */ static int max_bad_data_initial= 30; /* in one corrupt 4096-byte block the number of newlines has * mean 16 and standard deviation 3.99. 30 corresponds to z=+3.5 */ -static int connection_setup_timeout=200, port=119, try_stream=1; -static int inndcomm_flush_timeout=100; -static int reconnect_delay_periods, flushfail_retry_periods, open_wait_periods; -static int backlog_retry_minperiods, backlog_spontaneous_rescan_periods; -static int spontaneous_flush_periods, need_activity_periods; -static const char *inndconffile; - -static double nocheck_thresh_pct= 95.0; -static double nocheck_thresh; /* computed in main from _pct */ -static double nocheck_decay_articles= 100; /* converted to _decay */ -static double nocheck_decay; /* computed in main from _articles */ - /*----- statistics -----*/ @@ -439,6 +448,7 @@ static const char *sms_names[]= { struct Conn { ISNODE(Conn); int fd; /* may be 0, meaning closed (during construction/destruction) */ + oop_read *rd; /* likewise */ int max_queue, stream, quitting; int since_activity; /* periods */ ArticleList waiting; /* not yet told peer */ @@ -623,6 +633,7 @@ static void xunlink(const char *path, const char *what) { static time_t xtime(void) { time_t now= time(0); if (now==-1) sysdie("time(2) failed"); + return now; } static void check_isreg(const struct stat *stab, const char *path, @@ -655,13 +666,6 @@ static void xlstat_isreg(const char *path, struct stat *stab, check_isreg(stab, path, what); } -static void setnonblock(int fd, int nonblocking) { - int r= fcntl(fd, F_GETFL); if (r<0) sysdie("setnonblocking fcntl F_GETFL"); - if (nonblocking) r |= O_NONBLOCK; - else r &= ~O_NONBLOCK; - r= fcntl(fd, F_SETFL, r); if (r<0) sysdie("setnonblocking fcntl F_SETFL"); -} - static int samefile(const struct stat *a, const struct stat *b) { assert(S_ISREG(a->st_mode)); assert(S_ISREG(b->st_mode)); @@ -700,6 +704,11 @@ static void conn_closefd(Conn *conn, const char *msgprefix) { static void conn_dispose(Conn *conn) { if (!conn) return; + if (conn->rd) { + oop_rd_cancel(conn->rd); + oop_rd_delete_kill(conn->rd); + conn->rd= 0; + } if (conn->fd) { loop->cancel_fd(loop, conn->fd, OOP_WRITE); loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION); @@ -875,10 +884,11 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { goto x; } -#define CHK(field, val) \ - if (h->cmsg_##field != val) { \ - die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d"); \ - goto x; \ +#define CHK(field, val) \ + if (h->cmsg_##field != val) { \ + die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \ + h->cmsg_##field, val); \ + goto x; \ } CHK(level, SOL_SOCKET); CHK(type, SCM_RIGHTS); @@ -888,7 +898,6 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { if (CMSG_NXTHDR(&msg,h)) die("connect: child sent many cmsgs"); memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd)); - loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn); int status; pid_t got= waitpid(connecting_child, &status, 0); @@ -909,8 +918,16 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { LIST_INIT(conn->waiting); LIST_INIT(conn->priority); LIST_INIT(conn->sent); - setnonblock(conn->fd, 1); conn->max_queue= conn->stream ? max_queue_per_conn : 1; + + loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn); + conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */ + if (!conn->fd) sysdie("oop_rd_new_fd (fd=%d)",conn->fd); + int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN, + &peer_rd_ok, conn, + &peer_rd_err, conn); + if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd); + notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain"); LIST_ADDHEAD(conns, conn); @@ -921,6 +938,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { x: conn_dispose(conn); connect_attempt_discard(); + return OOP_CONTINUE; } static int allow_connect_start(void) { @@ -970,7 +988,7 @@ static void connect_start(void) { assert(l>=1); if (buf[-1]!='\n') fatal("connect: response to MODE STREAM is too long: %.100s...", - remote_host, sanitise(buf)); + sanitise(buf)); l--; if (l>0 && buf[l-1]=='\r') l--; buf[l]= 0; char *ep; @@ -1021,7 +1039,7 @@ static void check_assign_articles(void) { break; Conn *walk, *use=0; - int spare, inqueue; + int spare=0, inqueue=0; /* Find a connection to offer this article. We prefer a busy * connection to an idle one, provided it's not full. We take the @@ -1063,7 +1081,6 @@ static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) { } static void conn_maybe_write(Conn *conn) { - void *rp= 0; for (;;) { conn_make_some_xmits(conn); if (!conn->xmitu) { @@ -1143,7 +1160,7 @@ static void *conn_write_some_xmits(Conn *conn) { rs -= vp->iov_len; xmit_free(dp); } else { - vp->iov_base += rs; + vp->iov_base= (char*)vp->iov_base + rs; vp->iov_len -= rs; } } @@ -1217,7 +1234,7 @@ static const oop_rd_style peer_rd_style= { OOP_RD_SHORTREC_FORBID }; -static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_event ev, +static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev, const char *errmsg, int errnoval, const char *data, size_t recsz, void *conn_v) { Conn *conn= conn_v; @@ -1316,7 +1333,8 @@ static void article_done(Conn *conn, Article *art, int whichcount) { if (r==-1) { if (errno==EINTR) continue; sysdie("failed to blank entry for %s (length %d at offset %lu) in %s", - art->messageid, art->blanklen, art->offset, ipf->path); + art->messageid, art->blanklen, + (unsigned long)art->offset, ipf->path); } assert(r>=0 && r<=w); art->blanklen -= w; @@ -1331,7 +1349,7 @@ static void article_done(Conn *conn, Article *art, int whichcount) { queue_check_input_done(); } -static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, +static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, const char *errmsg, int errnoval, const char *data, size_t recsz, void *conn_v) { Conn *conn= conn_v; @@ -1515,7 +1533,7 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, if (ipf->skippinglong) { if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */ - return; + return OOP_CONTINUE; } if (ev==OOP_RD_LONG) { ipf->skippinglong= 1; @@ -2111,10 +2129,12 @@ static void statemc_setstate(StateMachineState newsms, int periods, const char *xtra= ""; switch (sms) { - case sm_FLUSHING: sm_FLUSHFAILED: + case sm_FLUSHING: + case sm_FLUSHFAILED: if (!main_input_file) xtra= "-ABSENT"; break; - case sm_SEPARATED: case sm_DROPPING: + case sm_SEPARATED: + case sm_DROPPING: xtra= flushing_input_file->rd ? "-1" : "-2"; break; default:; @@ -2225,7 +2245,7 @@ static void search_backlog_file(void) { int r, i; struct stat stab; const char *oldest_path=0; - time_t oldest_mtime, now; + time_t oldest_mtime=0, now; if (backlog_input_file) return; @@ -2267,8 +2287,6 @@ static void search_backlog_file(void) { " nonzero (error?) return value %d", globpat_backlog, r); } - globfree(&gl); - if (!oldest_path) { debug("backlog scan: none"); @@ -2277,8 +2295,8 @@ static void search_backlog_file(void) { xunlink(path_lock, "lockfile for old feed"); exit(0); } - until_backlog_nextscan= backlog_spontaneous_rescan_periods; - return; + until_backlog_nextscan= backlog_spontrescan_periods; + goto xfree; } now= xtime(); @@ -2291,22 +2309,26 @@ static void search_backlog_file(void) { backlog_input_file= open_input_file(oldest_path); if (!backlog_input_file) { - warn("backlog file %s vanished as we opened it", backlog_input_file); + warn("backlog file %s vanished as we opened it", oldest_path); + globfree(&gl); goto try_again; } inputfile_reading_start(backlog_input_file); until_backlog_nextscan= -1; - return; + goto xfree; } until_backlog_nextscan= age_deficiency / period_seconds; - if (backlog_spontaneous_rescan_periods >= 0 && - until_backlog_nextscan > backlog_spontaneous_rescan_periods) - until_backlog_nextscan= backlog_spontaneous_rescan_periods; + if (backlog_spontrescan_periods >= 0 && + until_backlog_nextscan > backlog_spontrescan_periods) + until_backlog_nextscan= backlog_spontrescan_periods; debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s", age, age_deficiency, until_backlog_nextscan, oldest_path); + + xfree: + globfree(&gl); return; } @@ -2380,6 +2402,7 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { failed: SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry"); + return OOP_CONTINUE; } static void inndcommfail(const char *what) { @@ -2457,40 +2480,51 @@ static void postfork(void) { static struct timeval what##_timeout = { interval_sec, interval_usec }; \ static void what##_schedule(void); \ static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \ - { body } \ + body; \ what##_schedule(); \ + return OOP_CONTINUE; \ } \ static void what##_schedule(void) { \ loop->on_time(loop, what##_timeout, what##_timedout, 0); \ } -EVERY(filepoll, 5,0, { +EVERY(filepoll, 5,0, ({ if (main_input_file && main_input_file->readable_callback) filemon_callback(main_input_file); -}); +})); + +static char *debug_report_ipf(InputFile *ipf) { + if (!ipf) return xasprintf("-"); + + const char *slash= strrchr(ipf->path,'/'); + const char *path= slash ? slash+1 : ipf->path; -#define DEBUGF_IPF(wh) " " #wh "=%p/%s:ip=%ld,off=%ld,fd=%d%s" -#define DEBUG_IPF(wh) \ - wh##_input_file, debug_ipf_path(wh##_input_file), \ - wh##_input_file->inprogress, (long)wh##_input_file->offset, \ - wh##_input_file->fd, wh##_input_file->rd ? "+" : "" -static const char *debug_ipf_path(InputFile *ipf) { - char *slash= strrchr(ipf->path,'/'); - return slash ? slash+1 : ipf->path; + return xasprintf("%p/%s:ip=%ld,off=%ld,fd=%d%s", + ipf, path, + ipf->inprogress, (long)ipf->offset, + ipf->fd, ipf->rd ? "+" : ""); } -EVERY(period, -1,0, { +EVERY(period, -1,0, ({ + char *dipf_main= debug_report_ipf(main_input_file); + char *dipf_flushing= debug_report_ipf(flushing_input_file); + char *dipf_backlog= debug_report_ipf(backlog_input_file); + debug("PERIOD" " sms=%s[%d] conns=%d queue=%d until_connect=%d" - " input_files" DEBUGF_IPF(main) DEBUGF_IPF(old) DEBUGF_IPF(flushing) - " children connecting=%ld inndcomm_child" + " input_files main:%s old:%s flushing:%s" + " children connecting=%ld inndcomm_child=%ld" , sms_names[sms], sm_period_counter, - queue.count, conns.count, until_connect, - DEBUG_IPF(main), DEBUG_IPF(flushing), DEBUG_IPF(flushing), + conns.count, queue.count, until_connect, + dipf_main, dipf_flushing, dipf_backlog, (long)connecting_child, (long)inndcomm_child ); + free(dipf_main); + free(dipf_flushing); + free(dipf_backlog); + if (until_connect) until_connect--; poll_backlog_file(); @@ -2498,7 +2532,7 @@ EVERY(period, -1,0, { statemc_period_poll(); check_assign_articles(); check_idle_conns(); -}); +})); /*========== option parsing ==========*/ @@ -2506,7 +2540,7 @@ EVERY(period, -1,0, { static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0); static void vbadusage(const char *fmt, va_list al) { char *m= xvasprintf(fmt,al); - fprintf(stderr, "bad usage: %s\n%s" + fprintf(stderr, "bad usage: %s\n" "say --help for help, or read the manpage\n", m); if (become_daemon) @@ -2599,7 +2633,7 @@ static void print_options(const Option *options, FILE *f) { for (o=options; o->shrt || o->lng; o++) { char shrt[2] = { o->shrt, 0 }; char *optspec= xasprintf("%s%s%s%s%s", - o->shrt ? "-" : "", o->shrt, + o->shrt ? "-" : "", shrt, o->shrt && o->lng ? "|" : "", DELIMPERHAPS("--", o->lng)); fprintf(f, " %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg)); @@ -2653,13 +2687,6 @@ static void op_seconds(const Option *o, const char *val) { *store= v; } -static void op_periods_rndup(const Option *o, const char *val) { - int *store= o->store; - op_seconds(o,val); - *store += period_seconds-1; - *store /= period_seconds; -} - static void op_setint(const Option *o, const char *val) { int *store= o->store; *store= o->intval; @@ -2670,26 +2697,35 @@ static void op_setint(const Option *o, const char *val) { static void help(const Option *o, const char *val); static const Option innduct_options[]= { -{'f',"feedfile", "F", &feedfile, op_string }, -{'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 }, -{0,"help", 0, help }, - -{0,"max-connections", "N", &max_connections, op_integer }, -{0,"max-queue-per-conn", "N", &max_queue_per_conn, op_integer }, - +{'f',"feedfile", "F", &feedfile, op_string }, +{'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 }, +{0,"no-daemon", 0, &become_daemon, op_setint, 0 }, +{0,"no-streaming", 0, &try_stream, op_setint, 0 }, +{0,"inndconf", "F", &inndconffile, op_string }, +{'P',"port", "PORT", &port, op_integer }, +{0,"help", 0, 0, help }, + +{0,"max-connections", "N", &max_connections, op_integer }, +{0,"max-queue-per-conn", "N", &max_queue_per_conn, op_integer }, +{0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer }, +{0,"period-interval", "TIME", &period_seconds, op_seconds }, + +{0,"connection-timeout", "TIME", &connection_setup_timeout, op_seconds }, +{0,"stuck-flush-timeout","TIME", &inndcomm_flush_timeout, op_seconds }, + +{0,"no-check-proportion", "PERCENT", &nocheck_thresh, op_double }, +{0,"no-check-response-time","ARTICLES", &nocheck_decay, op_double }, + +{0,"reconnect-interval", "PERIOD", &reconnect_delay_periods, op_seconds }, +{0,"flush-retry-interval", "PERIOD", &flushfail_retry_periods, op_seconds }, +{0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds }, +{0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds}, +{0,"max-flush-interval", "PERIOD", &spontaneous_flush_periods,op_seconds }, +{0,"idle-timeout", "PERIOD", &need_activity_periods, op_seconds }, + +{0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio, op_double }, +{0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer }, -{0,"streaming", 0, &try_stream, op_setint, 1 }, -{0,"no-streaming", 0, &try_stream, op_setint, 0 }, -{'P',"port", "PORT",&port, op_integer }, -{0,"inndconf", "F", &inndconffile, op_string }, -{0,"no-daemon", 0, &become_daemon, op_setint, 0 }, - -{0,"no-check-proportion","PERCENT", &nocheck_thresh_pct, op_double }, -{0,"no-check-filter", "ARTICLES", &nocheck_decay_articles, op_double }, - -{0,"reconnect-interval", "TIME", &reconnect_delay_periods, op_periods_rndup }, -{0,"flush-retry-interval","TIME", &flushfail_retry_periods, op_periods_rndup }, -{0,"inndcomm-timeout", "TIME", &inndcomm_flush_timeout, op_seconds }, {0,0} }; @@ -2708,6 +2744,11 @@ static void help(const Option *o, const char *val) { exit(0); } +static void convert_to_periods_rndup(int *store) { + *store += period_seconds-1; + *store /= period_seconds; +} + int main(int argc, char **argv) { if (!argv[1]) { printusage(stderr); @@ -2727,24 +2768,33 @@ int main(int argc, char **argv) { if (!remote_host) remote_host= sitename; - if (nocheck_thresh_pct < 0 || nocheck_thresh_pct > 100) + if (nocheck_thresh < 0 || nocheck_thresh > 100) badusage("nocheck threshold percentage must be between 0..100"); - nocheck_thresh= nocheck_thresh_pct * 0.01; + nocheck_thresh *= 0.01; - if (nocheck_decay_articles < 0.1) + if (nocheck_decay < 0.1) badusage("nocheck decay articles must be at least 0.1"); - nocheck_decay= 1 - 1/nocheck_decay_articles; - - if (!pathoutgoing) - pathoutgoing= innconf->pathoutgoing; - innconf_read(inndconffile); - - if (!feedfile) - feedfile= xasprintf("%s/%s",pathoutgoing,sitename); - else if (!feedfile[0]) + nocheck_decay= 1 - 1.0/nocheck_decay; + + convert_to_periods_rndup(&reconnect_delay_periods); + convert_to_periods_rndup(&flushfail_retry_periods); + convert_to_periods_rndup(&backlog_retry_minperiods); + convert_to_periods_rndup(&backlog_spontrescan_periods); + convert_to_periods_rndup(&spontaneous_flush_periods); + convert_to_periods_rndup(&need_activity_periods); + + if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100) + badusage("bad input data ratio must be between 0..100"); + max_bad_data_ratio *= 0.01; + + if (!feedfile) { + innconf_read(inndconffile); + feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename); + } else if (!feedfile[0]) { badusage("feed filename must be nonempty"); - else if (feedfile[strlen(feedfile)-1]=='/') + } else if (feedfile[strlen(feedfile)-1]=='/') { feedfile= xasprintf("%s%s",feedfile,sitename); + } const char *feedfile_forbidden= "?*[~#"; int c;