From 3cfc1f3827207ca05205357a5cacd710eed6c861 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Fri, 12 Feb 2010 18:12:43 +0000 Subject: [PATCH] WIP - incoming message processing --- backends/innduct.c | 126 ++++++++++++++++++++++++++++++--------------- 1 file changed, 85 insertions(+), 41 deletions(-) diff --git a/backends/innduct.c b/backends/innduct.c index 2855c41..65417c9 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -77,6 +77,7 @@ struct Article { char *mid; int midlen; int checked, sentbody; + fd and offset for blanking token or mid; }; #define CONNIOVS 128 @@ -101,7 +102,7 @@ struct Conn { ISNODE(Conn); int fd, max_queue, stream; LIST(Article) queue; /* not yet told peer, or CHECK said send it */ - LIST(Article) sent; /* offered, in xmit, or transmitted waiting reply */ + LIST(Article) sent; /* offered/transmitted - in xmit or waiting reply */ struct iovec xmit[CONNIOVS]; XmitDetails xmitd[CONNIOVS]; int xmitu; @@ -416,6 +417,7 @@ static LIST(Conn) *conn_determine_right_list(Conn *conn) { static void *conn_writeable(oop_source *l, int fd, int ev, void *u) { check_conn_work(u); + return OOP_CONTINUE; } static void conn_check_work(Conn *conn) { @@ -526,55 +528,97 @@ static void conn_make_some_xmits(Conn *conn) { } } } - - - if (conn->queue.head) { - if (conn->queue.checked || conn->nocheck) { - - - && conn->xmitu+3 <= CONNIOVS) { - if ( - XMIT(" - if (conn->xmitu < CONNIOVS - - if (!queue - +/*========== responses from peer ==========*/ +static const oop_rd_style peer_rd_style= { + OOP_RD_DELIM_STRIP, '\n', + OOP_RD_NUL_FORBID, + OOP_RD_SHORTREC_FORBID +}; +static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, + const char *errmsg, int errnoval, + const char *data, size_t recsz, void *conn_v) { + Conn *conn= conn_v; - - int circ_used= circ_write - circ_read; - if (circ_used < 0) circ_used += CONNBUFSZ; - writeable_moredata(conn, CONNBUFSZ-1 - circ_used); - - if (conn->circ_read == conn->circ_write) - return OOP_CONTINUE; - - struct iovec iov[2]; - int niov= 1; - iov[0].iov_base= conn->circ_buf + conn->circ_read; - if (conn->circ_read > conn->circ_write) { /* wrapped */ - iov[0].iov_len= CONNBUFSZ - conn->circ_read; - iov[1].iov_base= conn->circ_buf; - iov[1].iov_len= conn->circ_write; - if (niov[1].iov_len) niov= 2; - } else { - iov[0].iov_len= conn->circ_write - conn->circ_read; - } - ssize_t rs= writev(conn->fd, &iov, niov); - if (rs < 0) { - + if (ev == OOP_RD_EOF) { + warn("unexpected EOF from peer"); + conn_failed(conn); + return; + } + assert(ev == OOP_RD_OK); + + char *ep; + unsigned long code= strtoul(data, &ep, 10); + if (ep != data+3 || *ep != ' ' || data[0]=='0') { + char sanibuf[100]; + const char *p= data; + char *q= sanibuf; + *q++= '`'; + for (;;) { + if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"..."); break; } + int c= *p++; + if (!c) { *q++= '\''; break; } + if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; } + sprintf(q,"\\x%02x",c); + q += 4; } - assert(rs > 0); + warn("badly formatted response from peer: %s", sanibuf); + conn_failed(conn); + return; + } - conn->circ_read += rs; - if (conn->circ_read > CONNBUFSZ) - conn->circ_read -= CONNBUFSZ; + if (conn->quitting) { + if (code!=205) { + warn("peer gave failure response to QUIT: %s", sani); + conn_failed(conn); + return; + } + conn close ok; + return; } -} + switch (code) { + case 438: /* CHECK says they have it */ + case 435: /* IHAVE says they have it */ + ARTICLE_DEALTWITH(1,unwanted); + break; + + case 238: /* CHECK says send it */ + case 335: /* IHAVE says send it */ + count_checkedwanted++; + Article *art= LIST_REMHEAD(conn->sent); + art->checked= 1; + LIST_ADDTAIL(conn->queue); + break; + + case 235: /* IHAVE says thanks */ + case 239: /* TAKETHIS says thanks */ + ARTICLE_DEALTWITH(1,accepted); + break; + + case 439: /* TAKETHIS says rejected */ + case 437: /* IHAVE says rejected */ + ARTICLE_DEALTWITH(1,rejected); + break; + + case 431: /* CHECK or TAKETHIS says try later */ + case 436: /* IHAVE says try later */ + ARTICLE_DEALTWITH(0,deferred); + break; + + case 400: warn("peer has stopped accepting articles: %s", sani); goto failed; + case 503: warn("peer timed us out: %s", sani); goto failed; + default: warn("peer sent unexpected message: %s", sani); + failed: + conn_failed(conn); + return OOP_CONTINUE;; + } + return OOP_CONTINUE; +} + main { ignore sigpipe; }; -- 2.30.2