+static void startup_set_input_file(InputFile *f) {
+ assert(!main_input_file);
+ main_input_file= f;
+ inputfile_tailing_start(f);
+}
+
+static void statemc_init(void) {
+ struct stat stab, stabf;
+
+ path_lock= xasprintf("%s_lock", feedfile);
+ path_flushing= xasprintf("%s_flushing", feedfile);
+ path_defer= xasprintf("%s_defer", feedfile);
+ globpat_backlog= xasprintf("%s_backlog*", feedfile);
+
+ for (;;) {
+ int lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
+ if (lockfd<0) sysfatal("open lockfile %s", path_lock);
+
+ struct flock fl;
+ memset(&fl,0,sizeof(fl));
+ fl.l_type= F_WRLCK;
+ fl.l_whence= SEEK_SET;
+ int r= fcntl(lockfd, F_SETLK, &fl);
+ if (r==-1) {
+ if (errno==EACCES || errno==EAGAIN) {
+ if (quiet_multiple) exit(0);
+ fatal("another duct holds the lockfile");
+ }
+ sysdie("fcntl F_SETLK lockfile %s", path_lock);
+ }
+
+ xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
+ int lock_noent;
+ xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
+
+ if (!lock_noent && samefile(&stab, &stabf))
+ break;
+
+ if (close(lockfd))
+ sysdie("could not close stale lockfile %s", path_lock);
+ }
+ debug("startup: locked");
+
+ search_backlog_file();
+
+ int defer_noent;
+ xlstat_isreg(path_defer, &stab, &defer_noent, "defer file");
+ if (defer_noent) {
+ debug("startup: ductdefer ENOENT");
+ } else {
+ debug("startup: ductdefer nlink=%ld", (long)stab.st_nlink);
+ switch (stab.st_nlink==1) {
+ case 1:
+ open_defer(); /* so that we will later close it and rename it */
+ break;
+ case 2:
+ xunlink(path_defer, "stale defer file link"
+ " (presumably hardlink to backlog file)");
+ break;
+ default:
+ die("defer file %s has unexpected link count %d",
+ path_defer, stab.st_nlink);
+ }
+ }
+
+ struct stat stab_f, stab_d;
+ int noent_f;
+
+ InputFile *file_d= open_input_file(path_flushing);
+ if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
+
+ xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
+
+ if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
+ debug("startup: F==D => Hardlinked");
+ xunlink(feedfile, "feed file (during startup)"); /* => Moved */
+ noent_f= 1;
+ }
+
+ if (noent_f) {
+ debug("startup: F ENOENT => Moved");
+ if (file_d) startup_set_input_file(file_d);
+ spawn_inndcomm_flush("feedfile missing at startup");
+ /* => Flushing, sms:=FLUSHING */
+ } else {
+ if (file_d) {
+ debug("startup: F!=D => Separated");
+ startup_set_input_file(file_d);
+ SMS(SEPARATED, 0, "found both old and current feed files");
+ } else {
+ debug("startup: F exists, D ENOENT => Normal");
+ InputFile *file_f= open_input_file(feedfile);
+ if (!file_f) die("feed file vanished during startup");
+ startup_set_input_file(file_f);
+ SMS(NORMAL, flushfail_retry_periods, "normal startup");
+ }
+ }
+}
+
+static void statemc_start_flush(const char *why) { /* Normal => Flushing */
+ assert(sms == sm_NORMAL);
+
+ debug("starting flush (%s) (%lu >?= %lu) (%d)",
+ why,
+ (unsigned long)(main_input_file ? main_input_file->offset : 0),
+ (unsigned long)target_max_feedfile_size,
+ sm_period_counter);
+
+ int r= link(feedfile, path_flushing);
+ if (r) sysdie("link feedfile %s to flushing file %s",
+ feedfile, path_flushing);
+ /* => Hardlinked */
+
+ xunlink(feedfile, "old feedfile link");
+ /* => Moved */
+
+ spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
+}
+
+static void statemc_period_poll(void) {
+ if (!sm_period_counter) return;
+ sm_period_counter--;
+ assert(sm_period_counter>=0);
+
+ if (sm_period_counter) return;
+ switch (sms) {
+ case sm_NORMAL:
+ statemc_start_flush("periodic"); /* Normal => Flushing; => FLUSHING */
+ break;
+ case sm_FLUSHFAILED:
+ spawn_inndcomm_flush("retry"); /* Moved => Flushing; => FLUSHING */
+ break;
+ default:
+ abort();
+ }
+}
+
+static int inputfile_is_done(InputFile *ipf) {
+ if (!ipf) return 0;
+ if (ipf->inprogress) return 0; /* new article in the meantime */
+ if (ipf->fd >= 0); return 0; /* not had EOF */
+ return 1;
+}
+
+static void notice_processed(InputFile *ipf, const char *what,
+ const char *spec) {
+#define RCI_NOTHING(x) /* nothing */
+#define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
+#define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
+
+#define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
+
+ info("processed %s%s read=%d(+%dbl,+%derr)"
+ " offered=%d(ch%d,nc%d) accepted=%d(ch%d+nc%d)"
+ RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
+ ,
+ what, spec,
+ ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err,
+ CNT(Unchecked,sent) + CNT(Unsolicited,sent)
+ , CNT(Unchecked,sent), CNT(Unsolicited,sent),
+ CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
+ , CNT(Wanted,accepted), CNT(Unsolicited,accepted)
+ RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_VALS)
+ );
+
+#undef CNT
+}
+
+static void statemc_check_backlog_done(void) {
+ InputFile *ipf= backlog_input_file;
+ if (!inputfile_is_done(ipf)) return;
+
+ const char *slash= strrchr(ipf->path, '/');
+ const char *leaf= slash ? slash+1 : ipf->path;
+ const char *under= strchr(slash, '_');
+ const char *rest= under ? under+1 : leaf;
+ if (!strncmp(rest,"backlog",7)) rest += 7;
+ notice_processed(ipf,"backlog:",rest);
+
+ close_input_file(ipf);
+ if (unlink(ipf->path)) {
+ if (errno != ENOENT)
+ sysdie("could not unlink processed backlog file %s", ipf->path);
+ warn("backlog file %s vanished while we were reading it"
+ " so we couldn't remove it (but it's done now, anyway)",
+ ipf->path);
+ }
+ free(ipf);
+ backlog_input_file= 0;
+ search_backlog_file();
+ return;
+}
+
+static void statemc_check_flushing_done(void) {
+ InputFile *ipf= flushing_input_file;
+ if (!inputfile_is_done(ipf)) return;
+
+ assert(sms==sm_SEPARATED || sms==sm_DROPPING);
+
+ notice_processed(ipf,"feedfile",0);
+
+ close_defer();
+
+ xunlink(path_flushing, "old flushing file");
+
+ close_input_file(flushing_input_file);
+ free(flushing_input_file);
+ flushing_input_file= 0;
+
+ if (sms==sm_SEPARATED) {
+ notice("flush complete");
+ SMS(NORMAL, 0, "flush complete");
+ } else if (sms==sm_DROPPING) {
+ SMS(DROPPED, 0, "old flush complete");
+ search_backlog_file();
+ notice("feed dropped, but will continue until backlog is finished");
+ }
+}
+
+static void *statemc_check_input_done(oop_source *lp, struct timeval now,
+ void *u) {
+ assert(!inputfile_is_done(main_input_file));
+ statemc_check_flushing_done();
+ statemc_check_backlog_done();
+ return OOP_CONTINUE;
+}
+
+static void queue_check_input_done(void) {
+ loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
+}
+
+static void statemc_setstate(StateMachineState newsms, int periods,
+ const char *forlog, const char *why) {
+ sms= newsms;
+ sm_period_counter= periods;
+
+ const char *xtra= "";
+ switch (sms) {
+ case sm_FLUSHING: sm_FLUSHFAILED:
+ if (!main_input_file) xtra= "-ABSENT";
+ break;
+ case sm_SEPARATED: case sm_DROPPING:
+ xtra= flushing_input_file->fd >= 0 ? "-1" : "-2";
+ break;
+ default:;
+ }
+
+ if (periods) {
+ info("%s%s[%d] %s",forlog,xtra,periods,why);
+ } else {
+ info("%s%s %s",forlog,xtra,why);
+ }
+}
+
+/*---------- defer and backlog files ----------*/
+