chiark / gitweb /
WIP before new iov queue
authorIan Jackson <ian@liberator.relativity.greenend.org.uk>
Sun, 29 Nov 2009 17:07:13 +0000 (17:07 +0000)
committerIan Jackson <ian@liberator.relativity.greenend.org.uk>
Sun, 29 Nov 2009 17:07:13 +0000 (17:07 +0000)
backends/innduct.c

index 9be77ee..610504e 100644 (file)
 /*
+ * Four files full of
+ *    token article
+ *
+ *   site.name_ductlock        lock taken out by innduct
+ * F site.name                 written by innd
+ * D site.name_duct            moved aside by innduct, for flush
+ *   site.name_deferwork       431'd articles, still being written
+ *   site.name_defergo_<inum>  431'd articles, ready for innxmit
+ *   site.name_deferlock       lock taken out by innxmit wrapper
+ *
+ *
+ *
+ * OVERALL STATES:
+ *
+ *                                                  START
+ *   ,-->--.                                           |
+ *   |     |                                         stat D
+ *   |     |                                         /   |
+ *   |     |                                  ENOENT/    |exists
+ *   |     V                            <----------'     |
+ *   |  Normal                                         stat F
+ *   |   F: innd writing, duct reading                  /|\
+ *   |   D: ENOENT                                     / | \
+ *   |     |                                          /  |  \
+ *   |     |  duct decides time to flush      same   /   |   |
+ *   |     |  duct makes hardlink             as D  /    |   |
+ *   |     |                                       /     |   |
+ *   |     V                            <---------'      |   |
+ *   |  Hardlinked                                       |   |
+ *   |   F == D: innd writing, duct reading              |   |
+ *   ^     |                                             |   |
+ *   |     |  duct unlinks F                            /    |
+ *   |     V                                  ENOENT   /     |
+ *   |  Moved                            <------------'      |
+ *   |   F: ENOENT                                           |
+ *   |   D: innd writing, duct reading                       |
+ *   |     |                                                 |
+ *   |     |  duct flushes feed                              |
+ *   |     |   (others can too, harmlessly)                  |
+ *   |     V                                                 |
+ *   |  Separated                          <-----------------'
+ *   |   F: innd writing                        different to D
+ *   |   D: duct reading
+ *   |     |
+ *   |     V  duct completes processing of D
+ *   |     |  duct unlinks D
+ *   |     |
+ *   `--<--'
+ *
  */
 
-static int max_connections, articles_per_connect_attempt;
+static int max_connections, max_queue_per_conn;
 static int connection_setup_timeout, port, try_stream;
 static const char *remote_host;
 
+#define ISNODE(T)    T *next, *back;
+#define LIST(T)      struct { T *head, *tail, *tailpred; int count; }
+
+#define NODE(n) ((struct node*)&(n)->head)
+
+#define LIST_ADDHEAD(l,n)                                              \
+ (list_addhead((struct list*)&(l), NODE((n))), (void)(l).count++)
+#define LIST_ADDTAIL(l,n)                                              \
+ (list_addtail((struct list*)&(l), NODE((n))), (void)(l).count++)
+
+#define LIST_REMHEAD(l)                                                          \
+ ((l).count ? ((l).count--, (void*)list_remhead((struct list*)&(l))) : 0)
+#define LIST_REMTAIL(l)                                                          \
+ ((l).count ? ((l).count--, (void*)list_remtail((struct list*)&(l))) : 0)
+#define LIST_REMOVE(l,n)                       \
+ (list_remove(NODE((n))), (void)(l).count--)
+#define LIST_INSERT(l,n,pred) \
+ (list_insert((struct list*)&(l), NODE((n)), NODE((pred))), (void)(l).count++)
+
 struct Article {
+  char *mid;
+  int midlen;
+  int nocheck; /* also used when CHECK says yes please */
 };
 
+#define CONNBUFSZ 16384
+
+#define CN "<%d> "
+
 typedef struct Conn Conn;
 struct Conn {
-  Conn *next, *back;
-  int fd;
-  Article *queue;
+  ISNODE(Conn);
+  int fd, max_queue, stream;
+  LIST(Article) queue;
+  Article *tosend; /* points into queue */
+  char circ_buf[CONNBUFSZ];
+  unsigned circ_read, circ_write;
 };
 
 
-#define CHILD_ESTATUS_STREAM   1
-#define CHILD_ESTATUS_NOSTREAM 2
+#define CHILD_ESTATUS_STREAM   4
+#define CHILD_ESTATUS_NOSTREAM 5
 
 static int since_connect_attempt;
 static int nconns;
-static struct { Conn *head, *tail } idle, working, full;
+static LIST(Conn) idle, working, full;
+
+static LIST(Article) *queue;
+
+static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } }
+
+
+/*========== making new connections ==========*/
 
-static Conn *connecting;
 static int connecting_sockets[2]= {-1,-1};
 static pid_t connecting_child;
 
-static Article *currentart;
+static void report_child_status(const char *what, int status) {
+  if (WIFEXITED(status)) {
+    int es= WEXITSTATUS(status);
+    if (es)
+      warn("%s: child died with error exit status %d",es);
+  } else if (WIFSIGNALED(status)) {
+    int sig= WTERMSIG(status);
+    const char *sigstr= strsignal(sig);
+    const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
+    if (sigstr)
+      warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
+    else
+      warn("%s: child died due to unknown fatal signal %d%s",
+          what, sig, coredump);
+  } else {
+    warn("%s: child died with unknown wait status %d", status);
+  }
+}
 
-static void start_connecting() {
-  r= socketpair(AF_UNIX, SOCK_STREAM, 0, connecting_sockets);
-  if (r) { syswarn("cannot create socketpair for connect child"); goto x; }
+static void connect_attempt_discard(void) {
+  if (connecting_sockets[0]) {
+    cancel_fd(loop, connecting_sockets[0], OOP_READ);
+    cancel_fd(loop, connecting_sockets[0], OOP_EXCEPTION);
+  }
+  perhaps_close(&connecting_sockets[0]);
+  perhaps_close(&connecting_sockets[1]);
+
+  if (connecting_child) {
+    int status;
+    r= kill(connecting_child, SIGKILL);
+    if (r) sysdie("cannot kill connect child");
+
+    pid_t got= waitpid(connecting_child, &status, WNOHANG);
+    if (got==-1) sysdie("cannot reap connect child");
+
+    if (!(WIFEXITED(status) ||
+         (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) {
+      report_child_status("connect"
+    }
+    connecting_child= 0;
+  }
+}
+
+#define PREP_DECL_MSG_CMSG(msg)                        \
+  struct msghdr msg;                           \
+  memset(&msg,0,sizeof(msg));                  \
+  char msg##cbuf[CMSG_SPACE(sizeof(fd))];      \
+  msg.msg_control= msg##cbuf;                  \
+  msg.msg_controllen= sizeof(msg##cbuf);
+
+static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
+  Conn *conn= 0;
+
+  conn= xcalloc(sizeof(*conn));
   
+  DECL_MSG_CMSG(msg);
+  struct cmsghdr *h= 0;
+  ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT);
+  if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
+  if (!h) {
+    int status;
+    pid_t got= waitpid(connecting_child, &status, WNOHANG);
+    if (got != -1) {
+      assert(got==connecting_child);
+      connecting_child= 0;
+      if (WIFEXITED(status) &&
+         (WEXITSTATUS(status) != 0
+          WEXITSTATUS(status) != CHILD_ESTATUS_STREAM &&
+          WEXITSTATUS(status) != CHILD_ESTATUS_NOSTREAM)) {
+       /* child already reported the problem */
+      } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALARM) {
+       warn("connect: connection attempt timed out");
+      } else if (!WIFEXITED(status)) {
+       report_child_status("connect", status);
+       /* that's probably the root cause then */
+      }
+    } else {
+      /* child is still running apparently, report the socket problem */
+      if (rs < 0)
+       syswarn("connect: read from child socket failed");
+      else if (e == OOP_EXCEPTIONN)
+       warn("connect: unexpected exception on child socket");
+      else
+       warn("connect: unexpected EOF on child socket");
+    }
+    goto x;
+  }
+
+#define CHK(field, val)                                                          \
+  if (h->cmsg_##field != val) {                                                  \
+    die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d"); \
+    goto x;                                                              \
+  }
+  CHK(level, SOL_SOCKET);
+  CHK(type,  SCM_RIGHTS);
+  CHK(len,   CMSG_LEN(sizeof(conn-b>fd)));
+#undef CHK
+
+  if (CMSG_NXTHDR,&msg,h) { die("connect: child sent many cmsgs"); goto x; }
+
+  memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
+
+  pid_t got= waitpid(connecting_child, &status, 0);
+  if (got==-1) sysdie("connect: real wait for child");
+  assert(got == connecting_child);
+  connecting_child= 0;
+
+  if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
+  int es= WEXITSTATUS(status);
+  switch (es) {
+  case CHILD_ESTATUS_STREAM:    conn->stream= 1;   break;
+  case CHILD_ESTATUS_NOSTREAM:  conn->stream= 0;   break;
+  default:
+    die("connect: child gave unexpected exit status %d", es);
+  }
+
+  set nonblocking;
+
+  /* Phew! */
+  LIST_ADDHEAD(idle, conn);
+  notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain");
+  connect_attempt_discard();
+  process_queue();
+  return 0;
+
+ x:
+  if (conn) {
+    perhaps_close(&conn->fd);
+    free(conn);
+  }
+  connect_attempt_discard();
+}
+
+static void connect_start() {
+  assert(!connecting_sockets[0]);
+  assert(!connecting_sockets[1]);
+  assert(!connecting_child);
+
+  notice("starting connection attempt");
+
+  r= socketpair(AF_UNIX, SOCK_STREAM, 0, connecting_sockets);
+  if (r) { syswarn("connect: cannot create socketpair for child"); goto x; }
+
   connecting_child= fork();
-  if (connecting_child==-1) { syswarn("cannot fork for connect"); goto x; }
-  
+  if (connecting_child==-1) { syswarn("connect: cannot fork"); goto x; }
+
   if (!connecting_child) {
     FILE *cn_from, *cn_to;
     char buf[NNTP_STRLEN+100];
     int exitstatus= CHILD_ESTATUS_NOSTREAM;
 
+    put sigpipe back;
+    close unwanted fds;
+
+    r= close(connecting_sockets[0]);
+    if (r) sysdie("connect: close parent socket in child");
+
     alarm(connection_setup_timeout);
     if (NNTPconnect(remote_host, port, &cn_from, &cn_to, buf) < 0) {
       if (buf[0]) {
        sanitise_inplace(buf);
-       die("%s: connection rejected: %s", remote_host, buf);
+       die("connect: rejected: %s", buf);
       } else {
-       sysdie("%s: connection attempt failed", remote_host);
+       sysdie("connect: connection attempt failed");
       }
     }
     if (NNTPsendpassword(remote_host, cn_from, cn_to) < 0)
-      sysdie("%s: authentication failed", remote_host);
+      sysdie("connect: authentication failed");
     if (try_stream) {
       if (fputs("MODE STREAM\r\n", cn_to) ||
          fflush(cn_to))
-       sysdie("%s: could not send MODE STREAM", remote_host);
+       sysdie("connect: could not send MODE STREAM");
       buf[sizeof(buf)-1]= 0;
       if (!fgets(buf, sizeof(buf)-1, cn_from)) {
        if (ferror(cn_from))
-         sysdie("%s: could not read response to MODE STREAM", remote_host);
+         sysdie("connect: could not read response to MODE STREAM");
        else
-         die("%s: connection close in response to MODE STREAM", remote_host);
+         die("connect: connection close in response to MODE STREAM");
       }
       int l= strlen(buf);
       assert(l>=1);
       if (buf[-1]!='\n') {
        sanitise_inplace(buf);
-       die("%s: response to MODE STREAM is too long: %.100s...",
+       die("connect: response to MODE STREAM is too long: %.100s...",
            remote_host, buf);
       }
       l--;  if (l>0 && buf[1-]=='\r') l--;
@@ -76,7 +303,7 @@ static void start_connecting() {
       int rcode= strtoul(buf,&ep,10);
       if (ep != buf[3]) {
        sanitise_inplace(buf);
-       die("%s: bad response to MODE STREAM: %.50s", remote_host, buf);
+       die("connect: bad response to MODE STREAM: %.50s", buf);
       }
       switch (rcode) {
       case 203:
@@ -87,70 +314,98 @@ static void start_connecting() {
        break;
       default:
        sanitise_inplace(buf);
-       warn("%s: bad response to MODE STREAM: %.50s", remote_host, buf);
+       warn("connect: unexpected response to MODE STREAM: %.50s", buf);
        exitstatus= 2;
        break;
       }
     }
     int fd= fileno(cn_from);
 
-    char cmsgbuf[CMSG_SPACE(sizeof(fd))];
-    struct msghdr msg;
-    memset(&msg,0,sizeof(msg));
-    msg.msg_control= cmsgbuf;
-    msg.msg_controllen= sizeof(cmsgbuf);
-
+    PREP_DECL_MSG_CMSG(msg);
     struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
     cmsg->cmsg_level= SOL_SOCKET;
     cmsg->cmsg_type=  SCM_RIGHTS;
     cmsg->cmsg_len=   CMSG_LEN(sizeof(fd));
     memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
-    
+
     msg.msg_controllen= cmsg->cmsg_len;
-    r= sendmsg(childs_socket, &msg, 0);
-    if (r) sysdie("%s: sendmsg failed for new connection", remote_host);
+    r= sendmsg(connecting_sockets[1], &msg, 0);
+    if (r) sysdie("sendmsg failed for new connection");
 
     _exit(exitstatus);
   }
 
-  on_fd(loop, 
+  r= close(connecting_sockets[1]);  connecting_sockets[1]= 0;
+  if (r) sysdie("connect: close child socket in parent");
 
-      struct cmsghdr *cmsg;
-      
-      /* NNTPconnect inexplicably duplicates the fd but we don't care
-       * about that as we're going to exit shortly */
-
-      
-      
-
-       warn("
-
-       syswarn("
-      int e= errno;
-      sanitise(buf);
-      syswarn("
+  loop->on_fd(loop, connecting_sockets[0], OOP_READ,      connchild_event, 0);
+  loop->on_fd(loop, connecting_sockets[0], OOP_EXCEPTION, connchild_event, 0);
+  return OOP_CONTINUE;
 
  x:
-  kill_connection_attempt();
+  connect_attempt_discard();
 }
 
-static void kill_connection_attempt() {
-  fixme;
-  connecting_sockets[0]= connecting_sockets[1]= -1;
-  connecting_child= 0;
-}
+/*========== overall control of article flow ==========*/
  
-static void process_any_article() {
-  if (!currentart)
+static void process_queue() {
+  if (!queue.count)
     return;
-  
+
   if (working.head) {
     transmit(working.head);
   } else if (idle.head) {
     transmit(idle.head);
-  } else if (nconns < maxconns && !connecting_child &&
-            since_connect_attempt >= connect_attempt_limiter) {
-    since_connect_attempt= 0;
+  } else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
+            !connecting_child && !connect_delay) {
+    connect_delay= reconnect_delay_periods;
     connect_start();
-  } 
+  }
+} 
+/*========== article transmission ==========*/
+
+static void *conn_writeable() {
+  for (;;) {
+    int circ_used= circ_write - circ_read;
+    if (circ_used < 0) circ_used += CONNBUFSZ;
+    writeable_moredata(conn, CONNBUFSZ-1 - circ_used);
+
+    if (conn->circ_read == conn->circ_write)
+      return OOP_CONTINUE;
+
+    struct iovec iov[2];
+    int niov= 1;
+    iov[0].iov_base= conn->circ_buf + conn->circ_read;
+    if (conn->circ_read > conn->circ_write) { /* wrapped */
+      iov[0].iov_len= CONNBUFSZ - conn->circ_read;
+      iov[1].iov_base= conn->circ_buf;
+      iov[1].iov_len= conn->circ_write;
+      if (niov[1].iov_len) niov= 2;
+    } else {
+      iov[0].iov_len= conn->circ_write - conn->circ_read;
+    }
+    ssize_t rs= writev(conn->fd, &iov, niov);
+    if (rs < 0) {
+      if (errno == EAGAIN) return OOP_CONTINUE;
+      syswarn(CN "write failed", conn->fd);
+      conn_failed(conn);
+      return OOP_CONTINUE;
+    }
+    assert(rs > 0);
+
+    conn->circ_read += rs;
+    if (conn->circ_read > CONNBUFSZ)
+      conn->circ_read -= CONNBUFSZ;
+  }
 }
+
+
+static void transmit(Conn *conn) {
+  assert(conn->queue.count < max_queue);
+  
+
+main {
+  ignore sigpipe;
+};