chiark / gitweb /
Cope with new NNTPconnect prototype
[innduct.git] / infile.c
index 223846eecc66635ab2d779b4b5f50b7fba4e7d1b..b8e01bcfc6f6ad200a19741f70bcfa14a2f7dd1a 100644 (file)
--- a/infile.c
+++ b/infile.c
@@ -1,3 +1,31 @@
+/*
+ *  innduct
+ *  tailing reliable realtime streaming feeder for inn
+ *  infile.c - monitoring and handling of input files
+ *
+ *  Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
+ * 
+ *  This program is free software: you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation, either version 3 of the License, or
+ *  (at your option) any later version.
+ * 
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ * 
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ *  (I believe that when you compile and link this as part of the inn2
+ *  build, with the Makefile runes I have provided, all the libraries
+ *  and files which end up included in innduct are licence-compatible
+ *  with GPLv3.  If not then please let me know.  -Ian Jackson.)
+ */
+
+#include "innduct.h"
+
 /*========== monitoring of input files ==========*/
 
 static void feedfile_eof(InputFile *ipf) {
@@ -15,7 +43,7 @@ static void feedfile_eof(InputFile *ipf) {
   }
 }
 
-static InputFile *open_input_file(const char *path) {
+InputFile *open_input_file(const char *path) {
   int fd= open(path, O_RDWR);
   if (fd<0) {
     if (errno==ENOENT) return 0;
@@ -31,10 +59,12 @@ static InputFile *open_input_file(const char *path) {
   LIST_INIT(ipf->queue);
   strcpy(ipf->path, path);
 
+  dbg("started input file %p %s", ipf, path);
+
   return ipf;
 }
 
-static void close_input_file(InputFile *ipf) { /* does not free */
+void close_input_file(InputFile *ipf) { /* does not free */
   assert(!ipf->readable_callback); /* must have had ->on_cancel */
   assert(!ipf->filemon); /* must have had inputfile_reading_stop */
   assert(!ipf->rd); /* must have had inputfile_reading_stop */
@@ -49,11 +79,13 @@ static void *feedfile_got_bad_data(InputFile *ipf, off_t offset,
                                   const char *data, const char *how) {
   warn("corrupted file: %s, offset %lu: %s: in %s",
        ipf->path, (unsigned long)offset, how, sanitise(data,-1));
-  ipf->readcount_err++;
-  if (ipf->readcount_err > max_bad_data_initial +
-      (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio)
+  ipf->counts.events[read_err]++;
+  if (ipf->counts.events[read_err] > max_bad_data_initial +
+      (ipf->counts.events[read_ok] + ipf->counts.events[read_blank])
+                                                  / max_bad_data_ratio)
     crash("too much garbage in input file!  (%d errs, %d ok, %d blank)",
-         ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank);
+         ipf->counts.events[read_err], ipf->counts.events[read_ok],
+         ipf->counts.events[read_blank]);
   return OOP_CONTINUE;
 }
 
@@ -101,7 +133,7 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd,
 
   if (data[0]==' ') {
     if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
-    ipf->readcount_blank++;
+    ipf->counts.events[read_blank]++;
     return OOP_CONTINUE;
   }
 
@@ -116,7 +148,7 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd,
   tokentextbuf[tokenlen]= 0;
   if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
 
-  ipf->readcount_ok++;
+  ipf->counts.events[read_ok]++;
 
   art= xmalloc(sizeof(*art) - 1 + midlen + 1);
   memset(art,0,sizeof(*art));
@@ -148,47 +180,57 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd,
 
 /*========== tailing input file ==========*/
 
+static void tailing_rable_on_time(InputFile *ipf);
+
 static void *tailing_rable_call_time(oop_source *lp, struct timeval tv,
                                     void *user) {
   /* lifetime of ipf here is OK because destruction will cause
    * on_cancel which will cancel this callback */
   InputFile *ipf= user;
 
-  dbg("**TRACT** ipf=%p called",ipf);
+  //dbg("**TRACT** ipf=%p called",ipf);
   if (!ipf->fake_readable) return OOP_CONTINUE;
 
   /* we just keep calling readable until our caller (oop_rd)
    * has called try_read, and try_read has found EOF so given EAGAIN */
-  dbg("**TRACT** ipf=%p reschedule",ipf);
-  loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
+  //dbg("**TRACT** ipf=%p reschedule",ipf);
+  tailing_rable_on_time(ipf);
 
+  assert(ipf->readable_callback);
   return ipf->readable_callback(loop, &ipf->readable,
                                ipf->readable_callback_user);
 }
 
+static void tailing_rable_on_time(InputFile *ipf) {
+  loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
+  loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
+  /* on_time is not idempotent - it counts.   So we need this to make
+   * sure we only have one outstanding, as otherwise our cancel doesn't work */
+}
+
 static void tailing_on_cancel(struct oop_readable *rable) {
   InputFile *ipf= (void*)rable;
-  dbg("**TOR** ipf=%p on_cancel",ipf);
+  //dbg("**TOR** ipf=%p on_cancel",ipf);
 
   if (ipf->filemon) filemon_stop(ipf);
-  dbg("**TRACT** ipf=%p cancel",ipf);
+  //dbg("**TRACT** ipf=%p cancel",ipf);
   loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
   ipf->readable_callback= 0;
 }
 
-static void tailing_make_readable(InputFile *ipf) {
-  dbg("**TRACT** ipf=%p makereadable rcb=%p",ipf,
-      (void*)ipf?ipf->readable_callback:0);
+void tailing_make_readable(InputFile *ipf) {
+  //dbg("**TRACT** ipf=%p makereadable rcb=%p",ipf,
+  //    (void*)ipf?ipf->readable_callback:0);
   if (!ipf || !ipf->readable_callback) /* so callers can be naive */
     return;
   ipf->fake_readable= 1;
-  loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
+  tailing_rable_on_time(ipf);
 }
 
 static int tailing_on_readable(struct oop_readable *rable,
                                oop_readable_call *cb, void *user) {
   InputFile *ipf= (void*)rable;
-  dbg("**TOR** ipf=%p on_readable",ipf);
+  //dbg("**TOR** ipf=%p on_readable",ipf);
 
   tailing_on_cancel(rable);
   ipf->readable_callback= cb;
@@ -222,7 +264,7 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
        abort();
       }
     }
-    dbg("**TOR** ipf=%p try_read r=%d",ipf,r);
+    //dbg("**TOR** ipf=%p try_read r=%d",ipf,r);
     return r;
   }
 }
@@ -235,7 +277,7 @@ static const oop_rd_style feedfile_rdstyle= {
   OOP_RD_SHORTREC_LONG,
 };
 
-static void inputfile_reading_resume(InputFile *ipf) {
+void inputfile_reading_resume(InputFile *ipf) {
   if (!ipf->rd) return;
   if (!ipf->paused) return;
 
@@ -246,14 +288,14 @@ static void inputfile_reading_resume(InputFile *ipf) {
   ipf->paused= 0;
 }
 
-static void inputfile_reading_pause(InputFile *ipf) {
+void inputfile_reading_pause(InputFile *ipf) {
   if (!ipf->rd) return;
   if (ipf->paused) return;
   oop_rd_cancel(ipf->rd);
   ipf->paused= 1;
 }
 
-static void inputfile_reading_start(InputFile *ipf) {
+void inputfile_reading_start(InputFile *ipf) {
   assert(!ipf->rd);
   ipf->readable.on_readable= tailing_on_readable;
   ipf->readable.on_cancel=   tailing_on_cancel;
@@ -271,7 +313,7 @@ static void inputfile_reading_start(InputFile *ipf) {
   inputfile_reading_resume(ipf);
 }
 
-static void inputfile_reading_stop(InputFile *ipf) {
+void inputfile_reading_stop(InputFile *ipf) {
   assert(ipf->rd);
   inputfile_reading_pause(ipf);
   oop_rd_delete(ipf->rd);
@@ -284,3 +326,17 @@ void filepoll(void) {
   tailing_make_readable(flushing_input_file);
 }
 
+char *dbg_report_ipf(InputFile *ipf) {
+  if (!ipf) return masprintf("none");
+
+  const char *slash= strrchr(ipf->path,'/');
+  const char *path= slash ? slash+1 : ipf->path;
+
+  return masprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s%s",
+                  ipf, path,
+                  ipf->queue.count, ipf->inprogress, ipf->autodefer,
+                  (long)ipf->offset, ipf->fd,
+                  ipf->rd ? "" : ",!rd",
+                  ipf->skippinglong ? "*skiplong" : "",
+                  ipf->rd && ipf->paused ? "*paused" : "");
+}