XmitKind kind;
union {
char *malloc_tofree;
- }
+ } info;
} XmitDetails;
struct Conn {
ISNODE(Conn);
int fd, max_queue, stream;
- LIST(Article) sent; /* in xmit or transmitted */
+ LIST(Article) queue; /* not yet told peer, or said TAKETHIS */
+ LIST(Article) sent; /* offered, in xmit, or transmitted waiting reply */
Article send; /* partially transmitted */
- LIST(Article) queue; /* not yet in xmit */
struct iovec *xmit;
XmitDetails *xmitd;
int xmitu;
/*========== overall control of article flow ==========*/
-static void process_queue() {
+static void check_master_queue(void) {
if (!queue.count)
return;
if (working.head) {
- transmit(working.head);
+ conn_assign_one_article(&working);
} else if (idle.head) {
- transmit(idle.head);
+ conn_assign_one_article(&idle);
} else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
!connecting_child && !connect_delay) {
connect_delay= reconnect_delay_periods;
connect_start();
}
}
+
+static int conn_total_queued_articles(Conn *conn) {
+ return conn->sent.count + !!conn->send + conn->queue.count;
+}
+
+static void conn_assign_one_article(LIST(Conn) *connlist) {
+ Conn *conn= connlist->head;
+
+ LIST_REMOVE(*connlist, conn);
+ Article *art= LIST_REMHEAD(queue);
+ LIST_ADDTAIL(conn->queue, art);
+ LIST_ADD(*conn_determine_right_list(conn), conn);
+
+ check_conn_work(conn);
+}
+
+static LIST(Conn) *conn_determine_right_list(Conn *conn) {
+ int inqueue= conn_total_queued_articles(conn);
+ assert(inqueue <= max_queue);
+ if (inqueue == 0) return &idle;
+ if (inqueue == conn->max_queue) return &full;
+ return &working;
+}
+
+static void check_conn_work(Conn *conn) {
+ void *rp;
+ for (;;) {
+ conn_make_some_xmits(conn);
+
+ void *rp= conn_write_some_xmits(conn);
+ if (!rp) {
+ loop->cancel_fd(loop, conn->fd, OOP_WRITE);
+ return;
+ } else if (rp==OOP_CONTINUE) {
+ loop->on_fd(loop, conn->fd, OOP_WRITE;)
+ else if (rp==OOP_HALT) {
+ return;
+ }
+
+
+ if (
+
+ while (
+
+}
/*========== article transmission ==========*/
static void *conn_writeable() {
for (;;) {
+
if (!conn->xmitu) {
perhaps_transmit_on(conn);
if (!conn->xmitu) {
}
}
+
+static void *conn_write_some_xmits(Conn *conn) {
+ /* return values:
+ * 0: nothing more to write, no need to call us again
+ * OOP_CONTINUE: more to write but fd not writeable
+ * OOP_HALT: disaster, have destroyed conn
+ */
+ for (;;) {
int count= conn->xmitu;
+ if (!count) return 0;
+
if (count > IOV_MAX) count= IOV_MAX;
ssize_t rs= writev(conn->fd, conn->xmit, count);
if (rs < 0) {
- if (errno == EAGAIN) break;
+ if (errno == EAGAIN) return OOP_CONTINUE;
syswarn(CN "write failed", conn->fd);
conn_failed(conn);
- break;
+ return OOP_HALT;
}
assert(rs > 0);
memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
conn->xmitu= newu;
}
-
- return OOP_CONTINUE;
-}
-
-static void transmit(Conn *conn) {
- int inqueue= conn->sent.count + !!conn->send + conn->queue.count;
- assert(inqueue < max_queue);
- Article *art= LIST_REMHEAD(queue);
- LIST_ADDTAIL(conn->queue, art);
- if (inqueue == 0) {
- LIST_REMOVE(idle, conn);
- if (conn->max_queue==1) LIST_ADD(full, conn);
- else LIST_ADD(working, conn);
- } else if (inqueue == max_queue-1) {
- LIST_REMOVE(working, conn);
- LIST_ADD(full, conn);
- }
- perhaps_transmit_on(conn);
}
-static void perhaps_transmit_on(Conn *conn) {
+static void conn_make_some_xmits(Conn *conn) {
for (;;) {
if (conn->send) {
- /* do something about this article text */
+ do something about this article text;
continue;
}
- if (conn->xmitu+3 > xmita)
+
+ if (conn->xmitu+3 > conn->xmita)
/* no space for a CHECK even */
- return;
+ break;
- Article *art= LIST_REMHEAD(conn->queue);
- if (art) return;
+ Article *art= LIST_REMHEAD(queue);
+ if (!art) break;
if (art->checked || conn->nocheck) {
if (conn->stream) {
xmit_noalloc(art->mid, art->midlen);
XMIT_LITERAL("\r\n");
} else {
- /* we got 235 */
+ /* we got 235 from IHAVE */
}
conn->send= art;
} else {
- if (conn->stream) XMIT_LITERAL("IHAVE ");
- else XMIT_LITERAL("CHECK ");
+ if (conn->stream)
+ XMIT_LITERAL("IHAVE ");
+ else
+ XMIT_LITERAL("CHECK ");
xmit_noalloc(art->mid, art->midlen);
XMIT_LITERAL("\r\n");
LIST_ADDTAIL(conn->sent, art);
}
}
}
-
+
if (conn->queue.head) {
if (conn->queue.checked || conn->nocheck) {