chiark / gitweb /
get sense of ferror check right
[innduct.git] / backends / innduct.c
index 6060ebd6240538dbcbe384b920194f3b67adea0e..b02e9d1f98b4e87142f862b3813240f42b0c0422 100644 (file)
@@ -1,18 +1,11 @@
 /*
- * warning if no inotify
- * inotify not working ?
- * some per-conn info thing for control
-
  * todo
- *  - actually do something with readable on control master
- *  - option for realsockdir
- *  - option for filepoll
- *  - option for no inotify
+ *  - some per-conn info thing for control
  *  - manpage: document control master stuff
- *  - manpage: innconf is used for communicating with innd
- *  - debug this:
- *      build-lfs/backends/innduct --no-daemon -f `pwd`/fee sit dom
+ *  - admin-initiated flush
+ *
+ * debugging rune:
+ *  build-lfs/backends/innduct --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost
  */
 
 /*
@@ -310,6 +303,8 @@ static void period(void);
 static void open_defer(void);
 static void close_defer(void);
 static void search_backlog_file(void);
+static void preterminate(void);
+static void defraise(int signo);
 
 static void inputfile_reading_start(InputFile *ipf);
 static void inputfile_reading_stop(InputFile *ipf);
@@ -330,7 +325,7 @@ static oop_rd_call peer_rd_err, peer_rd_ok;
 static const char *sitename, *remote_host;
 static const char *feedfile, *realsockdir="/tmp/innduct.control";
 static int quiet_multiple=0;
-static int become_daemon=1;
+static int become_daemon=1, try_filemon=1;
 static int try_stream=1;
 static int port=119;
 static const char *inndconffile;
@@ -339,6 +334,7 @@ static int max_connections=10;
 static int max_queue_per_conn=200;
 static int target_max_feedfile_size=100000;
 static int period_seconds=60;
+static int filepoll_seconds=5;
 
 static int connection_setup_timeout=200;
 static int inndcomm_flush_timeout=100;
@@ -496,7 +492,7 @@ static int sm_period_counter;
 /* initialisation to 0 is good */
 static int until_connect, until_backlog_nextscan;
 static double accept_proportion;
-static int nocheck, nocheck_reported;
+static int nocheck, nocheck_reported, in_child;
 
 /* for simulation, debugging, etc. */
 int simulate_flush= -1;
@@ -536,6 +532,7 @@ static void logv(int sysloglevel, const char *pfx, int errnoval,
 #define diewrap(fn, pfx, sysloglevel, err, estatus)            \
   static void fn(const char *fmt, ...) NORET_PRINTF(1,2);      \
   static void fn(const char *fmt, ...) {                       \
+    preterminate();                                            \
     VA;                                                                \
     logv(sysloglevel, pfx, err, fmt, al);                      \
     exit(estatus);                                             \
@@ -660,6 +657,11 @@ static time_t xtime(void) {
   return now;
 }
 
+static void xsigaction(int s, const struct sigaction *sa) {
+  int r= sigaction(s,sa,0);
+  if (r) sysdie("sigaction failed for \"%s\"", strsignal(s));
+}
+
 static void xgettimeofday(struct timeval *tv_r) {
   int r= gettimeofday(tv_r,0);
   if (r) sysdie("gettimeofday(2) failed");
@@ -793,14 +795,28 @@ CCMD(help) {
 CCMD(period) { period(); }
 CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
 CCMD(setint) { *(int*)c->xdata= c->xval; }
+CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
+
+CCMD(stop) {
+  preterminate();
+  notice("terminating (CTRL%d)",cc->fd);
+  defraise(SIGTERM);
+  abort();
+}
 
 static const ControlCommand control_commands[]= {
   { "h",             ccmd_help },
   { "p",             ccmd_period },
+  { "stop",          ccmd_stop },
+
+#define POKES(cmd,func)                                                        \
+  { cmd "sm",        func,           &sm_period_counter,       1 },    \
+  { cmd "conn",      func,           &until_connect,           0 },    \
+  { cmd "blscan",    func,           &until_backlog_nextscan,  0 },
+POKES("prod ", ccmd_setint_period)
+POKES("next ", ccmd_setint)
+
   { "pretend flush", ccmd_setintarg, &simulate_flush             },
-  { "poke sm",       ccmd_setint,    &sm_period_counter,       1 },
-  { "poke conn",     ccmd_setint,    &until_connect,           0 },
-  { "poke blscan",   ccmd_setint,    &until_backlog_nextscan,  0 },
   { "wedge blscan",  ccmd_setint,    &until_backlog_nextscan, -1 },
   { 0 }
 };
@@ -955,7 +971,7 @@ static void control_init(void) {
       uid_t self= geteuid();
       if (!S_ISDIR(stab.st_mode) ||
          stab.st_uid != self ||
-         stab.st_mode & 0077) {
+         stab.st_mode & 0007) {
        warn("no control socket, because real socket directory"
             " is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)",
             !!S_ISDIR(stab.st_mode),
@@ -1715,15 +1731,18 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
     return OOP_CONTINUE;
   }
 
+  int conn_busy=
+    conn->waiting.count ||
+    conn->priority.count ||
+    conn->sent.count ||
+    conn->xmitu;
+
   if (conn->quitting) {
     if (code!=205 && code!=503) {
       connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
     } else {
-      notice("C%d idle connection closed", conn->fd);
-      assert(!conn->waiting.count);
-      assert(!conn->priority.count);
-      assert(!conn->sent.count);
-      assert(!conn->xmitu);
+      notice("C%d idle connection closed by us", conn->fd);
+      assert(!conn_busy);
       LIST_REMOVE(conns,conn);
       conn_dispose(conn);
     }
@@ -1733,25 +1752,36 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
   conn->since_activity= 0;
   Article *art;
 
-#define GET_ARTICLE(musthavesent)                                          \
-  art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
-  if (art) ; else return OOP_CONTINUE /* reply_check has failed the conn */
+#define GET_ARTICLE(musthavesent) do{                                        \
+    art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
+    if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */      \
+  }while(0) 
 
-#define ARTICLE_DEALTWITH(streaming,musthavesent,how)          \
-  code_streaming= (streaming);                                 \
-  GET_ARTICLE(musthavesent);                                   \
-  article_done(conn, art, RC_##how);  break;
+#define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{      \
+    code_streaming= (streaming);                               \
+    GET_ARTICLE(musthavesent);                                 \
+    article_done(conn, art, RC_##how);                         \
+    goto dealtwith;                                            \
+  }while(0)
 
-#define PEERBADMSG(m) connfail(conn, m ": %s", sani);  return OOP_CONTINUE
+#define PEERBADMSG(m) do {                                     \
+    connfail(conn, m ": %s", sani);  return OOP_CONTINUE;      \
+  }while(0)
 
   int code_streaming= 0;
 
   switch (code) {
 
   case 400: PEERBADMSG("peer stopped accepting articles");
-  case 503: PEERBADMSG("peer timed us out");
   default:  PEERBADMSG("peer sent unexpected message");
 
+  case 503:
+    if (conn_busy) PEERBADMSG("peer timed us out");
+    notice("C%d idle connection closed by peer", conn->fd);
+    LIST_REMOVE(conns,conn);
+    conn_dispose(conn);
+    return OOP_CONTINUE;
+
   case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
   case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
 
@@ -1783,6 +1813,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
     break;
 
   }
+dealtwith:
 
   conn_maybe_write(conn);
   check_assign_articles();
@@ -2048,7 +2079,7 @@ static void *filemon_inotify_readable(oop_source *lp, int fd,
       die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
     }
     InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
-    debug("filemon inotify readable read %d wd=%p", iev.wd, ipf);
+    debug("filemon inotify readable read %p wd=%d", ipf, iev.wd);
     filemon_callback(ipf);
   }
   return OOP_CONTINUE;
@@ -2399,20 +2430,26 @@ static int inputfile_is_done(InputFile *ipf) {
   return 1;
 }
 
-static void notice_processed(InputFile *ipf, const char *what,
-                            const char *spec) {
+static void notice_processed(InputFile *ipf, int completed,
+                            const char *what, const char *spec) {
+  if (!ipf) return; /* allows preterminate to be lazy */
+
 #define RCI_NOTHING(x) /* nothing */
 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
 
 #define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
 
-  info("processed %s%s read=%d (+bl=%d,+err=%d)"
+  char *inprog= completed
+    ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
+    : xasprintf(" inprogress=%ld", ipf->inprogress);
+
+  info("%s %s%s read=%d (+bl=%d,+err=%d)%s"
        " offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
        RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
        ,
-       what, spec,
-       ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err,
+       completed?"completed":"processed", what, spec,
+       ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err, inprog,
        CNT(Unchecked,sent) + CNT(Unsolicited,sent)
        , CNT(Unchecked,sent), CNT(Unsolicited,sent),
        CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
@@ -2420,6 +2457,8 @@ static void notice_processed(InputFile *ipf, const char *what,
        RESULT_COUNTS(RCI_NOTHING,  RCI_TRIPLE_VALS)
        );
 
+  free(inprog);
+
 #undef CNT
 }
 
@@ -2432,7 +2471,7 @@ static void statemc_check_backlog_done(void) {
   const char *under= strchr(slash, '_');
   const char *rest= under ? under+1 : leaf;
   if (!strncmp(rest,"backlog",7)) rest += 7;
-  notice_processed(ipf,"backlog ",rest);
+  notice_processed(ipf,1,"backlog ",rest);
 
   close_input_file(ipf);
   if (unlink(ipf->path)) {
@@ -2454,7 +2493,7 @@ static void statemc_check_flushing_done(void) {
 
   assert(sms==sm_SEPARATED || sms==sm_DROPPING);
 
-  notice_processed(ipf,"feedfile","");
+  notice_processed(ipf,1,"feedfile","");
 
   close_defer();
 
@@ -2655,6 +2694,7 @@ static void search_backlog_file(void) {
     debug("backlog scan: none");
 
     if (sms==sm_DROPPED) {
+      preterminate();
       notice("feed dropped and our work is complete");
 
       int r= unlink(path_control);
@@ -2701,6 +2741,74 @@ static void search_backlog_file(void) {
   return;
 }
 
+/*---------- shutdown and signal handling ----------*/
+
+static void preterminate(void) {
+  if (in_child) return;
+  notice_processed(main_input_file,0,"feedfile","");
+  notice_processed(flushing_input_file,0,"flushing file","");
+  if (backlog_input_file)
+    notice_processed(backlog_input_file,0, "backlog file ",
+                    backlog_input_file->path);
+}
+
+static int signal_self_pipe[2];
+static sig_atomic_t terminate_sig_flag;
+
+static void defraise(int signo) {
+  struct sigaction sa;
+  memset(&sa,0,sizeof(sa));
+  sa.sa_handler= SIG_DFL;
+  xsigaction(signo,&sa);
+  raise(signo);
+}
+
+static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
+  assert(fd=signal_self_pipe[0]);
+  char buf[PIPE_BUF];
+  int r= read(signal_self_pipe[0], buf, sizeof(buf));
+  if (r<0 && !isewouldblock(errno)) sysdie("failed to read signal self pipe");
+  if (r==0) die("eof on signal self pipe");
+  if (terminate_sig_flag) {
+    preterminate();
+    notice("terminating (%s)", strsignal(terminate_sig_flag));
+    defraise(terminate_sig_flag);
+    abort();
+  }
+  return OOP_CONTINUE;
+}
+
+static void sigarrived_handler(int signum) {
+  static char x;
+  switch (signum) {
+  case SIGINT: case SIGTERM:
+    if (!terminate_sig_flag) terminate_sig_flag= signum;
+    break;
+  default:
+    abort();
+  }
+  write(signal_self_pipe[1],&x,1);
+}
+
+static void init_signals(void) {
+  if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
+    sysdie("could not ignore SIGPIPE");
+
+  if (pipe(signal_self_pipe)) sysfatal("create self-pipe for signals");
+
+  xsetnonblock(signal_self_pipe[0],1);
+  xsetnonblock(signal_self_pipe[1],1);
+
+  struct sigaction sa;
+  memset(&sa,0,sizeof(sa));
+  sa.sa_handler= sigarrived_handler;
+  sa.sa_flags= SA_RESTART;
+  xsigaction(SIGTERM,&sa);
+  xsigaction(SIGINT,&sa);
+
+  on_fd_read_except(signal_self_pipe[0], sigarrived_event);
+}
+
 /*========== flushing the feed ==========*/
 
 static pid_t inndcomm_child;
@@ -2843,6 +2951,8 @@ static void postfork_stdio(FILE *f, const char *what, const char *what2) {
 }
 
 static void postfork(void) {
+  in_child= 1;
+
   if (signal(SIGPIPE, SIG_DFL) == SIG_ERR)
     sysdie("(in child) failed to reset SIGPIPE");
 
@@ -3107,8 +3217,10 @@ static const Option innduct_options[]= {
 {'q',"quiet-multiple",   0,       &quiet_multiple,           op_setint, 1   },
 {0,"no-daemon",          0,       &become_daemon,            op_setint, 0   },
 {0,"no-streaming",       0,       &try_stream,               op_setint, 0   },
+{0,"no-filemon",         0,       &try_filemon,              op_setint, 0   },
 {'C',"inndconf",         "F",     &inndconffile,             op_string      },
 {'P',"port",             "PORT",  &port,                     op_integer     },
+{0,"ctrl-sock-dir",      0,       &realsockdir,              op_string      },
 {0,"help",               0,       0,                         help           },
 
 {0,"max-connections",    "N",     &max_connections,          op_integer     },
@@ -3116,8 +3228,9 @@ static const Option innduct_options[]= {
 {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer     },
 {0,"period-interval",    "TIME",  &period_seconds,           op_seconds     },
 
-{0,"connection-timeout", "TIME",  &connection_setup_timeout, op_seconds     },
-{0,"stuck-flush-timeout","TIME",  &inndcomm_flush_timeout,   op_seconds     },
+{0,"connection-timeout",   "TIME",  &connection_setup_timeout, op_seconds   },
+{0,"stuck-flush-timeout",  "TIME",  &inndcomm_flush_timeout,   op_seconds   },
+{0,"feedfile-poll",        "TIME",  &filepoll_seconds,         op_seconds   },
 
 {0,"no-check-proportion",   "PERCENT",   &nocheck_thresh,       op_double   },
 {0,"no-check-response-time","ARTICLES",  &nocheck_decay,        op_double   },
@@ -3222,9 +3335,6 @@ int main(int argc, char **argv) {
   if (!sysloop) sysdie("could not create liboop event loop");
   loop= (oop_source*)sysloop;
 
-  if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
-    sysdie("could not ignore SIGPIPE");
-
   LIST_INIT(conns);
   LIST_INIT(queue);
 
@@ -3257,6 +3367,8 @@ int main(int argc, char **argv) {
 
   statemc_lock();
 
+  init_signals();
+
   notice("starting");
 
   if (!become_daemon)
@@ -3264,10 +3376,16 @@ int main(int argc, char **argv) {
 
   control_init();
 
-  if (!filemon_method_init()) {
-    warn("filemon: no file monitoring available, polling");
-    every(5,0,filepoll);
+  int filemon_ok= 0;
+  if (!try_filemon) {
+    notice("filemon: suppressed by command line option, polling");
+  } else {
+    filemon_ok= filemon_method_init();
+    if (!filemon_ok)
+      warn("filemon: no file monitoring available, polling");
   }
+  if (!filemon_ok)
+    every(filepoll_seconds,0,filepoll);
 
   every(period_seconds,1,period);