From ccfd1e3f48e3b15590263606cf0c6b5317762180 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Sun, 29 Nov 2009 23:21:47 +0000 Subject: [PATCH] WIP before any changes resulting from reading SM API stuff --- backends/innduct.c | 137 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 123 insertions(+), 14 deletions(-) diff --git a/backends/innduct.c b/backends/innduct.c index 610504e..43d4dd2 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -76,7 +76,7 @@ static const char *remote_host; struct Article { char *mid; int midlen; - int nocheck; /* also used when CHECK says yes please */ + int checked; }; #define CONNBUFSZ 16384 @@ -84,13 +84,26 @@ struct Article { #define CN "<%d> " typedef struct Conn Conn; + +typedef enum { + Malloc, Const, Artdata; +} XmitKind; +typedef struct { + XmitKind kind; + union { + char *malloc_tofree; + } +} XmitDetails; + struct Conn { ISNODE(Conn); int fd, max_queue, stream; - LIST(Article) queue; - Article *tosend; /* points into queue */ - char circ_buf[CONNBUFSZ]; - unsigned circ_read, circ_write; + LIST(Article) sent; /* in xmit or transmitted */ + Article send; /* partially transmitted */ + LIST(Article) queue; /* not yet in xmit */ + struct iovec *xmit; + XmitDetails *xmitd; + int xmitu; }; @@ -367,6 +380,110 @@ static void process_queue() { static void *conn_writeable() { for (;;) { + if (!conn->xmitu) { + perhaps_transmit_on(conn); + if (!conn->xmitu) { + unlink from readable; + break; + } + } + + int count= conn->xmitu; + if (count > IOV_MAX) count= IOV_MAX; + ssize_t rs= writev(conn->fd, conn->xmit, count); + if (rs < 0) { + if (errno == EAGAIN) break; + syswarn(CN "write failed", conn->fd); + conn_failed(conn); + break; + } + assert(rs > 0); + + for (done=0; rs && donexmit[done]; + XmitDetails *dp= &conn->xmitd[done]; + if (rs > vp->iov_len) { + rs -= vp->iov_len; + xmit_free(dp); + } else { + vp->iov_base += rs; + vp->iov_len -= rs; + } + } + int newu= conn->xmitu - done; + memmove(conn->xmit, conn->xmit + done, newu * sizeof(*conn->xmit)); + memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd)); + conn->xmitu= newu; + } + + return OOP_CONTINUE; +} + +static void transmit(Conn *conn) { + int inqueue= conn->sent.count + !!conn->send + conn->queue.count; + assert(inqueue < max_queue); + Article *art= LIST_REMHEAD(queue); + LIST_ADDTAIL(conn->queue, art); + if (inqueue == 0) { + LIST_REMOVE(idle, conn); + if (conn->max_queue==1) LIST_ADD(full, conn); + else LIST_ADD(working, conn); + } else if (inqueue == max_queue-1) { + LIST_REMOVE(working, conn); + LIST_ADD(full, conn); + } + perhaps_transmit_on(conn); +} + +static void perhaps_transmit_on(Conn *conn) { + for (;;) { + if (conn->send) { + /* do something about this article text */ + continue; + } + if (conn->xmitu+3 > xmita) + /* no space for a CHECK even */ + return; + + Article *art= LIST_REMHEAD(conn->queue); + if (art) return; + + if (art->checked || conn->nocheck) { + if (conn->stream) { + XMIT_LITERAL("TAKETHIS "); + xmit_noalloc(art->mid, art->midlen); + XMIT_LITERAL("\r\n"); + } else { + /* we got 235 */ + } + conn->send= art; + } else { + if (conn->stream) XMIT_LITERAL("IHAVE "); + else XMIT_LITERAL("CHECK "); + xmit_noalloc(art->mid, art->midlen); + XMIT_LITERAL("\r\n"); + LIST_ADDTAIL(conn->sent, art); + } + } +} + + + if (conn->queue.head) { + if (conn->queue.checked || conn->nocheck) { + + + && conn->xmitu+3 <= xmita) { + if ( + XMIT(" + if (conn->xmitu < xmita + + + if (!queue + + + + + int circ_used= circ_write - circ_read; if (circ_used < 0) circ_used += CONNBUFSZ; writeable_moredata(conn, CONNBUFSZ-1 - circ_used); @@ -387,10 +504,7 @@ static void *conn_writeable() { } ssize_t rs= writev(conn->fd, &iov, niov); if (rs < 0) { - if (errno == EAGAIN) return OOP_CONTINUE; - syswarn(CN "write failed", conn->fd); - conn_failed(conn); - return OOP_CONTINUE; + } assert(rs > 0); @@ -401,11 +515,6 @@ static void *conn_writeable() { } - -static void transmit(Conn *conn) { - assert(conn->queue.count < max_queue); - - main { ignore sigpipe; }; -- 2.30.2