X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;f=backends%2Finnduct.c;h=f64a453f621f205be0fed7eea366f652eff34b8d;hb=0b3c60f3feea900138009004d8b801ee9a03f58f;hp=a915b17c1e66db76380733be7fdee0947a367dc0;hpb=333df31c1aa4a2f0eaa14655993d44d0e101f183;p=innduct.git diff --git a/backends/innduct.c b/backends/innduct.c index a915b17..f64a453 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -15,28 +15,38 @@ * or might be blanked out * .... * - * site.name_duct.lock lock preventing multiple ducts - * holder of this lock is "duct" * F site.name main feed file * opened/created, then written, by innd * read by duct * unlinked by duct * tokens blanked out by duct when processed - * D site.name_duct temporary feed file during flush (or crash) + * site.name_lock lock preventing multiple ducts + * to hold lock must open,F_SETLK[W] + * and then stat to check that locked file + * still has name site.name_lock + * holder of this lock is "duct" + * (only) lockholder may remove the lockfile + * D site.name_flushing temporary feed file during flush (or crash) * hardlink created by duct * unlinked by duct - * site.name_duct.defer 431'd articles, still being written, + * site.name_defer 431'd articles, still being written, * created, written, used by duct - * site.name_backlog.lock lock taken out by innxmit wrapper - * holder and its child are "xmit" - * site.name_backlog_. + * + * site.name_backlog.. * 431'd articles, ready for innxmit or duct * created (link/mv) by duct - * site.name_backlog_ eg - * site.name_backlog_manual + * site.name_backlog (where does not + * contain '#' or '~') eg + * site.name_backlog.manual * anything the sysadmin likes (eg, feed files * from old feeds to be merged into this one) * created (link/mv) by admin + * may be symlinks (in which case links + * may be written through, but only links + * will be removed. + * + * It is safe to remove backlog files manually, + * if it's desired to throw away the backlog. * * Backlog files are also processed by innduct. We find the oldest * backlog file which is at least a certain amount old, and feed it @@ -286,7 +296,7 @@ static int nconns; static LIST(Conn) idle, working, full; static LIST(Article) *queue; -static char *path_ductlock, *path_duct, *path_ductdefer; +static char *path_lock, *path_flushing, *path_defer; #define SMS(newstate, periods, why) \ (statemc_setstate(sm_##newstate,(periods),#newstate,(why))) @@ -1058,7 +1068,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, open_defer(); if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0 || fflush(defer)) - sysfatal("write to defer file %s",path_ductdefer); + sysfatal("write to defer file %s",path_defer); article_done(conn, art, RC_deferred); break; @@ -1172,7 +1182,7 @@ typedef void *feedfile_got_article(oop_source *lp, oop_read *rd, (unsigned long)ipf->offset, (unsigned long)flush_threshold); int r= link(feedfile, duct_path); - if (r) sysdie("link feedfile %s to ductfile %s", feedfile, dut_path); + if (r) sysdie("link feedfile %s to flushing file %s", feedfile, dut_path); /* => Hardlinked */ r= unlink(feedfile); @@ -1393,9 +1403,9 @@ static void inputfile_tailing_stop(InputFile *ipf) { poll for F ================ | - | TIMEOUT + | TIMEOUT and no defer, no backlog |`--------------------------. - | | install defer as backlog + | | | OPEN F SUCCEEDS | exit ,--------->| V | V ========= @@ -1471,14 +1481,14 @@ static void statemc_init(void) { struct stat stab, stabf; int noent; - path_ductlock= xasprintf("%s_duct.lock", feedfile); - path_duct= xasprintf("%s_duct", feedfile); - path_ductdefer= xasprintf("%s_duct.defer", feedfile); - globpat_backlog= xasprintf("%s_backlog_*", feedfile); + path_lock= xasprintf("%s_lock", feedfile); + path_flushing= xasprintf("%s_flushing", feedfile); + path_defer= xasprintf("%s_defer", feedfile); + globpat_backlog= xasprintf("%s_backlog*", feedfile); for (;;) { - int lockfd= open(path_ductlock, O_CREAT|O_RDWR, 0600); - if (lockfd<0) sysfatal("open lockfile %s", path_ductlock); + int lockfd= open(path_lock, O_CREAT|O_RDWR, 0600); + if (lockfd<0) sysfatal("open lockfile %s", path_lock); struct flock fl; memset(&fl,0,sizeof(fl)); @@ -1490,26 +1500,31 @@ static void statemc_init(void) { if (quiet_if_locked) exit(0); fatal("another duct holds the lockfile"); } - sysdie("fcntl F_SETLK lockfile %s", path_ductlock); + sysdie("fcntl F_SETLK lockfile %s", path_lock); } xfstat_isreg(lockfd, &stabf, "lockfile"); - xlstat_isreg(path_ductlock, &stab, &noent, "lockfile"); + xlstat_isreg(path_lock, &stab, &noent, "lockfile"); + if (!noent && samefile(&stab, &stabf)) break; if (close(lockfd)) - sysdie("could not close stale lockfile %s", path_ductlock); + sysdie("could not close stale lockfile %s", path_lock); } debug("startup: locked"); - xlstat_isreg(path_ductdefer, &stab, &noent, "defer file"); + search_backlog_file(); + + xlstat_isreg(path_defer, &stab, &noent, "defer file"); if (noent) { debug("startup: ductdefer ENOENT"); } else { debug("startup: ductdefer nlink=%ld", (long)stab.st_nlink); switch (stab.st_nlink==1) { - case 1: /* ok */ break; + case 1: + open_defer(); /* so that we will later close it and rename it */ + break; case 2: if (unlink(path_defer)) sysdie("could not unlink stale defer file link %s (presumably" @@ -1521,7 +1536,7 @@ static void statemc_init(void) { } } - InputFile *file_d= open_input_file(path_duct); + InputFile *file_d= open_input_file(path_flushing); if (file_d) { struct stat stab_f, stab_d; @@ -1534,11 +1549,11 @@ static void statemc_init(void) { debug("startup: F and D both exist"); - xfstat_isreg(file_d->fd, &stab_d, "ductfile"); + xfstat_isreg(file_d->fd, &stab_d, "flushing file"); if (samefile(&stab_d, &stab_f)) { debug("startup: F==D => Hardlinked"); - r= unlink(path_duct); + r= unlink(path_flushing); if (r) sysdie("unlink feed file %s during startup", feedfile); found_moved: debug(" => Moved"); @@ -1598,8 +1613,13 @@ static void *statemc_check_input_done(oop_source *lp, if (ipf == backlog_input_file) { notice_processed(ipf,"backlog file",ipf->path); close_input_file(ipf); - if (unlink(ipf->path)) - sysdie("could not unlink done backlog file %s", ipf->path); + if (unlink(ipf->path)) { + if (errno != ENOENT) + sysdie("could not unlink processed backlog file %s", ipf->path); + warn("backlog file %s vanished while we were reading it" + " so we couldn't remove it (but it's done now, anyway)", + ipf->path); + } backlog_input_file= 0; search_backlog_file(); return; @@ -1612,12 +1632,17 @@ static void *statemc_check_input_done(oop_source *lp, close_defer(); - if (unlink(path_duct)) - sysdie("could not unlink old duct file %s", path_duct); + if (unlink(path_flushing)) + sysdie("could not unlink old flushing file %s", path_flushing); if (sms==sm_DROPPING) { - notice("feed dropped and our work is complete" - " (but check for backlog files)"); + if (search_backlog_file()) { + debug("feed dropped but still backlogs to process"); + return; + } + notice("feed dropped and our work is complete"); + r= unlink(path_lock); + if (r) sysdie("unlink lockfile for old feed %s", path_lock); exit(0); } @@ -1648,15 +1673,15 @@ static void open_defer(void) { if (defer) return; - defer= fopen(path_ductdefer, "a+"); - if (!defer) sysfatal("could not open defer file %s", path_ductdefer); + defer= fopen(path_defer, "a+"); + if (!defer) sysfatal("could not open defer file %s", path_defer); /* truncate away any half-written records */ xfstat_isreg(fileno(defer), &stab, "newly opened defer file"); if (stab.st_size > LONG_MAX) - die("defer file %s size is far too large", path_ductdefer); + die("defer file %s size is far too large", path_defer); if (!stab.st_size) return; @@ -1666,14 +1691,14 @@ static void open_defer(void) { for (;;) { if (!truncto) break; /* was only (if anything) one half-truncated record */ if (fseek(defer, truncto-1, SEEK_SET) < 0) - sysdie("seek in defer file %s while truncating partial", path_ductdefer); + sysdie("seek in defer file %s while truncating partial", path_defer); r= getc(defer); if (r==EOF) { if (ferror(defer)) - sysdie("failed read from defer file %s", path_ductdefer); + sysdie("failed read from defer file %s", path_defer); else - die("defer file %s shrank while we were checking it!", path_ductdefer); + die("defer file %s shrank while we were checking it!", path_defer); } if (r=='\n') break; truncto--; @@ -1682,19 +1707,19 @@ static void open_defer(void) { if (stab.st_size != truncto) { warn("truncating half-record at end of defer file %s -" " shrinking by %ld bytes from %ld to %ld", - path_ductdefer, orgsize - truncto, orgsize, truncto); + path_defer, orgsize - truncto, orgsize, truncto); if (fflush(defer)) - sysfatal("could not flush defer file %s", path_ductdefer); + sysfatal("could not flush defer file %s", path_defer); if (ftruncate(fileno(defer), truncto)) - sysdie("could not truncate defer file %s", path_ductdefer); + sysdie("could not truncate defer file %s", path_defer); } else { info("continuing existing defer file %s (%ld bytes)", - path_ductdefer, orgsize); + path_defer, orgsize); } if (fseek(defer, truncto, SEEK_SET)) - sysdie("could not seek to new end of defer file %s", path_ductdefer); + sysdie("could not seek to new end of defer file %s", path_defer); } static void close_defer(void) { @@ -1727,14 +1752,18 @@ static void poll_backlog_file(void) { search_backlog_file(); } -static void search_backlog_file(void) { +static int search_backlog_file(void) { + /* returns non-0 iff there are any backlog files */ + glob_t gl; int r; struct stat stab; const char *oldest_path=0; time_t oldest_mtime, now; - assert(!backlog_input_file); + if (backlog_input_file) return 3; + + try_again: r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl); @@ -1746,6 +1775,11 @@ static void search_backlog_file(void) { case 0: for (i=0; i