chiark / gitweb /
wip split into multiple files and make compile
[innduct.git] / infile.c
diff --git a/infile.c b/infile.c
new file mode 100644 (file)
index 0000000..223846e
--- /dev/null
+++ b/infile.c
@@ -0,0 +1,286 @@
+/*========== monitoring of input files ==========*/
+
+static void feedfile_eof(InputFile *ipf) {
+  assert(ipf != main_input_file); /* promised by tailing_try_read */
+  inputfile_reading_stop(ipf);
+
+  if (ipf == flushing_input_file) {
+    assert(sms==sm_SEPARATED || sms==sm_DROPPING);
+    if (main_input_file) inputfile_reading_start(main_input_file);
+    statemc_check_flushing_done();
+  } else if (ipf == backlog_input_file) {
+    statemc_check_backlog_done();
+  } else {
+    abort(); /* supposed to wait rather than get EOF on main input file */
+  }
+}
+
+static InputFile *open_input_file(const char *path) {
+  int fd= open(path, O_RDWR);
+  if (fd<0) {
+    if (errno==ENOENT) return 0;
+    sysdie("unable to open input file %s", path);
+  }
+  assert(fd>0);
+
+  InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
+  memset(ipf,0,sizeof(*ipf));
+
+  ipf->fd= fd;
+  ipf->autodefer= -1;
+  LIST_INIT(ipf->queue);
+  strcpy(ipf->path, path);
+
+  return ipf;
+}
+
+static void close_input_file(InputFile *ipf) { /* does not free */
+  assert(!ipf->readable_callback); /* must have had ->on_cancel */
+  assert(!ipf->filemon); /* must have had inputfile_reading_stop */
+  assert(!ipf->rd); /* must have had inputfile_reading_stop */
+  assert(!ipf->inprogress); /* no dangling pointers pointing here */
+  xclose_perhaps(&ipf->fd, "input file ", ipf->path);
+}
+
+
+/*---------- dealing with articles read in the input file ----------*/
+
+static void *feedfile_got_bad_data(InputFile *ipf, off_t offset,
+                                  const char *data, const char *how) {
+  warn("corrupted file: %s, offset %lu: %s: in %s",
+       ipf->path, (unsigned long)offset, how, sanitise(data,-1));
+  ipf->readcount_err++;
+  if (ipf->readcount_err > max_bad_data_initial +
+      (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio)
+    crash("too much garbage in input file!  (%d errs, %d ok, %d blank)",
+         ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank);
+  return OOP_CONTINUE;
+}
+
+static void *feedfile_read_err(oop_source *lp, oop_read *rd,
+                              oop_rd_event ev, const char *errmsg,
+                              int errnoval, const char *data, size_t recsz,
+                              void *ipf_v) {
+  InputFile *ipf= ipf_v;
+  assert(ev == OOP_RD_SYSTEM);
+  errno= errnoval;
+  syscrash("error reading input file: %s, offset %lu",
+          ipf->path, (unsigned long)ipf->offset);
+}
+
+static void *feedfile_got_article(oop_source *lp, oop_read *rd,
+                                 oop_rd_event ev, const char *errmsg,
+                                 int errnoval, const char *data, size_t recsz,
+                                 void *ipf_v) {
+  InputFile *ipf= ipf_v;
+  Article *art;
+  char tokentextbuf[sizeof(TOKEN)*2+3];
+
+  if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
+
+  off_t old_offset= ipf->offset;
+  ipf->offset += recsz + !!(ev == OOP_RD_OK);
+
+#define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m);
+
+  if (ev==OOP_RD_PARTREC)
+    feedfile_got_bad_data(ipf,old_offset,data,"missing final newline");
+    /* but process it anyway */
+
+  if (ipf->skippinglong) {
+    if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */
+    return OOP_CONTINUE;
+  }
+  if (ev==OOP_RD_LONG) {
+    ipf->skippinglong= 1;
+    X_BAD_DATA("overly long line");
+  }
+
+  if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte");
+  if (!recsz) X_BAD_DATA("empty line");
+
+  if (data[0]==' ') {
+    if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
+    ipf->readcount_blank++;
+    return OOP_CONTINUE;
+  }
+
+  char *space= strchr(data,' ');
+  int tokenlen= space-data;
+  int midlen= (int)recsz-tokenlen-1;
+  if (midlen <= 2) X_BAD_DATA("no room for messageid");
+  if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid");
+
+  if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length");
+  memcpy(tokentextbuf, data, tokenlen);
+  tokentextbuf[tokenlen]= 0;
+  if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
+
+  ipf->readcount_ok++;
+
+  art= xmalloc(sizeof(*art) - 1 + midlen + 1);
+  memset(art,0,sizeof(*art));
+  art->state= art_Unchecked;
+  art->midlen= midlen;
+  art->ipf= ipf;  ipf->inprogress++;
+  art->token= TextToToken(tokentextbuf);
+  art->offset= old_offset;
+  art->blanklen= recsz;
+  strcpy(art->messageid, space+1);
+
+  if (ipf->autodefer >= 0) {
+    article_autodefer(ipf, art);
+  } else {
+    LIST_ADDTAIL(ipf->queue, art);
+
+    if (ipf==backlog_input_file)
+      article_check_expired(art);
+  }
+
+  if (sms==sm_NORMAL && ipf==main_input_file &&
+      ipf->offset >= target_max_feedfile_size)
+    statemc_start_flush("feed file size");
+
+  check_assign_articles(); /* may destroy conn but that's OK */
+  check_reading_pause_resume(ipf);
+  return OOP_CONTINUE;
+}
+
+/*========== tailing input file ==========*/
+
+static void *tailing_rable_call_time(oop_source *lp, struct timeval tv,
+                                    void *user) {
+  /* lifetime of ipf here is OK because destruction will cause
+   * on_cancel which will cancel this callback */
+  InputFile *ipf= user;
+
+  dbg("**TRACT** ipf=%p called",ipf);
+  if (!ipf->fake_readable) return OOP_CONTINUE;
+
+  /* we just keep calling readable until our caller (oop_rd)
+   * has called try_read, and try_read has found EOF so given EAGAIN */
+  dbg("**TRACT** ipf=%p reschedule",ipf);
+  loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
+
+  return ipf->readable_callback(loop, &ipf->readable,
+                               ipf->readable_callback_user);
+}
+
+static void tailing_on_cancel(struct oop_readable *rable) {
+  InputFile *ipf= (void*)rable;
+  dbg("**TOR** ipf=%p on_cancel",ipf);
+
+  if (ipf->filemon) filemon_stop(ipf);
+  dbg("**TRACT** ipf=%p cancel",ipf);
+  loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
+  ipf->readable_callback= 0;
+}
+
+static void tailing_make_readable(InputFile *ipf) {
+  dbg("**TRACT** ipf=%p makereadable rcb=%p",ipf,
+      (void*)ipf?ipf->readable_callback:0);
+  if (!ipf || !ipf->readable_callback) /* so callers can be naive */
+    return;
+  ipf->fake_readable= 1;
+  loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
+}
+
+static int tailing_on_readable(struct oop_readable *rable,
+                               oop_readable_call *cb, void *user) {
+  InputFile *ipf= (void*)rable;
+  dbg("**TOR** ipf=%p on_readable",ipf);
+
+  tailing_on_cancel(rable);
+  ipf->readable_callback= cb;
+  ipf->readable_callback_user= user;
+  filemon_start(ipf);
+  tailing_make_readable(ipf);
+  return 0;
+}
+
+static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
+                               size_t length) {
+  InputFile *ipf= (void*)rable;
+  for (;;) {
+    ssize_t r= read(ipf->fd, buffer, length);
+    if (r==-1) {
+      if (errno==EINTR) continue;
+      ipf->fake_readable= 0;
+      return r;
+    }
+    if (!r) {
+      if (ipf==main_input_file) {
+       errno=EAGAIN;
+       ipf->fake_readable= 0;
+       return -1;
+      } else if (ipf==flushing_input_file) {
+       assert(ipf->rd);
+       assert(sms==sm_SEPARATED || sms==sm_DROPPING);
+      } else if (ipf==backlog_input_file) {
+       assert(ipf->rd);
+      } else {
+       abort();
+      }
+    }
+    dbg("**TOR** ipf=%p try_read r=%d",ipf,r);
+    return r;
+  }
+}
+
+/*---------- interface to start and stop an input file ----------*/
+
+static const oop_rd_style feedfile_rdstyle= {
+  OOP_RD_DELIM_STRIP, '\n',
+  OOP_RD_NUL_PERMIT,
+  OOP_RD_SHORTREC_LONG,
+};
+
+static void inputfile_reading_resume(InputFile *ipf) {
+  if (!ipf->rd) return;
+  if (!ipf->paused) return;
+
+  int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
+                    feedfile_got_article,ipf, feedfile_read_err, ipf);
+  if (r) syscrash("unable start reading feedfile %s",ipf->path);
+
+  ipf->paused= 0;
+}
+
+static void inputfile_reading_pause(InputFile *ipf) {
+  if (!ipf->rd) return;
+  if (ipf->paused) return;
+  oop_rd_cancel(ipf->rd);
+  ipf->paused= 1;
+}
+
+static void inputfile_reading_start(InputFile *ipf) {
+  assert(!ipf->rd);
+  ipf->readable.on_readable= tailing_on_readable;
+  ipf->readable.on_cancel=   tailing_on_cancel;
+  ipf->readable.try_read=    tailing_try_read;
+  ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
+  ipf->readable.delete_kill= 0;
+
+  ipf->readable_callback= 0;
+  ipf->readable_callback_user= 0;
+
+  ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
+  assert(ipf->rd);
+
+  ipf->paused= 1;
+  inputfile_reading_resume(ipf);
+}
+
+static void inputfile_reading_stop(InputFile *ipf) {
+  assert(ipf->rd);
+  inputfile_reading_pause(ipf);
+  oop_rd_delete(ipf->rd);
+  ipf->rd= 0;
+  assert(!ipf->filemon); /* we shouldn't be monitoring it now */
+}
+
+void filepoll(void) {
+  tailing_make_readable(main_input_file);
+  tailing_make_readable(flushing_input_file);
+}
+