+ if (!inndcomm_child) {
+ static char flushargv[2]= { sitename, 0 };
+ char *reply;
+
+ close(pipefds[0]);
+
+ alarm(inndcomm_flush_timeout);
+ r= ICCopen(); if (r) inndcommfail("connect");
+ r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit");
+ if (!r) exit(0); /* yay! */
+
+ if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
+ syswarn("innd ctlinnd flush failed: innd said %s", reply);
+ exit(INNDCOMMCHILD_ESTATUS_FAIL);
+ }
+
+ close(pipefds[1]);
+ int sentinel_fd= pipefds[0];
+ on_fd_read_except(sentinel_fd, inndcomm_event);
+
+ SMS(FLUSHING, 0, "flush is in progress");
+}
+
+/*========== main program ==========*/
+
+static void postfork_inputfile(InputFile *ipf) {
+ if (!ipf) return;
+ assert(ipf->fd >= 0);
+ close(ipf->fd);
+ ipf->fd= -1;
+}
+
+static void postfork_conns(Connection *conn) {
+ while (conn) {
+ close(conn->fd);
+ conn= conn->next;
+ }
+}
+
+static void postfork_stdio(FILE *f) {
+ /* we have no stdio streams that are buffered long-term */
+ if (f) fclose(f);
+}
+
+static void postfork(const char *what) {
+ if (signal(SIGPIPE, SIG_DFL) == SIG_ERR)
+ sysdie("%s child: failed to reset SIGPIPE");
+
+ postfork_inputfile(main_input_file);
+ postfork_inputfile(flushing_input_file);
+ postfork_conns(idle.head);
+ postfork_conns(working.head);
+ postfork_conns(full.head);
+ postfork_stdio(defer);
+}
+
+#define EVERY(what, interval, body) \
+ static const struct timeval what##_timeout = { 5, 0 }; \
+ static void what##_schedule(void); \
+ static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \
+ { body } \
+ what##_schedule(); \
+ } \
+ static void what##_schedule(void) { \
+ loop->on_time(loop, what##_timeout, what##_timedout, 0); \
+ }
+
+EVERY(filepoll, {5,0}, {
+ if (main_input_file && main_input_file->readable_callback)
+ filemon_callback(main_input_file);
+});
+
+#define DEBUGF_IPF(wh) " " #wh "=%p/%s:ip=%ld,off=%ld,fd=%d%s" \
+#define DEBUG_IPF(sh) \
+ wh##_input_file, debug_ipf_path(wh##_input_file), \
+ wh##_input_file->inprogress, (long)wh##_input_file->offset, \
+ wh##_input_file->fd, wh##_input_file->rd ? "+" : ""
+static const char *debug_ipf_path(InputFile *ipf) {
+ char *slash= strrchr(ipf->path,'/');
+ return slash ? slash+1 : ipf->path;
+}
+
+EVERY(period, {PERIOD_SECONDS,0}, {
+ debug("PERIOD"
+ " sms=%s queue=%d sm_period_counter=%d"
+ " connect_delay=%d until_spontaneous_flush=%d"
+ " input_files" DEBUGF_IPF(main) DEBUGF_IPF(old) DEBUGF_FMT(flushing)
+ " conns idle=%d working=%d full=%d"
+ " children connecting=%ld inndcomm_child"
+ ,
+ sms_names[sms], queue.count, sm_period_counter,
+ connect_delay, until_spontaneous_flush,
+ DEBUG_IPF(main), DEBUG_IPF(flushing), DEBUG_IPF(flushing),
+ idle.count, working.count, full.count,
+ (long)connecting_child, (long)inndcomm_child
+ );
+ if (connect_delay) connect_delay--;
+ if (until_spontaneous_flush) until_spontaneous_flush--;
+ poll_backlog_file();
+ if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
+ statemc_poll();
+ check_master_queue();
+});
+
+
+/*========== option parsing ==========*/
+
+enum OptFlags {
+ of_seconds= 001000u;
+ of_boolean= 002000u;
+};
+
+typedef struct Option Option;
+typedef void OptionParser(const Option*, const char *val);
+
+struct Option {
+ int short;
+ const char *long;
+ void *store;
+ OptionParser *fn;
+ int noarg;
+};
+
+void op_integer(const Option *o, const char *val) {
+ char *ep;
+ errno= 0;
+ unsigned long ul= strtoul(val,&ep,10);
+ if (*ep || ep==val || errno || ul>INT_MAX)
+ badusage("bad integer value for %s",o->long);
+ int *store= o->store;
+ *store= ul;
+}
+
+void op_double(const Option *o, const char *val) {
+ int *store= o->store;
+ char *ep;
+ errno= 0;
+ *store= strtod(val, &ep);
+ if (*ep || ep==val || errno)
+ badusage("bad floating point value for %s",o->long);
+}
+
+void op_string(const Option *o, const char *val) {
+ char **store= o->store;
+ free(*store);
+ *store= val;
+}
+
+void op_seconds(const Option *o, const char *val) {
+ int *store= o->store;
+ char *ep;
+
+ double v= strtod(val,&ep);
+ if (ep==val) badusage("bad time/duration value for %s",o->long);
+
+ if (!*ep || !strcmp(ep,"s")) unit= 1;
+ else if (!strcmp(ep,"m")) unit= 60;
+ else if (!strcmp(ep,"h")) unit= 3600;
+ else if (!strcmp(ep,"d")) unit= 86400;
+ else badusage("bad units %s for time/duration value for %s",ep,o->long);
+
+ v *= unit;
+ v= ceil(v);
+ if (v > INT_MAX) badusage("time/duration value for %s out of range",o->long);
+ *store= v;
+}
+
+void op_periods_rndup(const Option *o, const char *val) {
+ int *store= o->store;
+ op_seconds(o,val);
+ *store += PERIOD_SECONDS-1;
+ *store /= PERIOD_SECONDS;
+}
+
+void op_periods_booltrue(const Option *o, const char *val) {
+ int *store= o->store;
+ *store= 1;
+}
+void op_periods_boolfalse(const Option *o, const char *val) {
+ int *store= o->store;
+ *store= 0;
+}
+
+static const Option options[]= {
+{ 0, "max-connections", &max_connections op_integer },
+{ 0, "streaming", &try_stream, op_booltrue, 1 },
+{ 0, "no-streaming", &try_stream, op_boolfalse, 1 },
+{'h',"host", &remote_host, op_string },
+{'P',"port", &port op_integer },
+{ 0, "inndconf", &inndconffile, op_string },
+{'f',"feedfile", &feedfile, op_string },
+{'q',"quiet-multiple", &quiet_if_locked, op_booltrue, 1 },
+{ 0, "no-quiet-multiple", &quiet_if_locked, op_boolfalse, 1 },
+{'d',"daemon", &become_daemon, op_booltrue, 1 },
+{ 0, "no-daemon", &become_daemon, op_boolfalse, 1 },
+
+{ 0, "no-check-proportion", &nocheck_thresh_pct, op_double },
+{ 0, "no-check-filter", &nocheck_decay_articles, op_double },
+
+{ 0, "max-queue-size", &max_queue_per_conn op_integer },
+{ 0, "reconnect-interval", &reconnect_delay_periods, op_periods_rndup },
+{ 0, "flush-retry-interval", &flushfail_retry_periods, op_periods_rndup },
+{ 0, "feedfile-open-timeout", &open_wait_periods, op_periods_rndup },
+{ 0, "connection-timeout", &connection_timeout, op_seconds },
+{ 0, "inndcomm-timeout", &inndcomm_flush_timeout, op_seconds },