chiark / gitweb /
WIP
[inn-innduct.git] / backends / innduct.c
index 43d4dd2cdcf05df3c22709037ef5ab8e93b2547c..1b8875ca4c252ff82b483b8967d42fbb1cfe9bcc 100644 (file)
@@ -92,15 +92,15 @@ typedef struct {
   XmitKind kind;
   union {
     char *malloc_tofree;
-  } 
+  } info;
 } XmitDetails;
 
 struct Conn {
   ISNODE(Conn);
   int fd, max_queue, stream;
-  LIST(Article) sent; /* in xmit or transmitted */
+  LIST(Article) queue; /* not yet told peer, or said TAKETHIS */
+  LIST(Article) sent; /* offered, in xmit, or transmitted waiting reply */
   Article send; /* partially transmitted */
-  LIST(Article) queue; /* not yet in xmit */
   struct iovec *xmit;
   XmitDetails *xmitd;
   int xmitu;
@@ -361,25 +361,71 @@ static void connect_start() {
 
 /*========== overall control of article flow ==========*/
  
-static void process_queue() {
+static void check_master_queue(void) {
   if (!queue.count)
     return;
 
   if (working.head) {
-    transmit(working.head);
+    conn_assign_one_article(&working);
   } else if (idle.head) {
-    transmit(idle.head);
+    conn_assign_one_article(&idle);
   } else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
             !connecting_child && !connect_delay) {
     connect_delay= reconnect_delay_periods;
     connect_start();
   }
 } 
+
+static int conn_total_queued_articles(Conn *conn) {
+  return conn->sent.count + !!conn->send + conn->queue.count;
+}
+static void conn_assign_one_article(LIST(Conn) *connlist) {
+  Conn *conn= connlist->head;
+
+  LIST_REMOVE(*connlist, conn);
+  Article *art= LIST_REMHEAD(queue);
+  LIST_ADDTAIL(conn->queue, art);
+  LIST_ADD(*conn_determine_right_list(conn), conn);
+  
+  check_conn_work(conn);
+}
+
+static LIST(Conn) *conn_determine_right_list(Conn *conn) { 
+  int inqueue= conn_total_queued_articles(conn);
+  assert(inqueue <= max_queue);
+  if (inqueue == 0) return &idle;
+  if (inqueue == conn->max_queue) return &full;
+  return &working;
+}
+
+static void check_conn_work(Conn *conn)  {
+  void *rp;
+  for (;;) {
+    conn_make_some_xmits(conn);
+
+    void *rp= conn_write_some_xmits(conn);
+    if (!rp) {
+      loop->cancel_fd(loop, conn->fd, OOP_WRITE);
+      return;
+    } else if (rp==OOP_CONTINUE) {
+      loop->on_fd(loop, conn->fd, OOP_WRITE;)
+ else if (rp==OOP_HALT) {
+      return;
+    }
+    
+
+      if (
+
+    while (
+    
+}
  
 /*========== article transmission ==========*/
 
 static void *conn_writeable() {
   for (;;) {
+    
     if (!conn->xmitu) {
       perhaps_transmit_on(conn);
       if (!conn->xmitu) {
@@ -388,14 +434,24 @@ static void *conn_writeable() {
       }
     }
 
+
+static void *conn_write_some_xmits(Conn *conn) {
+  /* return values:
+   *      0:            nothing more to write, no need to call us again
+   *      OOP_CONTINUE: more to write but fd not writeable
+   *      OOP_HALT:     disaster, have destroyed conn
+   */
+  for (;;) {
     int count= conn->xmitu;
+    if (!count) return 0;
+  
     if (count > IOV_MAX) count= IOV_MAX;
     ssize_t rs= writev(conn->fd, conn->xmit, count);
     if (rs < 0) {
-      if (errno == EAGAIN) break;
+      if (errno == EAGAIN) return OOP_CONTINUE;
       syswarn(CN "write failed", conn->fd);
       conn_failed(conn);
-      break;
+      return OOP_HALT;
     }
     assert(rs > 0);
 
@@ -415,38 +471,21 @@ static void *conn_writeable() {
     memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
     conn->xmitu= newu;
   }
-  
-  return OOP_CONTINUE;
-}
-
-static void transmit(Conn *conn) {
-  int inqueue= conn->sent.count + !!conn->send + conn->queue.count;
-  assert(inqueue < max_queue);
-  Article *art= LIST_REMHEAD(queue);
-  LIST_ADDTAIL(conn->queue, art);
-  if (inqueue == 0) {
-    LIST_REMOVE(idle, conn);
-    if (conn->max_queue==1) LIST_ADD(full, conn);
-    else LIST_ADD(working, conn);
-  } else if (inqueue == max_queue-1) {
-    LIST_REMOVE(working, conn);
-    LIST_ADD(full, conn);
-  }
-  perhaps_transmit_on(conn);
 }
 
-static void perhaps_transmit_on(Conn *conn) {
+static void conn_make_some_xmits(Conn *conn) {
   for (;;) {
     if (conn->send) {
-      /* do something about this article text */
+      do something about this article text;
       continue;
     }
-    if (conn->xmitu+3 > xmita)
+
+    if (conn->xmitu+3 > conn->xmita)
       /* no space for a CHECK even */
-      return;
+      break;
 
-    Article *art= LIST_REMHEAD(conn->queue);
-    if (art) return;
+    Article *art= LIST_REMHEAD(queue);
+    if (!art) break;
 
     if (art->checked || conn->nocheck) {
       if (conn->stream) {
@@ -454,19 +493,21 @@ static void perhaps_transmit_on(Conn *conn) {
        xmit_noalloc(art->mid, art->midlen);
        XMIT_LITERAL("\r\n");
       } else {
-       /* we got 235 */
+       /* we got 235 from IHAVE */
       }
       conn->send= art;
     } else {
-      if (conn->stream) XMIT_LITERAL("IHAVE ");
-      else XMIT_LITERAL("CHECK ");
+      if (conn->stream)
+       XMIT_LITERAL("IHAVE ");
+      else
+       XMIT_LITERAL("CHECK ");
       xmit_noalloc(art->mid, art->midlen);
       XMIT_LITERAL("\r\n");
       LIST_ADDTAIL(conn->sent, art);
     }
   }
 }
-  
     
     if (conn->queue.head) {
       if (conn->queue.checked || conn->nocheck) {