From 8035a056375b9c5cb05fba8b0e8b81c2a5f13fa8 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Tue, 9 Feb 2010 21:13:34 +0000 Subject: [PATCH] WIP. New outbound data stuff looks good --- backends/innduct.c | 139 ++++++++++++++++++++++++++------------------- 1 file changed, 79 insertions(+), 60 deletions(-) diff --git a/backends/innduct.c b/backends/innduct.c index 1b8875c..2855c41 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -76,10 +76,10 @@ static const char *remote_host; struct Article { char *mid; int midlen; - int checked; + int checked, sentbody; }; -#define CONNBUFSZ 16384 +#define CONNIOVS 128 #define CN "<%d> " @@ -88,21 +88,22 @@ typedef struct Conn Conn; typedef enum { Malloc, Const, Artdata; } XmitKind; + typedef struct { XmitKind kind; union { char *malloc_tofree; + ARTHANDLE *sm_art; } info; } XmitDetails; struct Conn { ISNODE(Conn); int fd, max_queue, stream; - LIST(Article) queue; /* not yet told peer, or said TAKETHIS */ + LIST(Article) queue; /* not yet told peer, or CHECK said send it */ LIST(Article) sent; /* offered, in xmit, or transmitted waiting reply */ - Article send; /* partially transmitted */ - struct iovec *xmit; - XmitDetails *xmitd; + struct iovec xmit[CONNIOVS]; + XmitDetails xmitd[CONNIOVS]; int xmitu; }; @@ -246,7 +247,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { LIST_ADDHEAD(idle, conn); notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain"); connect_attempt_discard(); - process_queue(); + check_master_queue(); return 0; x: @@ -361,34 +362,48 @@ static void connect_start() { /*========== overall control of article flow ==========*/ +static void conn_check_work(Conn *conn); + static void check_master_queue(void) { if (!queue.count) return; - if (working.head) { - conn_assign_one_article(&working); - } else if (idle.head) { - conn_assign_one_article(&idle); - } else if (nconns < maxconns && queue.count >= max_queue_per_conn && - !connecting_child && !connect_delay) { - connect_delay= reconnect_delay_periods; - connect_start(); + Conn *last_assigned=0; + for (;;) { + if (working.head) { + conn_assign_one_article(&working, &last_assigned); + } else if (idle.head) { + conn_assign_one_article(&idle, &last_assigned); + } else if (nconns < maxconns && queue.count >= max_queue_per_conn && + !connecting_child && !connect_delay) { + connect_delay= reconnect_delay_periods; + connect_start(); + } else { + break; + } } + conn_check_work(last_assigned); } - -static int conn_total_queued_articles(Conn *conn) { - return conn->sent.count + !!conn->send + conn->queue.count; -} -static void conn_assign_one_article(LIST(Conn) *connlist) { +static void conn_assign_one_article(LIST(Conn) *connlist, + Conn **last_assigned) { Conn *conn= connlist->head; LIST_REMOVE(*connlist, conn); Article *art= LIST_REMHEAD(queue); LIST_ADDTAIL(conn->queue, art); LIST_ADD(*conn_determine_right_list(conn), conn); - - check_conn_work(conn); + + /* This slightly odd arrangement is so that we call conn_check_work + * once after filling the queue for a new connection in + * check_master_queue, rather than for each article. */ + if (conn != *last_assigned && *last_assigned) + conn_check_work(*last_assigned); + *last_assigned= conn; +} + +static int conn_total_queued_articles(Conn *conn) { + return conn->sent.count + conn->queue.count; } static LIST(Conn) *conn_determine_right_list(Conn *conn) { @@ -399,42 +414,35 @@ static LIST(Conn) *conn_determine_right_list(Conn *conn) { return &working; } -static void check_conn_work(Conn *conn) { - void *rp; +static void *conn_writeable(oop_source *l, int fd, int ev, void *u) { + check_conn_work(u); +} + +static void conn_check_work(Conn *conn) { + void *rp= 0; for (;;) { conn_make_some_xmits(conn); + if (!conn->xmitu) { + loop->cancel_fd(loop, conn->fd, OOP_WRITE); + return; + } void *rp= conn_write_some_xmits(conn); - if (!rp) { - loop->cancel_fd(loop, conn->fd, OOP_WRITE); + if (rp==OOP_CONTINUE) { + loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn); return; - } else if (rp==OOP_CONTINUE) { - loop->on_fd(loop, conn->fd, OOP_WRITE;) - else if (rp==OOP_HALT) { + } else if (rp==OOP_HALT) { return; + } else if (!rp) { + /* transmitted everything */ + } else { + abort(); } - - - if ( - - while ( - + } } /*========== article transmission ==========*/ -static void *conn_writeable() { - for (;;) { - - if (!conn->xmitu) { - perhaps_transmit_on(conn); - if (!conn->xmitu) { - unlink from readable; - break; - } - } - - static void *conn_write_some_xmits(Conn *conn) { /* return values: * 0: nothing more to write, no need to call us again @@ -475,34 +483,45 @@ static void *conn_write_some_xmits(Conn *conn) { static void conn_make_some_xmits(Conn *conn) { for (;;) { - if (conn->send) { - do something about this article text; - continue; - } - - if (conn->xmitu+3 > conn->xmita) - /* no space for a CHECK even */ + if (conn->xmitu+5 > CONNIOVS) break; Article *art= LIST_REMHEAD(queue); if (!art) break; if (art->checked || conn->nocheck) { + /* actually send it */ + + ARTHANDLE *artdata= SMretrieve(somehow); + if (conn->stream) { - XMIT_LITERAL("TAKETHIS "); - xmit_noalloc(art->mid, art->midlen); - XMIT_LITERAL("\r\n"); + if (artdata) { + XMIT_LITERAL("TAKETHIS "); + xmit_noalloc(art->mid, art->midlen); + XMIT_LITERAL("\r\n"); + xmit_artbody(artdata); + } } else { /* we got 235 from IHAVE */ + if (artdata) { + xmit_artbody(artdata); + } else { + XMIT_LITERAL(".\r\n"); + } } - conn->send= art; + art->sent= 1; + LIST_ADDTAIL(conn->sent, art); + } else { + /* check it */ + if (conn->stream) XMIT_LITERAL("IHAVE "); else XMIT_LITERAL("CHECK "); xmit_noalloc(art->mid, art->midlen); XMIT_LITERAL("\r\n"); + LIST_ADDTAIL(conn->sent, art); } } @@ -513,10 +532,10 @@ static void conn_make_some_xmits(Conn *conn) { if (conn->queue.checked || conn->nocheck) { - && conn->xmitu+3 <= xmita) { + && conn->xmitu+3 <= CONNIOVS) { if ( XMIT(" - if (conn->xmitu < xmita + if (conn->xmitu < CONNIOVS if (!queue -- 2.30.2