struct Article {
char *mid;
int midlen;
- int nocheck; /* also used when CHECK says yes please */
+ int checked;
};
#define CONNBUFSZ 16384
#define CN "<%d> "
typedef struct Conn Conn;
+
+typedef enum {
+ Malloc, Const, Artdata;
+} XmitKind;
+typedef struct {
+ XmitKind kind;
+ union {
+ char *malloc_tofree;
+ } info;
+} XmitDetails;
+
struct Conn {
ISNODE(Conn);
int fd, max_queue, stream;
- LIST(Article) queue;
- Article *tosend; /* points into queue */
- char circ_buf[CONNBUFSZ];
- unsigned circ_read, circ_write;
+ LIST(Article) queue; /* not yet told peer, or said TAKETHIS */
+ LIST(Article) sent; /* offered, in xmit, or transmitted waiting reply */
+ Article send; /* partially transmitted */
+ struct iovec *xmit;
+ XmitDetails *xmitd;
+ int xmitu;
};
/*========== overall control of article flow ==========*/
-static void process_queue() {
+static void check_master_queue(void) {
if (!queue.count)
return;
if (working.head) {
- transmit(working.head);
+ conn_assign_one_article(&working);
} else if (idle.head) {
- transmit(idle.head);
+ conn_assign_one_article(&idle);
} else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
!connecting_child && !connect_delay) {
connect_delay= reconnect_delay_periods;
connect_start();
}
}
+
+static int conn_total_queued_articles(Conn *conn) {
+ return conn->sent.count + !!conn->send + conn->queue.count;
+}
+
+static void conn_assign_one_article(LIST(Conn) *connlist) {
+ Conn *conn= connlist->head;
+
+ LIST_REMOVE(*connlist, conn);
+ Article *art= LIST_REMHEAD(queue);
+ LIST_ADDTAIL(conn->queue, art);
+ LIST_ADD(*conn_determine_right_list(conn), conn);
+
+ check_conn_work(conn);
+}
+
+static LIST(Conn) *conn_determine_right_list(Conn *conn) {
+ int inqueue= conn_total_queued_articles(conn);
+ assert(inqueue <= max_queue);
+ if (inqueue == 0) return &idle;
+ if (inqueue == conn->max_queue) return &full;
+ return &working;
+}
+
+static void check_conn_work(Conn *conn) {
+ void *rp;
+ for (;;) {
+ conn_make_some_xmits(conn);
+
+ void *rp= conn_write_some_xmits(conn);
+ if (!rp) {
+ loop->cancel_fd(loop, conn->fd, OOP_WRITE);
+ return;
+ } else if (rp==OOP_CONTINUE) {
+ loop->on_fd(loop, conn->fd, OOP_WRITE;)
+ else if (rp==OOP_HALT) {
+ return;
+ }
+
+
+ if (
+
+ while (
+
+}
/*========== article transmission ==========*/
static void *conn_writeable() {
for (;;) {
+
+ if (!conn->xmitu) {
+ perhaps_transmit_on(conn);
+ if (!conn->xmitu) {
+ unlink from readable;
+ break;
+ }
+ }
+
+
+static void *conn_write_some_xmits(Conn *conn) {
+ /* return values:
+ * 0: nothing more to write, no need to call us again
+ * OOP_CONTINUE: more to write but fd not writeable
+ * OOP_HALT: disaster, have destroyed conn
+ */
+ for (;;) {
+ int count= conn->xmitu;
+ if (!count) return 0;
+
+ if (count > IOV_MAX) count= IOV_MAX;
+ ssize_t rs= writev(conn->fd, conn->xmit, count);
+ if (rs < 0) {
+ if (errno == EAGAIN) return OOP_CONTINUE;
+ syswarn(CN "write failed", conn->fd);
+ conn_failed(conn);
+ return OOP_HALT;
+ }
+ assert(rs > 0);
+
+ for (done=0; rs && done<xmitu; done++) {
+ struct iovec *vp= &conn->xmit[done];
+ XmitDetails *dp= &conn->xmitd[done];
+ if (rs > vp->iov_len) {
+ rs -= vp->iov_len;
+ xmit_free(dp);
+ } else {
+ vp->iov_base += rs;
+ vp->iov_len -= rs;
+ }
+ }
+ int newu= conn->xmitu - done;
+ memmove(conn->xmit, conn->xmit + done, newu * sizeof(*conn->xmit));
+ memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
+ conn->xmitu= newu;
+ }
+}
+
+static void conn_make_some_xmits(Conn *conn) {
+ for (;;) {
+ if (conn->send) {
+ do something about this article text;
+ continue;
+ }
+
+ if (conn->xmitu+3 > conn->xmita)
+ /* no space for a CHECK even */
+ break;
+
+ Article *art= LIST_REMHEAD(queue);
+ if (!art) break;
+
+ if (art->checked || conn->nocheck) {
+ if (conn->stream) {
+ XMIT_LITERAL("TAKETHIS ");
+ xmit_noalloc(art->mid, art->midlen);
+ XMIT_LITERAL("\r\n");
+ } else {
+ /* we got 235 from IHAVE */
+ }
+ conn->send= art;
+ } else {
+ if (conn->stream)
+ XMIT_LITERAL("IHAVE ");
+ else
+ XMIT_LITERAL("CHECK ");
+ xmit_noalloc(art->mid, art->midlen);
+ XMIT_LITERAL("\r\n");
+ LIST_ADDTAIL(conn->sent, art);
+ }
+ }
+}
+
+
+ if (conn->queue.head) {
+ if (conn->queue.checked || conn->nocheck) {
+
+
+ && conn->xmitu+3 <= xmita) {
+ if (
+ XMIT("
+ if (conn->xmitu < xmita
+
+
+ if (!queue
+
+
+
+
+
int circ_used= circ_write - circ_read;
if (circ_used < 0) circ_used += CONNBUFSZ;
writeable_moredata(conn, CONNBUFSZ-1 - circ_used);
}
ssize_t rs= writev(conn->fd, &iov, niov);
if (rs < 0) {
- if (errno == EAGAIN) return OOP_CONTINUE;
- syswarn(CN "write failed", conn->fd);
- conn_failed(conn);
- return OOP_CONTINUE;
+
}
assert(rs > 0);
}
-
-static void transmit(Conn *conn) {
- assert(conn->queue.count < max_queue);
-
-
main {
ignore sigpipe;
};