chiark / gitweb /
WIP - incoming message processing
authorIan Jackson <ian@liberator.relativity.greenend.org.uk>
Fri, 12 Feb 2010 18:12:43 +0000 (18:12 +0000)
committerIan Jackson <ian@liberator.relativity.greenend.org.uk>
Fri, 12 Feb 2010 18:12:43 +0000 (18:12 +0000)
backends/innduct.c

index 2855c412a0f6c2113042c75d1107197b7615b6e0..65417c91aaac29eae95b4b3a135e138c00ebb9b3 100644 (file)
@@ -77,6 +77,7 @@ struct Article {
   char *mid;
   int midlen;
   int checked, sentbody;
   char *mid;
   int midlen;
   int checked, sentbody;
+  fd and offset for blanking token or mid;
 };
 
 #define CONNIOVS 128
 };
 
 #define CONNIOVS 128
@@ -101,7 +102,7 @@ struct Conn {
   ISNODE(Conn);
   int fd, max_queue, stream;
   LIST(Article) queue; /* not yet told peer, or CHECK said send it */
   ISNODE(Conn);
   int fd, max_queue, stream;
   LIST(Article) queue; /* not yet told peer, or CHECK said send it */
-  LIST(Article) sent; /* offered, in xmit, or transmitted waiting reply */
+  LIST(Article) sent; /* offered/transmitted - in xmit or waiting reply */
   struct iovec xmit[CONNIOVS];
   XmitDetails xmitd[CONNIOVS];
   int xmitu;
   struct iovec xmit[CONNIOVS];
   XmitDetails xmitd[CONNIOVS];
   int xmitu;
@@ -416,6 +417,7 @@ static LIST(Conn) *conn_determine_right_list(Conn *conn) {
 
 static void *conn_writeable(oop_source *l, int fd, int ev, void *u) {
   check_conn_work(u);
 
 static void *conn_writeable(oop_source *l, int fd, int ev, void *u) {
   check_conn_work(u);
+  return OOP_CONTINUE;
 }
   
 static void conn_check_work(Conn *conn)  {
 }
   
 static void conn_check_work(Conn *conn)  {
@@ -526,55 +528,97 @@ static void conn_make_some_xmits(Conn *conn) {
     }
   }
 }
     }
   }
 }
-    
-    if (conn->queue.head) {
-      if (conn->queue.checked || conn->nocheck) {
-       
-
-      && conn->xmitu+3 <= CONNIOVS) {
-      if (
-      XMIT("
-    if (conn->xmitu < CONNIOVS
-      
 
 
-  if (!queue
-  
+/*========== responses from peer ==========*/
 
 
+static const oop_rd_style peer_rd_style= {
+  OOP_RD_DELIM_STRIP, '\n',
+  OOP_RD_NUL_FORBID,
+  OOP_RD_SHORTREC_FORBID
+};
 
 
+static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
+                       const char *errmsg, int errnoval,
+                       const char *data, size_t recsz, void *conn_v) {
+  Conn *conn= conn_v;
 
 
-    int circ_used= circ_write - circ_read;
-    if (circ_used < 0) circ_used += CONNBUFSZ;
-    writeable_moredata(conn, CONNBUFSZ-1 - circ_used);
-
-    if (conn->circ_read == conn->circ_write)
-      return OOP_CONTINUE;
-
-    struct iovec iov[2];
-    int niov= 1;
-    iov[0].iov_base= conn->circ_buf + conn->circ_read;
-    if (conn->circ_read > conn->circ_write) { /* wrapped */
-      iov[0].iov_len= CONNBUFSZ - conn->circ_read;
-      iov[1].iov_base= conn->circ_buf;
-      iov[1].iov_len= conn->circ_write;
-      if (niov[1].iov_len) niov= 2;
-    } else {
-      iov[0].iov_len= conn->circ_write - conn->circ_read;
-    }
-    ssize_t rs= writev(conn->fd, &iov, niov);
-    if (rs < 0) {
-      
+  if (ev == OOP_RD_EOF) {
+    warn("unexpected EOF from peer");
+    conn_failed(conn);
+    return;
+  }
+  assert(ev == OOP_RD_OK);
+
+  char *ep;
+  unsigned long code= strtoul(data, &ep, 10);
+  if (ep != data+3 || *ep != ' ' || data[0]=='0') {
+    char sanibuf[100];
+    const char *p= data;
+    char *q= sanibuf;
+    *q++= '`';
+    for (;;) {
+      if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"..."); break; }
+      int c= *p++;
+      if (!c) { *q++= '\''; break; }
+      if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
+      sprintf(q,"\\x%02x",c);
+      q += 4;
     }
     }
-    assert(rs > 0);
+    warn("badly formatted response from peer: %s", sanibuf);
+    conn_failed(conn);
+    return;
+  }
 
 
-    conn->circ_read += rs;
-    if (conn->circ_read > CONNBUFSZ)
-      conn->circ_read -= CONNBUFSZ;
+  if (conn->quitting) {
+    if (code!=205) {
+      warn("peer gave failure response to QUIT: %s", sani);
+      conn_failed(conn);
+      return;
+    }
+    conn close ok;
+    return;
   }
   }
-}
 
 
+  switch (code) {
+  case 438: /* CHECK says they have it */
+  case 435: /* IHAVE says they have it */
+    ARTICLE_DEALTWITH(1,unwanted);
+    break;
+
+  case 238: /* CHECK says send it */
+  case 335: /* IHAVE says send it */
+    count_checkedwanted++;
+    Article *art= LIST_REMHEAD(conn->sent);
+    art->checked= 1;
+    LIST_ADDTAIL(conn->queue);
+    break;
+
+  case 235: /* IHAVE says thanks */
+  case 239: /* TAKETHIS says thanks */
+    ARTICLE_DEALTWITH(1,accepted);
+    break;
+
+  case 439: /* TAKETHIS says rejected */
+  case 437: /* IHAVE says rejected */
+    ARTICLE_DEALTWITH(1,rejected);
+    break;
+
+  case 431: /* CHECK or TAKETHIS says try later */
+  case 436: /* IHAVE says try later */
+    ARTICLE_DEALTWITH(0,deferred);
+    break;
+
+  case 400: warn("peer has stopped accepting articles: %s", sani); goto failed;
+  case 503: warn("peer timed us out: %s", sani);                   goto failed;
+  default:  warn("peer sent unexpected message: %s", sani);
+  failed:
+    conn_failed(conn);
+    return OOP_CONTINUE;;
+  }
 
 
+  return OOP_CONTINUE;
+}  
 main {
   ignore sigpipe;
 };
 main {
   ignore sigpipe;
 };