chiark / gitweb /
wip read backlog files ourselves; see TODO near blather in comment at top
authorIan Jackson <ian@liberator.(none)>
Sun, 11 Apr 2010 11:55:45 +0000 (12:55 +0100)
committerIan Jackson <ian@liberator.(none)>
Sun, 11 Apr 2010 12:13:10 +0000 (13:13 +0100)
backends/innduct.c

index 6a6e51a8a2172e1d30a007d2c7c8b1032c234448..0d4d05b0ad5dea68d39c8b7460e9d03f98c47b43 100644 (file)
 
  also we have to start a new backlog file every (some interval)
 
+ TODO for backlog file inputs
+ - all code to search for and open these files
+ - write proper algorithm comment
+
 
 
  
@@ -252,7 +256,7 @@ struct Article {
   char messageid[1];
 };
 
-typedef struct {
+typedef struct InputFile {
   /* This is an instance of struct oop_readable */
   struct oop_readable readable; /* first */
   oop_readable_call *readable_callback;
@@ -265,6 +269,8 @@ typedef struct {
   oop_read *rd;
   long inprogress; /* no. of articles read but not processed */
   off_t offset;
+
+  Counts counts;
 } InputFile;
 
 typedef enum {
@@ -301,7 +307,7 @@ static char *path_ductlock, *path_duct, *path_ductdefer;
 
 static StateMachineState sms;
 static FILE *defer;
-static InputFile *main_input_file, *old_input_file;
+static InputFile *main_input_file, *old_input_file, *backlog_input_file;
 static int sm_period_counter;
 
 
@@ -819,7 +825,7 @@ static void conn_make_some_xmits(Conn *conn) {
       art->sent= 1;
       LIST_ADDTAIL(conn->sent, art);
 
-      counts[art->checked].sent++;
+      art->ipf->counts[art->checked].sent++;
 
     } else {
       /* check it */
@@ -832,7 +838,7 @@ static void conn_make_some_xmits(Conn *conn) {
       XMIT_LITERAL("\r\n");
 
       LIST_ADDTAIL(conn->sent, art);
-      counts[art->checked].offered++;
+      art->ipf->counts[art->checked].offered++;
     }
   }
 }
@@ -903,7 +909,7 @@ static void update_nocheck(int accepted) {
 
 static void article_done(Connection *conn, Article *art, int whichcount) {
   *count++;
-  counts.articles[art->checked][whichcount]++;
+  art->ipf->counts.articles[art->checked][whichcount]++;
   if (whichcount == RC_accepted) update_nocheck(1);
   else if (whichcount == RC_unwanted) update_nocheck(0);
 
@@ -930,7 +936,7 @@ static void article_done(Connection *conn, Article *art, int whichcount) {
   assert(ipf->inprogress >= 0);
 
   if (!ipf->inprogress)
-    loop->on_time(loop, OOP_TIME_NOW, statemc_check_oldinput_done, 0);
+    loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, ipf);
 
   free(art);
 }
@@ -1046,6 +1052,16 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
 
 static void feedfile_eof(InputFile *ipf) {
   assert(ipf != main_input_file); /* promised by tailing_try_read */
+
+  inputfile_tailing_stop(ipf);
+
+  if (ipf == backlog_input_file) {
+    assert(ipf->fd >= 0);
+    if (close(ipf->fd)) sysdie("could not close backlog file %s", ipf->path);
+    ipf->fd= -1;
+    return;
+  }
+
   assert(ipf == old_input_file);
 
   inputfile_tailing_stop(ipf);
@@ -1085,8 +1101,11 @@ static void close_input_file(InputFile *ipf) {
   assert(!ipf->rd); /* must have had inputfile_tailing_stop */
   assert(!ipf->inprogress); /* no dangling pointers pointing here */
 
-  if (ipf->fd>=0)
+  if (ipf->fd >= 0)
     if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path);
+
+  fixme maybe free ipf->path;
+
   free(ipf);
 }
 
@@ -1591,14 +1610,26 @@ static void startup_set_input_file(InputFile *f) {
   inputfile_tailing_start(f);
 }
 
-static void *statemc_check_oldinput_done(oop_source *lp,
-                                        struct timeval now, void *u) {
+static void *statemc_check_input_done(oop_source *lp,
+                                     struct timeval now, void *ipf_v) {
+  InputFile *ipf= ipf_v;
   struct stat stab;
 
-  int done= (sms==sm_SEPARATED || sms==sm_DROPPING)
-         && old_input_file->fd==-1
-         && !old_input_file->inprogress;
-  if (!done) return;
+  if (ipf->inprogress) return; /* new article in the meantime */
+  if (ipf->fd >= 0); return; /* not had EOF */
+
+  if (ipf == backlog_input_file) {
+    notice_processed(ipf,"backlog file",ipf->path);
+    if (unlink(ipf->path))
+      sysdie("could not unlink done backlog file %s", ipf->path);
+    close_input_file(ipf);
+    fixme trigger search for new backlog file;
+  }
+
+  assert(ipf == old_input_file);
+  assert(sms==sm_SEPARATED || sms==sm_DROPPING);
+
+  notice_processed(ipf,"feed file",0);
 
   r= fstat(fileno(defer), &stab);
   if (r) sysdie("check defer file %s", path_defer);