+ xunlink(feedfile, "old feedfile link");
+ /* => Moved */
+
+ spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
+}
+
+static int trigger_flush_ok(void) { /* => Flushing,FLUSHING, ret 1; or ret 0 */
+ switch (sms) {
+
+ case sm_NORMAL:
+ statemc_start_flush("periodic"); /* Normal => Flushing; => FLUSHING */
+ return 1;
+
+ case sm_FLUSHFAILED:
+ spawn_inndcomm_flush("retry"); /* Moved => Flushing; => FLUSHING */
+ return 1;
+
+ case sm_SEPARATED:
+ case sm_DROPPING:
+ warn("took too long to complete old feedfile after flush, autodeferring");
+ assert(flushing_input_file);
+ autodefer_input_file(flushing_input_file);
+ return 1;
+
+ default:
+ return 0;
+ }
+}
+
+static void statemc_period_poll(void) {
+ if (!until_flush) return;
+ until_flush--;
+ assert(until_flush>=0);
+
+ if (until_flush) return;
+ int ok= trigger_flush_ok();
+ assert(ok);
+}
+
+static int inputfile_is_done(InputFile *ipf) {
+ if (!ipf) return 0;
+ if (ipf->inprogress) return 0; /* new article in the meantime */
+ if (ipf->rd) return 0; /* not had EOF */
+ return 1;
+}
+
+static void notice_processed(InputFile *ipf, int completed,
+ const char *what, const char *spec) {
+ if (!ipf) return; /* allows preterminate to be lazy */
+
+#define RCI_NOTHING(x) /* nothing */
+#define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
+#define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
+
+#define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
+
+ char *inprog= completed
+ ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
+ : xasprintf(" inprogress=%ld", ipf->inprogress);
+ char *autodefer= ipf->autodefer >= 0
+ ? xasprintf(" autodeferred=%ld", ipf->autodefer)
+ : xasprintf("%s","");
+
+ info("%s %s%s read=%d (+bl=%d,+err=%d)%s%s"
+ " missing=%d offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
+ RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
+ ,
+ completed?"completed":"processed", what, spec,
+ ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err,
+ inprog, autodefer, ipf->count_nooffer_missing,
+ CNT(Unchecked,sent) + CNT(Unsolicited,sent)
+ , CNT(Unchecked,sent), CNT(Unsolicited,sent),
+ CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
+ , CNT(Wanted,accepted), CNT(Unsolicited,accepted)
+ RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_VALS)
+ );
+
+ free(inprog);
+ free(autodefer);
+
+#undef CNT
+}
+
+static void statemc_check_backlog_done(void) {
+ InputFile *ipf= backlog_input_file;
+ if (!inputfile_is_done(ipf)) return;
+
+ const char *slash= strrchr(ipf->path, '/');
+ const char *leaf= slash ? slash+1 : ipf->path;
+ const char *under= strchr(slash, '_');
+ const char *rest= under ? under+1 : leaf;
+ if (!strncmp(rest,"backlog",7)) rest += 7;
+ notice_processed(ipf,1,"backlog ",rest);
+
+ close_input_file(ipf);
+ if (unlink(ipf->path)) {
+ if (errno != ENOENT)
+ sysdie("could not unlink processed backlog file %s", ipf->path);
+ warn("backlog file %s vanished while we were reading it"
+ " so we couldn't remove it (but it's done now, anyway)",
+ ipf->path);
+ }
+ free(ipf);
+ backlog_input_file= 0;
+ search_backlog_file();
+ return;
+}
+
+static void statemc_check_flushing_done(void) {
+ InputFile *ipf= flushing_input_file;
+ if (!inputfile_is_done(ipf)) return;
+
+ assert(sms==sm_SEPARATED || sms==sm_DROPPING);
+
+ notice_processed(ipf,1,"feedfile","");
+
+ close_defer();
+
+ xunlink(path_flushing, "old flushing file");
+
+ close_input_file(flushing_input_file);
+ free(flushing_input_file);
+ flushing_input_file= 0;
+
+ if (sms==sm_SEPARATED) {
+ notice("flush complete");
+ SMS(NORMAL, spontaneous_flush_periods, "flush complete");
+ } else if (sms==sm_DROPPING) {
+ SMS(DROPPED, max_separated_periods, "old flush complete");
+ search_backlog_file();
+ notice("feed dropped, but will continue until backlog is finished");
+ }
+}
+
+static void *statemc_check_input_done(oop_source *lp, struct timeval now,
+ void *u) {
+ assert(!inputfile_is_done(main_input_file));
+ statemc_check_flushing_done();
+ statemc_check_backlog_done();
+ return OOP_CONTINUE;
+}
+
+static void queue_check_input_done(void) {
+ loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
+}
+
+static void statemc_setstate(StateMachineState newsms, int periods,
+ const char *forlog, const char *why) {
+ sms= newsms;
+ until_flush= periods;
+
+ const char *xtra= "";
+ switch (sms) {
+ case sm_FLUSHING:
+ case sm_FLUSHFAILED:
+ if (!main_input_file) xtra= "-ABSENT";
+ break;
+ case sm_SEPARATED:
+ case sm_DROPPING:
+ xtra= flushing_input_file->rd ? "-1" : "-2";
+ break;
+ default:;
+ }
+
+ if (periods) {
+ info("state %s%s[%d] %s",forlog,xtra,periods,why);
+ } else {
+ info("state %s%s %s",forlog,xtra,why);
+ }
+}
+
+/*---------- defer and backlog files ----------*/
+
+static void open_defer(void) {
+ struct stat stab;
+
+ if (defer) return;
+
+ defer= fopen(path_defer, "a+");
+ if (!defer) sysfatal("could not open defer file %s", path_defer);
+
+ /* truncate away any half-written records */
+
+ xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
+
+ if (stab.st_size > LONG_MAX)
+ die("defer file %s size is far too large", path_defer);
+
+ if (!stab.st_size)
+ return;
+
+ long orgsize= stab.st_size;
+ long truncto= stab.st_size;
+ for (;;) {
+ if (!truncto) break; /* was only (if anything) one half-truncated record */
+ if (fseek(defer, truncto-1, SEEK_SET) < 0)
+ sysdie("seek in defer file %s while truncating partial", path_defer);
+
+ int r= getc(defer);
+ if (r==EOF) {
+ if (ferror(defer))
+ sysdie("failed read from defer file %s", path_defer);
+ else
+ die("defer file %s shrank while we were checking it!", path_defer);
+ }
+ if (r=='\n') break;
+ truncto--;
+ }
+
+ if (stab.st_size != truncto) {
+ warn("truncating half-record at end of defer file %s -"
+ " shrinking by %ld bytes from %ld to %ld",
+ path_defer, orgsize - truncto, orgsize, truncto);
+
+ if (fflush(defer))
+ sysfatal("could not flush defer file %s", path_defer);
+ if (ftruncate(fileno(defer), truncto))
+ sysdie("could not truncate defer file %s", path_defer);
+
+ } else {
+ info("continuing existing defer file %s (%ld bytes)",
+ path_defer, orgsize);
+ }
+ if (fseek(defer, truncto, SEEK_SET))
+ sysdie("could not seek to new end of defer file %s", path_defer);
+}
+
+static void close_defer(void) {
+ if (!defer)
+ return;
+
+ struct stat stab;
+ xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
+
+ if (fclose(defer)) sysfatal("could not close defer file %s", path_defer);
+ defer= 0;
+
+ time_t now= xtime();
+
+ char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile,
+ (unsigned long)now,
+ (unsigned long)stab.st_ino);
+ if (link(path_defer, backlog))
+ sysfatal("could not install defer file %s as backlog file %s",
+ path_defer, backlog);
+ if (unlink(path_defer))
+ sysdie("could not unlink old defer link %s to backlog file %s",
+ path_defer, backlog);
+
+ free(backlog);
+
+ if (until_backlog_nextscan < 0 ||
+ until_backlog_nextscan > backlog_retry_minperiods + 1)
+ until_backlog_nextscan= backlog_retry_minperiods + 1;
+}
+
+static void poll_backlog_file(void) {
+ if (until_backlog_nextscan < 0) return;
+ if (until_backlog_nextscan-- > 0) return;
+ search_backlog_file();
+}
+
+static void search_backlog_file(void) {
+ /* returns non-0 iff there are any backlog files */
+
+ glob_t gl;
+ int r, i;
+ struct stat stab;
+ const char *oldest_path=0;
+ time_t oldest_mtime=0, now;
+
+ if (backlog_input_file) return;
+
+ try_again:
+
+ r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
+
+ switch (r) {
+ case GLOB_ABORTED:
+ sysfatal("failed to expand backlog pattern %s", globpat_backlog);
+ case GLOB_NOSPACE:
+ fatal("out of memory expanding backlog pattern %s", globpat_backlog);
+ case 0:
+ for (i=0; i<gl.gl_pathc; i++) {
+ const char *path= gl.gl_pathv[i];
+
+ if (strchr(path,'#') || strchr(path,'~')) {
+ debug("backlog file search skipping %s", path);
+ continue;
+ }
+ r= stat(path, &stab);
+ if (r) {
+ syswarn("failed to stat backlog file %s", path);
+ continue;
+ }
+ if (!S_ISREG(stab.st_mode)) {
+ warn("backlog file %s is not a plain file (or link to one)", path);
+ continue;
+ }
+ if (!oldest_path || stab.st_mtime < oldest_mtime) {
+ oldest_path= path;
+ oldest_mtime= stab.st_mtime;
+ }
+ }
+ case GLOB_NOMATCH: /* fall through */
+ break;
+ default:
+ sysdie("glob expansion of backlog pattern %s gave unexpected"
+ " nonzero (error?) return value %d", globpat_backlog, r);
+ }
+
+ if (!oldest_path) {
+ debug("backlog scan: none");
+
+ if (sms==sm_DROPPED) {
+ preterminate();
+ notice("feed dropped and our work is complete");
+
+ int r= unlink(path_control);
+ if (r && errno!=ENOENT)
+ syswarn("failed to remove control symlink for old feed");
+
+ xunlink(path_lock, "lockfile for old feed");
+ exit(4);
+ }
+ until_backlog_nextscan= backlog_spontrescan_periods;
+ goto xfree;
+ }
+
+ now= xtime();
+ double age= difftime(now, oldest_mtime);
+ long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
+
+ if (age_deficiency <= 0) {
+ debug("backlog scan: found age=%f deficiency=%ld oldest=%s",
+ age, age_deficiency, oldest_path);
+
+ backlog_input_file= open_input_file(oldest_path);
+ if (!backlog_input_file) {
+ warn("backlog file %s vanished as we opened it", oldest_path);
+ globfree(&gl);
+ goto try_again;
+ }
+ inputfile_reading_start(backlog_input_file);
+ until_backlog_nextscan= -1;
+ goto xfree;
+ }
+
+ until_backlog_nextscan= age_deficiency / period_seconds;
+
+ if (backlog_spontrescan_periods >= 0 &&
+ until_backlog_nextscan > backlog_spontrescan_periods)
+ until_backlog_nextscan= backlog_spontrescan_periods;
+
+ debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
+ age, age_deficiency, until_backlog_nextscan, oldest_path);
+
+ xfree:
+ globfree(&gl);
+ return;
+}
+
+/*---------- shutdown and signal handling ----------*/
+
+static void preterminate(void) {
+ if (in_child) return;
+ notice_processed(main_input_file,0,"feedfile","");
+ notice_processed(flushing_input_file,0,"flushing","");
+ if (backlog_input_file)
+ notice_processed(backlog_input_file,0, "backlog file ",
+ backlog_input_file->path);
+}
+
+static int signal_self_pipe[2];
+static sig_atomic_t terminate_sig_flag;
+
+static void raise_default(int signo) {
+ xsigsetdefault(signo);
+ raise(signo);
+ abort();
+}
+
+static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
+ assert(fd=signal_self_pipe[0]);
+ char buf[PIPE_BUF];
+ int r= read(signal_self_pipe[0], buf, sizeof(buf));
+ if (r<0 && !isewouldblock(errno)) sysdie("failed to read signal self pipe");
+ if (r==0) die("eof on signal self pipe");
+ if (terminate_sig_flag) {
+ preterminate();
+ notice("terminating (%s)", strsignal(terminate_sig_flag));
+ raise_default(terminate_sig_flag);
+ }
+ return OOP_CONTINUE;
+}
+
+static void sigarrived_handler(int signum) {
+ static char x;
+ switch (signum) {
+ case SIGTERM:
+ case SIGINT:
+ if (!terminate_sig_flag) terminate_sig_flag= signum;
+ break;
+ default:
+ abort();
+ }
+ write(signal_self_pipe[1],&x,1);
+}
+
+static void init_signals(void) {
+ if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
+ sysdie("could not ignore SIGPIPE");
+
+ if (pipe(signal_self_pipe)) sysfatal("create self-pipe for signals");
+
+ xsetnonblock(signal_self_pipe[0],1);
+ xsetnonblock(signal_self_pipe[1],1);
+
+ struct sigaction sa;
+ memset(&sa,0,sizeof(sa));
+ sa.sa_handler= sigarrived_handler;
+ sa.sa_flags= SA_RESTART;
+ xsigaction(SIGTERM,&sa);
+ xsigaction(SIGINT,&sa);
+
+ on_fd_read_except(signal_self_pipe[0], sigarrived_event);
+}
+
+/*========== flushing the feed ==========*/
+
+static pid_t inndcomm_child;
+static int inndcomm_sentinel_fd;
+
+static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
+ assert(inndcomm_child);
+ assert(fd == inndcomm_sentinel_fd);
+ int status= xwaitpid(&inndcomm_child, "inndcomm");
+ inndcomm_child= 0;
+
+ cancel_fd_read_except(fd);
+ xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
+ inndcomm_sentinel_fd= 0;
+
+ assert(!flushing_input_file);
+
+ if (WIFEXITED(status)) {
+ switch (WEXITSTATUS(status)) {
+
+ case INNDCOMMCHILD_ESTATUS_FAIL:
+ goto failed;
+
+ case INNDCOMMCHILD_ESTATUS_NONESUCH:
+ notice("feed has been dropped by innd, finishing up");
+ flushing_input_file= main_input_file;
+ tailing_queue_readable(flushing_input_file);
+ /* we probably previously returned EAGAIN from our fake read method
+ * when in fact we were at EOF, so signal another readable event
+ * so we actually see the EOF */
+
+ main_input_file= 0;
+
+ if (flushing_input_file) {
+ SMS(DROPPING, max_separated_periods,
+ "feed dropped by innd, but must finish last flush");
+ } else {
+ close_defer();
+ SMS(DROPPED, 0, "feed dropped by innd");
+ search_backlog_file();
+ }
+ return OOP_CONTINUE;
+
+ case 0:
+ /* as above */
+ flushing_input_file= main_input_file;
+ tailing_queue_readable(flushing_input_file);
+
+ main_input_file= open_input_file(feedfile);
+ if (!main_input_file)
+ die("flush succeeded but feedfile %s does not exist!", feedfile);
+
+ if (flushing_input_file) {
+ SMS(SEPARATED, max_separated_periods, "recovery flush complete");
+ } else {
+ close_defer();
+ SMS(NORMAL, spontaneous_flush_periods, "flush complete");
+ }
+ return OOP_CONTINUE;
+
+ default:
+ goto unexpected_exitstatus;
+
+ }
+ } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
+ warn("flush timed out trying to talk to innd");
+ goto failed;
+ } else {
+ unexpected_exitstatus:
+ report_child_status("inndcomm child", status);
+ }
+
+ failed:
+ SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
+ return OOP_CONTINUE;
+}
+
+static void inndcommfail(const char *what) {
+ syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
+ exit(INNDCOMMCHILD_ESTATUS_FAIL);
+}
+
+void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
+ int pipefds[2];
+
+ notice("flushing %s",why);
+
+ assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
+ assert(!inndcomm_child);
+ assert(!inndcomm_sentinel_fd);
+
+ if (pipe(pipefds)) sysfatal("create pipe for inndcomm child sentinel");
+
+ inndcomm_child= xfork("inndcomm child");
+
+ if (!inndcomm_child) {
+ const char *flushargv[2]= { sitename, 0 };
+ char *reply;
+ int r;
+
+ xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
+ /* parent spots the autoclose of pipefds[1] when we die or exit */
+
+ if (simulate_flush>=0) {
+ warn("SIMULATING flush child status %d", simulate_flush);
+ if (simulate_flush>128) raise(simulate_flush-128);
+ else exit(simulate_flush);
+ }
+
+ alarm(inndcomm_flush_timeout);
+ r= ICCopen(); if (r) inndcommfail("connect");
+ r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit");
+ if (!r) exit(0); /* yay! */
+
+ if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
+ syswarn("innd ctlinnd flush failed: innd said %s", reply);
+ exit(INNDCOMMCHILD_ESTATUS_FAIL);
+ }
+
+ simulate_flush= -1;
+
+ xclose(pipefds[1], "inndcomm sentinel child's end",0);
+ inndcomm_sentinel_fd= pipefds[0];
+ assert(inndcomm_sentinel_fd);
+ on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
+
+ SMS(FLUSHING, 0, why);
+}
+
+/*========== main program ==========*/
+
+static void postfork_inputfile(InputFile *ipf) {
+ if (!ipf) return;
+ xclose(ipf->fd, "(in child) input file ", ipf->path);
+}
+
+static void postfork_stdio(FILE *f, const char *what, const char *what2) {
+ /* we have no stdio streams that are buffered long-term */
+ if (!f) return;
+ if (fclose(f)) sysdie("(in child) close %s%s", what, what2?what2:0);
+}
+
+static void postfork(void) {
+ in_child= 1;
+
+ xsigsetdefault(SIGTERM);
+ xsigsetdefault(SIGINT);
+ xsigsetdefault(SIGPIPE);
+ if (terminate_sig_flag) raise(terminate_sig_flag);
+
+ postfork_inputfile(main_input_file);
+ postfork_inputfile(flushing_input_file);
+
+ Conn *conn;
+ FOR_CONN(conn)
+ conn_closefd(conn,"(in child) ");
+
+ postfork_stdio(defer, "defer file ", path_defer);
+}
+
+typedef struct Every Every;
+struct Every {
+ struct timeval interval;
+ int fixed_rate;
+ void (*f)(void);
+};
+
+static void every_schedule(Every *e, struct timeval base);
+
+static void *every_happens(oop_source *lp, struct timeval base, void *e_v) {
+ Every *e= e_v;
+ e->f();
+ if (!e->fixed_rate) xgettimeofday(&base);
+ every_schedule(e, base);
+ return OOP_CONTINUE;
+}
+
+static void every_schedule(Every *e, struct timeval base) {
+ struct timeval when;
+ timeradd(&base, &e->interval, &when);
+ loop->on_time(loop, when, every_happens, e);
+}
+
+static void every(int interval, int fixed_rate, void (*f)(void)) {
+ NEW_DECL(Every *,e);
+ e->interval.tv_sec= interval;
+ e->interval.tv_usec= 0;
+ e->fixed_rate= fixed_rate;
+ e->f= f;
+ struct timeval now;
+ xgettimeofday(&now);
+ every_schedule(e, now);
+}
+
+static void filepoll(void) {
+ filemon_callback(main_input_file);
+ filemon_callback(flushing_input_file);
+}
+
+static char *debug_report_ipf(InputFile *ipf) {
+ if (!ipf) return xasprintf("none");
+
+ const char *slash= strrchr(ipf->path,'/');
+ const char *path= slash ? slash+1 : ipf->path;
+
+ return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s%s",
+ ipf, path,
+ ipf->queue.count, ipf->inprogress, ipf->autodefer,
+ (long)ipf->offset, ipf->fd,
+ ipf->rd ? "" : ",!rd",
+ ipf->skippinglong ? "*skiplong" : "",
+ ipf->rd && ipf->paused ? "*paused" : "");
+}
+
+static void period(void) {
+ char *dipf_main= debug_report_ipf(main_input_file);
+ char *dipf_flushing= debug_report_ipf(flushing_input_file);
+ char *dipf_backlog= debug_report_ipf(backlog_input_file);
+
+ debug("PERIOD"
+ " sms=%s[%d] conns=%d until_connect=%d"
+ " input_files main:%s flushing:%s backlog:%s[%d]"
+ " children connecting=%ld inndcomm=%ld"
+ ,
+ sms_names[sms], until_flush, conns.count, until_connect,
+ dipf_main, dipf_flushing, dipf_backlog, until_backlog_nextscan,
+ (long)connecting_child, (long)inndcomm_child
+ );
+
+ free(dipf_main);
+ free(dipf_flushing);
+ free(dipf_backlog);
+
+ if (until_connect) until_connect--;
+
+ inputfile_queue_check_expired(backlog_input_file);
+ poll_backlog_file();
+ if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
+ statemc_period_poll();
+ check_assign_articles();
+ check_idle_conns();
+}
+
+
+/*========== dumping state ==========*/
+
+static void dump_article_list(FILE *f, const ControlCommand *c,
+ const ArticleList *al) {
+ fprintf(f, " count=%d\n", al->count);
+ if (!c->xval) return;
+
+ int i; Article *art;
+ for (i=0, art=LIST_HEAD(*al); art; i++, art=LIST_NEXT(art)) {
+ fprintf(f," #%05d %-11s", i, artstate_names[art->state]);
+ DUMPV("%p", art->,ipf);
+ DUMPV("%d", art->,missing);
+ DUMPV("%lu", (unsigned long)art->,offset);
+ DUMPV("%d", art->,blanklen);
+ DUMPV("%d", art->,midlen);
+ fprintf(f, " %s %s\n", TokenToText(art->token), art->messageid);
+ }
+}
+
+static void dump_input_file(FILE *f, const ControlCommand *c,
+ InputFile *ipf, const char *wh) {
+ char *dipf= debug_report_ipf(ipf);
+ fprintf(f,"input %s %s", wh, dipf);
+ free(dipf);
+
+ if (ipf) {
+ DUMPV("%d", ipf->,readcount_ok);
+ DUMPV("%d", ipf->,readcount_blank);
+ DUMPV("%d", ipf->,readcount_err);
+ DUMPV("%d", ipf->,count_nooffer_missing);
+ }
+ fprintf(f,"\n");
+ if (ipf) {
+ ArtState state; const char *const *statename;
+ for (state=0, statename=artstate_names; *statename; state++,statename++) {
+#define RC_DUMP_FMT(x) " " #x "=%d"
+#define RC_DUMP_VAL(x) ,ipf->counts[state][RC_##x]
+ fprintf(f,"input %s counts %-11s"
+ RESULT_COUNTS(RC_DUMP_FMT,RC_DUMP_FMT) "\n",
+ wh, *statename
+ RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL));
+ }
+ fprintf(f,"input %s queue", wh);
+ dump_article_list(f,c,&ipf->queue);
+ }
+}
+
+CCMD(dump) {
+ int i;
+ fprintf(cc->out, "dumping state to %s\n", path_dump);
+ FILE *f= fopen(path_dump, "w");
+ if (!f) { fprintf(cc->out, "failed: open: %s\n", strerror(errno)); return; }
+
+ fprintf(f,"general");
+ DUMPV("%s", sms_names,[sms]);
+ DUMPV("%d", ,until_flush);
+ DUMPV("%ld", (long),self_pid);
+ DUMPV("%p", , defer);
+ DUMPV("%d", , until_connect);
+ DUMPV("%d", , until_backlog_nextscan);
+ DUMPV("%d", , simulate_flush);
+ fprintf(f,"\nnocheck");
+ DUMPV("%#.10f", , accept_proportion);
+ DUMPV("%d", , nocheck);
+ DUMPV("%d", , nocheck_reported);
+ fprintf(f,"\n");
+
+ fprintf(f,"special");
+ DUMPV("%ld", (long),connecting_child);
+ DUMPV("%d", , connecting_fdpass_sock);
+ DUMPV("%d", , control_master);
+ fprintf(f,"\n");
+
+ fprintf(f,"filemon ");
+ filemon_method_dump_info(f);
+
+ dump_input_file(f,c, main_input_file, "main" );
+ dump_input_file(f,c, flushing_input_file, "flushing");
+ dump_input_file(f,c, backlog_input_file, "backlog" );
+
+ fprintf(f,"conns count=%d\n", conns.count);
+
+ Conn *conn;
+ FOR_CONN(conn) {
+
+ fprintf(f,"C%d",conn->fd);
+ DUMPV("%p",conn->,rd); DUMPV("%d",conn->,max_queue);
+ DUMPV("%d",conn->,stream); DUMPV("%d",conn->,quitting);
+ DUMPV("%d",conn->,since_activity);
+ fprintf(f,"\n");
+
+ fprintf(f,"C%d waiting", conn->fd); dump_article_list(f,c,&conn->waiting);
+ fprintf(f,"C%d priority",conn->fd); dump_article_list(f,c,&conn->priority);
+ fprintf(f,"C%d sent", conn->fd); dump_article_list(f,c,&conn->sent);
+
+ fprintf(f,"C%d xmit xmitu=%d\n", conn->fd, conn->xmitu);
+ for (i=0; i<conn->xmitu; i++) {
+ const struct iovec *iv= &conn->xmit[i];
+ const XmitDetails *xd= &conn->xmitd[i];
+ char *dinfo;
+ switch (xd->kind) {
+ case xk_Const: dinfo= xasprintf("Const"); break;
+ case xk_Artdata: dinfo= xasprintf("A%p", xd->info.sm_art); break;
+ default:
+ abort();
+ }
+ fprintf(f," #%03d %-11s l=%d %s\n", i, dinfo, iv->iov_len,
+ sanitise(iv->iov_base, iv->iov_len));
+ free(dinfo);
+ }
+ }
+
+ fprintf(f,"paths");
+ DUMPV("%s", , feedfile);
+ DUMPV("%s", , path_control);
+ DUMPV("%s", , path_lock);
+ DUMPV("%s", , path_flushing);
+ DUMPV("%s", , path_defer);
+ DUMPV("%s", , path_dump);
+ DUMPV("%s", , globpat_backlog);
+ fprintf(f,"\n");
+
+ if (!!ferror(f) + !!fclose(f)) {
+ fprintf(cc->out, "failed: write: %s\n", strerror(errno));
+ return;
+ }
+}
+
+/*========== option parsing ==========*/
+
+static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
+static void vbadusage(const char *fmt, va_list al) {
+ char *m= xvasprintf(fmt,al);
+ fprintf(stderr, "bad usage: %s\n"
+ "say --help for help, or read the manpage\n",
+ m);
+ if (become_daemon)
+ syslog(LOG_CRIT,"innduct: invoked with bad usage: %s",m);
+ exit(8);
+}
+
+/*---------- generic option parser ----------*/
+
+static void badusage(const char *fmt, ...) NORET_PRINTF(1,2);
+static void badusage(const char *fmt, ...) {
+ va_list al;
+ va_start(al,fmt);
+ vbadusage(fmt,al);
+}
+
+enum OptFlags {
+ of_seconds= 001000u,
+ of_boolean= 002000u,
+};
+
+typedef struct Option Option;
+typedef void OptionParser(const Option*, const char *val);
+
+struct Option {
+ int shrt;
+ const char *lng, *formarg;
+ void *store;
+ OptionParser *fn;
+ int intval;
+};
+
+static void parse_options(const Option *options, char ***argvp) {
+ /* on return *argvp is first non-option arg; argc is not updated */
+
+ for (;;) {
+ const char *arg= *++(*argvp);
+ if (!arg) break;
+ if (*arg != '-') break;
+ if (!strcmp(arg,"--")) { arg= *++(*argvp); break; }
+ int a;
+ while ((a= *++arg)) {
+ const Option *o;
+ if (a=='-') {
+ arg++;
+ char *equals= strchr(arg,'=');
+ int len= equals ? (equals - arg) : strlen(arg);
+ for (o=options; o->shrt || o->lng; o++)
+ if (strlen(o->lng) == len && !memcmp(o->lng,arg,len))
+ goto found_long;
+ badusage("unknown long option --%s",arg);
+ found_long:
+ if (!o->formarg) {
+ if (equals) badusage("option --%s does not take a value",o->lng);
+ arg= 0;
+ } else if (equals) {
+ arg= equals+1;
+ } else {
+ arg= *++(*argvp);
+ if (!arg) badusage("option --%s needs a value for %s",
+ o->lng, o->formarg);
+ }
+ o->fn(o, arg);
+ break; /* eaten the whole argument now */
+ }
+ for (o=options; o->shrt || o->lng; o++)
+ if (a == o->shrt)
+ goto found_short;
+ badusage("unknown short option -%c",a);
+ found_short:
+ if (!o->formarg) {
+ o->fn(o,0);
+ } else {
+ if (!*++arg) {
+ arg= *++(*argvp);
+ if (!arg) badusage("option -%c needs a value for %s",
+ o->shrt, o->formarg);
+ }
+ o->fn(o,arg);
+ break; /* eaten the whole argument now */
+ }
+ }
+ }
+}
+
+#define DELIMPERHAPS(delim,str) (str) ? (delim) : "", (str) ? (str) : ""
+
+static void print_options(const Option *options, FILE *f) {
+ const Option *o;
+ for (o=options; o->shrt || o->lng; o++) {
+ char shrt[2] = { o->shrt, 0 };
+ char *optspec= xasprintf("%s%s%s%s%s",
+ o->shrt ? "-" : "", shrt,
+ o->shrt && o->lng ? "|" : "",
+ DELIMPERHAPS("--", o->lng));
+ fprintf(f, " %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
+ free(optspec);
+ }
+}
+
+/*---------- specific option types ----------*/
+
+static void op_integer(const Option *o, const char *val) {
+ char *ep;
+ errno= 0;
+ unsigned long ul= strtoul(val,&ep,10);
+ if (*ep || ep==val || errno || ul>INT_MAX)
+ badusage("bad integer value for %s",o->lng);
+ int *store= o->store;
+ *store= ul;
+}
+
+static void op_double(const Option *o, const char *val) {
+ int *store= o->store;
+ char *ep;
+ errno= 0;
+ *store= strtod(val, &ep);
+ if (*ep || ep==val || errno)
+ badusage("bad floating point value for %s",o->lng);
+}
+
+static void op_string(const Option *o, const char *val) {
+ const char **store= o->store;
+ *store= val;
+}
+
+static void op_seconds(const Option *o, const char *val) {
+ int *store= o->store;
+ char *ep;
+ int unit;
+
+ double v= strtod(val,&ep);
+ if (ep==val) badusage("bad time/duration value for %s",o->lng);
+
+ if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1;
+ else if (!strcmp(ep,"m") || !strcmp(ep,"min")) unit= 60;
+ else if (!strcmp(ep,"h") || !strcmp(ep,"hour")) unit= 3600;
+ else if (!strcmp(ep,"d") || !strcmp(ep,"day")) unit= 86400;
+ else if (!strcmp(ep,"das")) unit= 10;
+ else if (!strcmp(ep,"hs")) unit= 100;
+ else if (!strcmp(ep,"ks")) unit= 1000;
+ else if (!strcmp(ep,"Ms")) unit= 1000000;
+ else badusage("bad units %s for time/duration value for %s",ep,o->lng);
+
+ v *= unit;
+ v= ceil(v);
+ if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng);
+ *store= v;
+}
+
+static void op_setint(const Option *o, const char *val) {
+ int *store= o->store;
+ *store= o->intval;
+}
+
+/*---------- specific options ----------*/
+
+static void help(const Option *o, const char *val);
+
+static const Option innduct_options[]= {
+{'f',"feedfile", "F", &feedfile, op_string },
+{'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 },
+{0,"no-daemon", 0, &become_daemon, op_setint, 0 },
+{0,"no-streaming", 0, &try_stream, op_setint, 0 },
+{0,"no-filemon", 0, &try_filemon, op_setint, 0 },
+{'C',"inndconf", "F", &inndconffile, op_string },
+{'P',"port", "PORT", &port, op_integer },
+{0,"cli", 0, &path_control, op_string },
+{0,"help", 0, 0, help },
+
+{0,"max-connections", "N", &max_connections, op_integer },
+{0,"max-queue-per-conn", "N", &max_queue_per_conn, op_integer },
+{0,"max-queue-per-file", "N", &max_queue_per_ipf, op_integer },
+{0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer },
+{0,"period-interval", "TIME", &period_seconds, op_seconds },
+
+{0,"connection-timeout", "TIME", &connection_setup_timeout, op_seconds },
+{0,"stuck-flush-timeout", "TIME", &inndcomm_flush_timeout, op_seconds },
+{0,"feedfile-poll", "TIME", &filepoll_seconds, op_seconds },
+
+{0,"no-check-proportion", "PERCENT", &nocheck_thresh, op_double },
+{0,"no-check-response-time","ARTICLES", &nocheck_decay, op_double },
+
+{0,"reconnect-interval", "PERIOD", &reconnect_delay_periods, op_seconds },
+{0,"flush-retry-interval", "PERIOD", &flushfail_retry_periods, op_seconds },
+{0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
+{0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
+{0,"max-flush-interval", "PERIOD", &spontaneous_flush_periods,op_seconds },
+{0,"flush-finish-timeout", "PERIOD", &max_separated_periods, op_seconds },
+{0,"idle-timeout", "PERIOD", &need_activity_periods, op_seconds },
+
+{0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio, op_double },
+{0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer },
+
+{0,0}
+};
+
+static void printusage(FILE *f) {
+ fputs("usage: innduct [options] site [fqdn]\n"
+ "available options are:\n", f);
+ print_options(innduct_options, f);
+}
+
+static void help(const Option *o, const char *val) {
+ printusage(stdout);
+ if (ferror(stdout) || fflush(stdout)) {
+ perror("innduct: writing help");
+ exit(12);
+ }
+ exit(0);
+}
+
+static void convert_to_periods_rndup(int *store) {
+ *store += period_seconds-1;
+ *store /= period_seconds;
+}
+
+static void assemble_path(const char **path_io, const char *suffix,
+ const char *what) {
+ const char *const specified= *path_io;
+ if (!specified[0]) badusage("%s, if specified, must be nonempty", what);
+ if (specified[strlen(specified)-1]=='/')
+ *path_io= xasprintf("%s%s%s", specified, sitename, suffix);
+}
+
+int main(int argc, char **argv) {
+ if (!argv[1]) {
+ printusage(stderr);
+ exit(8);
+ }
+
+ parse_options(innduct_options, &argv);
+
+ /* arguments */
+
+ sitename= *argv++;
+ if (!sitename) badusage("need site name argument");
+ remote_host= *argv++;
+ if (*argv) badusage("too many non-option arguments");
+
+ /* defaults */
+
+ int r= innconf_read(inndconffile);
+ if (!r) badusage("could not read inn.conf (more info on stderr)");
+
+ if (!remote_host) remote_host= sitename;
+
+ if (nocheck_thresh < 0 || nocheck_thresh > 100)
+ badusage("nocheck threshold percentage must be between 0..100");
+ nocheck_thresh *= 0.01;
+
+ if (nocheck_decay < 0.1)
+ badusage("nocheck decay articles must be at least 0.1");
+ nocheck_decay= pow(0.5, 1.0/nocheck_decay);
+
+ convert_to_periods_rndup(&reconnect_delay_periods);
+ convert_to_periods_rndup(&flushfail_retry_periods);
+ convert_to_periods_rndup(&backlog_retry_minperiods);
+ convert_to_periods_rndup(&backlog_spontrescan_periods);
+ convert_to_periods_rndup(&spontaneous_flush_periods);
+ convert_to_periods_rndup(&max_separated_periods);
+ convert_to_periods_rndup(&need_activity_periods);
+
+ if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
+ badusage("bad input data ratio must be between 0..100");
+ max_bad_data_ratio *= 0.01;
+
+ if (!feedfile) feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename);
+ else assemble_path(&feedfile, "", "feed filename");
+
+ if (path_control) path_control= xasprintf("%s_cli", feedfile);
+ else assemble_path(&path_control, "%s_cli", "control socket path");
+
+ if (max_queue_per_ipf<0)
+ max_queue_per_ipf= max_queue_per_conn * 2;
+
+ const char *feedfile_forbidden= "?*[~#";
+ int c;
+ while ((c= *feedfile_forbidden++))
+ if (strchr(feedfile, c))
+ badusage("feed filename may not contain metacharacter %c",c);
+
+ /* set things up */
+
+ path_lock= xasprintf("%s_lock", feedfile);
+ path_flushing= xasprintf("%s_flushing", feedfile);
+ path_defer= xasprintf("%s_defer", feedfile);
+ path_dump= xasprintf("%s_dump", feedfile);
+ globpat_backlog= xasprintf("%s_backlog*", feedfile);
+
+ oop_source_sys *sysloop= oop_sys_new();
+ if (!sysloop) sysdie("could not create liboop event loop");
+ loop= (oop_source*)sysloop;
+
+ LIST_INIT(conns);
+
+ if (become_daemon) {
+ int i;
+ for (i=3; i<255; i++)
+ /* do this now before we open syslog, etc. */
+ close(i);
+ openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS);
+
+ int null= open("/dev/null",O_RDWR);
+ if (null<0) sysfatal("failed to open /dev/null");
+ dup2(null,0);
+ dup2(null,1);
+ dup2(null,2);
+ xclose(null, "/dev/null original fd",0);
+
+ pid_t child1= xfork("daemonise first fork");
+ if (child1) _exit(0);
+
+ pid_t sid= setsid();
+ if (sid != child1) sysfatal("setsid failed");
+
+ pid_t child2= xfork("daemonise second fork");
+ if (child2) _exit(0);
+ }
+
+ self_pid= getpid();
+ if (self_pid==-1) sysdie("getpid");
+
+ statemc_lock();
+
+ init_signals();
+
+ notice("starting");
+
+ int val= 1;
+ r= SMsetup(SM_PREOPEN, &val); if (!r) warn("SMsetup SM_PREOPEN failed");
+// r= SMinit(); if (!r) die("storage manager initialisation (SMinit) failed");
+
+ if (!become_daemon)
+ control_stdio();
+
+ control_init();
+
+ int filemon_ok= 0;
+ if (!try_filemon) {
+ notice("filemon: suppressed by command line option, polling");
+ } else {
+ filemon_ok= filemon_method_init();
+ if (!filemon_ok)
+ warn("filemon: no file monitoring available, polling");
+ }
+ if (!filemon_ok)
+ every(filepoll_seconds,0,filepoll);
+
+ every(period_seconds,1,period);
+
+ statemc_init();
+
+ /* let's go */
+
+ void *run= oop_sys_run(sysloop);
+ assert(run == OOP_ERROR);
+ sysdie("event loop failed");
+}