X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=blobdiff_plain;f=backends%2Finnduct.c;h=8b4b2859359150848e8cccd79a842ed1ddb38ba2;hp=092c69356ad5f838e5a0ffa1067ab4224436dcb5;hb=bd976de6f0a8f28a87c4261be5b3df3f59bdb661;hpb=cba40d7fec8017adb811e512bafa6eb2f9c13685 diff --git a/backends/innduct.c b/backends/innduct.c index 092c693..8b4b285 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -15,24 +15,29 @@ * or might be blanked out * .... * - * site.name_duct.lock lock preventing multiple ducts - * holder of this lock is "duct" * F site.name main feed file * opened/created, then written, by innd * read by duct * unlinked by duct * tokens blanked out by duct when processed - * D site.name_duct temporary feed file during flush (or crash) + * site.name_lock lock preventing multiple ducts + * to hold lock must open,F_SETLK[W] + * and then stat to check that locked file + * still has name site.name_lock + * holder of this lock is "duct" + * (only) lockholder may remove the lockfile + * D site.name_flushing temporary feed file during flush (or crash) * hardlink created by duct * unlinked by duct - * site.name_duct.defer 431'd articles, still being written, + * site.name_defer 431'd articles, still being written, * created, written, used by duct * - * site.name_backlog_. + * site.name_backlog.. * 431'd articles, ready for innxmit or duct * created (link/mv) by duct - * site.name_backlog_ eg - * site.name_backlog_manual + * site.name_backlog (where does not + * contain '#' or '~') eg + * site.name_backlog.manual * anything the sysadmin likes (eg, feed files * from old feeds to be merged into this one) * created (link/mv) by admin @@ -254,7 +259,6 @@ typedef struct InputFile { void *readable_callback_user; int fd; - const char *path; /* ptr copy of path_ or feedfile */ struct Filemon_Perfile *filemon; oop_read *rd; @@ -262,17 +266,28 @@ typedef struct InputFile { off_t offset; Counts counts; + char path[]; } InputFile; +#define SMS_LIST(X) \ + X(WAITING) \ + X(NORMAL) \ + X(FLUSHING) \ + X(FLUSHFAIL) \ + X(SEPARATED) \ + X(DROPPING) + typedef enum { - sm_WAITING, - sm_NORMAL, - sm_FLUSHING, - sm_FLUSHFAIL, - sm_SEPARATED, - sm_DROPPING, +#define SMS_DEF_ENUM(s) sm_##s, + SMS_LIST(SMS_DEF_ENUM) } StateMachineState; +static const char *sms_names[]= { +#define SMS_DEF_NAME(s) #s , + SMS_LIST(SMS_DEF_NAME) + 0 +}; + struct Conn { ISNODE(Conn); int fd, max_queue, stream; @@ -286,19 +301,18 @@ struct Conn { /*----- operational variables -----*/ -static int since_connect_attempt; static int nconns; static LIST(Conn) idle, working, full; static LIST(Article) *queue; -static char *path_ductlock, *path_duct, *path_ductdefer; +static char *path_lock, *path_flushing, *path_defer; #define SMS(newstate, periods, why) \ (statemc_setstate(sm_##newstate,(periods),#newstate,(why))) static StateMachineState sms; static FILE *defer; -static InputFile *main_input_file, *old_input_file, *backlog_input_file; +static InputFile *main_input_file, *flushing_input_file, *backlog_input_file; static int sm_period_counter; @@ -457,6 +471,7 @@ logwrap(warn, " warning", LOG_WARN, -1, 0); logwrap(notice, "", LOG_NOTICE, -1, 0); logwrap(info, " info", LOG_INFO, -1, 0); +logwrap(debug, " debug", LOG_DEBUG, -1, 0); /*========== making new connections ==========*/ @@ -1063,7 +1078,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, open_defer(); if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0 || fflush(defer)) - sysfatal("write to defer file %s",path_ductdefer); + sysfatal("write to defer file %s",path_defer); article_done(conn, art, RC_deferred); break; @@ -1088,7 +1103,7 @@ static void feedfile_eof(InputFile *ipf) { return; } - assert(ipf == old_input_file); + assert(ipf == flushing_input_file); inputfile_tailing_stop(ipf); assert(ipf->fd >= 0); @@ -1108,7 +1123,7 @@ static InputFile *open_input_file(const char *path) { sysfatal("unable to open input file %s", path); } - InputFile *ipf= xmalloc(sizeof(InputFile)); + InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1); memset(ipf,0,sizeof(*ipf)); ipf->readable.on_readable= tailing_on_readable; @@ -1116,7 +1131,7 @@ static InputFile *open_input_file(const char *path) { ipf->readable.try_read= tailing_try_read; ipf->fd= fd; - ipf->path= path; + strcpy(ipf->path, path); return ipf; } @@ -1130,8 +1145,6 @@ static void close_input_file(InputFile *ipf) { if (ipf->fd >= 0) if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path); - fixme maybe free ipf->path; - free(ipf); } @@ -1172,12 +1185,14 @@ typedef void *feedfile_got_article(oop_source *lp, oop_read *rd, } ipf->offset += recsz + 1; - if (sms==sm_NORMAL && ipf->offset >= flush_threshold) { + if (sms==sm_NORMAL && ipf==main_input_file && + (ipf->offset >= flush_threshold || !until_spontaneous_flush) { + notice("starting flush (%lu >= %lu)", (unsigned long)ipf->offset, (unsigned long)flush_threshold); int r= link(feedfile, duct_path); - if (r) sysdie("link feedfile %s to ductfile %s", feedfile, dut_path); + if (r) sysdie("link feedfile %s to flushing file %s", feedfile, dut_path); /* => Hardlinked */ r= unlink(feedfile); @@ -1252,7 +1267,7 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, if (ipf==main_input_file) { errno=EAGAIN; return -1; - } else if (ipf==old_input_file) { + } else if (ipf==flushing_input_file) { assert(ipf->fd>=0); assert(sms==sm_SEPARATED || sms==sm_DROPPING); } else if (ipf==backlog_input_file) { @@ -1296,11 +1311,15 @@ static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { assert(!filemon_inotify_wd2ipf[wd]); filemon_inotify_wd2ipf[wd]= ipf; + debug("filemon inotify startfile %p wd=%d wdmax=%d", + ipf, wd, filemon_inotify_wdmax); + pf->wd= wd; } static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { int wd= pf->wd; + debug("filemon inotify stopfile %p wd=%d", ipf, wd); int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd); if (r) sysdie("inotify_rm_watch"); filemon_inotify_wd2ipf[wd]= 0; @@ -1320,6 +1339,7 @@ static void *filemon_inotify_readable(oop_source *lp, int fd, die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev)); } InputFile *ipf= filemon_inotify_wd2ipf[iev.wd]; + debug("filemon inotify readable read %p wd=%p", iev.wd, ipf); filemon_callback(ipf); } return OOP_CONTINUE; @@ -1334,6 +1354,7 @@ static int filemon_method_init(void) { set nonblock; loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable); + debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd); return 1; } @@ -1476,14 +1497,14 @@ static void statemc_init(void) { struct stat stab, stabf; int noent; - path_ductlock= xasprintf("%s_duct.lock", feedfile); - path_duct= xasprintf("%s_duct", feedfile); - path_ductdefer= xasprintf("%s_duct.defer", feedfile); - globpat_backlog= xasprintf("%s_backlog_*", feedfile); + path_lock= xasprintf("%s_lock", feedfile); + path_flushing= xasprintf("%s_flushing", feedfile); + path_defer= xasprintf("%s_defer", feedfile); + globpat_backlog= xasprintf("%s_backlog*", feedfile); for (;;) { - int lockfd= open(path_ductlock, O_CREAT|O_RDWR, 0600); - if (lockfd<0) sysfatal("open lockfile %s", path_ductlock); + int lockfd= open(path_lock, O_CREAT|O_RDWR, 0600); + if (lockfd<0) sysfatal("open lockfile %s", path_lock); struct flock fl; memset(&fl,0,sizeof(fl)); @@ -1495,23 +1516,23 @@ static void statemc_init(void) { if (quiet_if_locked) exit(0); fatal("another duct holds the lockfile"); } - sysdie("fcntl F_SETLK lockfile %s", path_ductlock); + sysdie("fcntl F_SETLK lockfile %s", path_lock); } xfstat_isreg(lockfd, &stabf, "lockfile"); - xlstat_isreg(path_ductlock, &stab, &noent, "lockfile"); + xlstat_isreg(path_lock, &stab, &noent, "lockfile"); if (!noent && samefile(&stab, &stabf)) break; if (close(lockfd)) - sysdie("could not close stale lockfile %s", path_ductlock); + sysdie("could not close stale lockfile %s", path_lock); } debug("startup: locked"); search_backlog_file(); - xlstat_isreg(path_ductdefer, &stab, &noent, "defer file"); + xlstat_isreg(path_defer, &stab, &noent, "defer file"); if (noent) { debug("startup: ductdefer ENOENT"); } else { @@ -1531,7 +1552,7 @@ static void statemc_init(void) { } } - InputFile *file_d= open_input_file(path_duct); + InputFile *file_d= open_input_file(path_flushing); if (file_d) { struct stat stab_f, stab_d; @@ -1544,11 +1565,11 @@ static void statemc_init(void) { debug("startup: F and D both exist"); - xfstat_isreg(file_d->fd, &stab_d, "ductfile"); + xfstat_isreg(file_d->fd, &stab_d, "flushing file"); if (samefile(&stab_d, &stab_f)) { debug("startup: F==D => Hardlinked"); - r= unlink(path_duct); + r= unlink(path_flushing); if (r) sysdie("unlink feed file %s during startup", feedfile); found_moved: debug(" => Moved"); @@ -1561,6 +1582,7 @@ static void statemc_init(void) { } } else { debug("startup: D ENOENT => Nothing"); + fixme need to try flushing innd here - needs state diagram changes; SMS(WAITING, open_wait_periods, "no feed file currently exists"); } } @@ -1594,6 +1616,7 @@ static void statemc_waiting_poll(void) { static void startup_set_input_file(InputFile *f) { assert(!main_input_file); main_input_file= f; + until_spontaneous_flush= spontaneous_flush_periods; inputfile_tailing_start(f); } @@ -1620,15 +1643,15 @@ static void *statemc_check_input_done(oop_source *lp, return; } - assert(ipf == old_input_file); + assert(ipf == flushing_input_file); assert(sms==sm_SEPARATED || sms==sm_DROPPING); notice_processed(ipf,"feed file",0); close_defer(); - if (unlink(path_duct)) - sysdie("could not unlink old duct file %s", path_duct); + if (unlink(path_flushing)) + sysdie("could not unlink old flushing file %s", path_flushing); if (sms==sm_DROPPING) { if (search_backlog_file()) { @@ -1636,15 +1659,15 @@ static void *statemc_check_input_done(oop_source *lp, return; } notice("feed dropped and our work is complete"); - r= unlink(path_ductlock); - if (r) sysdie("unlink lock file for old feed %s", path_ductlock); + r= unlink(path_lock); + if (r) sysdie("unlink lockfile for old feed %s", path_lock); exit(0); } open_defer(); - close_input_file(old_input_file); - old_input_file= 0; + close_input_file(flushing_input_file); + flushing_input_file= 0; notice("flush complete"); SMS(NORMAL, 0, "flush complete"); @@ -1668,15 +1691,15 @@ static void open_defer(void) { if (defer) return; - defer= fopen(path_ductdefer, "a+"); - if (!defer) sysfatal("could not open defer file %s", path_ductdefer); + defer= fopen(path_defer, "a+"); + if (!defer) sysfatal("could not open defer file %s", path_defer); /* truncate away any half-written records */ xfstat_isreg(fileno(defer), &stab, "newly opened defer file"); if (stab.st_size > LONG_MAX) - die("defer file %s size is far too large", path_ductdefer); + die("defer file %s size is far too large", path_defer); if (!stab.st_size) return; @@ -1686,14 +1709,14 @@ static void open_defer(void) { for (;;) { if (!truncto) break; /* was only (if anything) one half-truncated record */ if (fseek(defer, truncto-1, SEEK_SET) < 0) - sysdie("seek in defer file %s while truncating partial", path_ductdefer); + sysdie("seek in defer file %s while truncating partial", path_defer); r= getc(defer); if (r==EOF) { if (ferror(defer)) - sysdie("failed read from defer file %s", path_ductdefer); + sysdie("failed read from defer file %s", path_defer); else - die("defer file %s shrank while we were checking it!", path_ductdefer); + die("defer file %s shrank while we were checking it!", path_defer); } if (r=='\n') break; truncto--; @@ -1702,19 +1725,19 @@ static void open_defer(void) { if (stab.st_size != truncto) { warn("truncating half-record at end of defer file %s -" " shrinking by %ld bytes from %ld to %ld", - path_ductdefer, orgsize - truncto, orgsize, truncto); + path_defer, orgsize - truncto, orgsize, truncto); if (fflush(defer)) - sysfatal("could not flush defer file %s", path_ductdefer); + sysfatal("could not flush defer file %s", path_defer); if (ftruncate(fileno(defer), truncto)) - sysdie("could not truncate defer file %s", path_ductdefer); + sysdie("could not truncate defer file %s", path_defer); } else { info("continuing existing defer file %s (%ld bytes)", - path_ductdefer, orgsize); + path_defer, orgsize); } if (fseek(defer, truncto, SEEK_SET)) - sysdie("could not seek to new end of defer file %s", path_ductdefer); + sysdie("could not seek to new end of defer file %s", path_defer); } static void close_defer(void) { @@ -1770,6 +1793,11 @@ static int search_backlog_file(void) { case 0: for (i=0; icancel_fd(fd); close(fd); - assert(!old_input_file); + assert(!flushing_input_file); if (WIFEXITED(status)) { switch (WEXITSTATUS(status)) { @@ -1848,16 +1876,17 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { case INNDCOMMCHILD_ESTATUS_NONESUCH: warn("feed has been dropped by innd, finishing up"); - old_input_file= main_input_file; + flushing_input_file= main_input_file; main_input_file= 0; SMS(DROPPING, 0, "dropped by innd"); return OOP_CONTINUE; case 0: - old_input_file= main_input_file; + flushing_input_file= main_input_file; main_input_file= open_input_file(feedfile); if (!main_input_file) die("flush succeeded but feedfile %s does not exist!", feedfile); + until_spontaneous_flush= spontaneous_flush_periods; SMS(SEPARATED, 0, "feed file missing"); return OOP_CONTINUE; @@ -1941,7 +1970,7 @@ static void postfork(const char *what) { sysdie("%s child: failed to reset SIGPIPE"); postfork_inputfile(main_input_file); - postfork_inputfile(old_input_file); + postfork_inputfile(flushing_input_file); postfork_conns(idle.head); postfork_conns(working.head); postfork_conns(full.head); @@ -1963,9 +1992,33 @@ EVERY(filepoll, {5,0}, { if (main_input_file && main_input_file->readable_callback) filemon_callback(main_input_file); }); + +#define DEBUGF_IPF(wh) " " #wh "=%p/%s:ip=%ld,off=%ld,fd=%d%s" \ +#define DEBUG_IPF(sh) \ + wh##_input_file, debug_ipf_path(wh##_input_file), \ + wh##_input_file->inprogress, (long)wh##_input_file->offset, \ + wh##_input_file->fd, wh##_input_file->rd ? "+" : "" +static const char *debug_ipf_path(InputFile *ipf) { + char *slash= strrchr(ipf->path,'/'); + return slash ? slash+1 : ipf->path; +} EVERY(period, {PERIOD_SECONDS,0}, { + debug("PERIOD" + " sms=%s queue=%d sm_period_counter=%d" + " connect_delay=%d until_spontaneous_flush=%d" + " input_files" DEBUGF_IPF(main) DEBUGF_IPF(old) DEBUGF_FMT(flushing) + " conns idle=%d working=%d full=%d" + " children connecting=%ld inndcomm_child" + , + sms_names[sms], queue.count, sm_period_counter, + connect_delay, until_spontaneous_flush, + DEBUG_IPF(main), DEBUG_IPF(flushing), DEBUG_IPF(flushing), + idle.count, working.count, full.count, + (long)connecting_child, (long)inndcomm_child + ); if (connect_delay) connect_delay--; + if (until_spontaneous_flush) until_spontaneous_flush--; poll_backlog_file(); if (!backlog_input_file) close_defer(); /* want to start on a new backlog */ statemc_poll(); @@ -2146,11 +2199,11 @@ int main(int argc, char **argv) { else if (feedfile[strlen(feedfile)-1]=='/') feedfile= xasprintf("%s%s",feedfile,sitename); - const char *feedfile_forbidden= "?*["; + const char *feedfile_forbidden= "?*[~#"; int c; while ((c= *feedfile_forbidden++)) if (strchr(feedfile, c)) - badusage("feed filename may not contain glob metacharacter %c",c); + badusage("feed filename may not contain metacharacter %c",c); if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) sysdie("could not ignore SIGPIPE");