X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;f=backends%2Finnduct.c;h=fe7be9bde496db1bc32c5d2f0f3c64ea7ee5b62c;hb=52fe602181677af1a1e453e45e4d6b6557b357cc;hp=f64a453f621f205be0fed7eea366f652eff34b8d;hpb=0b3c60f3feea900138009004d8b801ee9a03f58f;p=innduct.git diff --git a/backends/innduct.c b/backends/innduct.c index f64a453..fe7be9b 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -259,7 +259,6 @@ typedef struct InputFile { void *readable_callback_user; int fd; - const char *path; /* ptr copy of path_ or feedfile */ struct Filemon_Perfile *filemon; oop_read *rd; @@ -267,17 +266,28 @@ typedef struct InputFile { off_t offset; Counts counts; + char path[]; } InputFile; +#define SMS_LIST(X) \ + X(WAITING) \ + X(NORMAL) \ + X(FLUSHING) \ + X(FLUSHFAIL) \ + X(SEPARATED) \ + X(DROPPING) + typedef enum { - sm_WAITING, - sm_NORMAL, - sm_FLUSHING, - sm_FLUSHFAIL, - sm_SEPARATED, - sm_DROPPING, +#define SMS_DEF_ENUM(s) sm_##s, + SMS_LIST(SMS_DEF_ENUM) } StateMachineState; +static const char *sms_names[]= { +#define SMS_DEF_NAME(s) #s , + SMS_LIST(SMS_DEF_NAME) + 0 +}; + struct Conn { ISNODE(Conn); int fd, max_queue, stream; @@ -291,7 +301,6 @@ struct Conn { /*----- operational variables -----*/ -static int since_connect_attempt; static int nconns; static LIST(Conn) idle, working, full; static LIST(Article) *queue; @@ -303,7 +312,7 @@ static char *path_lock, *path_flushing, *path_defer; static StateMachineState sms; static FILE *defer; -static InputFile *main_input_file, *old_input_file, *backlog_input_file; +static InputFile *main_input_file, *flushing_input_file, *backlog_input_file; static int sm_period_counter; @@ -462,6 +471,7 @@ logwrap(warn, " warning", LOG_WARN, -1, 0); logwrap(notice, "", LOG_NOTICE, -1, 0); logwrap(info, " info", LOG_INFO, -1, 0); +logwrap(debug, " debug", LOG_DEBUG, -1, 0); /*========== making new connections ==========*/ @@ -1093,7 +1103,7 @@ static void feedfile_eof(InputFile *ipf) { return; } - assert(ipf == old_input_file); + assert(ipf == flushing_input_file); inputfile_tailing_stop(ipf); assert(ipf->fd >= 0); @@ -1113,7 +1123,7 @@ static InputFile *open_input_file(const char *path) { sysfatal("unable to open input file %s", path); } - InputFile *ipf= xmalloc(sizeof(InputFile)); + InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1); memset(ipf,0,sizeof(*ipf)); ipf->readable.on_readable= tailing_on_readable; @@ -1121,7 +1131,7 @@ static InputFile *open_input_file(const char *path) { ipf->readable.try_read= tailing_try_read; ipf->fd= fd; - ipf->path= path; + strcpy(ipf->path, path); return ipf; } @@ -1135,8 +1145,6 @@ static void close_input_file(InputFile *ipf) { if (ipf->fd >= 0) if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path); - fixme maybe free ipf->path; - free(ipf); } @@ -1177,7 +1185,9 @@ typedef void *feedfile_got_article(oop_source *lp, oop_read *rd, } ipf->offset += recsz + 1; - if (sms==sm_NORMAL && ipf->offset >= flush_threshold) { + if (sms==sm_NORMAL && ipf==main_input_file && + (ipf->offset >= flush_threshold || !until_spontaneous_flush) { + notice("starting flush (%lu >= %lu)", (unsigned long)ipf->offset, (unsigned long)flush_threshold); @@ -1257,7 +1267,7 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, if (ipf==main_input_file) { errno=EAGAIN; return -1; - } else if (ipf==old_input_file) { + } else if (ipf==flushing_input_file) { assert(ipf->fd>=0); assert(sms==sm_SEPARATED || sms==sm_DROPPING); } else if (ipf==backlog_input_file) { @@ -1301,11 +1311,15 @@ static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { assert(!filemon_inotify_wd2ipf[wd]); filemon_inotify_wd2ipf[wd]= ipf; + debug("filemon inotify startfile %p wd=%d wdmax=%d", + ipf, wd, filemon_inotify_wdmax); + pf->wd= wd; } static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { int wd= pf->wd; + debug("filemon inotify stopfile %p wd=%d", ipf, wd); int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd); if (r) sysdie("inotify_rm_watch"); filemon_inotify_wd2ipf[wd]= 0; @@ -1325,6 +1339,7 @@ static void *filemon_inotify_readable(oop_source *lp, int fd, die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev)); } InputFile *ipf= filemon_inotify_wd2ipf[iev.wd]; + debug("filemon inotify readable read %p wd=%p", iev.wd, ipf); filemon_callback(ipf); } return OOP_CONTINUE; @@ -1339,6 +1354,7 @@ static int filemon_method_init(void) { set nonblock; loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable); + debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd); return 1; } @@ -1449,7 +1465,7 @@ static void inputfile_tailing_stop(InputFile *ipf) { | V V | ============= ============ | SEPARATED/ DROPPING/ - | old->fd>=0 old->fd>=0 + | flsh->fd>=0 flsh->fd>=0 | [Separated] [Dropping] | main F idle main none | old D tail old D tail @@ -1459,7 +1475,7 @@ static void inputfile_tailing_stop(InputFile *ipf) { | V V | =============== =============== | SEPARATED/ DROPPING/ - | old->fd==-1 old->fd==-1 + | flsh->fd==-1 flsh->fd==-1 | [Finishing] [Dropping] | main F tail main none | old D closed old D closed @@ -1599,6 +1615,7 @@ static void statemc_waiting_poll(void) { static void startup_set_input_file(InputFile *f) { assert(!main_input_file); main_input_file= f; + until_spontaneous_flush= spontaneous_flush_periods; inputfile_tailing_start(f); } @@ -1625,7 +1642,7 @@ static void *statemc_check_input_done(oop_source *lp, return; } - assert(ipf == old_input_file); + assert(ipf == flushing_input_file); assert(sms==sm_SEPARATED || sms==sm_DROPPING); notice_processed(ipf,"feed file",0); @@ -1648,8 +1665,8 @@ static void *statemc_check_input_done(oop_source *lp, open_defer(); - close_input_file(old_input_file); - old_input_file= 0; + close_input_file(flushing_input_file); + flushing_input_file= 0; notice("flush complete"); SMS(NORMAL, 0, "flush complete"); @@ -1817,7 +1834,7 @@ static int search_backlog_file(void) { debug("backlog scan: found age=%f deficiency=%ld oldest=%s", age, age_deficiency, oldest_path); - backlog_input_file= open_input_file(); + backlog_input_file= open_input_file(oldest_path); if (!backlog_input_file) { warn("backlog file %s vanished as we opened it", backlog_input_file); goto try_again; @@ -1848,7 +1865,7 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { loop->cancel_fd(fd); close(fd); - assert(!old_input_file); + assert(!flushing_input_file); if (WIFEXITED(status)) { switch (WEXITSTATUS(status)) { @@ -1858,16 +1875,17 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { case INNDCOMMCHILD_ESTATUS_NONESUCH: warn("feed has been dropped by innd, finishing up"); - old_input_file= main_input_file; + flushing_input_file= main_input_file; main_input_file= 0; SMS(DROPPING, 0, "dropped by innd"); return OOP_CONTINUE; case 0: - old_input_file= main_input_file; + flushing_input_file= main_input_file; main_input_file= open_input_file(feedfile); if (!main_input_file) die("flush succeeded but feedfile %s does not exist!", feedfile); + until_spontaneous_flush= spontaneous_flush_periods; SMS(SEPARATED, 0, "feed file missing"); return OOP_CONTINUE; @@ -1951,7 +1969,7 @@ static void postfork(const char *what) { sysdie("%s child: failed to reset SIGPIPE"); postfork_inputfile(main_input_file); - postfork_inputfile(old_input_file); + postfork_inputfile(flushing_input_file); postfork_conns(idle.head); postfork_conns(working.head); postfork_conns(full.head); @@ -1973,9 +1991,33 @@ EVERY(filepoll, {5,0}, { if (main_input_file && main_input_file->readable_callback) filemon_callback(main_input_file); }); + +#define DEBUGF_IPF(wh) " " #wh "=%p/%s:ip=%ld,off=%ld,fd=%d%s" \ +#define DEBUG_IPF(sh) \ + wh##_input_file, debug_ipf_path(wh##_input_file), \ + wh##_input_file->inprogress, (long)wh##_input_file->offset, \ + wh##_input_file->fd, wh##_input_file->rd ? "+" : "" +static const char *debug_ipf_path(InputFile *ipf) { + char *slash= strrchr(ipf->path,'/'); + return slash ? slash+1 : ipf->path; +} EVERY(period, {PERIOD_SECONDS,0}, { + debug("PERIOD" + " sms=%s queue=%d sm_period_counter=%d" + " connect_delay=%d until_spontaneous_flush=%d" + " input_files" DEBUGF_IPF(main) DEBUGF_IPF(old) DEBUGF_FMT(flushing) + " conns idle=%d working=%d full=%d" + " children connecting=%ld inndcomm_child" + , + sms_names[sms], queue.count, sm_period_counter, + connect_delay, until_spontaneous_flush, + DEBUG_IPF(main), DEBUG_IPF(flushing), DEBUG_IPF(flushing), + idle.count, working.count, full.count, + (long)connecting_child, (long)inndcomm_child + ); if (connect_delay) connect_delay--; + if (until_spontaneous_flush) until_spontaneous_flush--; poll_backlog_file(); if (!backlog_input_file) close_defer(); /* want to start on a new backlog */ statemc_poll(); @@ -2156,11 +2198,11 @@ int main(int argc, char **argv) { else if (feedfile[strlen(feedfile)-1]=='/') feedfile= xasprintf("%s%s",feedfile,sitename); - const char *feedfile_forbidden= "?*["; + const char *feedfile_forbidden= "?*[~#"; int c; while ((c= *feedfile_forbidden++)) if (strchr(feedfile, c)) - badusage("feed filename may not contain glob metacharacter %c",c); + badusage("feed filename may not contain metacharacter %c",c); if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) sysdie("could not ignore SIGPIPE");