chiark / gitweb /
Inputfile machinery. Now do state machine
[innduct.git] / backends / innduct.c
index 610504e8d84c0dc858d424d3ca890f753b7a3cc8..51b423303fb4023f3008554e64db0a6965d2afb1 100644 (file)
  * Four files full of
  *    token article
  *
- *   site.name_ductlock        lock taken out by innduct
- * F site.name                 written by innd
- * D site.name_duct            moved aside by innduct, for flush
- *   site.name_deferwork       431'd articles, still being written
- *   site.name_defergo_<inum>  431'd articles, ready for innxmit
- *   site.name_deferlock       lock taken out by innxmit wrapper
- *
- *
- *
- * OVERALL STATES:
+ *   site.name_duct.lock       lock preventing multiple ducts
+ *                                holder of this lock is "duct"
+ * F site.name                 main feed file
+ *                                opened/created, then written, by innd
+ *                                read by duct
+ *                                unlinked by duct
+ *                                tokens blanked out by duct when processed
+ * D site.name_duct            temporary feed file during flush (or crash)
+ *                                hardlink created by duct
+ *                                unlinked by duct
+ *   site.name_duct.defer      431'd articles, still being written,
+ *                                created, written, used by duct
+ *   site.name_backlog.lock    lock taken out by innxmit wrapper
+ *                                holder and its child are "xmit"
+ *   site.name_backlog_<inum>  431'd articles, ready for innxmit
+ *                                created (link/mv) by duct
+ *                                read by xmit
+ *                                unlinked by xmit
+ *   site.name_backlog_<letters> eg
+ *   site.name_backlog_manual
+ *                             anything the sysadmin likes (eg, feed files
+ *                             from old feeds to be merged into this one)
+ *                                created (link/mv) by admin
+ *                                read by xmit
+ *                                unlinked by xmit
+
+
+   OVERALL STATES:
+
+                                                               START
+                                                                  |
+                                                             check D, F
+                                                                  |
+                          <--------------------------------------'|
+        Nothing                            F, D both ENOENT       |
+         F: ENOENT                                                       |
+         D: ENOENT                                                       |
+         duct: not not reading anything                           |
+           |                                                     |
+           |`---------------------.                               |
+           |                      | duct times out waiting for F  |
+           V  innd creates F      | duct exits                    |
+           |                      V                               |
+        Noduct                    GO TO Dropped                   |
+         F: innd writing                                         |
+         D: ENOENT                                               |
+         duct: not running or not reading anything                |
+           |                                                     |
+           |                                                     |
+     ,-->--+                   <---------------------------------'|
+     |     |  duct opens F                         F exists       |
+     |     |                                              D ENOENT       |
+     |     V                                                     |
+     |  Normal                                                    |
+     |   F: innd writing, duct reading                            |
+     |   D: ENOENT                                                |
+     |     |                                                      |
+     |     |  duct decides time to flush                          |
+     |     |  duct makes hardlink                                 |
+     |     |                                                      |
+     |     V                            <------------------------'|
+     |  Hardlinked                                  F==D          |
+     |   F == D: innd writing, duct reading         both exist    |
+     ^     |                                                      |
+     |     |  duct unlinks F                                             |
+     |     V                                                     |
+     |  Moved                               <----+------------<--'|
+     |   F: ENOENT                               |  F ENOENT      |
+     |   D: innd writing, duct reading           |  D exists      |
+     |     |                                     |               |
+     |     |  duct requests flush of feed        |               |
+     |     |   (others can too, harmlessly)      |               |
+     |     V                                    |                |
+     |  Flushing                                |                |
+     |   F: ENOENT                              |                |
+     |   D: innd flushing, duct reading                 |                |
+     |     |                                            |                |
+     |     |   inndcomm flush fails                     |                |
+     |     |`-------------------------->---------'               |
+     |     |                                                             |
+     |     |   inndcomm reports no such site                             |
+     |     |`---------------------------------------------------- | -.
+     |     |                                                     |  |
+     |     |  innd finishes writing D, creates F                 |  |
+     |     |  inndcomm reports flush successful                          |  |
+     |     |                                                             |  |
+     |     V                                                     |  |
+     |  Separated                                <----------------'  |
+     |   F: innd writing                            F!=D             /
+     |   D: duct reading                             both exist     /
+     |     |                                                       /
+     |     |  duct gets to the end of D                           /
+     |     |  duct opens F too                                          /
+     |     V                                                    /
+     |  Finishing                                              /
+     |   F: innd writing, duct reading                        |
+     |   D: duct finishing                                    V
+     |     |                                                   Dropping
+     |     |  duct finishes processing D                        F: ENOENT
+     |     V  duct unlinks D                                    D: duct reading
+     |     |                                                 |
+     `--<--'                                                 | duct finishes
+                                                                     |  processing D
+                                                              | duct unlinks D
+                                                              | duct exits
+                                                              V
+                                                               Dropped
+                                                        F: ENOENT
+                                                        D: ENOENT
+                                                        duct not running
+
+   "duct reading" means innduct is reading the file but also
+   overwriting processed tokens.
+
  *
- *                                                  START
- *   ,-->--.                                           |
- *   |     |                                         stat D
- *   |     |                                         /   |
- *   |     |                                  ENOENT/    |exists
- *   |     V                            <----------'     |
- *   |  Normal                                         stat F
- *   |   F: innd writing, duct reading                  /|\
- *   |   D: ENOENT                                     / | \
- *   |     |                                          /  |  \
- *   |     |  duct decides time to flush      same   /   |   |
- *   |     |  duct makes hardlink             as D  /    |   |
- *   |     |                                       /     |   |
- *   |     V                            <---------'      |   |
- *   |  Hardlinked                                       |   |
- *   |   F == D: innd writing, duct reading              |   |
- *   ^     |                                             |   |
- *   |     |  duct unlinks F                            /    |
- *   |     V                                  ENOENT   /     |
- *   |  Moved                            <------------'      |
- *   |   F: ENOENT                                           |
- *   |   D: innd writing, duct reading                       |
- *   |     |                                                 |
- *   |     |  duct flushes feed                              |
- *   |     |   (others can too, harmlessly)                  |
- *   |     V                                                 |
- *   |  Separated                          <-----------------'
- *   |   F: innd writing                        different to D
- *   |   D: duct reading
- *   |     |
- *   |     V  duct completes processing of D
- *   |     |  duct unlinks D
- *   |     |
- *   `--<--'
  *
  */
 
+#define PERIOD_SECONDS 60
+
+static char *feedfile;
 static int max_connections, max_queue_per_conn;
 static int connection_setup_timeout, port, try_stream;
 static const char *remote_host;
@@ -76,23 +149,42 @@ static const char *remote_host;
 struct Article {
   char *mid;
   int midlen;
-  int nocheck; /* also used when CHECK says yes please */
+  int checked, sentbody;
+  fd and offset for blanking token or mid;
 };
 
-#define CONNBUFSZ 16384
+#define CONNIOVS 128
 
 #define CN "<%d> "
 
 typedef struct Conn Conn;
+
+typedef enum {
+  Malloc, Const, Artdata;
+} XmitKind;
+
+typedef struct {
+  XmitKind kind;
+  union {
+    char *malloc_tofree;
+    ARTHANDLE *sm_art;
+  } info;
+} XmitDetails;
+
 struct Conn {
   ISNODE(Conn);
   int fd, max_queue, stream;
-  LIST(Article) queue;
-  Article *tosend; /* points into queue */
-  char circ_buf[CONNBUFSZ];
-  unsigned circ_read, circ_write;
+  LIST(Article) queue; /* not yet told peer, or CHECK said send it */
+  LIST(Article) sent; /* offered/transmitted - in xmit or waiting reply */
+  struct iovec xmit[CONNIOVS];
+  XmitDetails xmitd[CONNIOVS];
+  int xmitu;
 };
 
+static int filemon_init(void);
+static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path);
+static void filemon_callback(void);
+
 
 #define CHILD_ESTATUS_STREAM   4
 #define CHILD_ESTATUS_NOSTREAM 5
@@ -105,7 +197,6 @@ static LIST(Article) *queue;
 
 static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } }
 
-
 /*========== making new connections ==========*/
 
 static int connecting_sockets[2]= {-1,-1};
@@ -165,7 +256,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
   Conn *conn= 0;
 
   conn= xcalloc(sizeof(*conn));
-  
+
   DECL_MSG_CMSG(msg);
   struct cmsghdr *h= 0;
   ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT);
@@ -233,7 +324,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
   LIST_ADDHEAD(idle, conn);
   notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain");
   connect_attempt_discard();
-  process_queue();
+  check_master_queue();
   return 0;
 
  x:
@@ -347,65 +438,607 @@ static void connect_start() {
 }
 
 /*========== overall control of article flow ==========*/
-static void process_queue() {
+
+static void conn_check_work(Conn *conn);
+
+static void check_master_queue(void) {
+  try reading current feed file;
+
   if (!queue.count)
     return;
 
-  if (working.head) {
-    transmit(working.head);
-  } else if (idle.head) {
-    transmit(idle.head);
-  } else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
-            !connecting_child && !connect_delay) {
-    connect_delay= reconnect_delay_periods;
-    connect_start();
+  Conn *last_assigned=0;
+  for (;;) {
+    if (working.head) {
+      conn_assign_one_article(&working, &last_assigned);
+    } else if (idle.head) {
+      conn_assign_one_article(&idle, &last_assigned);
+    } else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
+              !connecting_child && !connect_delay) {
+      connect_delay= reconnect_delay_periods;
+      connect_start();
+    } else {
+      break;
+    }
   }
-} 
-/*========== article transmission ==========*/
+  conn_check_work(last_assigned);
+}
+
+static void conn_assign_one_article(LIST(Conn) *connlist,
+                                   Conn **last_assigned) {
+  Conn *conn= connlist->head;
+
+  LIST_REMOVE(*connlist, conn);
+  Article *art= LIST_REMHEAD(queue);
+  LIST_ADDTAIL(conn->queue, art);
+  LIST_ADD(*conn_determine_right_list(conn), conn);
+
+  /* This slightly odd arrangement is so that we call conn_check_work
+   * once after filling the queue for a new connection in
+   * check_master_queue, rather than for each article. */
+  if (conn != *last_assigned && *last_assigned)
+    conn_check_work(*last_assigned);
+  *last_assigned= conn;
+}
+
+static int conn_total_queued_articles(Conn *conn) {
+  return conn->sent.count + conn->queue.count;
+}
+
+static LIST(Conn) *conn_determine_right_list(Conn *conn) {
+  int inqueue= conn_total_queued_articles(conn);
+  assert(inqueue <= max_queue);
+  if (inqueue == 0) return &idle;
+  if (inqueue == conn->max_queue) return &full;
+  return &working;
+}
+
+static void *conn_writeable(oop_source *l, int fd, int ev, void *u) {
+  check_conn_work(u);
+  return OOP_CONTINUE;
+}
 
-static void *conn_writeable() {
+static void conn_check_work(Conn *conn)  {
+  void *rp= 0;
   for (;;) {
-    int circ_used= circ_write - circ_read;
-    if (circ_used < 0) circ_used += CONNBUFSZ;
-    writeable_moredata(conn, CONNBUFSZ-1 - circ_used);
-
-    if (conn->circ_read == conn->circ_write)
-      return OOP_CONTINUE;
-
-    struct iovec iov[2];
-    int niov= 1;
-    iov[0].iov_base= conn->circ_buf + conn->circ_read;
-    if (conn->circ_read > conn->circ_write) { /* wrapped */
-      iov[0].iov_len= CONNBUFSZ - conn->circ_read;
-      iov[1].iov_base= conn->circ_buf;
-      iov[1].iov_len= conn->circ_write;
-      if (niov[1].iov_len) niov= 2;
+    conn_make_some_xmits(conn);
+    if (!conn->xmitu) {
+      loop->cancel_fd(loop, conn->fd, OOP_WRITE);
+      return;
+    }
+
+    void *rp= conn_write_some_xmits(conn);
+    if (rp==OOP_CONTINUE) {
+      loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
+      return;
+    } else if (rp==OOP_HALT) {
+      return;
+    } else if (!rp) {
+      /* transmitted everything */
     } else {
-      iov[0].iov_len= conn->circ_write - conn->circ_read;
+      abort();
     }
-    ssize_t rs= writev(conn->fd, &iov, niov);
+  }
+}
+
+/*========== article transmission ==========*/
+
+static void *conn_write_some_xmits(Conn *conn) {
+  /* return values:
+   *      0:            nothing more to write, no need to call us again
+   *      OOP_CONTINUE: more to write but fd not writeable
+   *      OOP_HALT:     disaster, have destroyed conn
+   */
+  for (;;) {
+    int count= conn->xmitu;
+    if (!count) return 0;
+
+    if (count > IOV_MAX) count= IOV_MAX;
+    ssize_t rs= writev(conn->fd, conn->xmit, count);
     if (rs < 0) {
       if (errno == EAGAIN) return OOP_CONTINUE;
       syswarn(CN "write failed", conn->fd);
       conn_failed(conn);
-      return OOP_CONTINUE;
+      return OOP_HALT;
     }
     assert(rs > 0);
 
-    conn->circ_read += rs;
-    if (conn->circ_read > CONNBUFSZ)
-      conn->circ_read -= CONNBUFSZ;
+    for (done=0; rs && done<xmitu; done++) {
+      struct iovec *vp= &conn->xmit[done];
+      XmitDetails *dp= &conn->xmitd[done];
+      if (rs > vp->iov_len) {
+       rs -= vp->iov_len;
+       xmit_free(dp);
+      } else {
+       vp->iov_base += rs;
+       vp->iov_len -= rs;
+      }
+    }
+    int newu= conn->xmitu - done;
+    memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
+    memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
+    conn->xmitu= newu;
+  }
+}
+
+static void conn_make_some_xmits(Conn *conn) {
+  for (;;) {
+    if (conn->xmitu+5 > CONNIOVS)
+      break;
+
+    Article *art= LIST_REMHEAD(queue);
+    if (!art) break;
+
+    if (art->checked || conn->nocheck) {
+      /* actually send it */
+
+      ARTHANDLE *artdata= SMretrieve(somehow);
+
+      if (conn->stream) {
+       if (artdata) {
+         XMIT_LITERAL("TAKETHIS ");
+         xmit_noalloc(art->mid, art->midlen);
+         XMIT_LITERAL("\r\n");
+         xmit_artbody(artdata);
+       }
+      } else {
+       /* we got 235 from IHAVE */
+       if (artdata) {
+         xmit_artbody(artdata);
+       } else {
+         XMIT_LITERAL(".\r\n");
+       }
+      }
+      art->sent= 1;
+      LIST_ADDTAIL(conn->sent, art);
+
+    } else {
+      /* check it */
+
+      if (conn->stream)
+       XMIT_LITERAL("IHAVE ");
+      else
+       XMIT_LITERAL("CHECK ");
+      xmit_noalloc(art->mid, art->midlen);
+      XMIT_LITERAL("\r\n");
+
+      LIST_ADDTAIL(conn->sent, art);
+    }
+  }
+}
+
+/*========== responses from peer ==========*/
+
+static const oop_rd_style peer_rd_style= {
+  OOP_RD_DELIM_STRIP, '\n',
+  OOP_RD_NUL_FORBID,
+  OOP_RD_SHORTREC_FORBID
+};
+
+static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
+                       const char *errmsg, int errnoval,
+                       const char *data, size_t recsz, void *conn_v) {
+  Conn *conn= conn_v;
+
+  if (ev == OOP_RD_EOF) {
+    warn("unexpected EOF from peer");
+    conn_failed(conn);
+    return;
+  }
+  assert(ev == OOP_RD_OK);
+
+  char *ep;
+  unsigned long code= strtoul(data, &ep, 10);
+  if (ep != data+3 || *ep != ' ' || data[0]=='0') {
+    char sanibuf[100];
+    const char *p= data;
+    char *q= sanibuf;
+    *q++= '`';
+    for (;;) {
+      if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"..."); break; }
+      int c= *p++;
+      if (!c) { *q++= '\''; break; }
+      if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
+      sprintf(q,"\\x%02x",c);
+      q += 4;
+    }
+    warn("badly formatted response from peer: %s", sanibuf);
+    conn_failed(conn);
+    return;
+  }
+
+  if (conn->quitting) {
+    if (code!=205) {
+      warn("peer gave failure response to QUIT: %s", sani);
+      conn_failed(conn);
+      return;
+    }
+    conn close ok;
+    return;
+  }
+
+  switch (code) {
+  case 438: /* CHECK says they have it */
+  case 435: /* IHAVE says they have it */
+    ARTICLE_DEALTWITH(1,unwanted);
+    break;
+
+  case 238: /* CHECK says send it */
+  case 335: /* IHAVE says send it */
+    count_checkedwanted++;
+    Article *art= LIST_REMHEAD(conn->sent);
+    art->checked= 1;
+    LIST_ADDTAIL(conn->queue);
+    break;
+
+  case 235: /* IHAVE says thanks */
+  case 239: /* TAKETHIS says thanks */
+    ARTICLE_DEALTWITH(1,accepted);
+    break;
+
+  case 439: /* TAKETHIS says rejected */
+  case 437: /* IHAVE says rejected */
+    ARTICLE_DEALTWITH(1,rejected);
+    break;
+
+  case 431: /* CHECK or TAKETHIS says try later */
+  case 436: /* IHAVE says try later */
+    ARTICLE_DEALTWITH(0,deferred);
+    break;
+
+  case 400: warn("peer has stopped accepting articles: %s", sani); goto failed;
+  case 503: warn("peer timed us out: %s", sani);                   goto failed;
+  default:  warn("peer sent unexpected message: %s", sani);
+  failed:
+    conn_failed(conn);
+    return OOP_CONTINUE;;
+  }
+
+  return OOP_CONTINUE;
+}
+
+/*========== monitoring of input file ==========*/
+
+/*---------- tailing input file ----------*/
+
+static void filemon_start(InputFile *ipf) {
+  assert(!ipf->filemon);
+
+  ipf->filemon= xmalloc(sizeof(*ipf->filemon));
+  memset(ipf->filemon, 0, sizeof(*ipf->filemon));
+  filemon_method_startfile(ipf, ipf->filemon);
+}
+static void filemon_stop(InputFile *ipf) {
+  if (!ipf->filemon) return;
+  filemon_method_stopfile(ipf, ipf->filemon);
+  free(ipf->filemon);
+  ipf->filemon= 0;
+}
+
+static void filemon_callback(InputFile *ipf) {
+  ipf->readable_callback(ipf->readable_callback_user);
+}
+static void *tailing_rable_call_time(oop_source *loop, struct timeval tv,
+                                    void *user) {
+  InputFile *ipf= user;
+  return ipf->readable_callback(ipf->readable_callback_user);
+}
+
+static void on_cancel(struct oop_readable *rable) {
+  InputFile *ipf= (void*)rable;
+
+  if (ipf->filemon) filemon_stopfile(ipf);
+  loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
+}
+
+static int tailing_on_readable(struct oop_readable *rable,
+                               oop_readable_call *cb, void *user) {
+  InputFile *ipf= (void*)rable;
+
+  tailing_on_cancel(rable);
+  ipf->readable_callback= cb;
+  ipf->readable_callback_user= user;
+  filemon_startfile(ipf);
+
+  loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
+  return 0;
+}
+
+static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
+                               size_t length) {
+  InputFile *ipf= (void*)rable;
+  for (;;) {
+    ssize_t r= read(ipf->fd, buffer, length);
+    if (!r && ipf==main_input_file) { errno=EAGAIN; return -1; }
+    if (r==-1 && errno==EINTR) continue;
+    return r;
+  }
+}
+/*---------- filemon implemented with inotify ----------*/
+
+#if defined(HAVE_INOTIFY) && !defined(HAVE_FILEMON)
+#define HAVE_FILEMON
+
+#include <linux/inotify.h>
+
+static int filemon_inotify_fd;
+static int filemon_inotify_wdmax;
+static InputFile **filemon_inotify_wd2ipf;
+
+typedef struct Filemon_Perfile {
+  int wd;
+} Filemon_Inotify_Perfile;
+
+static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) {
+  int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
+  if (wd < 0) sysdie("inotify_add_watch %s", ipf->path);
+
+  if (wd >= filemon_inotify_wdmax) {
+    int newmax= wd+2;
+    filemon_inotify_wd= xrealloc(filemon_inotify_wd2ipf,
+                                sizeof(*filemon_inotify_wd2ipf) * newmax);
+    memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
+          sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
+    filemon_inotify_wdmax= newmax;
+  }
+
+  assert(!filemon_inotify_wd2ipf[wd]);
+  filemon_inotify_wd2ipf[wd]= ipf;
+
+  pf->wd= wd;
+}
+
+static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) {
+  int wd= pf->wd;
+  int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd);
+  if (r) sysdie("inotify_rm_watch");
+  filemon_inotify_wd2ipf[wd]= 0;
+}
+
+static void *filemon_inotify_readable(oop_source *lp, int fd,
+                                     oop_event e, void *u) {
+  struct inotify_event iev;
+  for (;;) {
+    int r= read(filemon_inotify_fd, &iev, sizeof(iev));
+    if (r==-1) {
+      if (errno==EAGAIN) break;
+      sysdie("read from inotify master");
+    } else if (r==sizeof(iev)) {
+      assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
+    } else {
+      die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
+    }
+    InputFile *ipf= filemon_inotify_wd2ipf[iev.wd];
+    filemon_callback(ipf);
   }
+  return OOP_CONTINUE;
 }
 
+static int filemon_method_init(void) {
+  filemon_inotify_fd= inotify_init();
+  if (filemon_inotify_fd<0) {
+    syswarn("could not initialise inotify: inotify_init failed");
+    return 0;
+  }
+  set nonblock;
+  loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable);
 
+  return 1;
+}
+
+#endif /* HAVE_INOTIFY && !HAVE_FILEMON *//
+
+/*---------- filemon dummy implementation ----------*/
+
+#if !defined(HAVE_FILEMON)
+
+typedef struct Filemon_Perfile { int dummy; } Filemon_Dummy_Perfile;
+
+static int filemon_method_init(void) { return 0; }
+static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
+static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
+
+#endif /* !HAVE_FILEMON */
+
+/*---------- interface to start and stop an input file ----------*/
+
+static const oop_rd_style feedfile_rdstyle= {
+  OOP_RD_DELIM_STRIP, '\n',
+  OOP_RD_NUL_FORBID,
+  OOP_RD_SHORTREC_EOF,
+};
  
-static void transmit(Conn *conn) {
-  assert(conn->queue.count < max_queue);
-  
+static void inputfile_tailing_start(InputFile *ipf) {
+  assert(!ipf->fd);
+  ipf->readable->on_readable= tailing_on_readable;
+  ipf->readable->on_cancel=   tailing_on_cancel;
+  ipf->readable->try_read=    tailing_try_read;
+  ipf->readable->delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
+  ipf->readable->delete_kill= 0;
+
+  ipf->readable_callback= 0;
+  ipf->readable_callback_user= 0;
+
+  ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
+  assert(ipf->fd);
+
+  int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
+                    feedfile_got_article,ipf, feedfile_problem,ipf);
+  if (r) sysdie("unable start reading feedfile %s",ipf->path);
+}
+
+static void inputfile_tailing_stop(InputFile *ipf) {
+  assert(ipf->fd);
+  oop_rd_delete(ipf->rd);
+  ipf->rd= 0;
+  assert(!ipf->filemon); /* we shouldn't be monitoring it now */
+}
+
+/*========== interaction with innd ==========*/
+
+/* See official state diagram at top of file.  We implement
+ * this as follows:
+ *
+          ================
+           WAITING
+          [Nothing/Noduct]
+           poll for F
+          ================
+               |
+               |     TIMEOUT
+               |`--------------------------.
+               |                           | install defer as backlog
+     ,--------->|                           | exit
+     |          | OPEN F SUCCEEDS           V
+     |          V                         =========
+     |     ========                        (ESRCH)
+     |      NORMAL                        [Dropped]
+     |     [Normal]                       =========
+     |      read F
+     |     ========
+     |          |
+     |          | F IS SO BIG WE SHOULD FLUSH
+     ^          | hardlink F to D
+     |     [Hardlinked]
+     |          | unlink F
+     |          | our handle onto F is now onto D
+     |     [Moved]
+     |          |
+     |          |<---------------------------------------------------.
+     |          |                                                    |
+     |          | spawn inndcomm flush                               |
+     |          V                                                    |
+     |     ==========                                                |
+     |      FLUSHING                                                 |
+     |     [Flushing]                                                |
+     |      read D                                                   |
+     |     ==========                                                |
+     |          |                                                    |
+     |          |   INNDCOMM FLUSH FAILS                             ^
+     |          |`----------------------->--------.                  |
+     |          |                                 |                  |
+     |          |   NO SUCH SITE                  V                  |
+     ^          |`----------------.            =========             |
+     |          |                 |            FLUSHFAIL             |
+     |          |                 V            [Moved]               |
+     |          |            ==========        read D                |
+     |          |             DROPPING         =========             |
+     |          |            [Dropping]           |                  |
+     |          |             read D              | TIME TO RETRY    |
+     |          |            ==========           `------------------'
+     |          | FLUSH OK        |
+     |          | open F          | AT EOF OF D AND ALL PROCESSED
+     |          V                 | install defer as backlog
+     |     ===========            | unlink D
+     |      SEPARATED             | exit
+     |     [Separated]            V
+     |      read D            ==========
+     |     ===========         (ESRCH)
+     |          |             [Droppped]
+     |          |             ==========
+     |          V
+     |          | AT EOF OF D
+     ^          |
+     |     ===========
+     |      FINISHING
+     |     [Finishing]
+     |      read F
+     |      write D
+     |     ===========
+     |          |
+     |          | ALL D PROCESSED
+     |          | install defer as backlog
+     |          | start new defer
+     ^          V unlink D
+     |          | close D
+     |          |
+     `----------'
+
+ *
+ */
+
+static char *path_ductlock, *path_duct, *path_ductdefer;
+
+typedef struct {
+  /* This is an instance of struct oop_readable */
+  struct oop_readable readable; /* first */
+  oop_readable_call *readable_callback;
+  void *readable_callback_user;
+
+  int fd;
+  const char *path; /* ptr copy of path_<foo> or feedfile */
+  struct Filemon_Perfile *filemon;
+
+  oop_read *rd;
+  long inprogress; /* no. of articles read but not processed */
+} InputFile;
+
+static void statemc_init(void) {
+  path_ductlock=  xasprintf("%s_duct.lock",  feedfile);
+  path_duct=      xasprintf("%s_duct",       feedfile);
+  path_ductdefer= xasprintf("%s_duct.defer", feedfile);
+
+  int lockfd= open(path_ductlock, O_CREAT|O_RDWR, 0600);
+  if (lockfd<0) sysdie("open lockfile %s", path_ductlock);
+
+  struct flock fl;
+  memset(&fl,0,sizeof(fl));
+  fl.l_type= F_WRLCK;
+  fl.l_whence= SEEK_SET;
+  r= fcntl(lockfd, F_SETLK, &fl);
+  if (r==-1) {
+    if (errno==EACCES || errno==EAGAIN)
+      die("another duct holds the lockfile");
+    sysdie("fcntl F_SETLK lockfile %s", path_ductlock);
+  }
+}
+
+static void statemc_poll(void) {
+  if (tailing_fd>=0) return;
+
+  int d_fd= open(path_duct, O_RDWR);
+  if (d_fd<0)
+    if (errno!=ENOENT) sysdie("open duct file %s", path_duct);
+
+  int f_fd= open(feedfile, O_RDWR);
+  if (f_fd<0)
+    if (errno!=ENOENT) sysdie("open feed file %s", feedfile);
+
+  if (d_fd<0) {
+    if (f_fd>=0)
+      start_tailing(f_fd);
+    return;
+  }
+
+
+
+/*========== main program ==========*/
+
+#define EVERY(what, interval, body)                                         \
+  static const struct timeval what##_timeout = { 5, 0 };                    \
+  static void what##_schedule(void);                                        \
+  static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \
+    { body }                                                                \
+    what##_schedule();                                                      \
+  }                                                                         \
+  static void what##_schedule(void) {                                       \
+    loop->on_time(loop, what##_timeout, what##_timedout, 0);                \
+  }
+
+EVERY(filepoll, {5,0}, { check_master_queue(); })
+
+EVERY(period, {PERIOD_SECONDS,0}, {
+  if (connect_delay) connect_delay--;
+  statemc_poll();
+  check_master_queue();
+});
 
 main {
   ignore sigpipe;
+  if (!filemon_init())
+    filepoll_schedule();
+  period_schedule();
 };