chiark / gitweb /
WIP. New outbound data stuff looks good
authorIan Jackson <ian@liberator.(none)>
Tue, 9 Feb 2010 21:13:34 +0000 (21:13 +0000)
committerIan Jackson <ian@liberator.(none)>
Tue, 9 Feb 2010 21:13:34 +0000 (21:13 +0000)
backends/innduct.c

index 1b8875ca4c252ff82b483b8967d42fbb1cfe9bcc..2855c412a0f6c2113042c75d1107197b7615b6e0 100644 (file)
@@ -76,10 +76,10 @@ static const char *remote_host;
 struct Article {
   char *mid;
   int midlen;
-  int checked;
+  int checked, sentbody;
 };
 
-#define CONNBUFSZ 16384
+#define CONNIOVS 128
 
 #define CN "<%d> "
 
@@ -88,21 +88,22 @@ typedef struct Conn Conn;
 typedef enum {
   Malloc, Const, Artdata;
 } XmitKind;
+
 typedef struct {
   XmitKind kind;
   union {
     char *malloc_tofree;
+    ARTHANDLE *sm_art;
   } info;
 } XmitDetails;
 
 struct Conn {
   ISNODE(Conn);
   int fd, max_queue, stream;
-  LIST(Article) queue; /* not yet told peer, or said TAKETHIS */
+  LIST(Article) queue; /* not yet told peer, or CHECK said send it */
   LIST(Article) sent; /* offered, in xmit, or transmitted waiting reply */
-  Article send; /* partially transmitted */
-  struct iovec *xmit;
-  XmitDetails *xmitd;
+  struct iovec xmit[CONNIOVS];
+  XmitDetails xmitd[CONNIOVS];
   int xmitu;
 };
 
@@ -246,7 +247,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
   LIST_ADDHEAD(idle, conn);
   notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain");
   connect_attempt_discard();
-  process_queue();
+  check_master_queue();
   return 0;
 
  x:
@@ -361,34 +362,48 @@ static void connect_start() {
 
 /*========== overall control of article flow ==========*/
  
+static void conn_check_work(Conn *conn);
+
 static void check_master_queue(void) {
   if (!queue.count)
     return;
 
-  if (working.head) {
-    conn_assign_one_article(&working);
-  } else if (idle.head) {
-    conn_assign_one_article(&idle);
-  } else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
-            !connecting_child && !connect_delay) {
-    connect_delay= reconnect_delay_periods;
-    connect_start();
+  Conn *last_assigned=0;
+  for (;;) {
+    if (working.head) {
+      conn_assign_one_article(&working, &last_assigned);
+    } else if (idle.head) {
+      conn_assign_one_article(&idle, &last_assigned);
+    } else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
+              !connecting_child && !connect_delay) {
+      connect_delay= reconnect_delay_periods;
+      connect_start();
+    } else {
+      break;
+    }
   }
+  conn_check_work(last_assigned);
 } 
-
-static int conn_total_queued_articles(Conn *conn) {
-  return conn->sent.count + !!conn->send + conn->queue.count;
-}
  
-static void conn_assign_one_article(LIST(Conn) *connlist) {
+static void conn_assign_one_article(LIST(Conn) *connlist,
+                                   Conn **last_assigned) {
   Conn *conn= connlist->head;
 
   LIST_REMOVE(*connlist, conn);
   Article *art= LIST_REMHEAD(queue);
   LIST_ADDTAIL(conn->queue, art);
   LIST_ADD(*conn_determine_right_list(conn), conn);
-  
-  check_conn_work(conn);
+
+  /* This slightly odd arrangement is so that we call conn_check_work
+   * once after filling the queue for a new connection in
+   * check_master_queue, rather than for each article. */
+  if (conn != *last_assigned && *last_assigned)
+    conn_check_work(*last_assigned);
+  *last_assigned= conn;
+}
+
+static int conn_total_queued_articles(Conn *conn) {
+  return conn->sent.count + conn->queue.count;
 }
 
 static LIST(Conn) *conn_determine_right_list(Conn *conn) { 
@@ -399,42 +414,35 @@ static LIST(Conn) *conn_determine_right_list(Conn *conn) {
   return &working;
 }
 
-static void check_conn_work(Conn *conn)  {
-  void *rp;
+static void *conn_writeable(oop_source *l, int fd, int ev, void *u) {
+  check_conn_work(u);
+}
+  
+static void conn_check_work(Conn *conn)  {
+  void *rp= 0;
   for (;;) {
     conn_make_some_xmits(conn);
+    if (!conn->xmitu) {
+      loop->cancel_fd(loop, conn->fd, OOP_WRITE);
+      return;
+    }
 
     void *rp= conn_write_some_xmits(conn);
-    if (!rp) {
-      loop->cancel_fd(loop, conn->fd, OOP_WRITE);
+    if (rp==OOP_CONTINUE) {
+      loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
       return;
-    } else if (rp==OOP_CONTINUE) {
-      loop->on_fd(loop, conn->fd, OOP_WRITE;)
- else if (rp==OOP_HALT) {
+    } else if (rp==OOP_HALT) {
       return;
+    } else if (!rp) {
+      /* transmitted everything */
+    } else {
+      abort();
     }
-    
-
-      if (
-
-    while (
-    
+  }
 }
  
 /*========== article transmission ==========*/
 
-static void *conn_writeable() {
-  for (;;) {
-    
-    if (!conn->xmitu) {
-      perhaps_transmit_on(conn);
-      if (!conn->xmitu) {
-       unlink from readable;
-       break;
-      }
-    }
-
-
 static void *conn_write_some_xmits(Conn *conn) {
   /* return values:
    *      0:            nothing more to write, no need to call us again
@@ -475,34 +483,45 @@ static void *conn_write_some_xmits(Conn *conn) {
 
 static void conn_make_some_xmits(Conn *conn) {
   for (;;) {
-    if (conn->send) {
-      do something about this article text;
-      continue;
-    }
-
-    if (conn->xmitu+3 > conn->xmita)
-      /* no space for a CHECK even */
+    if (conn->xmitu+5 > CONNIOVS)
       break;
 
     Article *art= LIST_REMHEAD(queue);
     if (!art) break;
 
     if (art->checked || conn->nocheck) {
+      /* actually send it */
+
+      ARTHANDLE *artdata= SMretrieve(somehow);
+      
       if (conn->stream) {
-       XMIT_LITERAL("TAKETHIS ");
-       xmit_noalloc(art->mid, art->midlen);
-       XMIT_LITERAL("\r\n");
+       if (artdata) {
+         XMIT_LITERAL("TAKETHIS ");
+         xmit_noalloc(art->mid, art->midlen);
+         XMIT_LITERAL("\r\n");
+         xmit_artbody(artdata);
+       }
       } else {
        /* we got 235 from IHAVE */
+       if (artdata) {
+         xmit_artbody(artdata);
+       } else {
+         XMIT_LITERAL(".\r\n");
+       }
       }
-      conn->send= art;
+      art->sent= 1;
+      LIST_ADDTAIL(conn->sent, art);
+
     } else {
+      /* check it */
+      
       if (conn->stream)
        XMIT_LITERAL("IHAVE ");
       else
        XMIT_LITERAL("CHECK ");
       xmit_noalloc(art->mid, art->midlen);
       XMIT_LITERAL("\r\n");
+
       LIST_ADDTAIL(conn->sent, art);
     }
   }
@@ -513,10 +532,10 @@ static void conn_make_some_xmits(Conn *conn) {
       if (conn->queue.checked || conn->nocheck) {
        
 
-      && conn->xmitu+3 <= xmita) {
+      && conn->xmitu+3 <= CONNIOVS) {
       if (
       XMIT("
-    if (conn->xmitu < xmita
+    if (conn->xmitu < CONNIOVS
       
 
   if (!queue