chiark
/
gitweb
/
~ian
/
inn-innduct.git
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
f8e8102
)
WIP. New outbound data stuff looks good
author
Ian Jackson
<ian@liberator.(none)>
Tue, 9 Feb 2010 21:13:34 +0000
(21:13 +0000)
committer
Ian Jackson
<ian@liberator.(none)>
Tue, 9 Feb 2010 21:13:34 +0000
(21:13 +0000)
backends/innduct.c
patch
|
blob
|
history
diff --git
a/backends/innduct.c
b/backends/innduct.c
index 1b8875ca4c252ff82b483b8967d42fbb1cfe9bcc..2855c412a0f6c2113042c75d1107197b7615b6e0 100644
(file)
--- a/
backends/innduct.c
+++ b/
backends/innduct.c
@@
-76,10
+76,10
@@
static const char *remote_host;
struct Article {
char *mid;
int midlen;
struct Article {
char *mid;
int midlen;
- int checked;
+ int checked
, sentbody
;
};
};
-#define CONN
BUFSZ 16384
+#define CONN
IOVS 128
#define CN "<%d> "
#define CN "<%d> "
@@
-88,21
+88,22
@@
typedef struct Conn Conn;
typedef enum {
Malloc, Const, Artdata;
} XmitKind;
typedef enum {
Malloc, Const, Artdata;
} XmitKind;
+
typedef struct {
XmitKind kind;
union {
char *malloc_tofree;
typedef struct {
XmitKind kind;
union {
char *malloc_tofree;
+ ARTHANDLE *sm_art;
} info;
} XmitDetails;
struct Conn {
ISNODE(Conn);
int fd, max_queue, stream;
} info;
} XmitDetails;
struct Conn {
ISNODE(Conn);
int fd, max_queue, stream;
- LIST(Article) queue; /* not yet told peer, or
said TAKETHIS
*/
+ LIST(Article) queue; /* not yet told peer, or
CHECK said send it
*/
LIST(Article) sent; /* offered, in xmit, or transmitted waiting reply */
LIST(Article) sent; /* offered, in xmit, or transmitted waiting reply */
- Article send; /* partially transmitted */
- struct iovec *xmit;
- XmitDetails *xmitd;
+ struct iovec xmit[CONNIOVS];
+ XmitDetails xmitd[CONNIOVS];
int xmitu;
};
int xmitu;
};
@@
-246,7
+247,7
@@
static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
LIST_ADDHEAD(idle, conn);
notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain");
connect_attempt_discard();
LIST_ADDHEAD(idle, conn);
notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain");
connect_attempt_discard();
-
process
_queue();
+
check_master
_queue();
return 0;
x:
return 0;
x:
@@
-361,34
+362,48
@@
static void connect_start() {
/*========== overall control of article flow ==========*/
/*========== overall control of article flow ==========*/
+static void conn_check_work(Conn *conn);
+
static void check_master_queue(void) {
if (!queue.count)
return;
static void check_master_queue(void) {
if (!queue.count)
return;
- if (working.head) {
- conn_assign_one_article(&working);
- } else if (idle.head) {
- conn_assign_one_article(&idle);
- } else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
- !connecting_child && !connect_delay) {
- connect_delay= reconnect_delay_periods;
- connect_start();
+ Conn *last_assigned=0;
+ for (;;) {
+ if (working.head) {
+ conn_assign_one_article(&working, &last_assigned);
+ } else if (idle.head) {
+ conn_assign_one_article(&idle, &last_assigned);
+ } else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
+ !connecting_child && !connect_delay) {
+ connect_delay= reconnect_delay_periods;
+ connect_start();
+ } else {
+ break;
+ }
}
}
+ conn_check_work(last_assigned);
}
}
-
-static int conn_total_queued_articles(Conn *conn) {
- return conn->sent.count + !!conn->send + conn->queue.count;
-}
-static void conn_assign_one_article(LIST(Conn) *connlist) {
+static void conn_assign_one_article(LIST(Conn) *connlist,
+ Conn **last_assigned) {
Conn *conn= connlist->head;
LIST_REMOVE(*connlist, conn);
Article *art= LIST_REMHEAD(queue);
LIST_ADDTAIL(conn->queue, art);
LIST_ADD(*conn_determine_right_list(conn), conn);
Conn *conn= connlist->head;
LIST_REMOVE(*connlist, conn);
Article *art= LIST_REMHEAD(queue);
LIST_ADDTAIL(conn->queue, art);
LIST_ADD(*conn_determine_right_list(conn), conn);
-
- check_conn_work(conn);
+
+ /* This slightly odd arrangement is so that we call conn_check_work
+ * once after filling the queue for a new connection in
+ * check_master_queue, rather than for each article. */
+ if (conn != *last_assigned && *last_assigned)
+ conn_check_work(*last_assigned);
+ *last_assigned= conn;
+}
+
+static int conn_total_queued_articles(Conn *conn) {
+ return conn->sent.count + conn->queue.count;
}
static LIST(Conn) *conn_determine_right_list(Conn *conn) {
}
static LIST(Conn) *conn_determine_right_list(Conn *conn) {
@@
-399,42
+414,35
@@
static LIST(Conn) *conn_determine_right_list(Conn *conn) {
return &working;
}
return &working;
}
-static void check_conn_work(Conn *conn) {
- void *rp;
+static void *conn_writeable(oop_source *l, int fd, int ev, void *u) {
+ check_conn_work(u);
+}
+
+static void conn_check_work(Conn *conn) {
+ void *rp= 0;
for (;;) {
conn_make_some_xmits(conn);
for (;;) {
conn_make_some_xmits(conn);
+ if (!conn->xmitu) {
+ loop->cancel_fd(loop, conn->fd, OOP_WRITE);
+ return;
+ }
void *rp= conn_write_some_xmits(conn);
void *rp= conn_write_some_xmits(conn);
- if (
!rp
) {
- loop->
cancel_fd(loop, conn->fd, OOP_WRITE
);
+ if (
rp==OOP_CONTINUE
) {
+ loop->
on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn
);
return;
return;
- } else if (rp==OOP_CONTINUE) {
- loop->on_fd(loop, conn->fd, OOP_WRITE;)
- else if (rp==OOP_HALT) {
+ } else if (rp==OOP_HALT) {
return;
return;
+ } else if (!rp) {
+ /* transmitted everything */
+ } else {
+ abort();
}
}
-
-
- if (
-
- while (
-
+ }
}
/*========== article transmission ==========*/
}
/*========== article transmission ==========*/
-static void *conn_writeable() {
- for (;;) {
-
- if (!conn->xmitu) {
- perhaps_transmit_on(conn);
- if (!conn->xmitu) {
- unlink from readable;
- break;
- }
- }
-
-
static void *conn_write_some_xmits(Conn *conn) {
/* return values:
* 0: nothing more to write, no need to call us again
static void *conn_write_some_xmits(Conn *conn) {
/* return values:
* 0: nothing more to write, no need to call us again
@@
-475,34
+483,45
@@
static void *conn_write_some_xmits(Conn *conn) {
static void conn_make_some_xmits(Conn *conn) {
for (;;) {
static void conn_make_some_xmits(Conn *conn) {
for (;;) {
- if (conn->send) {
- do something about this article text;
- continue;
- }
-
- if (conn->xmitu+3 > conn->xmita)
- /* no space for a CHECK even */
+ if (conn->xmitu+5 > CONNIOVS)
break;
Article *art= LIST_REMHEAD(queue);
if (!art) break;
if (art->checked || conn->nocheck) {
break;
Article *art= LIST_REMHEAD(queue);
if (!art) break;
if (art->checked || conn->nocheck) {
+ /* actually send it */
+
+ ARTHANDLE *artdata= SMretrieve(somehow);
+
if (conn->stream) {
if (conn->stream) {
- XMIT_LITERAL("TAKETHIS ");
- xmit_noalloc(art->mid, art->midlen);
- XMIT_LITERAL("\r\n");
+ if (artdata) {
+ XMIT_LITERAL("TAKETHIS ");
+ xmit_noalloc(art->mid, art->midlen);
+ XMIT_LITERAL("\r\n");
+ xmit_artbody(artdata);
+ }
} else {
/* we got 235 from IHAVE */
} else {
/* we got 235 from IHAVE */
+ if (artdata) {
+ xmit_artbody(artdata);
+ } else {
+ XMIT_LITERAL(".\r\n");
+ }
}
}
- conn->send= art;
+ art->sent= 1;
+ LIST_ADDTAIL(conn->sent, art);
+
} else {
} else {
+ /* check it */
+
if (conn->stream)
XMIT_LITERAL("IHAVE ");
else
XMIT_LITERAL("CHECK ");
xmit_noalloc(art->mid, art->midlen);
XMIT_LITERAL("\r\n");
if (conn->stream)
XMIT_LITERAL("IHAVE ");
else
XMIT_LITERAL("CHECK ");
xmit_noalloc(art->mid, art->midlen);
XMIT_LITERAL("\r\n");
+
LIST_ADDTAIL(conn->sent, art);
}
}
LIST_ADDTAIL(conn->sent, art);
}
}
@@
-513,10
+532,10
@@
static void conn_make_some_xmits(Conn *conn) {
if (conn->queue.checked || conn->nocheck) {
if (conn->queue.checked || conn->nocheck) {
- && conn->xmitu+3 <=
xmita
) {
+ && conn->xmitu+3 <=
CONNIOVS
) {
if (
XMIT("
if (
XMIT("
- if (conn->xmitu <
xmita
+ if (conn->xmitu <
CONNIOVS
if (!queue
if (!queue