chiark / gitweb /
WIP
[inn-innduct.git] / backends / innduct.c
index 610504e8d84c0dc858d424d3ca890f753b7a3cc8..1b8875ca4c252ff82b483b8967d42fbb1cfe9bcc 100644 (file)
@@ -76,7 +76,7 @@ static const char *remote_host;
 struct Article {
   char *mid;
   int midlen;
-  int nocheck; /* also used when CHECK says yes please */
+  int checked;
 };
 
 #define CONNBUFSZ 16384
@@ -84,13 +84,26 @@ struct Article {
 #define CN "<%d> "
 
 typedef struct Conn Conn;
+
+typedef enum {
+  Malloc, Const, Artdata;
+} XmitKind;
+typedef struct {
+  XmitKind kind;
+  union {
+    char *malloc_tofree;
+  } info;
+} XmitDetails;
+
 struct Conn {
   ISNODE(Conn);
   int fd, max_queue, stream;
-  LIST(Article) queue;
-  Article *tosend; /* points into queue */
-  char circ_buf[CONNBUFSZ];
-  unsigned circ_read, circ_write;
+  LIST(Article) queue; /* not yet told peer, or said TAKETHIS */
+  LIST(Article) sent; /* offered, in xmit, or transmitted waiting reply */
+  Article send; /* partially transmitted */
+  struct iovec *xmit;
+  XmitDetails *xmitd;
+  int xmitu;
 };
 
 
@@ -348,25 +361,170 @@ static void connect_start() {
 
 /*========== overall control of article flow ==========*/
  
-static void process_queue() {
+static void check_master_queue(void) {
   if (!queue.count)
     return;
 
   if (working.head) {
-    transmit(working.head);
+    conn_assign_one_article(&working);
   } else if (idle.head) {
-    transmit(idle.head);
+    conn_assign_one_article(&idle);
   } else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
             !connecting_child && !connect_delay) {
     connect_delay= reconnect_delay_periods;
     connect_start();
   }
 } 
+
+static int conn_total_queued_articles(Conn *conn) {
+  return conn->sent.count + !!conn->send + conn->queue.count;
+}
+static void conn_assign_one_article(LIST(Conn) *connlist) {
+  Conn *conn= connlist->head;
+
+  LIST_REMOVE(*connlist, conn);
+  Article *art= LIST_REMHEAD(queue);
+  LIST_ADDTAIL(conn->queue, art);
+  LIST_ADD(*conn_determine_right_list(conn), conn);
+  
+  check_conn_work(conn);
+}
+
+static LIST(Conn) *conn_determine_right_list(Conn *conn) { 
+  int inqueue= conn_total_queued_articles(conn);
+  assert(inqueue <= max_queue);
+  if (inqueue == 0) return &idle;
+  if (inqueue == conn->max_queue) return &full;
+  return &working;
+}
+
+static void check_conn_work(Conn *conn)  {
+  void *rp;
+  for (;;) {
+    conn_make_some_xmits(conn);
+
+    void *rp= conn_write_some_xmits(conn);
+    if (!rp) {
+      loop->cancel_fd(loop, conn->fd, OOP_WRITE);
+      return;
+    } else if (rp==OOP_CONTINUE) {
+      loop->on_fd(loop, conn->fd, OOP_WRITE;)
+ else if (rp==OOP_HALT) {
+      return;
+    }
+    
+
+      if (
+
+    while (
+    
+}
  
 /*========== article transmission ==========*/
 
 static void *conn_writeable() {
   for (;;) {
+    
+    if (!conn->xmitu) {
+      perhaps_transmit_on(conn);
+      if (!conn->xmitu) {
+       unlink from readable;
+       break;
+      }
+    }
+
+
+static void *conn_write_some_xmits(Conn *conn) {
+  /* return values:
+   *      0:            nothing more to write, no need to call us again
+   *      OOP_CONTINUE: more to write but fd not writeable
+   *      OOP_HALT:     disaster, have destroyed conn
+   */
+  for (;;) {
+    int count= conn->xmitu;
+    if (!count) return 0;
+  
+    if (count > IOV_MAX) count= IOV_MAX;
+    ssize_t rs= writev(conn->fd, conn->xmit, count);
+    if (rs < 0) {
+      if (errno == EAGAIN) return OOP_CONTINUE;
+      syswarn(CN "write failed", conn->fd);
+      conn_failed(conn);
+      return OOP_HALT;
+    }
+    assert(rs > 0);
+
+    for (done=0; rs && done<xmitu; done++) {
+      struct iovec *vp= &conn->xmit[done];
+      XmitDetails *dp= &conn->xmitd[done];
+      if (rs > vp->iov_len) {
+       rs -= vp->iov_len;
+       xmit_free(dp);
+      } else {
+       vp->iov_base += rs;
+       vp->iov_len -= rs;
+      }
+    }
+    int newu= conn->xmitu - done;
+    memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
+    memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
+    conn->xmitu= newu;
+  }
+}
+
+static void conn_make_some_xmits(Conn *conn) {
+  for (;;) {
+    if (conn->send) {
+      do something about this article text;
+      continue;
+    }
+
+    if (conn->xmitu+3 > conn->xmita)
+      /* no space for a CHECK even */
+      break;
+
+    Article *art= LIST_REMHEAD(queue);
+    if (!art) break;
+
+    if (art->checked || conn->nocheck) {
+      if (conn->stream) {
+       XMIT_LITERAL("TAKETHIS ");
+       xmit_noalloc(art->mid, art->midlen);
+       XMIT_LITERAL("\r\n");
+      } else {
+       /* we got 235 from IHAVE */
+      }
+      conn->send= art;
+    } else {
+      if (conn->stream)
+       XMIT_LITERAL("IHAVE ");
+      else
+       XMIT_LITERAL("CHECK ");
+      xmit_noalloc(art->mid, art->midlen);
+      XMIT_LITERAL("\r\n");
+      LIST_ADDTAIL(conn->sent, art);
+    }
+  }
+}
+    
+    if (conn->queue.head) {
+      if (conn->queue.checked || conn->nocheck) {
+       
+
+      && conn->xmitu+3 <= xmita) {
+      if (
+      XMIT("
+    if (conn->xmitu < xmita
+      
+
+  if (!queue
+  
+
+
+
     int circ_used= circ_write - circ_read;
     if (circ_used < 0) circ_used += CONNBUFSZ;
     writeable_moredata(conn, CONNBUFSZ-1 - circ_used);
@@ -387,10 +545,7 @@ static void *conn_writeable() {
     }
     ssize_t rs= writev(conn->fd, &iov, niov);
     if (rs < 0) {
-      if (errno == EAGAIN) return OOP_CONTINUE;
-      syswarn(CN "write failed", conn->fd);
-      conn_failed(conn);
-      return OOP_CONTINUE;
+      
     }
     assert(rs > 0);
 
@@ -401,11 +556,6 @@ static void *conn_writeable() {
 }
 
 
-static void transmit(Conn *conn) {
-  assert(conn->queue.count < max_queue);
-  
-
 main {
   ignore sigpipe;
 };