From bd976de6f0a8f28a87c4261be5b3df3f59bdb661 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Sun, 18 Apr 2010 18:08:50 +0100 Subject: [PATCH] WIP tidy up, etc, rename old_input_file to flushing_input_file --- backends/innduct.c | 84 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 64 insertions(+), 20 deletions(-) diff --git a/backends/innduct.c b/backends/innduct.c index f0cbf34..8b4b285 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -269,15 +269,25 @@ typedef struct InputFile { char path[]; } InputFile; +#define SMS_LIST(X) \ + X(WAITING) \ + X(NORMAL) \ + X(FLUSHING) \ + X(FLUSHFAIL) \ + X(SEPARATED) \ + X(DROPPING) + typedef enum { - sm_WAITING, - sm_NORMAL, - sm_FLUSHING, - sm_FLUSHFAIL, - sm_SEPARATED, - sm_DROPPING, +#define SMS_DEF_ENUM(s) sm_##s, + SMS_LIST(SMS_DEF_ENUM) } StateMachineState; +static const char *sms_names[]= { +#define SMS_DEF_NAME(s) #s , + SMS_LIST(SMS_DEF_NAME) + 0 +}; + struct Conn { ISNODE(Conn); int fd, max_queue, stream; @@ -291,7 +301,6 @@ struct Conn { /*----- operational variables -----*/ -static int since_connect_attempt; static int nconns; static LIST(Conn) idle, working, full; static LIST(Article) *queue; @@ -303,7 +312,7 @@ static char *path_lock, *path_flushing, *path_defer; static StateMachineState sms; static FILE *defer; -static InputFile *main_input_file, *old_input_file, *backlog_input_file; +static InputFile *main_input_file, *flushing_input_file, *backlog_input_file; static int sm_period_counter; @@ -462,6 +471,7 @@ logwrap(warn, " warning", LOG_WARN, -1, 0); logwrap(notice, "", LOG_NOTICE, -1, 0); logwrap(info, " info", LOG_INFO, -1, 0); +logwrap(debug, " debug", LOG_DEBUG, -1, 0); /*========== making new connections ==========*/ @@ -1093,7 +1103,7 @@ static void feedfile_eof(InputFile *ipf) { return; } - assert(ipf == old_input_file); + assert(ipf == flushing_input_file); inputfile_tailing_stop(ipf); assert(ipf->fd >= 0); @@ -1175,7 +1185,9 @@ typedef void *feedfile_got_article(oop_source *lp, oop_read *rd, } ipf->offset += recsz + 1; - if (sms==sm_NORMAL && ipf->offset >= flush_threshold) { + if (sms==sm_NORMAL && ipf==main_input_file && + (ipf->offset >= flush_threshold || !until_spontaneous_flush) { + notice("starting flush (%lu >= %lu)", (unsigned long)ipf->offset, (unsigned long)flush_threshold); @@ -1255,7 +1267,7 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, if (ipf==main_input_file) { errno=EAGAIN; return -1; - } else if (ipf==old_input_file) { + } else if (ipf==flushing_input_file) { assert(ipf->fd>=0); assert(sms==sm_SEPARATED || sms==sm_DROPPING); } else if (ipf==backlog_input_file) { @@ -1299,11 +1311,15 @@ static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { assert(!filemon_inotify_wd2ipf[wd]); filemon_inotify_wd2ipf[wd]= ipf; + debug("filemon inotify startfile %p wd=%d wdmax=%d", + ipf, wd, filemon_inotify_wdmax); + pf->wd= wd; } static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { int wd= pf->wd; + debug("filemon inotify stopfile %p wd=%d", ipf, wd); int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd); if (r) sysdie("inotify_rm_watch"); filemon_inotify_wd2ipf[wd]= 0; @@ -1323,6 +1339,7 @@ static void *filemon_inotify_readable(oop_source *lp, int fd, die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev)); } InputFile *ipf= filemon_inotify_wd2ipf[iev.wd]; + debug("filemon inotify readable read %p wd=%p", iev.wd, ipf); filemon_callback(ipf); } return OOP_CONTINUE; @@ -1337,6 +1354,7 @@ static int filemon_method_init(void) { set nonblock; loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable); + debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd); return 1; } @@ -1598,6 +1616,7 @@ static void statemc_waiting_poll(void) { static void startup_set_input_file(InputFile *f) { assert(!main_input_file); main_input_file= f; + until_spontaneous_flush= spontaneous_flush_periods; inputfile_tailing_start(f); } @@ -1624,7 +1643,7 @@ static void *statemc_check_input_done(oop_source *lp, return; } - assert(ipf == old_input_file); + assert(ipf == flushing_input_file); assert(sms==sm_SEPARATED || sms==sm_DROPPING); notice_processed(ipf,"feed file",0); @@ -1647,8 +1666,8 @@ static void *statemc_check_input_done(oop_source *lp, open_defer(); - close_input_file(old_input_file); - old_input_file= 0; + close_input_file(flushing_input_file); + flushing_input_file= 0; notice("flush complete"); SMS(NORMAL, 0, "flush complete"); @@ -1847,7 +1866,7 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { loop->cancel_fd(fd); close(fd); - assert(!old_input_file); + assert(!flushing_input_file); if (WIFEXITED(status)) { switch (WEXITSTATUS(status)) { @@ -1857,16 +1876,17 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { case INNDCOMMCHILD_ESTATUS_NONESUCH: warn("feed has been dropped by innd, finishing up"); - old_input_file= main_input_file; + flushing_input_file= main_input_file; main_input_file= 0; SMS(DROPPING, 0, "dropped by innd"); return OOP_CONTINUE; case 0: - old_input_file= main_input_file; + flushing_input_file= main_input_file; main_input_file= open_input_file(feedfile); if (!main_input_file) die("flush succeeded but feedfile %s does not exist!", feedfile); + until_spontaneous_flush= spontaneous_flush_periods; SMS(SEPARATED, 0, "feed file missing"); return OOP_CONTINUE; @@ -1950,7 +1970,7 @@ static void postfork(const char *what) { sysdie("%s child: failed to reset SIGPIPE"); postfork_inputfile(main_input_file); - postfork_inputfile(old_input_file); + postfork_inputfile(flushing_input_file); postfork_conns(idle.head); postfork_conns(working.head); postfork_conns(full.head); @@ -1972,9 +1992,33 @@ EVERY(filepoll, {5,0}, { if (main_input_file && main_input_file->readable_callback) filemon_callback(main_input_file); }); + +#define DEBUGF_IPF(wh) " " #wh "=%p/%s:ip=%ld,off=%ld,fd=%d%s" \ +#define DEBUG_IPF(sh) \ + wh##_input_file, debug_ipf_path(wh##_input_file), \ + wh##_input_file->inprogress, (long)wh##_input_file->offset, \ + wh##_input_file->fd, wh##_input_file->rd ? "+" : "" +static const char *debug_ipf_path(InputFile *ipf) { + char *slash= strrchr(ipf->path,'/'); + return slash ? slash+1 : ipf->path; +} EVERY(period, {PERIOD_SECONDS,0}, { + debug("PERIOD" + " sms=%s queue=%d sm_period_counter=%d" + " connect_delay=%d until_spontaneous_flush=%d" + " input_files" DEBUGF_IPF(main) DEBUGF_IPF(old) DEBUGF_FMT(flushing) + " conns idle=%d working=%d full=%d" + " children connecting=%ld inndcomm_child" + , + sms_names[sms], queue.count, sm_period_counter, + connect_delay, until_spontaneous_flush, + DEBUG_IPF(main), DEBUG_IPF(flushing), DEBUG_IPF(flushing), + idle.count, working.count, full.count, + (long)connecting_child, (long)inndcomm_child + ); if (connect_delay) connect_delay--; + if (until_spontaneous_flush) until_spontaneous_flush--; poll_backlog_file(); if (!backlog_input_file) close_defer(); /* want to start on a new backlog */ statemc_poll(); @@ -2155,11 +2199,11 @@ int main(int argc, char **argv) { else if (feedfile[strlen(feedfile)-1]=='/') feedfile= xasprintf("%s%s",feedfile,sitename); - const char *feedfile_forbidden= "?*["; + const char *feedfile_forbidden= "?*[~#"; int c; while ((c= *feedfile_forbidden++)) if (strchr(feedfile, c)) - badusage("feed filename may not contain glob metacharacter %c",c); + badusage("feed filename may not contain metacharacter %c",c); if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) sysdie("could not ignore SIGPIPE"); -- 2.30.2