chiark / gitweb /
get sense of ferror check right
[innduct.git] / backends / innduct.c
index acbae608ad4ecef16452814f886d56ef3a565d8d..b02e9d1f98b4e87142f862b3813240f42b0c0422 100644 (file)
@@ -1,27 +1,11 @@
 /*
- * bugs
- *
-
- [740] <sit> info: processed feedfile(null) read=4(+0bl,+6err) offered=5(ch5,nc0) accepted=0(ch0+nc0) unwanted=0(0id+0bd+0nc) rejected=0(0id+0bd+0nc) deferred=0(0id+0bd+0nc) missing=2(0id+2bd+0nc) connretry=0(0id+0bd+0nc)
-
-    (null) ?!
-
-   also unwanted should be nonzero I think
-
-   %d(  is too hard to read
-
- [740] <sit> warning: corrupted file: /home/ian/things/Innfeed/inn2-2.4.5/fee, offset 349: line partially blanked: in `                                                  @050000002D130000006A0000000000000000@ <mi'..
-
-
  * todo
- *  - actually do something with readable on control master
- *  - option for realsockdir
- *  - option for filepoll
- *  - option for no inotify
+ *  - some per-conn info thing for control
  *  - manpage: document control master stuff
- *  - manpage: innconf is used for communicating with innd
- *  - debug this:
- *      build-lfs/backends/innduct --no-daemon -f `pwd`/fee sit dom
+ *  - admin-initiated flush
+ *
+ * debugging rune:
+ *  build-lfs/backends/innduct --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost
  */
 
 /*
@@ -319,6 +303,8 @@ static void period(void);
 static void open_defer(void);
 static void close_defer(void);
 static void search_backlog_file(void);
+static void preterminate(void);
+static void defraise(int signo);
 
 static void inputfile_reading_start(InputFile *ipf);
 static void inputfile_reading_stop(InputFile *ipf);
@@ -339,7 +325,7 @@ static oop_rd_call peer_rd_err, peer_rd_ok;
 static const char *sitename, *remote_host;
 static const char *feedfile, *realsockdir="/tmp/innduct.control";
 static int quiet_multiple=0;
-static int become_daemon=1;
+static int become_daemon=1, try_filemon=1;
 static int try_stream=1;
 static int port=119;
 static const char *inndconffile;
@@ -348,6 +334,7 @@ static int max_connections=10;
 static int max_queue_per_conn=200;
 static int target_max_feedfile_size=100000;
 static int period_seconds=60;
+static int filepoll_seconds=5;
 
 static int connection_setup_timeout=200;
 static int inndcomm_flush_timeout=100;
@@ -387,7 +374,7 @@ typedef enum {      /* in queue                 in conn->sent             */
   RCN(missing)                                 \
   RCN(connretry)
 
-#define RCI_TRIPLE_FMT_BASE "%d(%did+%dbd+%dnc)"
+#define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)"
 #define RCI_TRIPLE_VALS_BASE(counts,x)         \
        counts[art_Unchecked] x                 \
        + counts[art_Wanted] x                  \
@@ -505,7 +492,7 @@ static int sm_period_counter;
 /* initialisation to 0 is good */
 static int until_connect, until_backlog_nextscan;
 static double accept_proportion;
-static int nocheck, nocheck_reported;
+static int nocheck, nocheck_reported, in_child;
 
 /* for simulation, debugging, etc. */
 int simulate_flush= -1;
@@ -545,6 +532,7 @@ static void logv(int sysloglevel, const char *pfx, int errnoval,
 #define diewrap(fn, pfx, sysloglevel, err, estatus)            \
   static void fn(const char *fmt, ...) NORET_PRINTF(1,2);      \
   static void fn(const char *fmt, ...) {                       \
+    preterminate();                                            \
     VA;                                                                \
     logv(sysloglevel, pfx, err, fmt, al);                      \
     exit(estatus);                                             \
@@ -669,6 +657,11 @@ static time_t xtime(void) {
   return now;
 }
 
+static void xsigaction(int s, const struct sigaction *sa) {
+  int r= sigaction(s,sa,0);
+  if (r) sysdie("sigaction failed for \"%s\"", strsignal(s));
+}
+
 static void xgettimeofday(struct timeval *tv_r) {
   int r= gettimeofday(tv_r,0);
   if (r) sysdie("gettimeofday(2) failed");
@@ -802,14 +795,28 @@ CCMD(help) {
 CCMD(period) { period(); }
 CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
 CCMD(setint) { *(int*)c->xdata= c->xval; }
+CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
+
+CCMD(stop) {
+  preterminate();
+  notice("terminating (CTRL%d)",cc->fd);
+  defraise(SIGTERM);
+  abort();
+}
 
 static const ControlCommand control_commands[]= {
   { "h",             ccmd_help },
   { "p",             ccmd_period },
+  { "stop",          ccmd_stop },
+
+#define POKES(cmd,func)                                                        \
+  { cmd "sm",        func,           &sm_period_counter,       1 },    \
+  { cmd "conn",      func,           &until_connect,           0 },    \
+  { cmd "blscan",    func,           &until_backlog_nextscan,  0 },
+POKES("prod ", ccmd_setint_period)
+POKES("next ", ccmd_setint)
+
   { "pretend flush", ccmd_setintarg, &simulate_flush             },
-  { "poke sm",       ccmd_setint,    &sm_period_counter,       1 },
-  { "poke conn",     ccmd_setint,    &until_connect,           0 },
-  { "poke blscan",   ccmd_setint,    &until_backlog_nextscan,  0 },
   { "wedge blscan",  ccmd_setint,    &until_backlog_nextscan, -1 },
   { 0 }
 };
@@ -964,7 +971,7 @@ static void control_init(void) {
       uid_t self= geteuid();
       if (!S_ISDIR(stab.st_mode) ||
          stab.st_uid != self ||
-         stab.st_mode & 0077) {
+         stab.st_mode & 0007) {
        warn("no control socket, because real socket directory"
             " is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)",
             !!S_ISDIR(stab.st_mode),
@@ -1674,6 +1681,11 @@ static void article_done(Conn *conn, Article *art, int whichcount) {
 
   while (art->blanklen) {
     static const char spaces[]=
+      "                                                                "
+      "                                                                "
+      "                                                                "
+      "                                                                "
+      "                                                                "
       "                                                                "
       "                                                                "
       "                                                                "
@@ -1719,15 +1731,18 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
     return OOP_CONTINUE;
   }
 
+  int conn_busy=
+    conn->waiting.count ||
+    conn->priority.count ||
+    conn->sent.count ||
+    conn->xmitu;
+
   if (conn->quitting) {
     if (code!=205 && code!=503) {
       connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
     } else {
-      notice("C%d idle connection closed", conn->fd);
-      assert(!conn->waiting.count);
-      assert(!conn->priority.count);
-      assert(!conn->sent.count);
-      assert(!conn->xmitu);
+      notice("C%d idle connection closed by us", conn->fd);
+      assert(!conn_busy);
       LIST_REMOVE(conns,conn);
       conn_dispose(conn);
     }
@@ -1737,25 +1752,36 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
   conn->since_activity= 0;
   Article *art;
 
-#define GET_ARTICLE(musthavesent)                                          \
-  art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
-  if (art) ; else return OOP_CONTINUE /* reply_check has failed the conn */
+#define GET_ARTICLE(musthavesent) do{                                        \
+    art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
+    if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */      \
+  }while(0) 
 
-#define ARTICLE_DEALTWITH(streaming,musthavesent,how)          \
-  code_streaming= (streaming);                                 \
-  GET_ARTICLE(musthavesent);                                   \
-  article_done(conn, art, RC_##how);  break;
+#define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{      \
+    code_streaming= (streaming);                               \
+    GET_ARTICLE(musthavesent);                                 \
+    article_done(conn, art, RC_##how);                         \
+    goto dealtwith;                                            \
+  }while(0)
 
-#define PEERBADMSG(m) connfail(conn, m ": %s", sani);  return OOP_CONTINUE
+#define PEERBADMSG(m) do {                                     \
+    connfail(conn, m ": %s", sani);  return OOP_CONTINUE;      \
+  }while(0)
 
   int code_streaming= 0;
 
   switch (code) {
 
   case 400: PEERBADMSG("peer stopped accepting articles");
-  case 503: PEERBADMSG("peer timed us out");
   default:  PEERBADMSG("peer sent unexpected message");
 
+  case 503:
+    if (conn_busy) PEERBADMSG("peer timed us out");
+    notice("C%d idle connection closed by peer", conn->fd);
+    LIST_REMOVE(conns,conn);
+    conn_dispose(conn);
+    return OOP_CONTINUE;
+
   case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
   case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
 
@@ -1787,6 +1813,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
     break;
 
   }
+dealtwith:
 
   conn_maybe_write(conn);
   check_assign_articles();
@@ -1917,7 +1944,7 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd,
   art->midlen= midlen;
   art->ipf= ipf;  ipf->inprogress++;
   art->token= TextToToken(tokentextbuf);
-  art->offset= ipf->offset;
+  art->offset= old_offset;
   art->blanklen= recsz;
   strcpy(art->messageid, space+1);
   LIST_ADDTAIL(queue, art);
@@ -1995,10 +2022,10 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
 
 /*---------- filemon implemented with inotify ----------*/
 
-#if defined(HAVE_INOTIFY) && !defined(HAVE_FILEMON)
+#if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON)
 #define HAVE_FILEMON
 
-#include <linux/inotify.h>
+#include <sys/inotify.h>
 
 static int filemon_inotify_fd;
 static int filemon_inotify_wdmax;
@@ -2014,7 +2041,7 @@ static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
 
   if (wd >= filemon_inotify_wdmax) {
     int newmax= wd+2;
-    filemon_inotify_wd= xrealloc(filemon_inotify_wd2ipf,
+    filemon_inotify_wd2ipf= xrealloc(filemon_inotify_wd2ipf,
                                 sizeof(*filemon_inotify_wd2ipf) * newmax);
     memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
           sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
@@ -2033,7 +2060,7 @@ static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
   int wd= pf->wd;
   debug("filemon inotify stopfile %p wd=%d", ipf, wd);
-  int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd);
+  int r= inotify_rm_watch(filemon_inotify_fd, wd);
   if (r) sysdie("inotify_rm_watch");
   filemon_inotify_wd2ipf[wd]= 0;
 }
@@ -2052,7 +2079,7 @@ static void *filemon_inotify_readable(oop_source *lp, int fd,
       die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
     }
     InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
-    debug("filemon inotify readable read %p wd=%p", iev.wd, ipf);
+    debug("filemon inotify readable read %p wd=%d", ipf, iev.wd);
     filemon_callback(ipf);
   }
   return OOP_CONTINUE;
@@ -2061,11 +2088,11 @@ static void *filemon_inotify_readable(oop_source *lp, int fd,
 static int filemon_method_init(void) {
   filemon_inotify_fd= inotify_init();
   if (filemon_inotify_fd<0) {
-    syswarn("could not initialise inotify: inotify_init failed");
+    syswarn("filemon/inotify: inotify_init failed");
     return 0;
   }
-  set nonblock;
-  loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable);
+  xsetnonblock(filemon_inotify_fd, 1);
+  loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable, 0);
 
   debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd);
   return 1;
@@ -2079,7 +2106,10 @@ static int filemon_method_init(void) {
 
 struct Filemon_Perfile { int dummy; };
 
-static int filemon_method_init(void) { return 0; }
+static int filemon_method_init(void) {
+  warn("filemon/dummy: no filemon method compiled in");
+  return 0;
+}
 static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
 
@@ -2400,20 +2430,26 @@ static int inputfile_is_done(InputFile *ipf) {
   return 1;
 }
 
-static void notice_processed(InputFile *ipf, const char *what,
-                            const char *spec) {
+static void notice_processed(InputFile *ipf, int completed,
+                            const char *what, const char *spec) {
+  if (!ipf) return; /* allows preterminate to be lazy */
+
 #define RCI_NOTHING(x) /* nothing */
 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
 
 #define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
 
-  info("processed %s%s read=%d(+%dbl,+%derr)"
-       " offered=%d(ch%d,nc%d) accepted=%d(ch%d+nc%d)"
+  char *inprog= completed
+    ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
+    : xasprintf(" inprogress=%ld", ipf->inprogress);
+
+  info("%s %s%s read=%d (+bl=%d,+err=%d)%s"
+       " offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
        RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
        ,
-       what, spec,
-       ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err,
+       completed?"completed":"processed", what, spec,
+       ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err, inprog,
        CNT(Unchecked,sent) + CNT(Unsolicited,sent)
        , CNT(Unchecked,sent), CNT(Unsolicited,sent),
        CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
@@ -2421,6 +2457,8 @@ static void notice_processed(InputFile *ipf, const char *what,
        RESULT_COUNTS(RCI_NOTHING,  RCI_TRIPLE_VALS)
        );
 
+  free(inprog);
+
 #undef CNT
 }
 
@@ -2433,7 +2471,7 @@ static void statemc_check_backlog_done(void) {
   const char *under= strchr(slash, '_');
   const char *rest= under ? under+1 : leaf;
   if (!strncmp(rest,"backlog",7)) rest += 7;
-  notice_processed(ipf,"backlog:",rest);
+  notice_processed(ipf,1,"backlog ",rest);
 
   close_input_file(ipf);
   if (unlink(ipf->path)) {
@@ -2455,7 +2493,7 @@ static void statemc_check_flushing_done(void) {
 
   assert(sms==sm_SEPARATED || sms==sm_DROPPING);
 
-  notice_processed(ipf,"feedfile",0);
+  notice_processed(ipf,1,"feedfile","");
 
   close_defer();
 
@@ -2467,7 +2505,7 @@ static void statemc_check_flushing_done(void) {
 
   if (sms==sm_SEPARATED) {
     notice("flush complete");
-    SMS(NORMAL, 0, "flush complete");
+    SMS(NORMAL, spontaneous_flush_periods, "flush complete");
   } else if (sms==sm_DROPPING) {
     SMS(DROPPED, 0, "old flush complete");
     search_backlog_file();
@@ -2656,6 +2694,7 @@ static void search_backlog_file(void) {
     debug("backlog scan: none");
 
     if (sms==sm_DROPPED) {
+      preterminate();
       notice("feed dropped and our work is complete");
 
       int r= unlink(path_control);
@@ -2702,6 +2741,74 @@ static void search_backlog_file(void) {
   return;
 }
 
+/*---------- shutdown and signal handling ----------*/
+
+static void preterminate(void) {
+  if (in_child) return;
+  notice_processed(main_input_file,0,"feedfile","");
+  notice_processed(flushing_input_file,0,"flushing file","");
+  if (backlog_input_file)
+    notice_processed(backlog_input_file,0, "backlog file ",
+                    backlog_input_file->path);
+}
+
+static int signal_self_pipe[2];
+static sig_atomic_t terminate_sig_flag;
+
+static void defraise(int signo) {
+  struct sigaction sa;
+  memset(&sa,0,sizeof(sa));
+  sa.sa_handler= SIG_DFL;
+  xsigaction(signo,&sa);
+  raise(signo);
+}
+
+static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
+  assert(fd=signal_self_pipe[0]);
+  char buf[PIPE_BUF];
+  int r= read(signal_self_pipe[0], buf, sizeof(buf));
+  if (r<0 && !isewouldblock(errno)) sysdie("failed to read signal self pipe");
+  if (r==0) die("eof on signal self pipe");
+  if (terminate_sig_flag) {
+    preterminate();
+    notice("terminating (%s)", strsignal(terminate_sig_flag));
+    defraise(terminate_sig_flag);
+    abort();
+  }
+  return OOP_CONTINUE;
+}
+
+static void sigarrived_handler(int signum) {
+  static char x;
+  switch (signum) {
+  case SIGINT: case SIGTERM:
+    if (!terminate_sig_flag) terminate_sig_flag= signum;
+    break;
+  default:
+    abort();
+  }
+  write(signal_self_pipe[1],&x,1);
+}
+
+static void init_signals(void) {
+  if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
+    sysdie("could not ignore SIGPIPE");
+
+  if (pipe(signal_self_pipe)) sysfatal("create self-pipe for signals");
+
+  xsetnonblock(signal_self_pipe[0],1);
+  xsetnonblock(signal_self_pipe[1],1);
+
+  struct sigaction sa;
+  memset(&sa,0,sizeof(sa));
+  sa.sa_handler= sigarrived_handler;
+  sa.sa_flags= SA_RESTART;
+  xsigaction(SIGTERM,&sa);
+  xsigaction(SIGINT,&sa);
+
+  on_fd_read_except(signal_self_pipe[0], sigarrived_event);
+}
+
 /*========== flushing the feed ==========*/
 
 static pid_t inndcomm_child;
@@ -2844,6 +2951,8 @@ static void postfork_stdio(FILE *f, const char *what, const char *what2) {
 }
 
 static void postfork(void) {
+  in_child= 1;
+
   if (signal(SIGPIPE, SIG_DFL) == SIG_ERR)
     sysdie("(in child) failed to reset SIGPIPE");
 
@@ -2905,7 +3014,7 @@ static char *debug_report_ipf(InputFile *ipf) {
   return xasprintf("%p/%s:ip=%ld,off=%ld,fd=%d%s",
                   ipf, path,
                   ipf->inprogress, (long)ipf->offset,
-                  ipf->fd, ipf->rd ? "+" : "");
+                  ipf->fd, ipf->rd ? "" : ",!rd");
 }
 
 static void period(void) {
@@ -2915,7 +3024,7 @@ static void period(void) {
 
   debug("PERIOD"
        " sms=%s[%d] conns=%d queue=%d until_connect=%d"
-       " input_files main:%s old:%s flushing:%s"
+       " input_files main:%s flushing:%s backlog:%s"
        " children connecting=%ld inndcomm=%ld"
        ,
        sms_names[sms], sm_period_counter,
@@ -3108,8 +3217,10 @@ static const Option innduct_options[]= {
 {'q',"quiet-multiple",   0,       &quiet_multiple,           op_setint, 1   },
 {0,"no-daemon",          0,       &become_daemon,            op_setint, 0   },
 {0,"no-streaming",       0,       &try_stream,               op_setint, 0   },
+{0,"no-filemon",         0,       &try_filemon,              op_setint, 0   },
 {'C',"inndconf",         "F",     &inndconffile,             op_string      },
 {'P',"port",             "PORT",  &port,                     op_integer     },
+{0,"ctrl-sock-dir",      0,       &realsockdir,              op_string      },
 {0,"help",               0,       0,                         help           },
 
 {0,"max-connections",    "N",     &max_connections,          op_integer     },
@@ -3117,8 +3228,9 @@ static const Option innduct_options[]= {
 {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer     },
 {0,"period-interval",    "TIME",  &period_seconds,           op_seconds     },
 
-{0,"connection-timeout", "TIME",  &connection_setup_timeout, op_seconds     },
-{0,"stuck-flush-timeout","TIME",  &inndcomm_flush_timeout,   op_seconds     },
+{0,"connection-timeout",   "TIME",  &connection_setup_timeout, op_seconds   },
+{0,"stuck-flush-timeout",  "TIME",  &inndcomm_flush_timeout,   op_seconds   },
+{0,"feedfile-poll",        "TIME",  &filepoll_seconds,         op_seconds   },
 
 {0,"no-check-proportion",   "PERCENT",   &nocheck_thresh,       op_double   },
 {0,"no-check-response-time","ARTICLES",  &nocheck_decay,        op_double   },
@@ -3223,9 +3335,6 @@ int main(int argc, char **argv) {
   if (!sysloop) sysdie("could not create liboop event loop");
   loop= (oop_source*)sysloop;
 
-  if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
-    sysdie("could not ignore SIGPIPE");
-
   LIST_INIT(conns);
   LIST_INIT(queue);
 
@@ -3258,6 +3367,8 @@ int main(int argc, char **argv) {
 
   statemc_lock();
 
+  init_signals();
+
   notice("starting");
 
   if (!become_daemon)
@@ -3265,10 +3376,16 @@ int main(int argc, char **argv) {
 
   control_init();
 
-  if (!filemon_method_init()) {
-    warn("no file monitoring available, polling");
-    every(5,0,filepoll);
+  int filemon_ok= 0;
+  if (!try_filemon) {
+    notice("filemon: suppressed by command line option, polling");
+  } else {
+    filemon_ok= filemon_method_init();
+    if (!filemon_ok)
+      warn("filemon: no file monitoring available, polling");
   }
+  if (!filemon_ok)
+    every(filepoll_seconds,0,filepoll);
 
   every(period_seconds,1,period);