chiark / gitweb /
WIP tidy up, etc, rename old_input_file to flushing_input_file
[innduct.git] / backends / innduct.c
index f0cbf3496c830ad177967cbfb83fa2252359dfb0..8b4b2859359150848e8cccd79a842ed1ddb38ba2 100644 (file)
@@ -269,15 +269,25 @@ typedef struct InputFile {
   char path[];
 } InputFile;
 
+#define SMS_LIST(X)                            \
+  X(WAITING)                                   \
+  X(NORMAL)                                    \
+  X(FLUSHING)                                  \
+  X(FLUSHFAIL)                                 \
+  X(SEPARATED)                                 \
+  X(DROPPING)
+
 typedef enum {
-  sm_WAITING,
-  sm_NORMAL,
-  sm_FLUSHING,
-  sm_FLUSHFAIL,
-  sm_SEPARATED,
-  sm_DROPPING,
+#define SMS_DEF_ENUM(s) sm_##s,
+  SMS_LIST(SMS_DEF_ENUM)
 } StateMachineState;
 
+static const char *sms_names[]= {
+#define SMS_DEF_NAME(s) #s ,
+  SMS_LIST(SMS_DEF_NAME)
+  0
+};
+
 struct Conn {
   ISNODE(Conn);
   int fd, max_queue, stream;
@@ -291,7 +301,6 @@ struct Conn {
 
 /*----- operational variables -----*/
 
-static int since_connect_attempt;
 static int nconns;
 static LIST(Conn) idle, working, full;
 static LIST(Article) *queue;
@@ -303,7 +312,7 @@ static char *path_lock, *path_flushing, *path_defer;
 
 static StateMachineState sms;
 static FILE *defer;
-static InputFile *main_input_file, *old_input_file, *backlog_input_file;
+static InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
 static int sm_period_counter;
 
 
@@ -462,6 +471,7 @@ logwrap(warn,     " warning",  LOG_WARN,   -1,    0);
 
 logwrap(notice,   "",          LOG_NOTICE, -1,    0);
 logwrap(info,     " info",     LOG_INFO,   -1,    0);
+logwrap(debug,    " debug",    LOG_DEBUG,  -1,    0);
 
 
 /*========== making new connections ==========*/
@@ -1093,7 +1103,7 @@ static void feedfile_eof(InputFile *ipf) {
     return;
   }
 
-  assert(ipf == old_input_file);
+  assert(ipf == flushing_input_file);
 
   inputfile_tailing_stop(ipf);
   assert(ipf->fd >= 0);
@@ -1175,7 +1185,9 @@ typedef void *feedfile_got_article(oop_source *lp, oop_read *rd,
   }
   ipf->offset += recsz + 1;
 
-  if (sms==sm_NORMAL && ipf->offset >= flush_threshold) {
+  if (sms==sm_NORMAL && ipf==main_input_file &&
+      (ipf->offset >= flush_threshold || !until_spontaneous_flush) {
+
     notice("starting flush (%lu >= %lu)",
           (unsigned long)ipf->offset, (unsigned long)flush_threshold);
 
@@ -1255,7 +1267,7 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
       if (ipf==main_input_file) {
        errno=EAGAIN;
        return -1;
-      } else if (ipf==old_input_file) {
+      } else if (ipf==flushing_input_file) {
        assert(ipf->fd>=0);
        assert(sms==sm_SEPARATED || sms==sm_DROPPING);
       } else if (ipf==backlog_input_file) {
@@ -1299,11 +1311,15 @@ static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
   assert(!filemon_inotify_wd2ipf[wd]);
   filemon_inotify_wd2ipf[wd]= ipf;
 
+  debug("filemon inotify startfile %p wd=%d wdmax=%d",
+       ipf, wd, filemon_inotify_wdmax);
+
   pf->wd= wd;
 }
 
 static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
   int wd= pf->wd;
+  debug("filemon inotify stopfile %p wd=%d", ipf, wd);
   int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd);
   if (r) sysdie("inotify_rm_watch");
   filemon_inotify_wd2ipf[wd]= 0;
@@ -1323,6 +1339,7 @@ static void *filemon_inotify_readable(oop_source *lp, int fd,
       die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
     }
     InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
+    debug("filemon inotify readable read %p wd=%p", iev.wd, ipf);
     filemon_callback(ipf);
   }
   return OOP_CONTINUE;
@@ -1337,6 +1354,7 @@ static int filemon_method_init(void) {
   set nonblock;
   loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable);
 
+  debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd);
   return 1;
 }
 
@@ -1598,6 +1616,7 @@ static void statemc_waiting_poll(void) {
 static void startup_set_input_file(InputFile *f) {
   assert(!main_input_file);
   main_input_file= f;
+  until_spontaneous_flush= spontaneous_flush_periods;
   inputfile_tailing_start(f);
 }
 
@@ -1624,7 +1643,7 @@ static void *statemc_check_input_done(oop_source *lp,
     return;
   }
 
-  assert(ipf == old_input_file);
+  assert(ipf == flushing_input_file);
   assert(sms==sm_SEPARATED || sms==sm_DROPPING);
 
   notice_processed(ipf,"feed file",0);
@@ -1647,8 +1666,8 @@ static void *statemc_check_input_done(oop_source *lp,
 
   open_defer();
 
-  close_input_file(old_input_file);
-  old_input_file= 0;
+  close_input_file(flushing_input_file);
+  flushing_input_file= 0;
 
   notice("flush complete");
   SMS(NORMAL, 0, "flush complete");
@@ -1847,7 +1866,7 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
   loop->cancel_fd(fd);
   close(fd);
 
-  assert(!old_input_file);
+  assert(!flushing_input_file);
 
   if (WIFEXITED(status)) {
     switch (WEXITSTATUS(status)) {
@@ -1857,16 +1876,17 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
 
     case INNDCOMMCHILD_ESTATUS_NONESUCH:
       warn("feed has been dropped by innd, finishing up");
-      old_input_file= main_input_file;
+      flushing_input_file= main_input_file;
       main_input_file= 0;
       SMS(DROPPING, 0, "dropped by innd");
       return OOP_CONTINUE;
 
     case 0:
-      old_input_file= main_input_file;
+      flushing_input_file= main_input_file;
       main_input_file= open_input_file(feedfile);
       if (!main_input_file)
        die("flush succeeded but feedfile %s does not exist!", feedfile);
+      until_spontaneous_flush= spontaneous_flush_periods;
       SMS(SEPARATED, 0, "feed file missing");
       return OOP_CONTINUE;
 
@@ -1950,7 +1970,7 @@ static void postfork(const char *what) {
     sysdie("%s child: failed to reset SIGPIPE");
 
   postfork_inputfile(main_input_file);
-  postfork_inputfile(old_input_file);
+  postfork_inputfile(flushing_input_file);
   postfork_conns(idle.head);
   postfork_conns(working.head);
   postfork_conns(full.head);
@@ -1972,9 +1992,33 @@ EVERY(filepoll, {5,0}, {
   if (main_input_file && main_input_file->readable_callback)
     filemon_callback(main_input_file);
 });
+#define DEBUGF_IPF(wh) " " #wh "=%p/%s:ip=%ld,off=%ld,fd=%d%s" \
+#define DEBUG_IPF(sh)                                          \
+  wh##_input_file, debug_ipf_path(wh##_input_file),            \
+  wh##_input_file->inprogress, (long)wh##_input_file->offset,  \
+  wh##_input_file->fd, wh##_input_file->rd ? "+" : ""
+static const char *debug_ipf_path(InputFile *ipf) {
+  char *slash= strrchr(ipf->path,'/');
+  return slash ? slash+1 : ipf->path;
+}
 
 EVERY(period, {PERIOD_SECONDS,0}, {
+  debug("PERIOD"
+       " sms=%s queue=%d sm_period_counter=%d"
+       " connect_delay=%d until_spontaneous_flush=%d"
+       " input_files" DEBUGF_IPF(main) DEBUGF_IPF(old) DEBUGF_FMT(flushing)
+       " conns idle=%d working=%d full=%d"
+       " children connecting=%ld inndcomm_child"
+       ,
+       sms_names[sms], queue.count, sm_period_counter,
+       connect_delay, until_spontaneous_flush,
+       DEBUG_IPF(main), DEBUG_IPF(flushing), DEBUG_IPF(flushing),
+       idle.count, working.count, full.count,
+       (long)connecting_child, (long)inndcomm_child
+       );
   if (connect_delay) connect_delay--;
+  if (until_spontaneous_flush) until_spontaneous_flush--;
   poll_backlog_file();
   if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
   statemc_poll();
@@ -2155,11 +2199,11 @@ int main(int argc, char **argv) {
   else if (feedfile[strlen(feedfile)-1]=='/')
     feedfile= xasprintf("%s%s",feedfile,sitename);
 
-  const char *feedfile_forbidden= "?*[";
+  const char *feedfile_forbidden= "?*[~#";
   int c;
   while ((c= *feedfile_forbidden++))
     if (strchr(feedfile, c))
-      badusage("feed filename may not contain glob metacharacter %c",c);
+      badusage("feed filename may not contain metacharacter %c",c);
 
   if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
     sysdie("could not ignore SIGPIPE");