chiark / gitweb /
options for everything
[inn-innduct.git] / backends / innduct.c
index 8e5181a84e9fc797684d20b7446f7163ee7f68bb..8e599f56026835595780ad3a28725754699fd06f 100644 (file)
@@ -306,34 +306,43 @@ static void filemon_callback(InputFile *ipf);
 static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
 static void connfail(Conn *conn, const char *fmt, ...)         PRINTF(2,3);
 
+static const oop_rd_style peer_rd_style;
+static oop_rd_call peer_rd_err, peer_rd_ok;
+
 /*----- configuration options -----*/
 
-static const char *sitename, *feedfile, *pathoutgoing;
-static const char *remote_host;
-static int quiet_multiple=0, become_daemon=1;
+static const char *sitename, *remote_host;
+static const char *feedfile;
+static int quiet_multiple=0;
+static int become_daemon=1;
+static int try_stream=1;
+static int port=119;
+static const char *inndconffile;
 
-static int max_connections=10, max_queue_per_conn=200;
+static int max_connections=10;
+static int max_queue_per_conn=200;
 static int target_max_feedfile_size=100000;
-
 static int period_seconds=60;
 
-static double max_bad_data_ratio= 0.01;
+static int connection_setup_timeout=200;
+static int inndcomm_flush_timeout=100;
+
+static double nocheck_thresh= 95.0; /* converted from percentage by main */
+static double nocheck_decay= 100; /* conv'd from articles to lambda by main */
+
+/* all these are initialised to seconds, and converted to periods in main */
+static int reconnect_delay_periods=1000;
+static int flushfail_retry_periods=1000;
+static int backlog_retry_minperiods=50;
+static int backlog_spontrescan_periods=300;
+static int spontaneous_flush_periods=100000;
+static int need_activity_periods=1000;
+
+static double max_bad_data_ratio= 1; /* conv'd from percentage by main */
 static int max_bad_data_initial= 30;
   /* in one corrupt 4096-byte block the number of newlines has
    * mean 16 and standard deviation 3.99.  30 corresponds to z=+3.5 */
 
-static int connection_setup_timeout=200, port=119, try_stream=1;
-static int inndcomm_flush_timeout=100;
-static int reconnect_delay_periods, flushfail_retry_periods, open_wait_periods;
-static int backlog_retry_minperiods, backlog_spontaneous_rescan_periods;
-static int spontaneous_flush_periods, need_activity_periods;
-static const char *inndconffile;
-
-static double nocheck_thresh_pct= 95.0;
-static double nocheck_thresh; /* computed in main from _pct */
-static double nocheck_decay_articles= 100; /* converted to _decay */
-static double nocheck_decay; /* computed in main from _articles */
-
 
 /*----- statistics -----*/
 
@@ -439,6 +448,7 @@ static const char *sms_names[]= {
 struct Conn {
   ISNODE(Conn);
   int fd; /* may be 0, meaning closed (during construction/destruction) */
+  oop_read *rd; /* likewise */
   int max_queue, stream, quitting;
   int since_activity; /* periods */
   ArticleList waiting; /* not yet told peer */
@@ -623,6 +633,7 @@ static void xunlink(const char *path, const char *what) {
 static time_t xtime(void) {
   time_t now= time(0);
   if (now==-1) sysdie("time(2) failed");
+  return now;
 }
 
 static void check_isreg(const struct stat *stab, const char *path,
@@ -655,13 +666,6 @@ static void xlstat_isreg(const char *path, struct stat *stab,
   check_isreg(stab, path, what);
 }
 
-static void setnonblock(int fd, int nonblocking) {
-  int r= fcntl(fd, F_GETFL);  if (r<0) sysdie("setnonblocking fcntl F_GETFL");
-  if (nonblocking) r |= O_NONBLOCK;
-  else r &= ~O_NONBLOCK;
-  r= fcntl(fd, F_SETFL, r);  if (r<0) sysdie("setnonblocking fcntl F_SETFL");
-}
-
 static int samefile(const struct stat *a, const struct stat *b) {
   assert(S_ISREG(a->st_mode));
   assert(S_ISREG(b->st_mode));
@@ -700,6 +704,11 @@ static void conn_closefd(Conn *conn, const char *msgprefix) {
 
 static void conn_dispose(Conn *conn) {
   if (!conn) return;
+  if (conn->rd) {
+    oop_rd_cancel(conn->rd);
+    oop_rd_delete_kill(conn->rd);
+    conn->rd= 0;
+  }
   if (conn->fd) {
     loop->cancel_fd(loop, conn->fd, OOP_WRITE);
     loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION);
@@ -875,10 +884,11 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
     goto x;
   }
 
-#define CHK(field, val)                                                          \
-  if (h->cmsg_##field != val) {                                                  \
-    die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d"); \
-    goto x;                                                              \
+#define CHK(field, val)                                                         \
+  if (h->cmsg_##field != val) {                                                 \
+    die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \
+       h->cmsg_##field, val);                                           \
+    goto x;                                                             \
   }
   CHK(level, SOL_SOCKET);
   CHK(type,  SCM_RIGHTS);
@@ -888,7 +898,6 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
   if (CMSG_NXTHDR(&msg,h)) die("connect: child sent many cmsgs");
 
   memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
-  loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn);
 
   int status;
   pid_t got= waitpid(connecting_child, &status, 0);
@@ -909,8 +918,16 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
   LIST_INIT(conn->waiting);
   LIST_INIT(conn->priority);
   LIST_INIT(conn->sent);
-  setnonblock(conn->fd, 1);
   conn->max_queue= conn->stream ? max_queue_per_conn : 1;
+
+  loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn);
+  conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */
+  if (!conn->fd) sysdie("oop_rd_new_fd (fd=%d)",conn->fd);
+  int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN,
+                    &peer_rd_ok, conn,
+                    &peer_rd_err, conn);
+  if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd);
+
   notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain");
   LIST_ADDHEAD(conns, conn);
 
@@ -921,6 +938,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
  x:
   conn_dispose(conn);
   connect_attempt_discard();
+  return OOP_CONTINUE;
 }
 
 static int allow_connect_start(void) {
@@ -970,7 +988,7 @@ static void connect_start(void) {
       assert(l>=1);
       if (buf[-1]!='\n')
        fatal("connect: response to MODE STREAM is too long: %.100s...",
-           remote_host, sanitise(buf));
+             sanitise(buf));
       l--;  if (l>0 && buf[l-1]=='\r') l--;
       buf[l]= 0;
       char *ep;
@@ -1021,7 +1039,7 @@ static void check_assign_articles(void) {
       break;
 
     Conn *walk, *use=0;
-    int spare, inqueue;
+    int spare=0, inqueue=0;
 
     /* Find a connection to offer this article.  We prefer a busy
      * connection to an idle one, provided it's not full.  We take the
@@ -1063,7 +1081,6 @@ static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
 }
 
 static void conn_maybe_write(Conn *conn)  {
-  void *rp= 0;
   for (;;) {
     conn_make_some_xmits(conn);
     if (!conn->xmitu) {
@@ -1143,7 +1160,7 @@ static void *conn_write_some_xmits(Conn *conn) {
        rs -= vp->iov_len;
        xmit_free(dp);
       } else {
-       vp->iov_base += rs;
+       vp->iov_base= (char*)vp->iov_base + rs;
        vp->iov_len -= rs;
       }
     }
@@ -1217,7 +1234,7 @@ static const oop_rd_style peer_rd_style= {
   OOP_RD_SHORTREC_FORBID
 };
 
-static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_event ev,
+static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
                         const char *errmsg, int errnoval,
                         const char *data, size_t recsz, void *conn_v) {
   Conn *conn= conn_v;
@@ -1316,7 +1333,8 @@ static void article_done(Conn *conn, Article *art, int whichcount) {
     if (r==-1) {
       if (errno==EINTR) continue;
       sysdie("failed to blank entry for %s (length %d at offset %lu) in %s",
-            art->messageid, art->blanklen, art->offset, ipf->path);
+            art->messageid, art->blanklen,
+            (unsigned long)art->offset, ipf->path);
     }
     assert(r>=0 && r<=w);
     art->blanklen -= w;
@@ -1331,7 +1349,7 @@ static void article_done(Conn *conn, Article *art, int whichcount) {
     queue_check_input_done();
 }
 
-static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
+static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
                        const char *errmsg, int errnoval,
                        const char *data, size_t recsz, void *conn_v) {
   Conn *conn= conn_v;
@@ -1515,7 +1533,7 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd,
 
   if (ipf->skippinglong) {
     if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */
-    return;
+    return OOP_CONTINUE;
   }
   if (ev==OOP_RD_LONG) {
     ipf->skippinglong= 1;
@@ -2111,10 +2129,12 @@ static void statemc_setstate(StateMachineState newsms, int periods,
 
   const char *xtra= "";
   switch (sms) {
-  case sm_FLUSHING: sm_FLUSHFAILED:
+  case sm_FLUSHING:
+  case sm_FLUSHFAILED:
     if (!main_input_file) xtra= "-ABSENT";
     break;
-  case sm_SEPARATED: case sm_DROPPING:
+  case sm_SEPARATED:
+  case sm_DROPPING:
     xtra= flushing_input_file->rd ? "-1" : "-2";
     break;
   default:;
@@ -2225,7 +2245,7 @@ static void search_backlog_file(void) {
   int r, i;
   struct stat stab;
   const char *oldest_path=0;
-  time_t oldest_mtime, now;
+  time_t oldest_mtime=0, now;
 
   if (backlog_input_file) return;
 
@@ -2267,8 +2287,6 @@ static void search_backlog_file(void) {
           " nonzero (error?) return value %d", globpat_backlog, r);
   }
 
-  globfree(&gl);
-
   if (!oldest_path) {
     debug("backlog scan: none");
 
@@ -2277,8 +2295,8 @@ static void search_backlog_file(void) {
       xunlink(path_lock, "lockfile for old feed");
       exit(0);
     }
-    until_backlog_nextscan= backlog_spontaneous_rescan_periods;
-    return;
+    until_backlog_nextscan= backlog_spontrescan_periods;
+    goto xfree;
   }
 
   now= xtime();
@@ -2291,22 +2309,26 @@ static void search_backlog_file(void) {
 
     backlog_input_file= open_input_file(oldest_path);
     if (!backlog_input_file) {
-      warn("backlog file %s vanished as we opened it", backlog_input_file);
+      warn("backlog file %s vanished as we opened it", oldest_path);
+      globfree(&gl);
       goto try_again;
     }
     inputfile_reading_start(backlog_input_file);
     until_backlog_nextscan= -1;
-    return;
+    goto xfree;
   }
 
   until_backlog_nextscan= age_deficiency / period_seconds;
 
-  if (backlog_spontaneous_rescan_periods >= 0 &&
-      until_backlog_nextscan > backlog_spontaneous_rescan_periods)
-    until_backlog_nextscan= backlog_spontaneous_rescan_periods;
+  if (backlog_spontrescan_periods >= 0 &&
+      until_backlog_nextscan > backlog_spontrescan_periods)
+    until_backlog_nextscan= backlog_spontrescan_periods;
 
   debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
        age, age_deficiency, until_backlog_nextscan, oldest_path);
+
+ xfree:
+  globfree(&gl);
   return;
 }
 
@@ -2380,6 +2402,7 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
 
  failed:
   SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
+  return OOP_CONTINUE;
 }
 
 static void inndcommfail(const char *what) {
@@ -2457,40 +2480,51 @@ static void postfork(void) {
   static struct timeval what##_timeout = { interval_sec, interval_usec };    \
   static void what##_schedule(void);                                        \
   static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \
-    { body }                                                                \
+    body;                                                                   \
     what##_schedule();                                                      \
+    return OOP_CONTINUE;                                                    \
   }                                                                         \
   static void what##_schedule(void) {                                       \
     loop->on_time(loop, what##_timeout, what##_timedout, 0);                \
   }
 
-EVERY(filepoll, 5,0, {
+EVERY(filepoll, 5,0, ({
   if (main_input_file && main_input_file->readable_callback)
     filemon_callback(main_input_file);
-});
+}));
+
+static char *debug_report_ipf(InputFile *ipf) {
+  if (!ipf) return xasprintf("-");
+
+  const char *slash= strrchr(ipf->path,'/');
+  const char *path= slash ? slash+1 : ipf->path;
 
-#define DEBUGF_IPF(wh) " " #wh "=%p/%s:ip=%ld,off=%ld,fd=%d%s"
-#define DEBUG_IPF(wh)                                          \
-  wh##_input_file, debug_ipf_path(wh##_input_file),            \
-  wh##_input_file->inprogress, (long)wh##_input_file->offset,  \
-  wh##_input_file->fd, wh##_input_file->rd ? "+" : ""
-static const char *debug_ipf_path(InputFile *ipf) {
-  char *slash= strrchr(ipf->path,'/');
-  return slash ? slash+1 : ipf->path;
+  return xasprintf("%p/%s:ip=%ld,off=%ld,fd=%d%s",
+                  ipf, path,
+                  ipf->inprogress, (long)ipf->offset,
+                  ipf->fd, ipf->rd ? "+" : "");
 }
 
-EVERY(period, -1,0, {
+EVERY(period, -1,0, ({
+  char *dipf_main=     debug_report_ipf(main_input_file);
+  char *dipf_flushing= debug_report_ipf(flushing_input_file);
+  char *dipf_backlog=  debug_report_ipf(backlog_input_file);
+
   debug("PERIOD"
        " sms=%s[%d] conns=%d queue=%d until_connect=%d"
-       " input_files" DEBUGF_IPF(main) DEBUGF_IPF(old) DEBUGF_IPF(flushing)
-       " children connecting=%ld inndcomm_child"
+       " input_files main:%s old:%s flushing:%s"
+       " children connecting=%ld inndcomm_child=%ld"
        ,
        sms_names[sms], sm_period_counter,
-       queue.count, conns.count, until_connect,
-       DEBUG_IPF(main), DEBUG_IPF(flushing), DEBUG_IPF(flushing),
+         conns.count, queue.count, until_connect,
+       dipf_main, dipf_flushing, dipf_backlog,
        (long)connecting_child, (long)inndcomm_child
        );
 
+  free(dipf_main);
+  free(dipf_flushing);
+  free(dipf_backlog);
+
   if (until_connect) until_connect--;
 
   poll_backlog_file();
@@ -2498,7 +2532,7 @@ EVERY(period, -1,0, {
   statemc_period_poll();
   check_assign_articles();
   check_idle_conns();
-});
+}));
 
 
 /*========== option parsing ==========*/
@@ -2506,7 +2540,7 @@ EVERY(period, -1,0, {
 static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
 static void vbadusage(const char *fmt, va_list al) {
   char *m= xvasprintf(fmt,al);
-  fprintf(stderr, "bad usage: %s\n%s"
+  fprintf(stderr, "bad usage: %s\n"
          "say --help for help, or read the manpage\n",
          m);
   if (become_daemon)
@@ -2599,7 +2633,7 @@ static void print_options(const Option *options, FILE *f) {
   for (o=options; o->shrt || o->lng; o++) {
     char shrt[2] = { o->shrt, 0 };
     char *optspec= xasprintf("%s%s%s%s%s",
-                            o->shrt ? "-" : "", o->shrt,
+                            o->shrt ? "-" : "", shrt,
                             o->shrt && o->lng ? "|" : "",
                             DELIMPERHAPS("--", o->lng));
     fprintf(f, "  %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
@@ -2653,13 +2687,6 @@ static void op_seconds(const Option *o, const char *val) {
   *store= v;
 }
 
-static void op_periods_rndup(const Option *o, const char *val) {
-  int *store= o->store;
-  op_seconds(o,val);
-  *store += period_seconds-1;
-  *store /= period_seconds;
-}
-
 static void op_setint(const Option *o, const char *val) {
   int *store= o->store;
   *store= o->intval;
@@ -2670,26 +2697,35 @@ static void op_setint(const Option *o, const char *val) {
 static void help(const Option *o, const char *val);
 
 static const Option innduct_options[]= {
-{'f',"feedfile",         "F",   &feedfile,             op_string           },
-{'q',"quiet-multiple",   0,     &quiet_multiple,       op_setint,   1    },
-{0,"help",               0,                            help             },
-
-{0,"max-connections",    "N",   &max_connections,      op_integer          },
-{0,"max-queue-per-conn", "N",   &max_queue_per_conn,   op_integer          },
-
+{'f',"feedfile",         "F",     &feedfile,                 op_string      },
+{'q',"quiet-multiple",   0,       &quiet_multiple,           op_setint, 1   },
+{0,"no-daemon",          0,       &become_daemon,            op_setint, 0   },
+{0,"no-streaming",       0,       &try_stream,               op_setint, 0   },
+{0,"inndconf",           "F",     &inndconffile,             op_string      },
+{'P',"port",             "PORT",  &port,                     op_integer     },
+{0,"help",               0,       0,                         help           },
+
+{0,"max-connections",    "N",     &max_connections,          op_integer     },
+{0,"max-queue-per-conn", "N",     &max_queue_per_conn,       op_integer     },
+{0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer     },
+{0,"period-interval",    "TIME",  &period_seconds,           op_seconds     },
+
+{0,"connection-timeout", "TIME",  &connection_setup_timeout, op_seconds     },
+{0,"stuck-flush-timeout","TIME",  &inndcomm_flush_timeout,   op_seconds     },
+
+{0,"no-check-proportion",   "PERCENT",   &nocheck_thresh,       op_double   },
+{0,"no-check-response-time","ARTICLES",  &nocheck_decay,        op_double   },
+
+{0,"reconnect-interval",     "PERIOD", &reconnect_delay_periods,  op_seconds },
+{0,"flush-retry-interval",   "PERIOD", &flushfail_retry_periods,  op_seconds },
+{0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
+{0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
+{0,"max-flush-interval",     "PERIOD", &spontaneous_flush_periods,op_seconds },
+{0,"idle-timeout",           "PERIOD", &need_activity_periods,    op_seconds },
+
+{0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio,   op_double    },
+{0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer   },
 
-{0,"streaming",          0,     &try_stream,           op_setint,  1     },
-{0,"no-streaming",       0,     &try_stream,           op_setint,  0     },
-{'P',"port",             "PORT",&port,                 op_integer          },
-{0,"inndconf",           "F",   &inndconffile,         op_string           },
-{0,"no-daemon",          0,     &become_daemon,        op_setint,  0     },
-
-{0,"no-check-proportion","PERCENT", &nocheck_thresh_pct,      op_double },
-{0,"no-check-filter",    "ARTICLES", &nocheck_decay_articles, op_double },
-
-{0,"reconnect-interval",  "TIME", &reconnect_delay_periods, op_periods_rndup },
-{0,"flush-retry-interval","TIME", &flushfail_retry_periods, op_periods_rndup },
-{0,"inndcomm-timeout",    "TIME",  &inndcomm_flush_timeout,   op_seconds },
 {0,0}
 };
 
@@ -2708,6 +2744,11 @@ static void help(const Option *o, const char *val) {
   exit(0);
 }
 
+static void convert_to_periods_rndup(int *store) {
+  *store += period_seconds-1;
+  *store /= period_seconds;
+}
+
 int main(int argc, char **argv) {
   if (!argv[1]) {
     printusage(stderr);
@@ -2727,24 +2768,33 @@ int main(int argc, char **argv) {
 
   if (!remote_host) remote_host= sitename;
 
-  if (nocheck_thresh_pct < 0 || nocheck_thresh_pct > 100)
+  if (nocheck_thresh < 0 || nocheck_thresh > 100)
     badusage("nocheck threshold percentage must be between 0..100");
-  nocheck_thresh= nocheck_thresh_pct * 0.01;
+  nocheck_thresh *= 0.01;
 
-  if (nocheck_decay_articles < 0.1)
+  if (nocheck_decay < 0.1)
     badusage("nocheck decay articles must be at least 0.1");
-  nocheck_decay= 1 - 1/nocheck_decay_articles;
-
-  if (!pathoutgoing)
-    pathoutgoing= innconf->pathoutgoing;
-  innconf_read(inndconffile);
-
-  if (!feedfile)
-    feedfile= xasprintf("%s/%s",pathoutgoing,sitename);
-  else if (!feedfile[0])
+  nocheck_decay= 1 - 1.0/nocheck_decay;
+
+  convert_to_periods_rndup(&reconnect_delay_periods);
+  convert_to_periods_rndup(&flushfail_retry_periods);
+  convert_to_periods_rndup(&backlog_retry_minperiods);
+  convert_to_periods_rndup(&backlog_spontrescan_periods);
+  convert_to_periods_rndup(&spontaneous_flush_periods);
+  convert_to_periods_rndup(&need_activity_periods);
+
+  if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
+    badusage("bad input data ratio must be between 0..100");
+  max_bad_data_ratio *= 0.01;
+
+  if (!feedfile) {
+    innconf_read(inndconffile);
+    feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename);
+  } else if (!feedfile[0]) {
     badusage("feed filename must be nonempty");
-  else if (feedfile[strlen(feedfile)-1]=='/')
+  } else if (feedfile[strlen(feedfile)-1]=='/') {
     feedfile= xasprintf("%s%s",feedfile,sitename);
+  }
 
   const char *feedfile_forbidden= "?*[~#";
   int c;