+ xfstat_isreg(lockfd, &stabf, "lockfile");
+ xlstat_isreg(path_ductlock, &stab, &noent, "lockfile");
+ if (!noent && samefile(&stab, &stabf))
+ break;
+
+ if (close(lockfd))
+ sysdie("could not close stale lockfile %s", path_ductlock);
+ }
+ debug("startup: locked");
+
+ xlstat_isreg(path_ductdefer, &stab, &noent, "defer file");
+ if (noent) {
+ debug("startup: ductdefer ENOENT");
+ } else {
+ debug("startup: ductdefer nlink=%ld", (long)stab.st_nlink);
+ switch (stab.st_nlink==1) {
+ case 1: /* ok */ break;
+ case 2:
+ if (unlink(path_defer))
+ sysdie("could not unlink stale defer file link %s (presumably"
+ " hardlink to backlog file)", path_defer);
+ break;
+ default:
+ die("defer file %s has unexpected link count %d",
+ path_defer, stab.st_nlink);
+ }
+ }
+
+ InputFile *file_d= open_input_file(path_duct);
+
+ if (file_d) {
+ struct stat stab_f, stab_d;
+
+ xlstat_isreg(feedfile, &stab_f, &noent, "feed file");
+ if (noent) {
+ debug("startup: D exists, F ENOENT => Moved");
+ goto found_moved;
+ }
+
+ debug("startup: F and D both exist");
+
+ xfstat_isreg(file_d->fd, &stab_d, "ductfile");
+
+ if (samefile(&stab_d, &stab_f)) {
+ debug("startup: F==D => Hardlinked");
+ r= unlink(path_duct);
+ if (r) sysdie("unlink feed file %s during startup", feedfile);
+ found_moved:
+ debug(" => Moved");
+ startup_set_input_file(file_d);
+ spawn_inndcomm_flush(); /* => Flushing, sets sms to sm_FLUSHING */
+ } else {
+ debug("F!=D => Separated");
+ SMS(SEPARATED, 0, "found both old and current feed files");
+ startup_set_input_file(file_d);
+ }
+ } else {
+ debug("startup: D ENOENT => Nothing");
+ SMS(WAITING, open_wait_periods, "no feed file currently exists");
+ }
+}
+
+static void statemc_poll(void) {
+ if (sms==sm_WAITING) { statemc_waiting_poll(); return; }
+
+ if (!sm_period_counter) return;
+ sm_period_counter--;
+ assert(sm_period_counter>=0);
+
+ if (sm_period_counter) return;
+ switch (sms) {
+ case sm_WAITING:
+ fatal("timed out waiting for innd to create feed file %s", feedfile);
+ case sm_FLUSHFAIL:
+ spawn_inndcomm_flush(void);
+ break;
+ default:
+ abort();
+ }
+}
+
+static void statemc_waiting_poll(void) {
+ InputFile *file_f= open_input_file(feedfile);
+ if (!file_f) return;
+ startup_set_input_file(file_d);
+ SMS(NORMAL, 0, "found and opened feed file");
+}
+
+static void startup_set_input_file(InputFile *f) {
+ assert(!main_input_file);
+ main_input_file= f;
+ inputfile_tailing_start(f);
+}
+
+static void *statemc_check_input_done(oop_source *lp,
+ struct timeval now, void *ipf_v) {
+ InputFile *ipf= ipf_v;
+ struct stat stab;
+
+ if (ipf->inprogress) return; /* new article in the meantime */
+ if (ipf->fd >= 0); return; /* not had EOF */
+
+ if (ipf == backlog_input_file) {
+ notice_processed(ipf,"backlog file",ipf->path);
+ close_input_file(ipf);
+ if (unlink(ipf->path))
+ sysdie("could not unlink done backlog file %s", ipf->path);
+ backlog_input_file= 0;
+ search_backlog_file();
+ return;
+ }
+
+ assert(ipf == old_input_file);
+ assert(sms==sm_SEPARATED || sms==sm_DROPPING);
+
+ notice_processed(ipf,"feed file",0);
+
+ close_defer();
+
+ if (unlink(path_duct))
+ sysdie("could not unlink old duct file %s", path_duct);
+
+ if (sms==sm_DROPPING) {
+ notice("feed dropped and our work is complete"
+ " (but check for backlog files)");
+ exit(0);
+ }
+
+ open_defer();
+
+ close_input_file(old_input_file);
+ old_input_file= 0;
+
+ notice("flush complete");
+ SMS(NORMAL, 0, "flush complete");
+}
+
+static void statemc_setstate(StateMachineState newsms, int periods,
+ const char *forlog, const char *why) {
+ sms= newsms;
+ sm_period_counter= periods;
+ if (periods) {
+ info("%s[%d] %s",periods,forlog,why);
+ } else {
+ info("%s %s",forlog,why);
+ }
+}
+
+/*---------- defer and backlog files ----------*/
+
+static void open_defer(void) {
+ struct stat stab;
+
+ if (defer) return;
+
+ defer= fopen(path_ductdefer, "a+");
+ if (!defer) sysfatal("could not open defer file %s", path_ductdefer);
+
+ /* truncate away any half-written records */
+
+ xfstat_isreg(fileno(defer), &stab, "newly opened defer file");
+
+ if (stab.st_size > LONG_MAX)
+ die("defer file %s size is far too large", path_ductdefer);
+
+ if (!stab.st_size)
+ return;
+
+ long orgsize= stab.st_size;
+ long truncto= stab.st_size;
+ for (;;) {
+ if (!truncto) break; /* was only (if anything) one half-truncated record */
+ if (fseek(defer, truncto-1, SEEK_SET) < 0)
+ sysdie("seek in defer file %s while truncating partial", path_ductdefer);
+
+ r= getc(defer);
+ if (r==EOF) {
+ if (ferror(defer))
+ sysdie("failed read from defer file %s", path_ductdefer);
+ else
+ die("defer file %s shrank while we were checking it!", path_ductdefer);
+ }
+ if (r=='\n') break;
+ truncto--;
+ }
+
+ if (stab.st_size != truncto) {
+ warn("truncating half-record at end of defer file %s -"
+ " shrinking by %ld bytes from %ld to %ld",
+ path_ductdefer, orgsize - truncto, orgsize, truncto);
+
+ if (fflush(defer))
+ sysfatal("could not flush defer file %s", path_ductdefer);
+ if (ftruncate(fileno(defer), truncto))
+ sysdie("could not truncate defer file %s", path_ductdefer);
+
+ } else {
+ info("continuing existing defer file %s (%ld bytes)",
+ path_ductdefer, orgsize);
+ }
+ if (fseek(defer, truncto, SEEK_SET))
+ sysdie("could not seek to new end of defer file %s", path_ductdefer);
+}
+
+static void close_defer(void) {
+ if (!defer)
+ return;
+
+ xfstat(fileno(defer), &stab, "defer file");
+
+ if (fclose(defer)) sysfatal("could not close defer file %s", path_defer);
+ defer= 0;
+
+ char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile,
+ (unsigned long)now.tv_sec,
+ (unsigned long)stab.st_ino);
+ if (link(path_defer, path_backlog))
+ sysfatal("could not install defer file %s as backlog file %s",
+ path_defer, backlog);
+ if (unlink(path_defer))
+ sysdie("could not unlink old defer link %s to backlog file %s",
+ path_defer, backlog);
+
+ if (backlog_nextscan_periods < 0 ||
+ backlog_nextscan_periods > backlog_retry_minperiods + 1)
+ backlog_nextscan_periods= backlog_retry_minperiods + 1;
+}
+
+static void poll_backlog_file(void) {
+ if (backlog_nextscan_periods < 0) return;
+ if (backlog_nextscan_periods-- > 0) return;
+ search_backlog_file();
+}
+
+static void search_backlog_file(void) {
+ glob_t gl;
+ int r;
+ struct stat stab;
+ const char *oldest_path=0;
+ time_t oldest_mtime, now;
+
+ assert(!backlog_input_file);
+
+ r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
+
+ switch (r) {
+ case GLOB_ABORTED:
+ sysdie("failed to expand backlog pattern %s", globpat_backlog);
+ case GLOB_NOSPACE:
+ die("out of memory expanding backlog pattern %s", globpat_backlog);
+ case 0:
+ for (i=0; i<gl.gl_pathc; i++) {
+ const char *path= gl.gl_pathv[i];
+ r= stat(path, &stab);
+ if (r) {
+ syswarn("failed to stat backlog file %s", path);
+ continue;
+ }
+ if (!S_ISREG(stab.st_mode)) {
+ warn("backlog file %s is not a plain file (or link to one)", path);
+ continue;
+ }
+ if (!oldest_path || stab.st_mtime < oldest_mtime) {
+ oldest_path= path;
+ oldest_mtime= stab.st_mtime;
+ }
+ }
+ case GLOB_NOMATCH: /* fall through */
+ break;
+ default:
+ sysdie("glob expansion of backlog pattern %s gave unexpected"
+ " nonzero (error?) return value %d", globpat_backlog, r);
+ }
+
+ globfree(&gl);
+
+ if (!oldest_path) {
+ debug("backlog scan: none");
+ backlog_nextscan_periods= backlog_spontaneous_rescan_periods;
+ return;
+ }
+
+ now= time(); if (now==-1) sysdie("time(2) failed");
+ double age= difftime(now, oldest_mtime);
+ long age_deficiency= (backlog_retry_minperiods * PERIOD_SECONDS) - age;
+
+ if (age_deficiency <= 0) {
+ debug("backlog scan: found age=%f deficiency=%ld oldest=%s",
+ age, age_deficiency, oldest_path);
+
+ backlog_input_file= open_input_file();
+ inputfile_tailing_start(backlog_input_file);
+ backlog_nextscan_periods= -1;
+ return;
+ }
+
+ backlog_nextscan_periods= age_deficiency / PERIOD_SECONDS;
+
+ if (backlog_spontaneous_rescan_periods >= 0 &&
+ backlog_nextscan_periods > backlog_spontaneous_rescan_periods)
+ backlog_nextscan_periods= backlog_spontaneous_rescan_periods;
+
+ debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
+ age, age_deficiency, backlog_nextscan_periods, oldest_path);
+}
+
+/*========== flushing the feed ==========*/
+
+static pid_t inndcomm_child;
+
+static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
+ assert(inndcomm_child);
+ int status= xwaitpid(&inndcomm_child, "inndcomm");
+ loop->cancel_fd(fd);
+ close(fd);
+
+ assert(!old_input_file);
+
+ if (WIFEXITED(status)) {
+ switch (WEXITSTATUS(status)) {
+
+ case INNDCOMMCHILD_ESTATUS_FAIL:
+ goto failed;
+
+ case INNDCOMMCHILD_ESTATUS_NONESUCH:
+ warn("feed has been dropped by innd, finishing up");
+ old_input_file= main_input_file;
+ main_input_file= 0;
+ SMS(DROPPING, 0, "dropped by innd");
+ return OOP_CONTINUE;
+
+ case 0:
+ old_input_file= main_input_file;
+ main_input_file= open_input_file(feedfile);
+ if (!main_input_file)
+ die("flush succeeded but feedfile %s does not exist!", feedfile);
+ SMS(SEPARATED, 0, "feed file missing");
+ return OOP_CONTINUE;
+
+ default:
+ goto unexpected_exitstatus;
+
+ }
+ } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
+ warn("flush timed out trying to talk to innd");
+ goto failed;
+ } else {
+ unexpected_exitstatus:
+ report_child_status("inndcomm child", status);
+ }
+
+ failed:
+ SMS(FLUSHFAIL, flushfail_retry_periods, "flush failed, will retry");
+}
+
+static void inndcommfail(const char *what) {
+ syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
+ exit(INNDCOMMCHILD_ESTATUS_FAIL);
+}
+
+void spawn_inndcomm_flush(void) {
+ int pipefds[2];
+
+ assert(sms==sm_NORMAL || sms==sm_FLUSHFAIL);
+ assert(!inndcomm_child);
+
+ if (pipe(pipefds)) sysdie("create pipe for inndcomm child sentinel");
+
+ inndcomm_child= xfork();
+
+ if (!inndcomm_child) {
+ static char flushargv[2]= { sitename, 0 };
+ char *reply;
+
+ close(pipefds[0]);
+
+ alarm(inndcomm_flush_timeout);
+ r= ICCopen(); if (r) inndcommfail("connect");
+ r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit");
+ if (!r) exit(0); /* yay! */
+
+ if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
+ syswarn("innd ctlinnd flush failed: innd said %s", reply);
+ exit(INNDCOMMCHILD_ESTATUS_FAIL);
+ }
+
+ close(pipefds[1]);
+ int sentinel_fd= pipefds[0];
+ on_fd_read_except(sentinel_fd, inndcomm_event);
+
+ SMS(FLUSHING, 0, "flush is in progress");
+}
+
+/*========== main program ==========*/
+
+static void postfork_inputfile(InputFile *ipf) {
+ if (!ipf) return;
+ assert(ipf->fd >= 0);
+ close(ipf->fd);
+ ipf->fd= -1;
+}
+
+static void postfork_conns(Connection *conn) {
+ while (conn) {
+ close(conn->fd);
+ conn= conn->next;