From: Ian Jackson Date: Mon, 3 May 2010 01:14:11 +0000 (+0100) Subject: pause and resume reading as appropriate X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=commitdiff_plain;h=ca15e8f462c06b832f2ece41417a0a4cf0b5854a pause and resume reading as appropriate --- diff --git a/backends/innduct.c b/backends/innduct.c index 047d99e..897ec37 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -319,6 +319,7 @@ static void article_done(Article *art, int whichcount); static void check_assign_articles(void); static void queue_check_input_done(void); +static void check_reading_pause_resume(InputFile *ipf); static void statemc_check_flushing_done(void); static void statemc_check_backlog_done(void); @@ -335,6 +336,9 @@ static char *debug_report_ipf(InputFile *ipf); static void inputfile_reading_start(InputFile *ipf); static void inputfile_reading_stop(InputFile *ipf); +static void inputfile_reading_pause(InputFile *ipf); +static void inputfile_reading_resume(InputFile *ipf); + /* pause and resume are idempotent, and no-op if not done _reading_start */ static void filemon_start(InputFile *ipf); static void filemon_stop(InputFile *ipf); @@ -362,6 +366,7 @@ static int max_queue_per_conn=200; static int target_max_feedfile_size=100000; static int period_seconds=60; static int filepoll_seconds=5; +static int max_queue_per_ipf=-1; static int connection_setup_timeout=200; static int inndcomm_flush_timeout=100; @@ -450,7 +455,7 @@ struct InputFile { oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */ off_t offset; - int skippinglong; + int skippinglong, paused; ArticleList queue; long inprogress; /* includes queue.count and also articles in conns */ @@ -1134,6 +1139,7 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) { requeue[art->state]++; if (art->state==art_Unsolicited) art->state= art_Unchecked; LIST_ADDTAIL(art->ipf->queue,art); + check_reading_pause_resume(art->ipf); } int i; @@ -1446,7 +1452,11 @@ static void connect_start(void) { static Article *dequeue_from(int peek, InputFile *ipf) { if (!ipf) return 0; if (peek) return LIST_HEAD(ipf->queue); - else return LIST_REMHEAD(ipf->queue); + + Article *art= LIST_REMHEAD(ipf->queue); + if (!art) return 0; + check_reading_pause_resume(ipf); + return art; } static Article *dequeue(int peek) { @@ -1527,7 +1537,14 @@ static void conn_maybe_write(Conn *conn) { } } -/*---------- expiry and deferral ----------*/ +/*---------- expiry, flow control and deferral ----------*/ + +static void check_reading_pause_resume(InputFile *ipf) { + if (ipf->queue.count >= max_queue_per_ipf) + inputfile_reading_pause(ipf); + else + inputfile_reading_resume(ipf); +} static void article_defer(Article *art /* not on a queue */, int whichcount) { open_defer(); @@ -1556,6 +1573,7 @@ static void inputfile_queue_check_expired(InputFile *ipf) { int exp= article_check_expired(art); if (!exp) break; } + check_reading_pause_resume(ipf); } static void article_autodefer(InputFile *ipf, Article *art) { @@ -1570,12 +1588,16 @@ static int has_article_in(const ArticleList *al, InputFile *ipf) { return 0; } -static void autodefer_input_file(InputFile *ipf) { - ipf->autodefer= 0; - +static void autodefer_input_file_articles(InputFile *ipf) { Article *art; while ((art= LIST_REMHEAD(ipf->queue))) article_autodefer(ipf, art); +} + +static void autodefer_input_file(InputFile *ipf) { + ipf->autodefer= 0; + + autodefer_input_file_articles(ipf); if (ipf->inprogress) { Conn *walk; @@ -1593,8 +1615,11 @@ static void autodefer_input_file(InputFile *ipf) { found: connfail(walk, "connection is stuck or crawling," " and we need to finish flush"); + autodefer_input_file_articles(ipf); } } + + check_reading_pause_resume(ipf); } /*========== article transmission ==========*/ @@ -1815,7 +1840,8 @@ static void update_nocheck(int accepted) { } static void article_done(Article *art, int whichcount) { - if (!art->missing) art->ipf->counts[art->state][whichcount]++; + if (whichcount>=0 && !art->missing) + art->ipf->counts[art->state][whichcount]++; if (whichcount == RC_accepted) update_nocheck(1); else if (whichcount == RC_unwanted) update_nocheck(0); @@ -2100,7 +2126,8 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, ipf->offset >= target_max_feedfile_size) statemc_start_flush("feed file size"); - check_assign_articles(); + check_assign_articles(); /* may destroy conn but that's OK */ + check_reading_pause_resume(ipf); return OOP_CONTINUE; } @@ -2301,6 +2328,24 @@ static const oop_rd_style feedfile_rdstyle= { OOP_RD_SHORTREC_LONG, }; +static void inputfile_reading_resume(InputFile *ipf) { + if (!ipf->rd) return; + if (!ipf->paused) return; + + int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE, + feedfile_got_article,ipf, feedfile_read_err, ipf); + if (r) sysdie("unable start reading feedfile %s",ipf->path); + + ipf->paused= 0; +} + +static void inputfile_reading_pause(InputFile *ipf) { + if (!ipf->rd) return; + if (ipf->paused) return; + oop_rd_cancel(ipf->rd); + ipf->paused= 1; +} + static void inputfile_reading_start(InputFile *ipf) { assert(!ipf->rd); ipf->readable.on_readable= tailing_on_readable; @@ -2315,14 +2360,13 @@ static void inputfile_reading_start(InputFile *ipf) { ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0); assert(ipf->rd); - int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE, - feedfile_got_article,ipf, feedfile_read_err, ipf); - if (r) sysdie("unable start reading feedfile %s",ipf->path); + ipf->paused= 1; + inputfile_reading_resume(ipf); } static void inputfile_reading_stop(InputFile *ipf) { assert(ipf->rd); - oop_rd_cancel(ipf->rd); + inputfile_reading_pause(ipf); oop_rd_delete(ipf->rd); ipf->rd= 0; assert(!ipf->filemon); /* we shouldn't be monitoring it now */ @@ -3192,12 +3236,13 @@ static char *debug_report_ipf(InputFile *ipf) { const char *slash= strrchr(ipf->path,'/'); const char *path= slash ? slash+1 : ipf->path; - return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s", + return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s%s", ipf, path, ipf->queue.count, ipf->inprogress, ipf->autodefer, (long)ipf->offset, ipf->fd, ipf->rd ? "" : ",!rd", - ipf->skippinglong ? "*skiplong" : ""); + ipf->skippinglong ? "*skiplong" : "", + ipf->rd && ipf->paused ? "*paused" : ""); } static void period(void) { @@ -3207,11 +3252,11 @@ static void period(void) { debug("PERIOD" " sms=%s[%d] conns=%d until_connect=%d" - " input_files main:%s flushing:%s backlog:%s" + " input_files main:%s flushing:%s backlog:%s[%d]" " children connecting=%ld inndcomm=%ld" , sms_names[sms], until_flush, conns.count, until_connect, - dipf_main, dipf_flushing, dipf_backlog, + dipf_main, dipf_flushing, dipf_backlog, until_backlog_nextscan, (long)connecting_child, (long)inndcomm_child ); @@ -3534,6 +3579,7 @@ static const Option innduct_options[]= { {0,"max-connections", "N", &max_connections, op_integer }, {0,"max-queue-per-conn", "N", &max_queue_per_conn, op_integer }, +{0,"max-queue-per-file", "N", &max_queue_per_ipf, op_integer }, {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer }, {0,"period-interval", "TIME", &period_seconds, op_seconds }, @@ -3628,6 +3674,9 @@ int main(int argc, char **argv) { feedfile= xasprintf("%s%s",feedfile,sitename); } + if (max_queue_per_ipf<0) + max_queue_per_ipf= max_queue_per_conn * 2; + const char *feedfile_forbidden= "?*[~#"; int c; while ((c= *feedfile_forbidden++)) diff --git a/doc/man/innduct.8 b/doc/man/innduct.8 index 21705a8..b6c3145 100644 --- a/doc/man/innduct.8 +++ b/doc/man/innduct.8 @@ -143,14 +143,20 @@ The default is .BR 10 . There is no global limit on the number of connections. .TP -.BI \-\-max-queue-per-conn= max +.BI \-\-max-queue-per-conn= per-conn-max Restricts the maximum number of outstanding articles queued on any -particular connection +particular connection to .IR max . (Non-streaming connections can only handle one article at a time.) The default is .BR 200 . .TP +.BI \-\-max-queue-per-file= max +Restricts the maximum number articles read into core from any one +input file to +.IR max . +The default is twice the value of per-conn-max. +.TP .BI \-\-feedfile-flush-size= bytes Specifies that innduct should flush the feed and start a new feedfile when the existing feedfile size exceeds