+/*
+ * innduct
+ * tailing reliable realtime streaming feeder for inn
+ * xmit.c - transmitting checks and articles, flow control, expiry
+ *
+ * Copyright Ian Jackson <ijackson@chiark.greenend.org.uk>
+ * and contributors; see LICENCE.txt.
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ */
+
+#include "innduct.h"
+
+const char *const artstate_names[]=
+ { "Unchecked", "Wanted", "Unsolicited", 0 };
+
/*---------- assigning articles to conns, and transmitting ----------*/
static Article *dequeue_from(int peek, InputFile *ipf) {
return 0;
}
-static void check_assign_articles(void) {
+static void conn_inqueue_spare(const Conn *conn,
+ int *inqueue_r, int *spare_r) {
+ int inqueue= conn->sent.count + conn->priority.count + conn->waiting.count;
+ int spare= conn->max_queue - inqueue;
+ if (inqueue_r) *inqueue_r= inqueue;
+ if (spare_r) *spare_r= spare;
+}
+
+void check_assign_articles(void) {
for (;;) {
if (!dequeue(1))
break;
Conn *walk, *use=0;
- int spare=0, inqueue=0;
/* Find a connection to offer this article. We prefer a busy
* connection to an idle one, provided it's not full. We take the
*/
FOR_CONN(walk) {
if (walk->quitting) continue;
- inqueue= walk->sent.count + walk->priority.count
- + walk->waiting.count;
- spare= walk->max_queue - inqueue;
+ int inqueue, spare;
+ conn_inqueue_spare(walk, &inqueue, &spare);
assert(inqueue <= max_queue_per_conn);
assert(spare >= 0);
if (inqueue==0) /*idle*/ { if (!use) use= walk; }
else if (spare>0) /*working*/ { use= walk; break; }
}
if (use) {
+ int inqueue, spare;
+ conn_inqueue_spare(use, &inqueue, &spare);
if (!inqueue) use->since_activity= 0; /* reset idle counter */
while (spare>0) {
Article *art= dequeue(0);
return OOP_CONTINUE;
}
-static void conn_maybe_write(Conn *conn) {
+void conn_maybe_write(Conn *conn) {
for (;;) {
conn_make_some_xmits(conn);
if (!conn->xmitu) {
* pause/resume inputfile tailing
*/
-static void check_reading_pause_resume(InputFile *ipf) {
+void check_reading_pause_resume(InputFile *ipf) {
if (ipf->queue.count >= max_queue_per_ipf)
inputfile_reading_pause(ipf);
else
inputfile_reading_resume(ipf);
}
-static void article_defer(Article *art /* not on a queue */, int whichcount) {
+void article_defer(Article *art /* not on a queue */, int whichcount) {
open_defer();
if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
|| fflush(defer))
article_done(art, whichcount);
}
-static int article_check_expired(Article *art /* must be queued, not conn */) {
+int article_check_expired(Article *art /* must be queued, not conn */) {
ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
if (artdata) { SMfreearticle(artdata); return 0; }
LIST_REMOVE(art->ipf->queue, art);
art->missing= 1;
- art->ipf->count_nooffer_missing++;
+ art->ipf->counts.events[nooffer_missing]++;
article_done(art,-1);
return 1;
}
for (;;) {
Article *art= LIST_HEAD(ipf->queue);
+ if (!art) break;
int expd= article_check_expired(art);
if (!expd) break;
}
check_reading_pause_resume(ipf);
}
-static void article_autodefer(InputFile *ipf, Article *art) {
+void article_autodefer(InputFile *ipf, Article *art) {
ipf->autodefer++;
article_defer(art,-1);
}
article_autodefer(ipf, art);
}
-static void autodefer_input_file(InputFile *ipf) {
+void autodefer_input_file(InputFile *ipf) {
static const char *const abandon= "stuck";
ipf->autodefer= 0;
d->info.sm_art= ah;
}
-static void xmit_free(XmitDetails *d) {
+void xmit_free(XmitDetails *d) {
switch (d->kind) {
case xk_Artdata: SMfreearticle(d->info.sm_art); break;
case xk_Const: break;
}
}
-static void *conn_write_some_xmits(Conn *conn) {
+void *conn_write_some_xmits(Conn *conn) {
/* return values:
* 0: nothing more to write, no need to call us again
* OOP_CONTINUE: more to write but fd not writeable
}
}
-static void conn_make_some_xmits(Conn *conn) {
+void conn_make_some_xmits(Conn *conn) {
for (;;) {
if (conn->xmitu+5 > CONNIOVS)
break;
(abort(),-1);
if (!artdata) art->missing= 1;
- art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
+ art->ipf->counts.results[art->state][ artdata ? RC_sent : RC_missing ]++;
if (conn->stream) {
if (artdata) {
XMIT_LITERAL("\r\n");
assert(art->state == art_Unchecked);
- art->ipf->counts[art->state][RC_sent]++;
+ art->ipf->counts.results[art->state][RC_sent]++;
LIST_ADDTAIL(conn->sent, art);
}
}