chiark / gitweb /
wip read backlog files ourselves; see TODO near blather in comment at top
[inn-innduct.git] / backends / innduct.c
index f3be2b7613f07014e5174e4c274b9c70765c0611..0d4d05b0ad5dea68d39c8b7460e9d03f98c47b43 100644 (file)
 
  also we have to start a new backlog file every (some interval)
 
+ TODO for backlog file inputs
+ - all code to search for and open these files
+ - write proper algorithm comment
+
 
 
  
@@ -252,7 +256,7 @@ struct Article {
   char messageid[1];
 };
 
-typedef struct {
+typedef struct InputFile {
   /* This is an instance of struct oop_readable */
   struct oop_readable readable; /* first */
   oop_readable_call *readable_callback;
@@ -265,6 +269,8 @@ typedef struct {
   oop_read *rd;
   long inprogress; /* no. of articles read but not processed */
   off_t offset;
+
+  Counts counts;
 } InputFile;
 
 typedef enum {
@@ -272,10 +278,8 @@ typedef enum {
   sm_NORMAL,
   sm_FLUSHING,
   sm_FLUSHFAIL,
-  sm_SEPARATED1,
-  sm_SEPARATED2, /* must follow SEPARATED2 - see feedfile_eof */
-  sm_DROPPING1,
-  sm_DROPPING2, /* must follow DROPPING1 - see feedfile_eof */
+  sm_SEPARATED,
+  sm_DROPPING,
 } StateMachineState;
 
 struct Conn {
@@ -303,7 +307,7 @@ static char *path_ductlock, *path_duct, *path_ductdefer;
 
 static StateMachineState sms;
 static FILE *defer;
-static InputFile *main_input_file, *old_input_file;
+static InputFile *main_input_file, *old_input_file, *backlog_input_file;
 static int sm_period_counter;
 
 
@@ -821,7 +825,7 @@ static void conn_make_some_xmits(Conn *conn) {
       art->sent= 1;
       LIST_ADDTAIL(conn->sent, art);
 
-      counts[art->checked].sent++;
+      art->ipf->counts[art->checked].sent++;
 
     } else {
       /* check it */
@@ -834,7 +838,7 @@ static void conn_make_some_xmits(Conn *conn) {
       XMIT_LITERAL("\r\n");
 
       LIST_ADDTAIL(conn->sent, art);
-      counts[art->checked].offered++;
+      art->ipf->counts[art->checked].offered++;
     }
   }
 }
@@ -905,7 +909,7 @@ static void update_nocheck(int accepted) {
 
 static void article_done(Connection *conn, Article *art, int whichcount) {
   *count++;
-  counts.articles[art->checked][whichcount]++;
+  art->ipf->counts.articles[art->checked][whichcount]++;
   if (whichcount == RC_accepted) update_nocheck(1);
   else if (whichcount == RC_unwanted) update_nocheck(0);
 
@@ -932,7 +936,7 @@ static void article_done(Connection *conn, Article *art, int whichcount) {
   assert(ipf->inprogress >= 0);
 
   if (!ipf->inprogress)
-    loop->on_time(loop, OOP_TIME_NOW, statemc_check_oldinput_done, 0);
+    loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, ipf);
 
   free(art);
 }
@@ -1048,13 +1052,25 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
 
 static void feedfile_eof(InputFile *ipf) {
   assert(ipf != main_input_file); /* promised by tailing_try_read */
+
+  inputfile_tailing_stop(ipf);
+
+  if (ipf == backlog_input_file) {
+    assert(ipf->fd >= 0);
+    if (close(ipf->fd)) sysdie("could not close backlog file %s", ipf->path);
+    ipf->fd= -1;
+    return;
+  }
+
   assert(ipf == old_input_file);
 
-  if (sms==sm_SEPARATED1) SMS(SEPARATED2, 0, "eof on old feed file");
-  else if (sms==sm_DROPPING1) SMS(DROPPING2, 0, "eof on dead feed file");
-  else abort();
-  
   inputfile_tailing_stop(ipf);
+  assert(ipf->fd >= 0);
+  if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path);
+  ipf->fd= -1;
+
+  assert(sms==sm_SEPARATED || sms==sm_DROPPING);
+  
   if (main_input_file)
     inputfile_tailing_start(main_input_file);
 }
@@ -1085,7 +1101,11 @@ static void close_input_file(InputFile *ipf) {
   assert(!ipf->rd); /* must have had inputfile_tailing_stop */
   assert(!ipf->inprogress); /* no dangling pointers pointing here */
 
-  if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path);
+  if (ipf->fd >= 0)
+    if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path);
+
+  fixme maybe free ipf->path;
+
   free(ipf);
 }
 
@@ -1203,8 +1223,17 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
       return r;
     }
     if (!r) {
-      if (ipf==main_input_file) { errno=EAGAIN; return -1; }
-      assert(sms==sm_SEPARATED1 || sms==sm_DROPPING1);
+      if (ipf==main_input_file) {
+       errno=EAGAIN;
+       return -1;
+      } else if (ipf==old_input_file) {
+       assert(ipf->fd>=0);
+       assert(sms==sm_SEPARATED || sms==sm_DROPPING);
+      } else if (ipf==backlog_input_file) {
+       assert(ipf->fd>=0);
+      } else {
+       abort();
+      }
     }
     return r;
   }
@@ -1387,7 +1416,8 @@ static void inputfile_tailing_stop(InputFile *ipf) {
      |          | open F                       \
      |          V                               V
      |     =============                     ============
-     |      SEPARATED1                        DROPPING1
+     |      SEPARATED/                        DROPPING/
+     |      old->fd>=0                        old->fd>=0
      |     [Separated]                       [Dropping]
      |      main F idle                       main none
      |      old  D tail                       old  D tail
@@ -1395,12 +1425,13 @@ static void inputfile_tailing_stop(InputFile *ipf) {
      |          |                                 |
      ^          | EOF ON D                        | EOF ON D
      |          V                                 V
-     |     =============                     ============
-     |      SEPARATED2                        DROPPING2
+     |     ===============                   ===============
+     |      SEPARATED/                        DROPPING/
+     |      old->fd==-1                       old->fd==-1
      |     [Finishing]                       [Dropping]
      |      main F tail                              main none
-     |      old  D idle                       old  D idle
-     |     =============                            ============
+     |      old  D closed                     old  D closed
+     |     ===============                          ===============
      |          |                               |
      |          | ALL D PROCESSED                | ALL D PROCESSED
      |          V install defer as backlog       V install defer as backlog
@@ -1539,7 +1570,7 @@ static void statemc_init(void) {
       spawn_inndcomm_flush(); /* => Flushing, sets sms to sm_FLUSHING */
     } else {
       /* F!=D => Separated */
-      SMS(SEPARATED1, 0, "found both old and current feed files");
+      SMS(SEPARATED, 0, "found both old and current feed files");
       startup_set_input_file(file_d);
     }
   } else { /*!file_d*/
@@ -1579,13 +1610,26 @@ static void startup_set_input_file(InputFile *f) {
   inputfile_tailing_start(f);
 }
 
-static void *statemc_check_oldinput_done(oop_source *lp,
-                                        struct timeval now, void *u) {
+static void *statemc_check_input_done(oop_source *lp,
+                                     struct timeval now, void *ipf_v) {
+  InputFile *ipf= ipf_v;
   struct stat stab;
 
-  int done= (sms==sm_SEPARATED2 || sms==sm_DROPPING2)
-         && old_input_file->inprogress;
-  if (!done) return;
+  if (ipf->inprogress) return; /* new article in the meantime */
+  if (ipf->fd >= 0); return; /* not had EOF */
+
+  if (ipf == backlog_input_file) {
+    notice_processed(ipf,"backlog file",ipf->path);
+    if (unlink(ipf->path))
+      sysdie("could not unlink done backlog file %s", ipf->path);
+    close_input_file(ipf);
+    fixme trigger search for new backlog file;
+  }
+
+  assert(ipf == old_input_file);
+  assert(sms==sm_SEPARATED || sms==sm_DROPPING);
+
+  notice_processed(ipf,"feed file",0);
 
   r= fstat(fileno(defer), &stab);
   if (r) sysdie("check defer file %s", path_defer);
@@ -1606,7 +1650,7 @@ static void *statemc_check_oldinput_done(oop_source *lp,
   if (unlink(path_duct))
     sysdie("could not unlink old duct file %s", path_duct);
 
-  if (sms==sm_DROPPING2) {
+  if (sms==sm_DROPPING) {
     notice("feed dropped and our work is complete"
           " (but check for backlog files)");
     exit(0);
@@ -1654,7 +1698,7 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
       warn("feed has been dropped by innd, finishing up");
       old_input_file= main_input_file;
       main_input_file= 0;
-      SMS(DROPPING1, 0, "dropped by innd");
+      SMS(DROPPING, 0, "dropped by innd");
       return OOP_CONTINUE;
 
     case 0:
@@ -1662,7 +1706,7 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
       main_input_file= open_input_file(feedfile);
       if (!main_input_file)
        die("flush succeeded but feedfile %s does not exist!", feedfile);
-      SMS(SEPARATED1, 0, "feed file missing");
+      SMS(SEPARATED, 0, "feed file missing");
       return OOP_CONTINUE;
 
     default: