chiark / gitweb /
WIP before rethink reading-two-files-at-once
[inn-innduct.git] / backends / innduct.c
index 9be77ee3a701ab25f3660163209edd195468a0b3..cf8fe6f82ffb0be75060f64b90752c45b39c4598 100644 (file)
 /*
+ * Four files full of
+ *    token article
+ *
+ *   site.name_duct.lock       lock preventing multiple ducts
+ *                                holder of this lock is "duct"
+ * F site.name                 main feed file
+ *                                opened/created, then written, by innd
+ *                                read by duct
+ *                                unlinked by duct
+ *                                tokens blanked out by duct when processed
+ * D site.name_duct            temporary feed file during flush (or crash)
+ *                                hardlink created by duct
+ *                                unlinked by duct
+ *   site.name_duct.defer      431'd articles, still being written,
+ *                                created, written, used by duct
+ *   site.name_backlog.lock    lock taken out by innxmit wrapper
+ *                                holder and its child are "xmit"
+ *   site.name_backlog_<inum>  431'd articles, ready for innxmit
+ *                                created (link/mv) by duct
+ *                                read by xmit
+ *                                unlinked by xmit
+ *   site.name_backlog_<letters> eg
+ *   site.name_backlog_manual
+ *                             anything the sysadmin likes (eg, feed files
+ *                             from old feeds to be merged into this one)
+ *                                created (link/mv) by admin
+ *                                read by xmit
+ *                                unlinked by xmit
+
+
+   OVERALL STATES:
+
+                                                               START
+                                                                  |
+                                                             check D, F
+                                                                  |
+                          <--------------------------------------'|
+        Nothing                            F, D both ENOENT       |
+         F: ENOENT                                                       |
+         D: ENOENT                                                       |
+         duct: not not reading anything                           |
+           |                                                     |
+           |`---------------------.                               |
+           |                      | duct times out waiting for F  |
+           V  innd creates F      | duct exits                    |
+           |                      V                               |
+        Noduct                    GO TO Dropped                   |
+         F: innd writing                                         |
+         D: ENOENT                                               |
+         duct: not running or not reading anything                |
+           |                                                     |
+           |                                                     |
+     ,-->--+                   <---------------------------------'|
+     |     |  duct opens F                         F exists       |
+     |     |                                              D ENOENT       |
+     |     V                                                     |
+     |  Normal                                                    |
+     |   F: innd writing, duct reading                            |
+     |   D: ENOENT                                                |
+     |     |                                                      |
+     |     |  duct decides time to flush                          |
+     |     |  duct makes hardlink                                 |
+     |     |                                                      |
+     |     V                            <------------------------'|
+     |  Hardlinked                                  F==D          |
+     |   F == D: innd writing, duct reading         both exist    |
+     ^     |                                                      |
+     |     |  duct unlinks F                                             |
+     |     V                                                     |
+     |  Moved                               <----+------------<--'|
+     |   F: ENOENT                               |  F ENOENT      |
+     |   D: innd writing, duct reading           |  D exists      |
+     |     |                                     |               |
+     |     |  duct requests flush of feed        |               |
+     |     |   (others can too, harmlessly)      |               |
+     |     V                                    |                |
+     |  Flushing                                |                |
+     |   F: ENOENT                              |                |
+     |   D: innd flushing, duct reading                 |                |
+     |     |                                            |                |
+     |     |   inndcomm flush fails                     |                |
+     |     |`-------------------------->---------'               |
+     |     |                                                             |
+     |     |   inndcomm reports no such site                             |
+     |     |`---------------------------------------------------- | -.
+     |     |                                                     |  |
+     |     |  innd finishes writing D, creates F                 |  |
+     |     |  inndcomm reports flush successful                          |  |
+     |     |  duct opens F too                                           |  |
+     |     V                                                     |  |
+     |  Flushed                                  <----------------'  |
+     |   F: innd writing, duct reading              F!=D            /
+     |   D: duct reading                             both exist    /
+     |     |                                                      /
+     |     |  duct gets to the end of D                          /
+     |     V  duct finishes processing D                        /
+     |     |  duct unlinks D                                   /
+     |     |                                                  |
+     `--<--'                                                  V
+                                                               Dropping
+                                                                F: ENOENT
+                                                                D: duct reading
+                                                             |
+                                                             | duct finishes
+                                                             |  processing D
+                                                              | duct unlinks D
+                                                              | duct exits
+                                                              V
+                                                               Dropped
+                                                        F: ENOENT
+                                                        D: ENOENT
+                                                        duct not running
+
+   "duct reading" means innduct is reading the file but also
+   overwriting processed tokens.
+
+ *
+ *
  */
 
-static int max_connections, articles_per_connect_attempt;
+#define PERIOD_SECONDS 60
+
+static char *feedfile;
+static int max_connections, max_queue_per_conn;
 static int connection_setup_timeout, port, try_stream;
 static const char *remote_host;
 
+#define ISNODE(T)    T *next, *back;
+#define LIST(T)      struct { T *head, *tail, *tailpred; int count; }
+
+#define NODE(n) ((struct node*)&(n)->head)
+
+#define LIST_ADDHEAD(l,n)                                              \
+ (list_addhead((struct list*)&(l), NODE((n))), (void)(l).count++)
+#define LIST_ADDTAIL(l,n)                                              \
+ (list_addtail((struct list*)&(l), NODE((n))), (void)(l).count++)
+
+#define LIST_REMHEAD(l)                                                          \
+ ((l).count ? ((l).count--, (void*)list_remhead((struct list*)&(l))) : 0)
+#define LIST_REMTAIL(l)                                                          \
+ ((l).count ? ((l).count--, (void*)list_remtail((struct list*)&(l))) : 0)
+#define LIST_REMOVE(l,n)                       \
+ (list_remove(NODE((n))), (void)(l).count--)
+#define LIST_INSERT(l,n,pred) \
+ (list_insert((struct list*)&(l), NODE((n)), NODE((pred))), (void)(l).count++)
+
 struct Article {
+  char *mid;
+  int midlen;
+  int checked, sentbody;
+  fd and offset for blanking token or mid;
 };
 
+#define CONNIOVS 128
+
+#define CN "<%d> "
+
 typedef struct Conn Conn;
+
+typedef enum {
+  Malloc, Const, Artdata;
+} XmitKind;
+
+typedef struct {
+  XmitKind kind;
+  union {
+    char *malloc_tofree;
+    ARTHANDLE *sm_art;
+  } info;
+} XmitDetails;
+
 struct Conn {
-  Conn *next, *back;
-  int fd;
-  Article *queue;
+  ISNODE(Conn);
+  int fd, max_queue, stream;
+  LIST(Article) queue; /* not yet told peer, or CHECK said send it */
+  LIST(Article) sent; /* offered/transmitted - in xmit or waiting reply */
+  struct iovec xmit[CONNIOVS];
+  XmitDetails xmitd[CONNIOVS];
+  int xmitu;
 };
 
+static int filemon_init(void);
+static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path);
+static void filemon_callback(void);
+
 
-#define CHILD_ESTATUS_STREAM   1
-#define CHILD_ESTATUS_NOSTREAM 2
+#define CHILD_ESTATUS_STREAM   4
+#define CHILD_ESTATUS_NOSTREAM 5
 
 static int since_connect_attempt;
 static int nconns;
-static struct { Conn *head, *tail } idle, working, full;
+static LIST(Conn) idle, working, full;
+
+static LIST(Article) *queue;
+
+static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } }
+
+/*========== making new connections ==========*/
 
-static Conn *connecting;
 static int connecting_sockets[2]= {-1,-1};
 static pid_t connecting_child;
 
-static Article *currentart;
+static void report_child_status(const char *what, int status) {
+  if (WIFEXITED(status)) {
+    int es= WEXITSTATUS(status);
+    if (es)
+      warn("%s: child died with error exit status %d",es);
+  } else if (WIFSIGNALED(status)) {
+    int sig= WTERMSIG(status);
+    const char *sigstr= strsignal(sig);
+    const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
+    if (sigstr)
+      warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
+    else
+      warn("%s: child died due to unknown fatal signal %d%s",
+          what, sig, coredump);
+  } else {
+    warn("%s: child died with unknown wait status %d", status);
+  }
+}
+
+static void connect_attempt_discard(void) {
+  if (connecting_sockets[0]) {
+    cancel_fd(loop, connecting_sockets[0], OOP_READ);
+    cancel_fd(loop, connecting_sockets[0], OOP_EXCEPTION);
+  }
+  perhaps_close(&connecting_sockets[0]);
+  perhaps_close(&connecting_sockets[1]);
+
+  if (connecting_child) {
+    int status;
+    r= kill(connecting_child, SIGKILL);
+    if (r) sysdie("cannot kill connect child");
+
+    pid_t got= waitpid(connecting_child, &status, WNOHANG);
+    if (got==-1) sysdie("cannot reap connect child");
+
+    if (!(WIFEXITED(status) ||
+         (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) {
+      report_child_status("connect"
+    }
+    connecting_child= 0;
+  }
+}
+
+#define PREP_DECL_MSG_CMSG(msg)                        \
+  struct msghdr msg;                           \
+  memset(&msg,0,sizeof(msg));                  \
+  char msg##cbuf[CMSG_SPACE(sizeof(fd))];      \
+  msg.msg_control= msg##cbuf;                  \
+  msg.msg_controllen= sizeof(msg##cbuf);
+
+static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
+  Conn *conn= 0;
+
+  conn= xcalloc(sizeof(*conn));
+
+  DECL_MSG_CMSG(msg);
+  struct cmsghdr *h= 0;
+  ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT);
+  if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
+  if (!h) {
+    int status;
+    pid_t got= waitpid(connecting_child, &status, WNOHANG);
+    if (got != -1) {
+      assert(got==connecting_child);
+      connecting_child= 0;
+      if (WIFEXITED(status) &&
+         (WEXITSTATUS(status) != 0
+          WEXITSTATUS(status) != CHILD_ESTATUS_STREAM &&
+          WEXITSTATUS(status) != CHILD_ESTATUS_NOSTREAM)) {
+       /* child already reported the problem */
+      } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALARM) {
+       warn("connect: connection attempt timed out");
+      } else if (!WIFEXITED(status)) {
+       report_child_status("connect", status);
+       /* that's probably the root cause then */
+      }
+    } else {
+      /* child is still running apparently, report the socket problem */
+      if (rs < 0)
+       syswarn("connect: read from child socket failed");
+      else if (e == OOP_EXCEPTIONN)
+       warn("connect: unexpected exception on child socket");
+      else
+       warn("connect: unexpected EOF on child socket");
+    }
+    goto x;
+  }
+
+#define CHK(field, val)                                                          \
+  if (h->cmsg_##field != val) {                                                  \
+    die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d"); \
+    goto x;                                                              \
+  }
+  CHK(level, SOL_SOCKET);
+  CHK(type,  SCM_RIGHTS);
+  CHK(len,   CMSG_LEN(sizeof(conn-b>fd)));
+#undef CHK
+
+  if (CMSG_NXTHDR,&msg,h) { die("connect: child sent many cmsgs"); goto x; }
+
+  memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
+
+  pid_t got= waitpid(connecting_child, &status, 0);
+  if (got==-1) sysdie("connect: real wait for child");
+  assert(got == connecting_child);
+  connecting_child= 0;
+
+  if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
+  int es= WEXITSTATUS(status);
+  switch (es) {
+  case CHILD_ESTATUS_STREAM:    conn->stream= 1;   break;
+  case CHILD_ESTATUS_NOSTREAM:  conn->stream= 0;   break;
+  default:
+    die("connect: child gave unexpected exit status %d", es);
+  }
+
+  set nonblocking;
+
+  /* Phew! */
+  LIST_ADDHEAD(idle, conn);
+  notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain");
+  connect_attempt_discard();
+  check_master_queue();
+  return 0;
+
+ x:
+  if (conn) {
+    perhaps_close(&conn->fd);
+    free(conn);
+  }
+  connect_attempt_discard();
+}
+
+static void connect_start() {
+  assert(!connecting_sockets[0]);
+  assert(!connecting_sockets[1]);
+  assert(!connecting_child);
+
+  notice("starting connection attempt");
 
-static void start_connecting() {
   r= socketpair(AF_UNIX, SOCK_STREAM, 0, connecting_sockets);
-  if (r) { syswarn("cannot create socketpair for connect child"); goto x; }
-  
+  if (r) { syswarn("connect: cannot create socketpair for child"); goto x; }
+
   connecting_child= fork();
-  if (connecting_child==-1) { syswarn("cannot fork for connect"); goto x; }
-  
+  if (connecting_child==-1) { syswarn("connect: cannot fork"); goto x; }
+
   if (!connecting_child) {
     FILE *cn_from, *cn_to;
     char buf[NNTP_STRLEN+100];
     int exitstatus= CHILD_ESTATUS_NOSTREAM;
 
+    put sigpipe back;
+    close unwanted fds;
+
+    r= close(connecting_sockets[0]);
+    if (r) sysdie("connect: close parent socket in child");
+
     alarm(connection_setup_timeout);
     if (NNTPconnect(remote_host, port, &cn_from, &cn_to, buf) < 0) {
       if (buf[0]) {
        sanitise_inplace(buf);
-       die("%s: connection rejected: %s", remote_host, buf);
+       die("connect: rejected: %s", buf);
       } else {
-       sysdie("%s: connection attempt failed", remote_host);
+       sysdie("connect: connection attempt failed");
       }
     }
     if (NNTPsendpassword(remote_host, cn_from, cn_to) < 0)
-      sysdie("%s: authentication failed", remote_host);
+      sysdie("connect: authentication failed");
     if (try_stream) {
       if (fputs("MODE STREAM\r\n", cn_to) ||
          fflush(cn_to))
-       sysdie("%s: could not send MODE STREAM", remote_host);
+       sysdie("connect: could not send MODE STREAM");
       buf[sizeof(buf)-1]= 0;
       if (!fgets(buf, sizeof(buf)-1, cn_from)) {
        if (ferror(cn_from))
-         sysdie("%s: could not read response to MODE STREAM", remote_host);
+         sysdie("connect: could not read response to MODE STREAM");
        else
-         die("%s: connection close in response to MODE STREAM", remote_host);
+         die("connect: connection close in response to MODE STREAM");
       }
       int l= strlen(buf);
       assert(l>=1);
       if (buf[-1]!='\n') {
        sanitise_inplace(buf);
-       die("%s: response to MODE STREAM is too long: %.100s...",
+       die("connect: response to MODE STREAM is too long: %.100s...",
            remote_host, buf);
       }
       l--;  if (l>0 && buf[1-]=='\r') l--;
@@ -76,7 +393,7 @@ static void start_connecting() {
       int rcode= strtoul(buf,&ep,10);
       if (ep != buf[3]) {
        sanitise_inplace(buf);
-       die("%s: bad response to MODE STREAM: %.50s", remote_host, buf);
+       die("connect: bad response to MODE STREAM: %.50s", buf);
       }
       switch (rcode) {
       case 203:
@@ -87,70 +404,550 @@ static void start_connecting() {
        break;
       default:
        sanitise_inplace(buf);
-       warn("%s: bad response to MODE STREAM: %.50s", remote_host, buf);
+       warn("connect: unexpected response to MODE STREAM: %.50s", buf);
        exitstatus= 2;
        break;
       }
     }
     int fd= fileno(cn_from);
 
-    char cmsgbuf[CMSG_SPACE(sizeof(fd))];
-    struct msghdr msg;
-    memset(&msg,0,sizeof(msg));
-    msg.msg_control= cmsgbuf;
-    msg.msg_controllen= sizeof(cmsgbuf);
-
+    PREP_DECL_MSG_CMSG(msg);
     struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
     cmsg->cmsg_level= SOL_SOCKET;
     cmsg->cmsg_type=  SCM_RIGHTS;
     cmsg->cmsg_len=   CMSG_LEN(sizeof(fd));
     memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
-    
+
     msg.msg_controllen= cmsg->cmsg_len;
-    r= sendmsg(childs_socket, &msg, 0);
-    if (r) sysdie("%s: sendmsg failed for new connection", remote_host);
+    r= sendmsg(connecting_sockets[1], &msg, 0);
+    if (r) sysdie("sendmsg failed for new connection");
 
     _exit(exitstatus);
   }
 
-  on_fd(loop, 
+  r= close(connecting_sockets[1]);  connecting_sockets[1]= 0;
+  if (r) sysdie("connect: close child socket in parent");
 
-      struct cmsghdr *cmsg;
-      
-      /* NNTPconnect inexplicably duplicates the fd but we don't care
-       * about that as we're going to exit shortly */
+  loop->on_fd(loop, connecting_sockets[0], OOP_READ,      connchild_event, 0);
+  loop->on_fd(loop, connecting_sockets[0], OOP_EXCEPTION, connchild_event, 0);
+  return OOP_CONTINUE;
 
-      
-      
+ x:
+  connect_attempt_discard();
+}
 
-       warn("
+/*========== overall control of article flow ==========*/
 
-       syswarn("
-      int e= errno;
-      sanitise(buf);
-      syswarn("
+static void conn_check_work(Conn *conn);
 
- x:
-  kill_connection_attempt();
+static void check_master_queue(void) {
+  try reading current feed file;
+
+  if (!queue.count)
+    return;
+
+  Conn *last_assigned=0;
+  for (;;) {
+    if (working.head) {
+      conn_assign_one_article(&working, &last_assigned);
+    } else if (idle.head) {
+      conn_assign_one_article(&idle, &last_assigned);
+    } else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
+              !connecting_child && !connect_delay) {
+      connect_delay= reconnect_delay_periods;
+      connect_start();
+    } else {
+      break;
+    }
+  }
+  conn_check_work(last_assigned);
 }
 
-static void kill_connection_attempt() {
-  fixme;
-  connecting_sockets[0]= connecting_sockets[1]= -1;
-  connecting_child= 0;
+static void conn_assign_one_article(LIST(Conn) *connlist,
+                                   Conn **last_assigned) {
+  Conn *conn= connlist->head;
+
+  LIST_REMOVE(*connlist, conn);
+  Article *art= LIST_REMHEAD(queue);
+  LIST_ADDTAIL(conn->queue, art);
+  LIST_ADD(*conn_determine_right_list(conn), conn);
+
+  /* This slightly odd arrangement is so that we call conn_check_work
+   * once after filling the queue for a new connection in
+   * check_master_queue, rather than for each article. */
+  if (conn != *last_assigned && *last_assigned)
+    conn_check_work(*last_assigned);
+  *last_assigned= conn;
+}
+
+static int conn_total_queued_articles(Conn *conn) {
+  return conn->sent.count + conn->queue.count;
+}
+
+static LIST(Conn) *conn_determine_right_list(Conn *conn) {
+  int inqueue= conn_total_queued_articles(conn);
+  assert(inqueue <= max_queue);
+  if (inqueue == 0) return &idle;
+  if (inqueue == conn->max_queue) return &full;
+  return &working;
+}
+
+static void *conn_writeable(oop_source *l, int fd, int ev, void *u) {
+  check_conn_work(u);
+  return OOP_CONTINUE;
+}
+
+static void conn_check_work(Conn *conn)  {
+  void *rp= 0;
+  for (;;) {
+    conn_make_some_xmits(conn);
+    if (!conn->xmitu) {
+      loop->cancel_fd(loop, conn->fd, OOP_WRITE);
+      return;
+    }
+
+    void *rp= conn_write_some_xmits(conn);
+    if (rp==OOP_CONTINUE) {
+      loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
+      return;
+    } else if (rp==OOP_HALT) {
+      return;
+    } else if (!rp) {
+      /* transmitted everything */
+    } else {
+      abort();
+    }
+  }
+}
+
+/*========== article transmission ==========*/
+
+static void *conn_write_some_xmits(Conn *conn) {
+  /* return values:
+   *      0:            nothing more to write, no need to call us again
+   *      OOP_CONTINUE: more to write but fd not writeable
+   *      OOP_HALT:     disaster, have destroyed conn
+   */
+  for (;;) {
+    int count= conn->xmitu;
+    if (!count) return 0;
+
+    if (count > IOV_MAX) count= IOV_MAX;
+    ssize_t rs= writev(conn->fd, conn->xmit, count);
+    if (rs < 0) {
+      if (errno == EAGAIN) return OOP_CONTINUE;
+      syswarn(CN "write failed", conn->fd);
+      conn_failed(conn);
+      return OOP_HALT;
+    }
+    assert(rs > 0);
+
+    for (done=0; rs && done<xmitu; done++) {
+      struct iovec *vp= &conn->xmit[done];
+      XmitDetails *dp= &conn->xmitd[done];
+      if (rs > vp->iov_len) {
+       rs -= vp->iov_len;
+       xmit_free(dp);
+      } else {
+       vp->iov_base += rs;
+       vp->iov_len -= rs;
+      }
+    }
+    int newu= conn->xmitu - done;
+    memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
+    memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
+    conn->xmitu= newu;
+  }
+}
+
+static void conn_make_some_xmits(Conn *conn) {
+  for (;;) {
+    if (conn->xmitu+5 > CONNIOVS)
+      break;
+
+    Article *art= LIST_REMHEAD(queue);
+    if (!art) break;
+
+    if (art->checked || conn->nocheck) {
+      /* actually send it */
+
+      ARTHANDLE *artdata= SMretrieve(somehow);
+
+      if (conn->stream) {
+       if (artdata) {
+         XMIT_LITERAL("TAKETHIS ");
+         xmit_noalloc(art->mid, art->midlen);
+         XMIT_LITERAL("\r\n");
+         xmit_artbody(artdata);
+       }
+      } else {
+       /* we got 235 from IHAVE */
+       if (artdata) {
+         xmit_artbody(artdata);
+       } else {
+         XMIT_LITERAL(".\r\n");
+       }
+      }
+      art->sent= 1;
+      LIST_ADDTAIL(conn->sent, art);
+
+    } else {
+      /* check it */
+
+      if (conn->stream)
+       XMIT_LITERAL("IHAVE ");
+      else
+       XMIT_LITERAL("CHECK ");
+      xmit_noalloc(art->mid, art->midlen);
+      XMIT_LITERAL("\r\n");
+
+      LIST_ADDTAIL(conn->sent, art);
+    }
+  }
 }
-static void process_any_article() {
-  if (!currentart)
+
+/*========== responses from peer ==========*/
+
+static const oop_rd_style peer_rd_style= {
+  OOP_RD_DELIM_STRIP, '\n',
+  OOP_RD_NUL_FORBID,
+  OOP_RD_SHORTREC_FORBID
+};
+
+static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
+                       const char *errmsg, int errnoval,
+                       const char *data, size_t recsz, void *conn_v) {
+  Conn *conn= conn_v;
+
+  if (ev == OOP_RD_EOF) {
+    warn("unexpected EOF from peer");
+    conn_failed(conn);
+    return;
+  }
+  assert(ev == OOP_RD_OK);
+
+  char *ep;
+  unsigned long code= strtoul(data, &ep, 10);
+  if (ep != data+3 || *ep != ' ' || data[0]=='0') {
+    char sanibuf[100];
+    const char *p= data;
+    char *q= sanibuf;
+    *q++= '`';
+    for (;;) {
+      if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"..."); break; }
+      int c= *p++;
+      if (!c) { *q++= '\''; break; }
+      if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
+      sprintf(q,"\\x%02x",c);
+      q += 4;
+    }
+    warn("badly formatted response from peer: %s", sanibuf);
+    conn_failed(conn);
     return;
-  
-  if (working.head) {
-    transmit(working.head);
-  } else if (idle.head) {
-    transmit(idle.head);
-  } else if (nconns < maxconns && !connecting_child &&
-            since_connect_attempt >= connect_attempt_limiter) {
-    since_connect_attempt= 0;
-    connect_start();
-  } 
+  }
+
+  if (conn->quitting) {
+    if (code!=205) {
+      warn("peer gave failure response to QUIT: %s", sani);
+      conn_failed(conn);
+      return;
+    }
+    conn close ok;
+    return;
+  }
+
+  switch (code) {
+  case 438: /* CHECK says they have it */
+  case 435: /* IHAVE says they have it */
+    ARTICLE_DEALTWITH(1,unwanted);
+    break;
+
+  case 238: /* CHECK says send it */
+  case 335: /* IHAVE says send it */
+    count_checkedwanted++;
+    Article *art= LIST_REMHEAD(conn->sent);
+    art->checked= 1;
+    LIST_ADDTAIL(conn->queue);
+    break;
+
+  case 235: /* IHAVE says thanks */
+  case 239: /* TAKETHIS says thanks */
+    ARTICLE_DEALTWITH(1,accepted);
+    break;
+
+  case 439: /* TAKETHIS says rejected */
+  case 437: /* IHAVE says rejected */
+    ARTICLE_DEALTWITH(1,rejected);
+    break;
+
+  case 431: /* CHECK or TAKETHIS says try later */
+  case 436: /* IHAVE says try later */
+    ARTICLE_DEALTWITH(0,deferred);
+    break;
+
+  case 400: warn("peer has stopped accepting articles: %s", sani); goto failed;
+  case 503: warn("peer timed us out: %s", sani);                   goto failed;
+  default:  warn("peer sent unexpected message: %s", sani);
+  failed:
+    conn_failed(conn);
+    return OOP_CONTINUE;;
+  }
+
+  return OOP_CONTINUE;
 }
+
+/*========== monitoring of input file ==========*/
+
+/*---------- tailing input file ----------*/
+
+
+
+/*---------- filemon implemented with inotify ----------*/
+
+#if defined(HAVE_INOTIFY) && !defined(HAVE_FILEMON)
+#define HAVE_FILEMON
+
+#include <linux/inotify.h>
+
+static int filemon_inotify_fd;
+static int filemon_inotify_wdmax;
+static InputFile **filemon_inotify_wd2ipf;
+
+typedef struct Filemon_Perfile {
+  int wd;
+} Filemon_Inotify_Perfile;
+
+static void filemon_startfile(InputFile *ipf) {
+  int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY);
+  if (wd < 0) sysdie("inotify_add_watch %s", ipf->path);
+
+  if (wd >= filemon_inotify_wdmax) {
+    int newmax= wd+2;
+    filemon_inotify_wd= xrealloc(filemon_inotify_wd2ipf,
+                                sizeof(*filemon_inotify_wd2ipf) * newmax);
+    memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0,
+          sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax));
+    filemon_inotify_wdmax= newmax;
+  }
+
+  assert(!filemon_inotify_wd2ipf[wd]);
+  filemon_inotify_wd2ipf[wd]= ipf;
+
+  assert(!ipf->filemon);
+  ipf->filemon= xmalloc(sizeof(*ipf->filemon));
+  ipf->filemon->wd= wd;
+}
+
+static void filemon_stopfile(InputFile *ipf) {
+  int wd= ipf->filemon->wd;
+  int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd);
+  if (r) sysdie("inotify_rm_watch");
+  filemon_inotify_wd2ipf[wd]= 0;
+  free(ipf->filemon);
+  ipf->filemon= 0;
+}
+
+static void *filemon_inotify_readable(oop_source *lp, int fd,
+                                     oop_event e, void *u) {
+  struct inotify_event iev;
+  for (;;) {
+    int r= read(filemon_inotify_fd, &iev, sizeof(iev));
+    if (r==-1) {
+      if (errno==EAGAIN) break;
+      sysdie("read from inotify master");
+    } else if (r==sizeof(iev)) {
+      assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax);
+    } else {
+      die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev));
+    }
+    filemon_callback(filemon_inotify_wd2ipf[iev.wd]);
+  }
+  return OOP_CONTINUE;
+}
+
+static int filemon_init(void) {
+  filemon_inotify_fd= inotify_init();
+  if (filemon_inotify_fd<0) {
+    syswarn("could not initialise inotify: inotify_init failed");
+    return 0;
+  }
+  set nonblock;
+  loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable);
+
+  return 1;
+}
+
+#endif /* HAVE_INOTIFY && !HAVE_FILEMON *//
+
+/*---------- filemon dummy implementation ----------*/
+
+#if !defined(HAVE_FILEMON)
+
+static int filemon_init(void) { return 0; }
+static void filemon_startfile(InputFile *ipf) { }
+static void filemon_stopfile(InputFile *ipf) { }
+
+#endif
+
+/*========== interaction with innd ==========*/
+
+/* See official state diagram at top of file.  We implement
+ * this as follows:
+ *
+          ================
+           WAITING
+          [Nothing/Noduct]
+           poll for F
+          ================
+               |
+               |     TIMEOUT
+               |`--------------------------.
+               |                           | install defer as backlog
+     ,--------->|                           | exit
+     |          | OPEN F SUCCEEDS           V
+     |          V                         =========
+     |     ========                        (ESRCH)
+     |      NORMAL                        [Dropped]
+     |     [Normal]                       =========
+     |      read F
+     |     ========
+     |          |
+     |          | F IS SO BIG WE SHOULD FLUSH
+     ^          | hardlink F to D
+     |     [Hardlinked]
+     |          | unlink F
+     |          | our handle onto F is now onto D
+     |     [Moved]
+     |          |
+     |          |<---------------------------------------------------.
+     |          |                                                    |
+     |          | spawn inndcomm flush                               |
+     |          V                                                    |
+     |     ==========                                                |
+     |      FLUSHING                                                 |
+     |     [Flushing]                                                |
+     |      read D                                                   |
+     |     ==========                                                |
+     |          |                                                    |
+     |          |   INNDCOMM FLUSH FAILS                             ^
+     |          |`----------------------->--------.                  |
+     |          |                                 |                  |
+     |          |   NO SUCH SITE                  V                  |
+     ^          |`----------------.            =========             |
+     |          |                 |            FLUSHFAIL             |
+     |          |                 V            [Moved]               |
+     |          |            ==========        read D                |
+     |          |             DROPPING         =========             |
+     |          |            [Dropping]           |                  |
+     |          |             read D              | TIME TO RETRY    |
+     |          |            ==========           `------------------'
+     |          | FLUSH OK        |
+     |          | open F          | AT EOF OF D AND ALL PROCESSED
+     |          V                 | install defer as backlog
+     |     ==========             | unlink D
+     |      FLUSHED               | exit
+     |     [Flushed]              V
+     |      read D, F         ==========
+     |     ==========          (ESRCH)
+     |          |             [Droppped]
+     |          |             ==========
+     |          |
+     |          | AT EOF OF D AND ALL D PROCESSED
+     ^          V unlink D
+     |          | close D
+     |          | install defer as backlog
+     |          | start new defer
+     |          |
+     `----------'
+
+ *
+ *  duct state
+ *   WAITING
+ *   NORMAL
+ *   FLUSHING
+ *   FLUSHED
+ *   FLUSHFAIL
+ *   DROPPING
+ */
+
+static char *path_ductlock, *path_duct, *path_ductdefer;
+
+typedef struct {
+  /* This is an instance of struct oop_readable */
+  struct oop_readable readable;
+  oop_readable_call *readable_callback;
+
+  int fd;
+  const char *path; /* ptr copy of path_<foo> or feedfile */
+  struct Filemon_Perfile *filemon;
+
+  oop_read *rd;
+  long inprogress; /* no. of articles read but not processed */
+} InputFile;
+
+static void statemc_init(void) {
+  path_ductlock=  xasprintf("%s_duct.lock",  feedfile);
+  path_duct=      xasprintf("%s_duct",       feedfile);
+  path_ductdefer= xasprintf("%s_duct.defer", feedfile);
+
+  int lockfd= open(path_ductlock, O_CREAT|O_RDWR, 0600);
+  if (lockfd<0) sysdie("open lockfile %s", path_ductlock);
+
+  struct flock fl;
+  memset(&fl,0,sizeof(fl));
+  fl.l_type= F_WRLCK;
+  fl.l_whence= SEEK_SET;
+  r= fcntl(lockfd, F_SETLK, &fl);
+  if (r==-1) {
+    if (errno==EACCES || errno==EAGAIN)
+      die("another duct holds the lockfile");
+    sysdie("fcntl F_SETLK lockfile %s", path_ductlock);
+  }
+}
+
+static void statemc_poll(void) {
+  if (tailing_fd>=0) return;
+
+  int d_fd= open(path_duct, O_RDWR);
+  if (d_fd<0)
+    if (errno!=ENOENT) sysdie("open duct file %s", path_duct);
+
+  int f_fd= open(feedfile, O_RDWR);
+  if (f_fd<0)
+    if (errno!=ENOENT) sysdie("open feed file %s", feedfile);
+
+  if (d_fd<0) {
+    if (f_fd>=0)
+      start_tailing(f_fd);
+    return;
+  }
+
+
+
+/*========== main program ==========*/
+
+#define EVERY(what, interval, body)                                         \
+  static const struct timeval what##_timeout = { 5, 0 };                    \
+  static void what##_schedule(void);                                        \
+  static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \
+    { body }                                                                \
+    what##_schedule();                                                      \
+  }                                                                         \
+  static void what##_schedule(void) {                                       \
+    loop->on_time(loop, what##_timeout, what##_timedout, 0);                \
+  }
+
+EVERY(filepoll, {5,0}, { check_master_queue(); })
+
+EVERY(period, {PERIOD_SECONDS,0}, {
+  if (connect_delay) connect_delay--;
+  statemc_poll();
+  check_master_queue();
+});
+
+main {
+  ignore sigpipe;
+  if (!filemon_init())
+    filepoll_schedule();
+  period_schedule();
+};