chiark / gitweb /
changelog: Finalise 2.2
[innduct.git] / xmit.c
diff --git a/xmit.c b/xmit.c
index dd909e41128a20d6e6df89b0ba4a1a1fba8b67d6..96e599af02eaceea39fb79864a5ab2cb9468dd37 100644 (file)
--- a/xmit.c
+++ b/xmit.c
@@ -1,3 +1,18 @@
+/*
+ *  innduct
+ *  tailing reliable realtime streaming feeder for inn
+ *  xmit.c - transmitting checks and articles, flow control, expiry
+ *
+ *  Copyright Ian Jackson <ijackson@chiark.greenend.org.uk>
+ *  and contributors; see LICENCE.txt.
+ *  SPDX-License-Identifier: GPL-3.0-or-later
+ */
+
+#include "innduct.h"
+
+const char *const artstate_names[]=
+  { "Unchecked", "Wanted", "Unsolicited", 0 };
+
 /*---------- assigning articles to conns, and transmitting ----------*/
 
 static Article *dequeue_from(int peek, InputFile *ipf) {
@@ -18,13 +33,20 @@ static Article *dequeue(int peek) {
   return 0;
 }
 
-static void check_assign_articles(void) {
+static void conn_inqueue_spare(const Conn *conn,
+                              int *inqueue_r, int *spare_r) {
+  int inqueue= conn->sent.count + conn->priority.count + conn->waiting.count;
+  int spare= conn->max_queue - inqueue;
+  if (inqueue_r) *inqueue_r= inqueue;
+  if (spare_r) *spare_r= spare;
+}
+
+void check_assign_articles(void) {
   for (;;) {
     if (!dequeue(1))
       break;
 
     Conn *walk, *use=0;
-    int spare=0, inqueue=0;
 
     /* Find a connection to offer this article.  We prefer a busy
      * connection to an idle one, provided it's not full.  We take the
@@ -34,15 +56,16 @@ static void check_assign_articles(void) {
      */
     FOR_CONN(walk) {
       if (walk->quitting) continue;
-      inqueue= walk->sent.count + walk->priority.count
-            + walk->waiting.count;
-      spare= walk->max_queue - inqueue;
+      int inqueue, spare;
+      conn_inqueue_spare(walk, &inqueue, &spare);
       assert(inqueue <= max_queue_per_conn);
       assert(spare >= 0);
       if (inqueue==0) /*idle*/ { if (!use) use= walk; }
       else if (spare>0) /*working*/ { use= walk; break; }
     }
     if (use) {
+      int inqueue, spare;
+      conn_inqueue_spare(use, &inqueue, &spare);
       if (!inqueue) use->since_activity= 0; /* reset idle counter */
       while (spare>0) {
        Article *art= dequeue(0);
@@ -66,7 +89,7 @@ static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
   return OOP_CONTINUE;
 }
 
-static void conn_maybe_write(Conn *conn)  {
+void conn_maybe_write(Conn *conn) {
   for (;;) {
     conn_make_some_xmits(conn);
     if (!conn->xmitu) {
@@ -114,14 +137,14 @@ static void conn_maybe_write(Conn *conn)  {
  *     pause/resume inputfile tailing
  */
 
-static void check_reading_pause_resume(InputFile *ipf) {
+void check_reading_pause_resume(InputFile *ipf) {
   if (ipf->queue.count >= max_queue_per_ipf)
     inputfile_reading_pause(ipf);
   else
     inputfile_reading_resume(ipf);
 }
 
-static void article_defer(Article *art /* not on a queue */, int whichcount) {
+void article_defer(Article *art /* not on a queue */, int whichcount) {
   open_defer();
   if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
       || fflush(defer))
@@ -129,13 +152,13 @@ static void article_defer(Article *art /* not on a queue */, int whichcount) {
   article_done(art, whichcount);
 }
 
-static int article_check_expired(Article *art /* must be queued, not conn */) {
+int article_check_expired(Article *art /* must be queued, not conn */) {
   ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
   if (artdata) { SMfreearticle(artdata); return 0; }
 
   LIST_REMOVE(art->ipf->queue, art);
   art->missing= 1;
-  art->ipf->count_nooffer_missing++;
+  art->ipf->counts.events[nooffer_missing]++;
   article_done(art,-1);
   return 1;
 }
@@ -145,13 +168,14 @@ void inputfile_queue_check_expired(InputFile *ipf) {
 
   for (;;) {
     Article *art= LIST_HEAD(ipf->queue);
+    if (!art) break;
     int expd= article_check_expired(art);
     if (!expd) break;
   }
   check_reading_pause_resume(ipf);
 }
 
-static void article_autodefer(InputFile *ipf, Article *art) {
+void article_autodefer(InputFile *ipf, Article *art) {
   ipf->autodefer++;
   article_defer(art,-1);
 }
@@ -169,7 +193,7 @@ static void autodefer_input_file_articles(InputFile *ipf) {
     article_autodefer(ipf, art);
 }
 
-static void autodefer_input_file(InputFile *ipf) {
+void autodefer_input_file(InputFile *ipf) {
   static const char *const abandon= "stuck";
   ipf->autodefer= 0;
 
@@ -220,7 +244,7 @@ static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
   d->info.sm_art= ah;
 }
 
-static void xmit_free(XmitDetails *d) {
+void xmit_free(XmitDetails *d) {
   switch (d->kind) {
   case xk_Artdata: SMfreearticle(d->info.sm_art); break;
   case xk_Const:                                  break;
@@ -228,7 +252,7 @@ static void xmit_free(XmitDetails *d) {
   }
 }
 
-static void *conn_write_some_xmits(Conn *conn) {
+void *conn_write_some_xmits(Conn *conn) {
   /* return values:
    *      0:            nothing more to write, no need to call us again
    *      OOP_CONTINUE: more to write but fd not writeable
@@ -270,7 +294,7 @@ static void *conn_write_some_xmits(Conn *conn) {
   }
 }
 
-static void conn_make_some_xmits(Conn *conn) {
+void conn_make_some_xmits(Conn *conn) {
   for (;;) {
     if (conn->xmitu+5 > CONNIOVS)
       break;
@@ -290,7 +314,7 @@ static void conn_make_some_xmits(Conn *conn) {
        (abort(),-1);
 
       if (!artdata) art->missing= 1;
-      art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
+      art->ipf->counts.results[art->state][ artdata ? RC_sent : RC_missing ]++;
 
       if (conn->stream) {
        if (artdata) {
@@ -324,7 +348,7 @@ static void conn_make_some_xmits(Conn *conn) {
       XMIT_LITERAL("\r\n");
 
       assert(art->state == art_Unchecked);
-      art->ipf->counts[art->state][RC_sent]++;
+      art->ipf->counts.results[art->state][RC_sent]++;
       LIST_ADDTAIL(conn->sent, art);
     }
   }