X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;f=backends%2Finnduct.c;h=89e15b63f7e2929d18925d7e0df3a13d111b1e89;hb=b104e862f8e83fbd5dc28c228b45c0214e1d1417;hp=2a22ba20a937b0acdd5d845c43e4bc15f3ff66c3;hpb=8c547d5d95d290391bc645a17d195412d20aea9a;p=innduct.git diff --git a/backends/innduct.c b/backends/innduct.c index 2a22ba2..89e15b6 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -259,7 +259,6 @@ typedef struct InputFile { void *readable_callback_user; int fd; - const char *path; /* ptr copy of path_ or feedfile */ struct Filemon_Perfile *filemon; oop_read *rd; @@ -267,17 +266,28 @@ typedef struct InputFile { off_t offset; Counts counts; + char path[]; } InputFile; +#define SMS_LIST(X) \ + X(WAITING) \ + X(NORMAL) \ + X(FLUSHING) \ + X(FLUSHFAIL) \ + X(SEPARATED) \ + X(DROPPING) + typedef enum { - sm_WAITING, - sm_NORMAL, - sm_FLUSHING, - sm_FLUSHFAIL, - sm_SEPARATED, - sm_DROPPING, +#define SMS_DEF_ENUM(s) sm_##s, + SMS_LIST(SMS_DEF_ENUM) } StateMachineState; +static const char *sms_names[]= { +#define SMS_DEF_NAME(s) #s , + SMS_LIST(SMS_DEF_NAME) + 0 +}; + struct Conn { ISNODE(Conn); int fd, max_queue, stream; @@ -291,7 +301,6 @@ struct Conn { /*----- operational variables -----*/ -static int since_connect_attempt; static int nconns; static LIST(Conn) idle, working, full; static LIST(Article) *queue; @@ -303,7 +312,7 @@ static char *path_lock, *path_flushing, *path_defer; static StateMachineState sms; static FILE *defer; -static InputFile *main_input_file, *old_input_file, *backlog_input_file; +static InputFile *main_input_file, *flushing_input_file, *backlog_input_file; static int sm_period_counter; @@ -462,6 +471,7 @@ logwrap(warn, " warning", LOG_WARN, -1, 0); logwrap(notice, "", LOG_NOTICE, -1, 0); logwrap(info, " info", LOG_INFO, -1, 0); +logwrap(debug, " debug", LOG_DEBUG, -1, 0); /*========== making new connections ==========*/ @@ -938,11 +948,14 @@ static Article *article_reply_check(Connection *conn, const char *response, static void update_nocheck(int accepted) { accept_proportion *= accept_decay; accept_proportion += accepted; - nocheck= accept_proportion >= nocheck_thresh; - if (nocheck && !nocheck_reported) { + int new_nocheck= accept_proportion >= nocheck_thresh; + if (new_nocheck && !nocheck_reported) { notice("entering nocheck mode for the first time"); nocheck_reported= 1; + } else if (new_nocheck != nockech) { + debug("nocheck mode %s", new_nocheck ? "start" : "stop"); } + nocheck= new_nocheck; } static void article_done(Connection *conn, Article *art, int whichcount) { @@ -1093,7 +1106,7 @@ static void feedfile_eof(InputFile *ipf) { return; } - assert(ipf == old_input_file); + assert(ipf == flushing_input_file); inputfile_tailing_stop(ipf); assert(ipf->fd >= 0); @@ -1113,7 +1126,7 @@ static InputFile *open_input_file(const char *path) { sysfatal("unable to open input file %s", path); } - InputFile *ipf= xmalloc(sizeof(InputFile)); + InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1); memset(ipf,0,sizeof(*ipf)); ipf->readable.on_readable= tailing_on_readable; @@ -1121,7 +1134,7 @@ static InputFile *open_input_file(const char *path) { ipf->readable.try_read= tailing_try_read; ipf->fd= fd; - ipf->path= path; + strcpy(ipf->path, path); return ipf; } @@ -1135,8 +1148,6 @@ static void close_input_file(InputFile *ipf) { if (ipf->fd >= 0) if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path); - fixme maybe free ipf->path; - free(ipf); } @@ -1177,7 +1188,9 @@ typedef void *feedfile_got_article(oop_source *lp, oop_read *rd, } ipf->offset += recsz + 1; - if (sms==sm_NORMAL && ipf->offset >= flush_threshold) { + if (sms==sm_NORMAL && ipf==main_input_file && + (ipf->offset >= flush_threshold || !until_spontaneous_flush) { + notice("starting flush (%lu >= %lu)", (unsigned long)ipf->offset, (unsigned long)flush_threshold); @@ -1257,7 +1270,7 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, if (ipf==main_input_file) { errno=EAGAIN; return -1; - } else if (ipf==old_input_file) { + } else if (ipf==flushing_input_file) { assert(ipf->fd>=0); assert(sms==sm_SEPARATED || sms==sm_DROPPING); } else if (ipf==backlog_input_file) { @@ -1301,11 +1314,15 @@ static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { assert(!filemon_inotify_wd2ipf[wd]); filemon_inotify_wd2ipf[wd]= ipf; + debug("filemon inotify startfile %p wd=%d wdmax=%d", + ipf, wd, filemon_inotify_wdmax); + pf->wd= wd; } static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { int wd= pf->wd; + debug("filemon inotify stopfile %p wd=%d", ipf, wd); int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd); if (r) sysdie("inotify_rm_watch"); filemon_inotify_wd2ipf[wd]= 0; @@ -1325,6 +1342,7 @@ static void *filemon_inotify_readable(oop_source *lp, int fd, die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev)); } InputFile *ipf= filemon_inotify_wd2ipf[iev.wd]; + debug("filemon inotify readable read %p wd=%p", iev.wd, ipf); filemon_callback(ipf); } return OOP_CONTINUE; @@ -1339,6 +1357,7 @@ static int filemon_method_init(void) { set nonblock; loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable); + debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd); return 1; } @@ -1449,7 +1468,7 @@ static void inputfile_tailing_stop(InputFile *ipf) { | V V | ============= ============ | SEPARATED/ DROPPING/ - | old->fd>=0 old->fd>=0 + | flsh->fd>=0 flsh->fd>=0 | [Separated] [Dropping] | main F idle main none | old D tail old D tail @@ -1459,7 +1478,7 @@ static void inputfile_tailing_stop(InputFile *ipf) { | V V | =============== =============== | SEPARATED/ DROPPING/ - | old->fd==-1 old->fd==-1 + | flsh->fd==-1 flsh->fd==-1 | [Finishing] [Dropping] | main F tail main none | old D closed old D closed @@ -1566,7 +1585,6 @@ static void statemc_init(void) { } } else { debug("startup: D ENOENT => Nothing"); - fixme need to try flushing innd here - needs state diagram changes; SMS(WAITING, open_wait_periods, "no feed file currently exists"); } } @@ -1600,6 +1618,7 @@ static void statemc_waiting_poll(void) { static void startup_set_input_file(InputFile *f) { assert(!main_input_file); main_input_file= f; + until_spontaneous_flush= spontaneous_flush_periods; inputfile_tailing_start(f); } @@ -1626,7 +1645,7 @@ static void *statemc_check_input_done(oop_source *lp, return; } - assert(ipf == old_input_file); + assert(ipf == flushing_input_file); assert(sms==sm_SEPARATED || sms==sm_DROPPING); notice_processed(ipf,"feed file",0); @@ -1649,8 +1668,8 @@ static void *statemc_check_input_done(oop_source *lp, open_defer(); - close_input_file(old_input_file); - old_input_file= 0; + close_input_file(flushing_input_file); + flushing_input_file= 0; notice("flush complete"); SMS(NORMAL, 0, "flush complete"); @@ -1742,14 +1761,14 @@ static void close_defer(void) { sysdie("could not unlink old defer link %s to backlog file %s", path_defer, backlog); - if (backlog_nextscan_periods < 0 || - backlog_nextscan_periods > backlog_retry_minperiods + 1) - backlog_nextscan_periods= backlog_retry_minperiods + 1; + if (until_backlog_nextscan < 0 || + until_backlog_nextscan > backlog_retry_minperiods + 1) + until_backlog_nextscan= backlog_retry_minperiods + 1; } static void poll_backlog_file(void) { - if (backlog_nextscan_periods < 0) return; - if (backlog_nextscan_periods-- > 0) return; + if (until_backlog_nextscan < 0) return; + if (until_backlog_nextscan-- > 0) return; search_backlog_file(); } @@ -1806,7 +1825,7 @@ static int search_backlog_file(void) { if (!oldest_path) { debug("backlog scan: none"); - backlog_nextscan_periods= backlog_spontaneous_rescan_periods; + until_backlog_nextscan= backlog_spontaneous_rescan_periods; return 0; } @@ -1818,24 +1837,24 @@ static int search_backlog_file(void) { debug("backlog scan: found age=%f deficiency=%ld oldest=%s", age, age_deficiency, oldest_path); - backlog_input_file= open_input_file(); + backlog_input_file= open_input_file(oldest_path); if (!backlog_input_file) { warn("backlog file %s vanished as we opened it", backlog_input_file); goto try_again; } inputfile_tailing_start(backlog_input_file); - backlog_nextscan_periods= -1; + until_backlog_nextscan= -1; return 1; } - backlog_nextscan_periods= age_deficiency / PERIOD_SECONDS; + until_backlog_nextscan= age_deficiency / PERIOD_SECONDS; if (backlog_spontaneous_rescan_periods >= 0 && - backlog_nextscan_periods > backlog_spontaneous_rescan_periods) - backlog_nextscan_periods= backlog_spontaneous_rescan_periods; + until_backlog_nextscan > backlog_spontaneous_rescan_periods) + until_backlog_nextscan= backlog_spontaneous_rescan_periods; debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s", - age, age_deficiency, backlog_nextscan_periods, oldest_path); + age, age_deficiency, until_backlog_nextscan, oldest_path); return 2; } @@ -1849,7 +1868,7 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { loop->cancel_fd(fd); close(fd); - assert(!old_input_file); + assert(!flushing_input_file); if (WIFEXITED(status)) { switch (WEXITSTATUS(status)) { @@ -1859,16 +1878,17 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { case INNDCOMMCHILD_ESTATUS_NONESUCH: warn("feed has been dropped by innd, finishing up"); - old_input_file= main_input_file; + flushing_input_file= main_input_file; main_input_file= 0; SMS(DROPPING, 0, "dropped by innd"); return OOP_CONTINUE; case 0: - old_input_file= main_input_file; + flushing_input_file= main_input_file; main_input_file= open_input_file(feedfile); if (!main_input_file) die("flush succeeded but feedfile %s does not exist!", feedfile); + until_spontaneous_flush= spontaneous_flush_periods; SMS(SEPARATED, 0, "feed file missing"); return OOP_CONTINUE; @@ -1952,7 +1972,7 @@ static void postfork(const char *what) { sysdie("%s child: failed to reset SIGPIPE"); postfork_inputfile(main_input_file); - postfork_inputfile(old_input_file); + postfork_inputfile(flushing_input_file); postfork_conns(idle.head); postfork_conns(working.head); postfork_conns(full.head); @@ -1974,9 +1994,33 @@ EVERY(filepoll, {5,0}, { if (main_input_file && main_input_file->readable_callback) filemon_callback(main_input_file); }); + +#define DEBUGF_IPF(wh) " " #wh "=%p/%s:ip=%ld,off=%ld,fd=%d%s" \ +#define DEBUG_IPF(sh) \ + wh##_input_file, debug_ipf_path(wh##_input_file), \ + wh##_input_file->inprogress, (long)wh##_input_file->offset, \ + wh##_input_file->fd, wh##_input_file->rd ? "+" : "" +static const char *debug_ipf_path(InputFile *ipf) { + char *slash= strrchr(ipf->path,'/'); + return slash ? slash+1 : ipf->path; +} EVERY(period, {PERIOD_SECONDS,0}, { + debug("PERIOD" + " sms=%s queue=%d sm_period_counter=%d" + " connect_delay=%d until_spontaneous_flush=%d" + " input_files" DEBUGF_IPF(main) DEBUGF_IPF(old) DEBUGF_FMT(flushing) + " conns idle=%d working=%d full=%d" + " children connecting=%ld inndcomm_child" + , + sms_names[sms], queue.count, sm_period_counter, + connect_delay, until_spontaneous_flush, + DEBUG_IPF(main), DEBUG_IPF(flushing), DEBUG_IPF(flushing), + idle.count, working.count, full.count, + (long)connecting_child, (long)inndcomm_child + ); if (connect_delay) connect_delay--; + if (until_spontaneous_flush) until_spontaneous_flush--; poll_backlog_file(); if (!backlog_input_file) close_defer(); /* want to start on a new backlog */ statemc_poll(); @@ -2157,11 +2201,11 @@ int main(int argc, char **argv) { else if (feedfile[strlen(feedfile)-1]=='/') feedfile= xasprintf("%s%s",feedfile,sitename); - const char *feedfile_forbidden= "?*["; + const char *feedfile_forbidden= "?*[~#"; int c; while ((c= *feedfile_forbidden++)) if (strchr(feedfile, c)) - badusage("feed filename may not contain glob metacharacter %c",c); + badusage("feed filename may not contain metacharacter %c",c); if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) sysdie("could not ignore SIGPIPE");