From: Ian Jackson Date: Mon, 26 Apr 2010 09:54:13 +0000 (+0100) Subject: wip compile X-Git-Tag: innduct-0.1~136 X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=innduct.git;a=commitdiff_plain;h=8da1c74b5f5cb7279a82ab4c31d3d251cc611ce1 wip compile --- diff --git a/backends/innduct.c b/backends/innduct.c index 4df15d3..6cb8eb4 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,6 +1,8 @@ /* * TODO * - close idle connections + * - cope better with garbage in feed file + * - cope better with NULs in feed file * - -k kill mode ? */ @@ -267,11 +269,18 @@ static void statemc_setstate(StateMachineState newsms, int periods, static void check_master_queue(void); static void queue_check_input_done(void); +static void statemc_start_flush(const char *why); /* Normal => Flushing */ +static void statemc_check_flushing_done(void); +static void statemc_check_backlog_done(void); + static void postfork(const char *what); static void postfork_inputfile(InputFile *ipf); static void open_defer(void); +static void inputfile_tailing_start(InputFile *ipf); +static void inputfile_tailing_stop(InputFile *ipf); + /*----- configuration options -----*/ static char *sitename, *feedfile; @@ -279,6 +288,7 @@ static const char *remote_host; static int quiet_multiple=0, become_daemon=1; static int max_connections=10, max_queue_per_conn=200; +static int target_max_feedfile_size=100000; static int connection_setup_timeout=200, port=119, try_stream=1; static int inndcomm_flush_timeout=100; @@ -1317,10 +1327,6 @@ static InputFile *open_input_file(const char *path) { InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1); memset(ipf,0,sizeof(*ipf)); - ipf->readable.on_readable= tailing_on_readable; - ipf->readable.on_cancel= tailing_on_cancel; - ipf->readable.try_read= tailing_try_read; - ipf->fd= fd; strcpy(ipf->path, path); @@ -1340,11 +1346,10 @@ static void close_input_file(InputFile *ipf) { /*---------- dealing with articles read in the input file ----------*/ -typedef void *feedfile_got_article(oop_source *lp, oop_read *rd, - oop_rd_event ev, const char *errmsg, - int errnoval, - const char *data, size_t recsz, - void *ipf_v) { +void *feedfile_got_article(oop_source *lp, oop_read *rd, + oop_rd_event ev, const char *errmsg, + int errnoval, const char *data, size_t recsz, + void *ipf_v) { InputFile *ipf= ipf_v; Article *art; char tokentextbuf[sizeof(TOKEN)*2+3]; @@ -1375,31 +1380,12 @@ typedef void *feedfile_got_article(oop_source *lp, oop_read *rd, ipf->offset += recsz + 1; if (sms==sm_NORMAL && ipf==main_input_file && - ipf->offset >= flush_threshold) + ipf->offset >= target_max_feedfile_size) statemc_start_flush("feed file size"); check_master_queue(); } -static void statemc_start_flush(const char *why) { /* Normal => Flushing */ - assert(sms == sm_NORMAL); - - debug("starting flush (%s) (%lu >= %lu) (%d)", - why, - (unsigned long)ipf->offset, (unsigned long)flush_threshold, - sm_period_counter); - - int r= link(feedfile, duct_path); - if (r) sysdie("link feedfile %s to flushing file %s", feedfile, - path_duct); - /* => Hardlinked */ - - xunlink(feedfile, "old feedfile link"); - /* => Moved */ - - spawn_inndcomm_flush(why); /* => Flushing FLUSHING */ -} - /*========== tailing input file ==========*/ static void filemon_start(InputFile *ipf) { @@ -1802,6 +1788,25 @@ static void statemc_init(void) { } } +static void statemc_start_flush(const char *why) { /* Normal => Flushing */ + assert(sms == sm_NORMAL); + + debug("starting flush (%s) (%lu >= %lu) (%d)", + why, + (unsigned long)ipf->offset, (unsigned long)flush_threshold, + sm_period_counter); + + int r= link(feedfile, duct_path); + if (r) sysdie("link feedfile %s to flushing file %s", feedfile, + path_duct); + /* => Hardlinked */ + + xunlink(feedfile, "old feedfile link"); + /* => Moved */ + + spawn_inndcomm_flush(why); /* => Flushing FLUSHING */ +} + static void statemc_period_poll(void) { if (!sm_period_counter) return; sm_period_counter--;