chiark / gitweb /
rename innduct.c to duct.c
[innduct.git] / duct.c
diff --git a/duct.c b/duct.c
new file mode 100644 (file)
index 0000000..6ff7bad
--- /dev/null
+++ b/duct.c
@@ -0,0 +1,594 @@
+/*
+ *  innduct
+ *  tailing reliable realtime streaming feeder for inn
+ *  innduct.c - main program, option parsing and startup
+ *
+ *  Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
+ * 
+ *  This program is free software: you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation, either version 3 of the License, or
+ *  (at your option) any later version.
+ * 
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ * 
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ *  (I believe that when you compile and link this as part of the inn2
+ *  build, with the Makefile runes I have provided, all the libraries
+ *  and files which end up included in innduct are licence-compatible
+ *  with GPLv3.  If not then please let me know.  -Ian Jackson.)
+ */
+
+#include "innduct.h"
+
+const char *sms_names[]= {
+#define SMS_DEF_NAME(s) #s ,
+  SMS_LIST(SMS_DEF_NAME)
+  0
+};
+
+
+/*----- general operational variables -----*/
+
+/* main initialises */
+oop_source *loop;
+ConnList conns;
+char *path_lock, *path_flushing, *path_defer, *path_dump;
+char *globpat_backlog;
+pid_t self_pid;
+int *lowvol_perperiod;
+int lowvol_circptr;
+int lowvol_total; /* does not include current period */
+
+/*---------- configuration option variables ----------*/
+/* when changing defaults, remember to update the manpage */
+
+const char *sitename, *remote_host;
+const char *feedfile, *path_run, *path_cli, *path_cli_dir;
+int quiet_multiple=0;
+int interactive=0, try_filemon=1;
+int try_stream=1;
+int port=119;
+const char *inndconffile;
+
+int max_connections=10;
+int max_queue_per_conn=200;
+int target_max_feedfile_size=100000;
+int period_seconds=30;
+int filepoll_seconds=5;
+int max_queue_per_ipf=-1;
+
+int connection_setup_timeout=200;
+int inndcomm_flush_timeout=100;
+
+double nocheck_thresh= 95.0; /* converted from percentage by main */
+double nocheck_decay= 100; /* conv'd from articles to lambda by main */
+
+/* all these are initialised to seconds, and converted to periods in main */
+int reconnect_delay_periods=1000;
+int flushfail_retry_periods=1000;
+int backlog_retry_minperiods=100;
+int backlog_spontrescan_periods=300;
+int spontaneous_flush_periods=100000;
+int max_separated_periods=2000;
+int need_activity_periods=1000;
+int lowvol_thresh=3;
+int lowvol_periods=1000;
+
+double max_bad_data_ratio= 1; /* conv'd from percentage by main */
+int max_bad_data_initial= 30;
+  /* in one corrupt 4096-byte block the number of newlines has
+   * mean 16 and standard deviation 3.99.  30 corresponds to z=+3.5 */
+
+/*========== main program ==========*/
+
+static void postfork_inputfile(InputFile *ipf) {
+  if (!ipf) return;
+  xclose(ipf->fd, "(in child) input file ", ipf->path);
+}
+
+static void postfork_stdio(FILE *f, const char *what, const char *what2) {
+  /* we have no stdio streams that are buffered long-term */
+  if (!f) return;
+  if (fclose(f)) syscrash("(in child) close %s%s", what, what2?what2:0);
+}
+
+void postfork(void) {
+  in_child= 1;
+
+  xsigsetdefault(SIGTERM);
+  xsigsetdefault(SIGINT);
+  xsigsetdefault(SIGPIPE);
+  if (terminate_sig_flag) raise(terminate_sig_flag);
+
+  postfork_inputfile(main_input_file);
+  postfork_inputfile(flushing_input_file);
+
+  Conn *conn;
+  FOR_CONN(conn)
+    conn_closefd(conn,"(in child) ");
+
+  postfork_stdio(defer, "defer file ", path_defer);
+}
+
+typedef struct Every Every;
+struct Every {
+  struct timeval interval;
+  int fixed_rate;
+  void (*f)(void);
+};
+
+static void every_schedule(Every *e, struct timeval base);
+
+static void *every_happens(oop_source *lp, struct timeval base, void *e_v) {
+  Every *e= e_v;
+  e->f();
+  if (!e->fixed_rate) xgettimeofday(&base);
+  every_schedule(e, base);
+  return OOP_CONTINUE;
+}
+
+static void every_schedule(Every *e, struct timeval base) {
+  struct timeval when;
+  timeradd(&base, &e->interval, &when);
+  loop->on_time(loop, when, every_happens, e);
+}
+
+static void every(int interval, int fixed_rate, void (*f)(void)) {
+  NEW_DECL(Every *,e);
+  e->interval.tv_sec= interval;
+  e->interval.tv_usec= 0;
+  e->fixed_rate= fixed_rate;
+  e->f= f;
+  struct timeval now;
+  xgettimeofday(&now);
+  every_schedule(e, now);
+}
+
+void period(void) {
+  char *dipf_main=     dbg_report_ipf(main_input_file);
+  char *dipf_flushing= dbg_report_ipf(flushing_input_file);
+  char *dipf_backlog=  dbg_report_ipf(backlog_input_file);
+
+  dbg("PERIOD"
+      " sms=%s[%d] conns=%d until_connect=%d"
+      " input_files main:%s flushing:%s backlog:%s[%d]"
+      " children connecting=%ld inndcomm=%ld lowvol_total=%d"
+      ,
+      sms_names[sms], until_flush, conns.count, until_connect,
+      dipf_main, dipf_flushing, dipf_backlog, until_backlog_nextscan,
+      (long)connecting_child, (long)inndcomm_child, lowvol_total
+      );
+
+  free(dipf_main);
+  free(dipf_flushing);
+  free(dipf_backlog);
+
+  if (until_connect) until_connect--;
+
+  inputfile_queue_check_expired(backlog_input_file);
+  poll_backlog_file();
+  if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
+  statemc_period_poll();
+  check_assign_articles();
+  check_idle_conns();
+}
+
+
+/*========== option parsing ==========*/
+
+static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
+static void vbadusage(const char *fmt, va_list al) {
+  char *m= xvasprintf(fmt,al);
+  fprintf(stderr, "bad usage: %s\n"
+         "say --help for help, or read the manpage\n",
+         m);
+  if (interactive < 2)
+    syslog(LOG_ERR,"innduct: invoked with bad usage: %s",m);
+  exit(8);
+}
+
+/*---------- generic option parser ----------*/
+
+static void badusage(const char *fmt, ...) NORET_PRINTF(1,2);
+static void badusage(const char *fmt, ...) {
+  va_list al;
+  va_start(al,fmt);
+  vbadusage(fmt,al);
+}
+
+enum OptFlags {
+  of_seconds= 001000u,
+  of_boolean= 002000u,
+};
+
+typedef struct Option Option;
+typedef void OptionParser(const Option*, const char *val);
+
+struct Option {
+  int shrt;
+  const char *lng, *formarg;
+  void *store;
+  OptionParser *fn;
+  int intval;
+};
+
+static void parse_options(const Option *options, char ***argvp) {
+  /* on return *argvp is first non-option arg; argc is not updated */
+
+  for (;;) {
+    const char *arg= *++(*argvp);
+    if (!arg) break;
+    if (*arg != '-') break;
+    if (!strcmp(arg,"--")) { arg= *++(*argvp); break; }
+    int a;
+    while ((a= *++arg)) {
+      const Option *o;
+      if (a=='-') {
+       arg++;
+       char *equals= strchr(arg,'=');
+       unsigned len= equals ? (size_t)(equals - arg) : strlen(arg);
+       for (o=options; o->shrt || o->lng; o++)
+         if (strlen(o->lng) == len && !memcmp(o->lng,arg,len))
+           goto found_long;
+       badusage("unknown long option --%s",arg);
+      found_long:
+       if (!o->formarg) {
+         if (equals) badusage("option --%s does not take a value",o->lng);
+         arg= 0;
+       } else if (equals) {
+         arg= equals+1;
+       } else {
+         arg= *++(*argvp);
+         if (!arg) badusage("option --%s needs a value for %s",
+                            o->lng, o->formarg);
+       }
+       o->fn(o, arg);
+       break; /* eaten the whole argument now */
+      }
+      for (o=options; o->shrt || o->lng; o++)
+       if (a == o->shrt)
+         goto found_short;
+      badusage("unknown short option -%c",a);
+    found_short:
+      if (!o->formarg) {
+       o->fn(o,0);
+      } else {
+       if (!*++arg) {
+         arg= *++(*argvp);
+         if (!arg) badusage("option -%c needs a value for %s",
+                            o->shrt, o->formarg);
+       }
+       o->fn(o,arg);
+       break; /* eaten the whole argument now */
+      }
+    }
+  }
+}
+
+#define DELIMPERHAPS(delim,str)  (str) ? (delim) : "", (str) ? (str) : ""
+
+static void print_options(const Option *options, FILE *f) {
+  const Option *o;
+  for (o=options; o->shrt || o->lng; o++) {
+    char shrt[2] = { o->shrt, 0 };
+    char *optspec= xasprintf("%s%s%s%s%s",
+                            o->shrt ? "-" : "", shrt,
+                            o->shrt && o->lng ? "|" : "",
+                            DELIMPERHAPS("--", o->lng));
+    fprintf(f, "  %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
+    free(optspec);
+  }
+}
+
+/*---------- specific option types ----------*/
+
+static void op_integer(const Option *o, const char *val) {
+  char *ep;
+  errno= 0;
+  unsigned long ul= strtoul(val,&ep,10);
+  if (*ep || ep==val || errno || ul>INT_MAX)
+    badusage("bad integer value for %s",o->lng);
+  int *store= o->store;
+  *store= ul;
+}
+
+static void op_double(const Option *o, const char *val) {
+  int *store= o->store;
+  char *ep;
+  errno= 0;
+  *store= strtod(val, &ep);
+  if (*ep || ep==val || errno)
+    badusage("bad floating point value for %s",o->lng);
+}
+
+static void op_string(const Option *o, const char *val) {
+  const char **store= o->store;
+  *store= val;
+}
+
+static void op_seconds(const Option *o, const char *val) {
+  int *store= o->store;
+  char *ep;
+  int unit;
+
+  double v= strtod(val,&ep);
+  if (ep==val) badusage("bad time/duration value for %s",o->lng);
+
+  if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1;
+  else if (!strcmp(ep,"m") || !strcmp(ep,"min"))    unit= 60;
+  else if (!strcmp(ep,"h") || !strcmp(ep,"hour"))   unit= 3600;
+  else if (!strcmp(ep,"d") || !strcmp(ep,"day"))    unit= 86400;
+  else if (!strcmp(ep,"das")) unit= 10;
+  else if (!strcmp(ep,"hs"))  unit= 100;
+  else if (!strcmp(ep,"ks"))  unit= 1000;
+  else if (!strcmp(ep,"Ms"))  unit= 1000000;
+  else badusage("bad units %s for time/duration value for %s",ep,o->lng);
+
+  v *= unit;
+  v= ceil(v);
+  if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng);
+  *store= v;
+}
+
+static void op_setint(const Option *o, const char *val) {
+  int *store= o->store;
+  *store= o->intval;
+}
+
+/*---------- specific options ----------*/
+
+static void help(const Option *o, const char *val);
+
+static const Option innduct_options[]= {
+{'f',"feedfile",         "F",     &feedfile,                 op_string      },
+{'q',"quiet-multiple",   0,       &quiet_multiple,           op_setint, 1   },
+{0,"no-daemon",          0,       &interactive,              op_setint, 1   },
+{0,"interactive",        0,       &interactive,              op_setint, 2   },
+{0,"no-streaming",       0,       &try_stream,               op_setint, 0   },
+{0,"no-filemon",         0,       &try_filemon,              op_setint, 0   },
+{'C',"inndconf",         "F",     &inndconffile,             op_string      },
+{'P',"port",             "PORT",  &port,                     op_integer     },
+{0,"chdir",              "DIR",   &path_run,                 op_string      },
+{0,"cli",            "DIR/|PATH", &path_cli,                 op_string      },
+{0,"help",               0,       0,                         help           },
+
+{0,"max-connections",    "N",     &max_connections,          op_integer     },
+{0,"max-queue-per-conn", "N",     &max_queue_per_conn,       op_integer     },
+{0,"max-queue-per-file", "N",     &max_queue_per_ipf,        op_integer     },
+{0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer     },
+{0,"period-interval",    "TIME",  &period_seconds,           op_seconds     },
+
+{0,"connection-timeout",   "TIME",  &connection_setup_timeout, op_seconds   },
+{0,"stuck-flush-timeout",  "TIME",  &inndcomm_flush_timeout,   op_seconds   },
+{0,"feedfile-poll",        "TIME",  &filepoll_seconds,         op_seconds   },
+
+{0,"no-check-proportion",   "PERCENT",   &nocheck_thresh,       op_double   },
+{0,"no-check-response-time","ARTICLES",  &nocheck_decay,        op_double   },
+
+{0,"reconnect-interval",     "PERIOD", &reconnect_delay_periods,  op_seconds },
+{0,"flush-retry-interval",   "PERIOD", &flushfail_retry_periods,  op_seconds },
+{0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
+{0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
+{0,"max-flush-interval",     "PERIOD", &spontaneous_flush_periods,op_seconds },
+{0,"flush-finish-timeout",   "PERIOD", &max_separated_periods,    op_seconds },
+{0,"idle-timeout",           "PERIOD", &need_activity_periods,    op_seconds },
+{0,"low-volume-thresh",      "PERIOD", &lowvol_thresh,            op_integer },
+{0,"low-volume-window",      "PERIOD", &lowvol_periods,           op_seconds },
+
+{0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio,   op_double    },
+{0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer   },
+
+{0,0}
+};
+
+static void printusage(FILE *f) {
+  fputs("usage: innduct [options] site [fqdn]\n"
+       "available options are:\n", f);
+  print_options(innduct_options, f);
+}
+
+static void help(const Option *o, const char *val) {
+  printusage(stdout);
+  if (ferror(stdout) || fflush(stdout)) {
+    perror("innduct: writing help");
+    exit(12);
+  }
+  exit(0);
+}
+
+static void convert_to_periods_rndup(int *store) {
+  *store += period_seconds-1;
+  *store /= period_seconds;
+}
+
+static int path_ends_slash(const char *specified) {
+  int l= strlen(specified);
+  assert(l);
+  return specified[l-1] == '/';
+}
+
+static int innduct_fatal_cleanup(void) { return 12; } /* used for libinn die */
+
+int main(int argc, char **argv) {
+  /* set up libinn logging */
+  message_program_name= "innduct";
+  message_fatal_cleanup= innduct_fatal_cleanup;
+
+#define INNLOGSET_CALL(fn, pfx, sysloglevel)   \
+  message_handlers_##fn(1, duct_log_##fn);
+  INNLOGSETS(INNLOGSET_CALL)
+
+  if (!argv[1]) {
+    printusage(stderr);
+    exit(8);
+  }
+
+  parse_options(innduct_options, &argv);
+
+  /* arguments */
+
+  sitename= *argv++;
+  if (!sitename) badusage("need site name argument");
+
+  if (*argv) remote_host= *argv++;
+  else remote_host= sitename;
+  
+  if (*argv) badusage("too many non-option arguments");
+
+  /* defaults */
+
+  int r= innconf_read(inndconffile);
+  if (!r) badusage("could not read inn.conf");
+
+  if (!remote_host) remote_host= sitename;
+
+  if (nocheck_thresh < 0 || nocheck_thresh > 100)
+    badusage("nocheck threshold percentage must be between 0..100");
+  nocheck_thresh *= 0.01;
+
+  if (nocheck_decay < 0.1)
+    badusage("nocheck decay articles must be at least 0.1");
+  nocheck_decay= pow(0.5, 1.0/nocheck_decay);
+
+  convert_to_periods_rndup(&reconnect_delay_periods);
+  convert_to_periods_rndup(&flushfail_retry_periods);
+  convert_to_periods_rndup(&backlog_retry_minperiods);
+  convert_to_periods_rndup(&backlog_spontrescan_periods);
+  convert_to_periods_rndup(&spontaneous_flush_periods);
+  convert_to_periods_rndup(&max_separated_periods);
+  convert_to_periods_rndup(&need_activity_periods);
+  convert_to_periods_rndup(&lowvol_periods);
+
+  if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
+    badusage("bad input data ratio must be between 0..100");
+  max_bad_data_ratio *= 0.01;
+
+  if (!path_run)
+    path_run= innconf->pathrun;
+
+  if (!feedfile) feedfile= sitename;
+  if (!feedfile[0]) badusage("feed filename, if specified, must be nonempty");
+  if (path_ends_slash(feedfile))
+    feedfile= xasprintf("%s%s", feedfile, sitename);
+  if (feedfile[0] != '/')
+    feedfile= xasprintf("%s/%s", innconf->pathoutgoing, feedfile);
+
+  if (!path_cli) {
+    path_cli_dir= "innduct";
+  } else if (!path_cli[0] || !strcmp(path_cli,"none")) {
+    path_cli= 0; /* ok, don't then */
+  } else if (path_ends_slash(path_cli)) {
+    path_cli_dir= xasprintf("%.*s", strlen(path_cli)-1, path_cli);
+  }
+  if (path_cli_dir)
+    path_cli= xasprintf("%s/%s", path_cli_dir, sitename);
+
+  if (max_queue_per_ipf<0)
+    max_queue_per_ipf= max_queue_per_conn * 2;
+
+  const char *feedfile_forbidden= "?*[~#";
+  int c;
+  while ((c= *feedfile_forbidden++))
+    if (strchr(feedfile, c))
+      badusage("feed filename may not contain metacharacter %c",c);
+
+  int i;
+  lowvol_perperiod= xcalloc(sizeof(*lowvol_perperiod), lowvol_periods);
+  for (i=0; i<lowvol_periods; i++) {
+    lowvol_perperiod[i]= lowvol_thresh;
+    lowvol_total += lowvol_thresh;
+  }
+  lowvol_total -= lowvol_thresh;
+
+  /* set things up */
+
+  path_lock=        xasprintf("%s_lock",      feedfile);
+  path_flushing=    xasprintf("%s_flushing",  feedfile);
+  path_defer=       xasprintf("%s_defer",     feedfile);
+  path_dump=        xasprintf("%s_dump",      feedfile);
+  globpat_backlog=  xasprintf("%s_backlog*",  feedfile);
+
+  oop_source_sys *sysloop= oop_sys_new();
+  if (!sysloop) syscrash("could not create liboop event loop");
+  loop= (oop_source*)sysloop;
+
+  LIST_INIT(conns);
+
+  if (interactive < 1) {
+    for (i=3; i<255; i++)
+      /* do this now before we open syslog, etc. */
+      close(i);
+  }
+
+  logv_prefix= xasprintf("%s| ", sitename);
+  if (interactive < 2) {
+    openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS);
+    logv_use_syslog= 1;
+  }
+
+  if (interactive < 1) {
+    int null= open("/dev/null",O_RDWR);
+    if (null<0) sysdie("failed to open /dev/null");
+    dup2(null,0);
+    dup2(null,1);
+    dup2(null,2);
+    xclose(null, "/dev/null original fd",0);
+
+    pid_t child1= xfork("daemonise first fork");
+    if (child1) _exit(0);
+
+    pid_t sid= setsid();
+    if (sid == -1) sysdie("setsid failed");
+
+    pid_t child2= xfork("daemonise second fork");
+    if (child2) _exit(0);
+  }
+
+  self_pid= getpid();
+  if (self_pid==-1) syscrash("getpid");
+
+  r= chdir(path_run);
+  if (r) sysdie("could not chdir to pathrun %s", path_run);
+
+  statemc_lock();
+
+  init_signals();
+
+  notice("starting");
+
+  int val= 1;
+  r= SMsetup(SM_PREOPEN, &val); if (!r) warn("SMsetup SM_PREOPEN failed");
+  r= SMinit(); if (!r) die("storage manager initialisation (SMinit) failed");
+
+  if (interactive >= 2)
+    cli_stdio();
+
+  cli_init();
+
+  int filemon_ok= 0;
+  if (!try_filemon) {
+    notice("filemon: suppressed by command line option, polling");
+  } else {
+    filemon_ok= filemon_method_init();
+    if (!filemon_ok)
+      warn("filemon: no file monitoring available, polling");
+  }
+  if (!filemon_ok)
+    every(filepoll_seconds,0,filepoll);
+
+  every(period_seconds,1,period);
+
+  statemc_init();
+
+  /* let's go */
+
+  void *run= oop_sys_run(sysloop);
+  assert(run == OOP_ERROR);
+  syscrash("event loop failed");
+}