From 39e51ad7be00995b2bcabe7ae7f1b613035c4df4 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Tue, 9 Feb 2010 16:57:14 +0000 Subject: [PATCH 1/1] WIP --- backends/innduct.c | 113 ++++++++++++++++++++++++++++++--------------- 1 file changed, 77 insertions(+), 36 deletions(-) diff --git a/backends/innduct.c b/backends/innduct.c index 43d4dd2..1b8875c 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -92,15 +92,15 @@ typedef struct { XmitKind kind; union { char *malloc_tofree; - } + } info; } XmitDetails; struct Conn { ISNODE(Conn); int fd, max_queue, stream; - LIST(Article) sent; /* in xmit or transmitted */ + LIST(Article) queue; /* not yet told peer, or said TAKETHIS */ + LIST(Article) sent; /* offered, in xmit, or transmitted waiting reply */ Article send; /* partially transmitted */ - LIST(Article) queue; /* not yet in xmit */ struct iovec *xmit; XmitDetails *xmitd; int xmitu; @@ -361,25 +361,71 @@ static void connect_start() { /*========== overall control of article flow ==========*/ -static void process_queue() { +static void check_master_queue(void) { if (!queue.count) return; if (working.head) { - transmit(working.head); + conn_assign_one_article(&working); } else if (idle.head) { - transmit(idle.head); + conn_assign_one_article(&idle); } else if (nconns < maxconns && queue.count >= max_queue_per_conn && !connecting_child && !connect_delay) { connect_delay= reconnect_delay_periods; connect_start(); } } + +static int conn_total_queued_articles(Conn *conn) { + return conn->sent.count + !!conn->send + conn->queue.count; +} + +static void conn_assign_one_article(LIST(Conn) *connlist) { + Conn *conn= connlist->head; + + LIST_REMOVE(*connlist, conn); + Article *art= LIST_REMHEAD(queue); + LIST_ADDTAIL(conn->queue, art); + LIST_ADD(*conn_determine_right_list(conn), conn); + + check_conn_work(conn); +} + +static LIST(Conn) *conn_determine_right_list(Conn *conn) { + int inqueue= conn_total_queued_articles(conn); + assert(inqueue <= max_queue); + if (inqueue == 0) return &idle; + if (inqueue == conn->max_queue) return &full; + return &working; +} + +static void check_conn_work(Conn *conn) { + void *rp; + for (;;) { + conn_make_some_xmits(conn); + + void *rp= conn_write_some_xmits(conn); + if (!rp) { + loop->cancel_fd(loop, conn->fd, OOP_WRITE); + return; + } else if (rp==OOP_CONTINUE) { + loop->on_fd(loop, conn->fd, OOP_WRITE;) + else if (rp==OOP_HALT) { + return; + } + + + if ( + + while ( + +} /*========== article transmission ==========*/ static void *conn_writeable() { for (;;) { + if (!conn->xmitu) { perhaps_transmit_on(conn); if (!conn->xmitu) { @@ -388,14 +434,24 @@ static void *conn_writeable() { } } + +static void *conn_write_some_xmits(Conn *conn) { + /* return values: + * 0: nothing more to write, no need to call us again + * OOP_CONTINUE: more to write but fd not writeable + * OOP_HALT: disaster, have destroyed conn + */ + for (;;) { int count= conn->xmitu; + if (!count) return 0; + if (count > IOV_MAX) count= IOV_MAX; ssize_t rs= writev(conn->fd, conn->xmit, count); if (rs < 0) { - if (errno == EAGAIN) break; + if (errno == EAGAIN) return OOP_CONTINUE; syswarn(CN "write failed", conn->fd); conn_failed(conn); - break; + return OOP_HALT; } assert(rs > 0); @@ -415,38 +471,21 @@ static void *conn_writeable() { memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd)); conn->xmitu= newu; } - - return OOP_CONTINUE; -} - -static void transmit(Conn *conn) { - int inqueue= conn->sent.count + !!conn->send + conn->queue.count; - assert(inqueue < max_queue); - Article *art= LIST_REMHEAD(queue); - LIST_ADDTAIL(conn->queue, art); - if (inqueue == 0) { - LIST_REMOVE(idle, conn); - if (conn->max_queue==1) LIST_ADD(full, conn); - else LIST_ADD(working, conn); - } else if (inqueue == max_queue-1) { - LIST_REMOVE(working, conn); - LIST_ADD(full, conn); - } - perhaps_transmit_on(conn); } -static void perhaps_transmit_on(Conn *conn) { +static void conn_make_some_xmits(Conn *conn) { for (;;) { if (conn->send) { - /* do something about this article text */ + do something about this article text; continue; } - if (conn->xmitu+3 > xmita) + + if (conn->xmitu+3 > conn->xmita) /* no space for a CHECK even */ - return; + break; - Article *art= LIST_REMHEAD(conn->queue); - if (art) return; + Article *art= LIST_REMHEAD(queue); + if (!art) break; if (art->checked || conn->nocheck) { if (conn->stream) { @@ -454,19 +493,21 @@ static void perhaps_transmit_on(Conn *conn) { xmit_noalloc(art->mid, art->midlen); XMIT_LITERAL("\r\n"); } else { - /* we got 235 */ + /* we got 235 from IHAVE */ } conn->send= art; } else { - if (conn->stream) XMIT_LITERAL("IHAVE "); - else XMIT_LITERAL("CHECK "); + if (conn->stream) + XMIT_LITERAL("IHAVE "); + else + XMIT_LITERAL("CHECK "); xmit_noalloc(art->mid, art->midlen); XMIT_LITERAL("\r\n"); LIST_ADDTAIL(conn->sent, art); } } } - + if (conn->queue.head) { if (conn->queue.checked || conn->nocheck) { -- 2.30.2