chiark / gitweb /
report to debug changes in nocheck
[inn-innduct.git] / backends / innduct.c
index f64a453f621f205be0fed7eea366f652eff34b8d..89e15b63f7e2929d18925d7e0df3a13d111b1e89 100644 (file)
@@ -259,7 +259,6 @@ typedef struct InputFile {
   void *readable_callback_user;
 
   int fd;
-  const char *path; /* ptr copy of path_<foo> or feedfile */
   struct Filemon_Perfile *filemon;
 
   oop_read *rd;
@@ -267,17 +266,28 @@ typedef struct InputFile {
   off_t offset;
 
   Counts counts;
+  char path[];
 } InputFile;
 
+#define SMS_LIST(X)                            \
+  X(WAITING)                                   \
+  X(NORMAL)                                    \
+  X(FLUSHING)                                  \
+  X(FLUSHFAIL)                                 \
+  X(SEPARATED)                                 \
+  X(DROPPING)
+
 typedef enum {
-  sm_WAITING,
-  sm_NORMAL,
-  sm_FLUSHING,
-  sm_FLUSHFAIL,
-  sm_SEPARATED,
-  sm_DROPPING,
+#define SMS_DEF_ENUM(s) sm_##s,
+  SMS_LIST(SMS_DEF_ENUM)
 } StateMachineState;
 
+static const char *sms_names[]= {
+#define SMS_DEF_NAME(s) #s ,
+  SMS_LIST(SMS_DEF_NAME)
+  0
+};
+
 struct Conn {
   ISNODE(Conn);
   int fd, max_queue, stream;
@@ -291,7 +301,6 @@ struct Conn {
 
 /*----- operational variables -----*/
 
-static int since_connect_attempt;
 static int nconns;
 static LIST(Conn) idle, working, full;
 static LIST(Article) *queue;
@@ -303,7 +312,7 @@ static char *path_lock, *path_flushing, *path_defer;
 
 static StateMachineState sms;
 static FILE *defer;
-static InputFile *main_input_file, *old_input_file, *backlog_input_file;
+static InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
 static int sm_period_counter;
 
 
@@ -462,6 +471,7 @@ logwrap(warn,     " warning",  LOG_WARN,   -1,    0);
 
 logwrap(notice,   "",          LOG_NOTICE, -1,    0);
 logwrap(info,     " info",     LOG_INFO,   -1,    0);
+logwrap(debug,    " debug",    LOG_DEBUG,  -1,    0);
 
 
 /*========== making new connections ==========*/
@@ -938,11 +948,14 @@ static Article *article_reply_check(Connection *conn, const char *response,
 static void update_nocheck(int accepted) {
   accept_proportion *= accept_decay;
   accept_proportion += accepted;
-  nocheck= accept_proportion >= nocheck_thresh;
-  if (nocheck && !nocheck_reported) {
+  int new_nocheck= accept_proportion >= nocheck_thresh;
+  if (new_nocheck && !nocheck_reported) {
     notice("entering nocheck mode for the first time");
     nocheck_reported= 1;
+  } else if (new_nocheck != nockech) {
+    debug("nocheck mode %s", new_nocheck ? "start" : "stop");
   }
+  nocheck= new_nocheck;
 }
 
 static void article_done(Connection *conn, Article *art, int whichcount) {
@@ -1093,7 +1106,7 @@ static void feedfile_eof(InputFile *ipf) {
     return;
   }
 
-  assert(ipf == old_input_file);
+  assert(ipf == flushing_input_file);
 
   inputfile_tailing_stop(ipf);
   assert(ipf->fd >= 0);
@@ -1113,7 +1126,7 @@ static InputFile *open_input_file(const char *path) {
     sysfatal("unable to open input file %s", path);
   }
 
-  InputFile *ipf= xmalloc(sizeof(InputFile));
+  InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
   memset(ipf,0,sizeof(*ipf));
 
   ipf->readable.on_readable= tailing_on_readable;
@@ -1121,7 +1134,7 @@ static InputFile *open_input_file(const char *path) {
   ipf->readable.try_read=    tailing_try_read;
 
   ipf->fd= fd;
-  ipf->path= path;
+  strcpy(ipf->path, path);
 
   return ipf;
 }
@@ -1135,8 +1148,6 @@ static void close_input_file(InputFile *ipf) {
   if (ipf->fd >= 0)
     if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path);
 
-  fixme maybe free ipf->path;
-
   free(ipf);
 }
 
@@ -1177,7 +1188,9 @@ typedef void *feedfile_got_article(oop_source *lp, oop_read *rd,
   }
   ipf->offset += recsz + 1;
 
-  if (sms==sm_NORMAL && ipf->offset >= flush_threshold) {
+  if (sms==sm_NORMAL && ipf==main_input_file &&
+      (ipf->offset >= flush_threshold || !until_spontaneous_flush) {
+
     notice("starting flush (%lu >= %lu)",
           (unsigned long)ipf->offset, (unsigned long)flush_threshold);
 
@@ -1257,7 +1270,7 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
       if (ipf==main_input_file) {
        errno=EAGAIN;
        return -1;
-      } else if (ipf==old_input_file) {
+      } else if (ipf==flushing_input_file) {
        assert(ipf->fd>=0);
        assert(sms==sm_SEPARATED || sms==sm_DROPPING);
       } else if (ipf==backlog_input_file) {
@@ -1301,11 +1314,15 @@ static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
   assert(!filemon_inotify_wd2ipf[wd]);
   filemon_inotify_wd2ipf[wd]= ipf;
 
+  debug("filemon inotify startfile %p wd=%d wdmax=%d",
+       ipf, wd, filemon_inotify_wdmax);
+
   pf->wd= wd;
 }
 
 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
   int wd= pf->wd;
+  debug("filemon inotify stopfile %p wd=%d", ipf, wd);
   int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd);
   if (r) sysdie("inotify_rm_watch");
   filemon_inotify_wd2ipf[wd]= 0;
@@ -1325,6 +1342,7 @@ static void *filemon_inotify_readable(oop_source *lp, int fd,
       die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
     }
     InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
+    debug("filemon inotify readable read %p wd=%p", iev.wd, ipf);
     filemon_callback(ipf);
   }
   return OOP_CONTINUE;
@@ -1339,6 +1357,7 @@ static int filemon_method_init(void) {
   set nonblock;
   loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable);
 
+  debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd);
   return 1;
 }
 
@@ -1449,7 +1468,7 @@ static void inputfile_tailing_stop(InputFile *ipf) {
      |          V                               V
      |     =============                     ============
      |      SEPARATED/                        DROPPING/
-     |      old->fd>=0                        old->fd>=0
+     |      flsh->fd>=0                       flsh->fd>=0
      |     [Separated]                       [Dropping]
      |      main F idle                       main none
      |      old  D tail                       old  D tail
@@ -1459,7 +1478,7 @@ static void inputfile_tailing_stop(InputFile *ipf) {
      |          V                                 V
      |     ===============                   ===============
      |      SEPARATED/                        DROPPING/
-     |      old->fd==-1                       old->fd==-1
+     |      flsh->fd==-1                      flsh->fd==-1
      |     [Finishing]                       [Dropping]
      |      main F tail                              main none
      |      old  D closed                     old  D closed
@@ -1599,6 +1618,7 @@ static void statemc_waiting_poll(void) {
 static void startup_set_input_file(InputFile *f) {
   assert(!main_input_file);
   main_input_file= f;
+  until_spontaneous_flush= spontaneous_flush_periods;
   inputfile_tailing_start(f);
 }
 
@@ -1625,7 +1645,7 @@ static void *statemc_check_input_done(oop_source *lp,
     return;
   }
 
-  assert(ipf == old_input_file);
+  assert(ipf == flushing_input_file);
   assert(sms==sm_SEPARATED || sms==sm_DROPPING);
 
   notice_processed(ipf,"feed file",0);
@@ -1648,8 +1668,8 @@ static void *statemc_check_input_done(oop_source *lp,
 
   open_defer();
 
-  close_input_file(old_input_file);
-  old_input_file= 0;
+  close_input_file(flushing_input_file);
+  flushing_input_file= 0;
 
   notice("flush complete");
   SMS(NORMAL, 0, "flush complete");
@@ -1741,14 +1761,14 @@ static void close_defer(void) {
     sysdie("could not unlink old defer link %s to backlog file %s",
           path_defer, backlog);
 
-  if (backlog_nextscan_periods < 0 ||
-      backlog_nextscan_periods > backlog_retry_minperiods + 1)
-    backlog_nextscan_periods= backlog_retry_minperiods + 1;
+  if (until_backlog_nextscan < 0 ||
+      until_backlog_nextscan > backlog_retry_minperiods + 1)
+    until_backlog_nextscan= backlog_retry_minperiods + 1;
 }
 
 static void poll_backlog_file(void) {
-  if (backlog_nextscan_periods < 0) return;
-  if (backlog_nextscan_periods-- > 0) return;
+  if (until_backlog_nextscan < 0) return;
+  if (until_backlog_nextscan-- > 0) return;
   search_backlog_file();
 }
 
@@ -1805,7 +1825,7 @@ static int search_backlog_file(void) {
 
   if (!oldest_path) {
     debug("backlog scan: none");
-    backlog_nextscan_periods= backlog_spontaneous_rescan_periods;
+    until_backlog_nextscan= backlog_spontaneous_rescan_periods;
     return 0;
   }
 
@@ -1817,24 +1837,24 @@ static int search_backlog_file(void) {
     debug("backlog scan: found age=%f deficiency=%ld oldest=%s",
          age, age_deficiency, oldest_path);
 
-    backlog_input_file= open_input_file();
+    backlog_input_file= open_input_file(oldest_path);
     if (!backlog_input_file) {
       warn("backlog file %s vanished as we opened it", backlog_input_file);
       goto try_again;
     }
     inputfile_tailing_start(backlog_input_file);
-    backlog_nextscan_periods= -1;
+    until_backlog_nextscan= -1;
     return 1;
   }
 
-  backlog_nextscan_periods= age_deficiency / PERIOD_SECONDS;
+  until_backlog_nextscan= age_deficiency / PERIOD_SECONDS;
 
   if (backlog_spontaneous_rescan_periods >= 0 &&
-      backlog_nextscan_periods > backlog_spontaneous_rescan_periods)
-    backlog_nextscan_periods= backlog_spontaneous_rescan_periods;
+      until_backlog_nextscan > backlog_spontaneous_rescan_periods)
+    until_backlog_nextscan= backlog_spontaneous_rescan_periods;
 
   debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
-       age, age_deficiency, backlog_nextscan_periods, oldest_path);
+       age, age_deficiency, until_backlog_nextscan, oldest_path);
   return 2;
 }
 
@@ -1848,7 +1868,7 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
   loop->cancel_fd(fd);
   close(fd);
 
-  assert(!old_input_file);
+  assert(!flushing_input_file);
 
   if (WIFEXITED(status)) {
     switch (WEXITSTATUS(status)) {
@@ -1858,16 +1878,17 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
 
     case INNDCOMMCHILD_ESTATUS_NONESUCH:
       warn("feed has been dropped by innd, finishing up");
-      old_input_file= main_input_file;
+      flushing_input_file= main_input_file;
       main_input_file= 0;
       SMS(DROPPING, 0, "dropped by innd");
       return OOP_CONTINUE;
 
     case 0:
-      old_input_file= main_input_file;
+      flushing_input_file= main_input_file;
       main_input_file= open_input_file(feedfile);
       if (!main_input_file)
        die("flush succeeded but feedfile %s does not exist!", feedfile);
+      until_spontaneous_flush= spontaneous_flush_periods;
       SMS(SEPARATED, 0, "feed file missing");
       return OOP_CONTINUE;
 
@@ -1951,7 +1972,7 @@ static void postfork(const char *what) {
     sysdie("%s child: failed to reset SIGPIPE");
 
   postfork_inputfile(main_input_file);
-  postfork_inputfile(old_input_file);
+  postfork_inputfile(flushing_input_file);
   postfork_conns(idle.head);
   postfork_conns(working.head);
   postfork_conns(full.head);
@@ -1973,9 +1994,33 @@ EVERY(filepoll, {5,0}, {
   if (main_input_file && main_input_file->readable_callback)
     filemon_callback(main_input_file);
 });
+#define DEBUGF_IPF(wh) " " #wh "=%p/%s:ip=%ld,off=%ld,fd=%d%s" \
+#define DEBUG_IPF(sh)                                          \
+  wh##_input_file, debug_ipf_path(wh##_input_file),            \
+  wh##_input_file->inprogress, (long)wh##_input_file->offset,  \
+  wh##_input_file->fd, wh##_input_file->rd ? "+" : ""
+static const char *debug_ipf_path(InputFile *ipf) {
+  char *slash= strrchr(ipf->path,'/');
+  return slash ? slash+1 : ipf->path;
+}
 
 EVERY(period, {PERIOD_SECONDS,0}, {
+  debug("PERIOD"
+       " sms=%s queue=%d sm_period_counter=%d"
+       " connect_delay=%d until_spontaneous_flush=%d"
+       " input_files" DEBUGF_IPF(main) DEBUGF_IPF(old) DEBUGF_FMT(flushing)
+       " conns idle=%d working=%d full=%d"
+       " children connecting=%ld inndcomm_child"
+       ,
+       sms_names[sms], queue.count, sm_period_counter,
+       connect_delay, until_spontaneous_flush,
+       DEBUG_IPF(main), DEBUG_IPF(flushing), DEBUG_IPF(flushing),
+       idle.count, working.count, full.count,
+       (long)connecting_child, (long)inndcomm_child
+       );
   if (connect_delay) connect_delay--;
+  if (until_spontaneous_flush) until_spontaneous_flush--;
   poll_backlog_file();
   if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
   statemc_poll();
@@ -2156,11 +2201,11 @@ int main(int argc, char **argv) {
   else if (feedfile[strlen(feedfile)-1]=='/')
     feedfile= xasprintf("%s%s",feedfile,sitename);
 
-  const char *feedfile_forbidden= "?*[";
+  const char *feedfile_forbidden= "?*[~#";
   int c;
   while ((c= *feedfile_forbidden++))
     if (strchr(feedfile, c))
-      badusage("feed filename may not contain glob metacharacter %c",c);
+      badusage("feed filename may not contain metacharacter %c",c);
 
   if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
     sysdie("could not ignore SIGPIPE");