chiark / gitweb /
WIP before any changes resulting from reading SM API stuff
authorIan Jackson <ian@liberator.(none)>
Sun, 29 Nov 2009 23:21:47 +0000 (23:21 +0000)
committerIan Jackson <ian@liberator.(none)>
Sun, 29 Nov 2009 23:21:47 +0000 (23:21 +0000)
backends/innduct.c

index 610504e..43d4dd2 100644 (file)
@@ -76,7 +76,7 @@ static const char *remote_host;
 struct Article {
   char *mid;
   int midlen;
-  int nocheck; /* also used when CHECK says yes please */
+  int checked;
 };
 
 #define CONNBUFSZ 16384
@@ -84,13 +84,26 @@ struct Article {
 #define CN "<%d> "
 
 typedef struct Conn Conn;
+
+typedef enum {
+  Malloc, Const, Artdata;
+} XmitKind;
+typedef struct {
+  XmitKind kind;
+  union {
+    char *malloc_tofree;
+  } 
+} XmitDetails;
+
 struct Conn {
   ISNODE(Conn);
   int fd, max_queue, stream;
-  LIST(Article) queue;
-  Article *tosend; /* points into queue */
-  char circ_buf[CONNBUFSZ];
-  unsigned circ_read, circ_write;
+  LIST(Article) sent; /* in xmit or transmitted */
+  Article send; /* partially transmitted */
+  LIST(Article) queue; /* not yet in xmit */
+  struct iovec *xmit;
+  XmitDetails *xmitd;
+  int xmitu;
 };
 
 
@@ -367,6 +380,110 @@ static void process_queue() {
 
 static void *conn_writeable() {
   for (;;) {
+    if (!conn->xmitu) {
+      perhaps_transmit_on(conn);
+      if (!conn->xmitu) {
+       unlink from readable;
+       break;
+      }
+    }
+
+    int count= conn->xmitu;
+    if (count > IOV_MAX) count= IOV_MAX;
+    ssize_t rs= writev(conn->fd, conn->xmit, count);
+    if (rs < 0) {
+      if (errno == EAGAIN) break;
+      syswarn(CN "write failed", conn->fd);
+      conn_failed(conn);
+      break;
+    }
+    assert(rs > 0);
+
+    for (done=0; rs && done<xmitu; done++) {
+      struct iovec *vp= &conn->xmit[done];
+      XmitDetails *dp= &conn->xmitd[done];
+      if (rs > vp->iov_len) {
+       rs -= vp->iov_len;
+       xmit_free(dp);
+      } else {
+       vp->iov_base += rs;
+       vp->iov_len -= rs;
+      }
+    }
+    int newu= conn->xmitu - done;
+    memmove(conn->xmit,  conn->xmit  + done, newu * sizeof(*conn->xmit));
+    memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
+    conn->xmitu= newu;
+  }
+  
+  return OOP_CONTINUE;
+}
+
+static void transmit(Conn *conn) {
+  int inqueue= conn->sent.count + !!conn->send + conn->queue.count;
+  assert(inqueue < max_queue);
+  Article *art= LIST_REMHEAD(queue);
+  LIST_ADDTAIL(conn->queue, art);
+  if (inqueue == 0) {
+    LIST_REMOVE(idle, conn);
+    if (conn->max_queue==1) LIST_ADD(full, conn);
+    else LIST_ADD(working, conn);
+  } else if (inqueue == max_queue-1) {
+    LIST_REMOVE(working, conn);
+    LIST_ADD(full, conn);
+  }
+  perhaps_transmit_on(conn);
+}
+
+static void perhaps_transmit_on(Conn *conn) {
+  for (;;) {
+    if (conn->send) {
+      /* do something about this article text */
+      continue;
+    }
+    if (conn->xmitu+3 > xmita)
+      /* no space for a CHECK even */
+      return;
+
+    Article *art= LIST_REMHEAD(conn->queue);
+    if (art) return;
+
+    if (art->checked || conn->nocheck) {
+      if (conn->stream) {
+       XMIT_LITERAL("TAKETHIS ");
+       xmit_noalloc(art->mid, art->midlen);
+       XMIT_LITERAL("\r\n");
+      } else {
+       /* we got 235 */
+      }
+      conn->send= art;
+    } else {
+      if (conn->stream) XMIT_LITERAL("IHAVE ");
+      else XMIT_LITERAL("CHECK ");
+      xmit_noalloc(art->mid, art->midlen);
+      XMIT_LITERAL("\r\n");
+      LIST_ADDTAIL(conn->sent, art);
+    }
+  }
+}
+  
+    
+    if (conn->queue.head) {
+      if (conn->queue.checked || conn->nocheck) {
+       
+
+      && conn->xmitu+3 <= xmita) {
+      if (
+      XMIT("
+    if (conn->xmitu < xmita
+      
+
+  if (!queue
+  
+
+
+
     int circ_used= circ_write - circ_read;
     if (circ_used < 0) circ_used += CONNBUFSZ;
     writeable_moredata(conn, CONNBUFSZ-1 - circ_used);
@@ -387,10 +504,7 @@ static void *conn_writeable() {
     }
     ssize_t rs= writev(conn->fd, &iov, niov);
     if (rs < 0) {
-      if (errno == EAGAIN) return OOP_CONTINUE;
-      syswarn(CN "write failed", conn->fd);
-      conn_failed(conn);
-      return OOP_CONTINUE;
+      
     }
     assert(rs > 0);
 
@@ -401,11 +515,6 @@ static void *conn_writeable() {
 }
 
 
-static void transmit(Conn *conn) {
-  assert(conn->queue.count < max_queue);
-  
-
 main {
   ignore sigpipe;
 };