chiark / gitweb /
pause and resume reading as appropriate
authorIan Jackson <ian@liberator.relativity.greenend.org.uk>
Mon, 3 May 2010 01:14:11 +0000 (02:14 +0100)
committerIan Jackson <ian@liberator.relativity.greenend.org.uk>
Mon, 3 May 2010 01:14:11 +0000 (02:14 +0100)
backends/innduct.c
doc/man/innduct.8

index 047d99e..897ec37 100644 (file)
@@ -319,6 +319,7 @@ static void article_done(Article *art, int whichcount);
 
 static void check_assign_articles(void);
 static void queue_check_input_done(void);
+static void check_reading_pause_resume(InputFile *ipf);
 
 static void statemc_check_flushing_done(void);
 static void statemc_check_backlog_done(void);
@@ -335,6 +336,9 @@ static char *debug_report_ipf(InputFile *ipf);
 
 static void inputfile_reading_start(InputFile *ipf);
 static void inputfile_reading_stop(InputFile *ipf);
+static void inputfile_reading_pause(InputFile *ipf);
+static void inputfile_reading_resume(InputFile *ipf);
+  /* pause and resume are idempotent, and no-op if not done _reading_start */
 
 static void filemon_start(InputFile *ipf);
 static void filemon_stop(InputFile *ipf);
@@ -362,6 +366,7 @@ static int max_queue_per_conn=200;
 static int target_max_feedfile_size=100000;
 static int period_seconds=60;
 static int filepoll_seconds=5;
+static int max_queue_per_ipf=-1;
 
 static int connection_setup_timeout=200;
 static int inndcomm_flush_timeout=100;
@@ -450,7 +455,7 @@ struct InputFile {
 
   oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */
   off_t offset;
-  int skippinglong;
+  int skippinglong, paused;
 
   ArticleList queue;
   long inprogress; /* includes queue.count and also articles in conns */
@@ -1134,6 +1139,7 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) {
     requeue[art->state]++;
     if (art->state==art_Unsolicited) art->state= art_Unchecked;
     LIST_ADDTAIL(art->ipf->queue,art);
+    check_reading_pause_resume(art->ipf);
   }
 
   int i;
@@ -1446,7 +1452,11 @@ static void connect_start(void) {
 static Article *dequeue_from(int peek, InputFile *ipf) {
   if (!ipf) return 0;
   if (peek) return LIST_HEAD(ipf->queue);
-  else return LIST_REMHEAD(ipf->queue);
+
+  Article *art= LIST_REMHEAD(ipf->queue);
+  if (!art) return 0;
+  check_reading_pause_resume(ipf);
+  return art;
 }
 
 static Article *dequeue(int peek) {
@@ -1527,7 +1537,14 @@ static void conn_maybe_write(Conn *conn)  {
   }
 }
 
-/*---------- expiry and deferral ----------*/
+/*---------- expiry, flow control and deferral ----------*/
+
+static void check_reading_pause_resume(InputFile *ipf) {
+  if (ipf->queue.count >= max_queue_per_ipf)
+    inputfile_reading_pause(ipf);
+  else
+    inputfile_reading_resume(ipf);
+}
 
 static void article_defer(Article *art /* not on a queue */, int whichcount) {
   open_defer();
@@ -1556,6 +1573,7 @@ static void inputfile_queue_check_expired(InputFile *ipf) {
     int exp= article_check_expired(art);
     if (!exp) break;
   }
+  check_reading_pause_resume(ipf);
 }
 
 static void article_autodefer(InputFile *ipf, Article *art) {
@@ -1570,12 +1588,16 @@ static int has_article_in(const ArticleList *al, InputFile *ipf) {
   return 0;
 }
 
-static void autodefer_input_file(InputFile *ipf) {
-  ipf->autodefer= 0;
-
+static void autodefer_input_file_articles(InputFile *ipf) {
   Article *art;
   while ((art= LIST_REMHEAD(ipf->queue)))
     article_autodefer(ipf, art);
+}
+
+static void autodefer_input_file(InputFile *ipf) {
+  ipf->autodefer= 0;
+
+  autodefer_input_file_articles(ipf);
 
   if (ipf->inprogress) {
     Conn *walk;
@@ -1593,8 +1615,11 @@ static void autodefer_input_file(InputFile *ipf) {
     found:
       connfail(walk, "connection is stuck or crawling,"
               " and we need to finish flush");
+      autodefer_input_file_articles(ipf);
     }
   }
+
+  check_reading_pause_resume(ipf);
 }
 
 /*========== article transmission ==========*/
@@ -1815,7 +1840,8 @@ static void update_nocheck(int accepted) {
 }
 
 static void article_done(Article *art, int whichcount) {
-  if (!art->missing) art->ipf->counts[art->state][whichcount]++;
+  if (whichcount>=0 && !art->missing)
+    art->ipf->counts[art->state][whichcount]++;
 
   if (whichcount == RC_accepted) update_nocheck(1);
   else if (whichcount == RC_unwanted) update_nocheck(0);
@@ -2100,7 +2126,8 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd,
       ipf->offset >= target_max_feedfile_size)
     statemc_start_flush("feed file size");
 
-  check_assign_articles();
+  check_assign_articles(); /* may destroy conn but that's OK */
+  check_reading_pause_resume(ipf);
   return OOP_CONTINUE;
 }
 
@@ -2301,6 +2328,24 @@ static const oop_rd_style feedfile_rdstyle= {
   OOP_RD_SHORTREC_LONG,
 };
 
+static void inputfile_reading_resume(InputFile *ipf) {
+  if (!ipf->rd) return;
+  if (!ipf->paused) return;
+
+  int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
+                    feedfile_got_article,ipf, feedfile_read_err, ipf);
+  if (r) sysdie("unable start reading feedfile %s",ipf->path);
+
+  ipf->paused= 0;
+}
+
+static void inputfile_reading_pause(InputFile *ipf) {
+  if (!ipf->rd) return;
+  if (ipf->paused) return;
+  oop_rd_cancel(ipf->rd);
+  ipf->paused= 1;
+}
+
 static void inputfile_reading_start(InputFile *ipf) {
   assert(!ipf->rd);
   ipf->readable.on_readable= tailing_on_readable;
@@ -2315,14 +2360,13 @@ static void inputfile_reading_start(InputFile *ipf) {
   ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
   assert(ipf->rd);
 
-  int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
-                    feedfile_got_article,ipf, feedfile_read_err, ipf);
-  if (r) sysdie("unable start reading feedfile %s",ipf->path);
+  ipf->paused= 1;
+  inputfile_reading_resume(ipf);
 }
 
 static void inputfile_reading_stop(InputFile *ipf) {
   assert(ipf->rd);
-  oop_rd_cancel(ipf->rd);
+  inputfile_reading_pause(ipf);
   oop_rd_delete(ipf->rd);
   ipf->rd= 0;
   assert(!ipf->filemon); /* we shouldn't be monitoring it now */
@@ -3192,12 +3236,13 @@ static char *debug_report_ipf(InputFile *ipf) {
   const char *slash= strrchr(ipf->path,'/');
   const char *path= slash ? slash+1 : ipf->path;
 
-  return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s",
+  return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s%s",
                   ipf, path,
                   ipf->queue.count, ipf->inprogress, ipf->autodefer,
                   (long)ipf->offset, ipf->fd,
                   ipf->rd ? "" : ",!rd",
-                  ipf->skippinglong ? "*skiplong" : "");
+                  ipf->skippinglong ? "*skiplong" : "",
+                  ipf->rd && ipf->paused ? "*paused" : "");
 }
 
 static void period(void) {
@@ -3207,11 +3252,11 @@ static void period(void) {
 
   debug("PERIOD"
        " sms=%s[%d] conns=%d until_connect=%d"
-       " input_files main:%s flushing:%s backlog:%s"
+       " input_files main:%s flushing:%s backlog:%s[%d]"
        " children connecting=%ld inndcomm=%ld"
        ,
        sms_names[sms], until_flush, conns.count, until_connect,
-       dipf_main, dipf_flushing, dipf_backlog,
+       dipf_main, dipf_flushing, dipf_backlog, until_backlog_nextscan,
        (long)connecting_child, (long)inndcomm_child
        );
 
@@ -3534,6 +3579,7 @@ static const Option innduct_options[]= {
 
 {0,"max-connections",    "N",     &max_connections,          op_integer     },
 {0,"max-queue-per-conn", "N",     &max_queue_per_conn,       op_integer     },
+{0,"max-queue-per-file", "N",     &max_queue_per_ipf,        op_integer     },
 {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer     },
 {0,"period-interval",    "TIME",  &period_seconds,           op_seconds     },
 
@@ -3628,6 +3674,9 @@ int main(int argc, char **argv) {
     feedfile= xasprintf("%s%s",feedfile,sitename);
   }
 
+  if (max_queue_per_ipf<0)
+    max_queue_per_ipf= max_queue_per_conn * 2;
+
   const char *feedfile_forbidden= "?*[~#";
   int c;
   while ((c= *feedfile_forbidden++))
index 21705a8..b6c3145 100644 (file)
@@ -143,14 +143,20 @@ The default is
 .BR 10 .
 There is no global limit on the number of connections.
 .TP
-.BI \-\-max-queue-per-conn= max
+.BI \-\-max-queue-per-conn= per-conn-max
 Restricts the maximum number of outstanding articles queued on any
-particular connection
+particular connection to
 .IR max .
 (Non-streaming connections can only handle one article at a time.)
 The default is
 .BR 200 .
 .TP
+.BI \-\-max-queue-per-file= max
+Restricts the maximum number articles read into core from any one
+input file to
+.IR max .
+The default is twice the value of per-conn-max.
+.TP
 .BI \-\-feedfile-flush-size= bytes
 Specifies that innduct should flush the feed and start a new feedfile
 when the existing feedfile size exceeds