X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=blobdiff_plain;f=backends%2Finnduct.c;h=b829841613d0aa010d7ce43bdd0e0934479a5fe6;hp=047d99e6f9f3563903ac17e2310fa1470f93066e;hb=df2fee3940b22c3b173e077bbd0c83007b23e794;hpb=a319a3c12dc51eebaf5305db1b6c150af68d5e35 diff --git a/backends/innduct.c b/backends/innduct.c index 047d99e..b829841 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -319,6 +319,7 @@ static void article_done(Article *art, int whichcount); static void check_assign_articles(void); static void queue_check_input_done(void); +static void check_reading_pause_resume(InputFile *ipf); static void statemc_check_flushing_done(void); static void statemc_check_backlog_done(void); @@ -335,6 +336,9 @@ static char *debug_report_ipf(InputFile *ipf); static void inputfile_reading_start(InputFile *ipf); static void inputfile_reading_stop(InputFile *ipf); +static void inputfile_reading_pause(InputFile *ipf); +static void inputfile_reading_resume(InputFile *ipf); + /* pause and resume are idempotent, and no-op if not done _reading_start */ static void filemon_start(InputFile *ipf); static void filemon_stop(InputFile *ipf); @@ -362,6 +366,7 @@ static int max_queue_per_conn=200; static int target_max_feedfile_size=100000; static int period_seconds=60; static int filepoll_seconds=5; +static int max_queue_per_ipf=-1; static int connection_setup_timeout=200; static int inndcomm_flush_timeout=100; @@ -450,14 +455,14 @@ struct InputFile { oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */ off_t offset; - int skippinglong; + int skippinglong, paused; ArticleList queue; long inprogress; /* includes queue.count and also articles in conns */ long autodefer; /* -1 means not doing autodefer */ int counts[art_MaxState][RCI_max]; - int readcount_ok, readcount_blank, readcount_err; + int readcount_ok, readcount_blank, readcount_err, count_nooffer_missing; char path[]; }; @@ -1134,6 +1139,7 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) { requeue[art->state]++; if (art->state==art_Unsolicited) art->state= art_Unchecked; LIST_ADDTAIL(art->ipf->queue,art); + check_reading_pause_resume(art->ipf); } int i; @@ -1446,7 +1452,11 @@ static void connect_start(void) { static Article *dequeue_from(int peek, InputFile *ipf) { if (!ipf) return 0; if (peek) return LIST_HEAD(ipf->queue); - else return LIST_REMHEAD(ipf->queue); + + Article *art= LIST_REMHEAD(ipf->queue); + if (!art) return 0; + check_reading_pause_resume(ipf); + return art; } static Article *dequeue(int peek) { @@ -1527,7 +1537,14 @@ static void conn_maybe_write(Conn *conn) { } } -/*---------- expiry and deferral ----------*/ +/*---------- expiry, flow control and deferral ----------*/ + +static void check_reading_pause_resume(InputFile *ipf) { + if (ipf->queue.count >= max_queue_per_ipf) + inputfile_reading_pause(ipf); + else + inputfile_reading_resume(ipf); +} static void article_defer(Article *art /* not on a queue */, int whichcount) { open_defer(); @@ -1543,7 +1560,7 @@ static int article_check_expired(Article *art /* must be queued, not conn */) { LIST_REMOVE(art->ipf->queue, art); art->missing= 1; - art->ipf->counts[art_Unchecked][RC_missing]++; + art->ipf->count_nooffer_missing++; article_done(art,-1); return 1; } @@ -1556,6 +1573,7 @@ static void inputfile_queue_check_expired(InputFile *ipf) { int exp= article_check_expired(art); if (!exp) break; } + check_reading_pause_resume(ipf); } static void article_autodefer(InputFile *ipf, Article *art) { @@ -1570,12 +1588,16 @@ static int has_article_in(const ArticleList *al, InputFile *ipf) { return 0; } -static void autodefer_input_file(InputFile *ipf) { - ipf->autodefer= 0; - +static void autodefer_input_file_articles(InputFile *ipf) { Article *art; while ((art= LIST_REMHEAD(ipf->queue))) article_autodefer(ipf, art); +} + +static void autodefer_input_file(InputFile *ipf) { + ipf->autodefer= 0; + + autodefer_input_file_articles(ipf); if (ipf->inprogress) { Conn *walk; @@ -1593,8 +1615,11 @@ static void autodefer_input_file(InputFile *ipf) { found: connfail(walk, "connection is stuck or crawling," " and we need to finish flush"); + autodefer_input_file_articles(ipf); } } + + check_reading_pause_resume(ipf); } /*========== article transmission ==========*/ @@ -1815,7 +1840,8 @@ static void update_nocheck(int accepted) { } static void article_done(Article *art, int whichcount) { - if (!art->missing) art->ipf->counts[art->state][whichcount]++; + if (whichcount>=0 && !art->missing) + art->ipf->counts[art->state][whichcount]++; if (whichcount == RC_accepted) update_nocheck(1); else if (whichcount == RC_unwanted) update_nocheck(0); @@ -1915,11 +1941,12 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, switch (code) { - case 400: PEERBADMSG("peer stopped accepting articles"); default: PEERBADMSG("peer sent unexpected message"); - case 503: - if (conn_busy) PEERBADMSG("peer timed us out"); + case 400: + if (conn_busy) + PEERBADMSG("peer timed us out or stopped accepting articles"); + notice("C%d idle connection closed by peer", conn->fd); LIST_REMOVE(conns,conn); conn_dispose(conn); @@ -2100,7 +2127,8 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, ipf->offset >= target_max_feedfile_size) statemc_start_flush("feed file size"); - check_assign_articles(); + check_assign_articles(); /* may destroy conn but that's OK */ + check_reading_pause_resume(ipf); return OOP_CONTINUE; } @@ -2251,7 +2279,7 @@ static void filemon_method_dump_info(FILE *f) { DUMPV("%d",,filemon_inotify_fd); DUMPV("%d",,filemon_inotify_wdmax); for (i=0; ird) return; + if (!ipf->paused) return; + + int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE, + feedfile_got_article,ipf, feedfile_read_err, ipf); + if (r) sysdie("unable start reading feedfile %s",ipf->path); + + ipf->paused= 0; +} + +static void inputfile_reading_pause(InputFile *ipf) { + if (!ipf->rd) return; + if (ipf->paused) return; + oop_rd_cancel(ipf->rd); + ipf->paused= 1; +} + static void inputfile_reading_start(InputFile *ipf) { assert(!ipf->rd); ipf->readable.on_readable= tailing_on_readable; @@ -2315,14 +2361,13 @@ static void inputfile_reading_start(InputFile *ipf) { ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0); assert(ipf->rd); - int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE, - feedfile_got_article,ipf, feedfile_read_err, ipf); - if (r) sysdie("unable start reading feedfile %s",ipf->path); + ipf->paused= 1; + inputfile_reading_resume(ipf); } static void inputfile_reading_stop(InputFile *ipf) { assert(ipf->rd); - oop_rd_cancel(ipf->rd); + inputfile_reading_pause(ipf); oop_rd_delete(ipf->rd); ipf->rd= 0; assert(!ipf->filemon); /* we shouldn't be monitoring it now */ @@ -2623,12 +2668,12 @@ static void notice_processed(InputFile *ipf, int completed, : xasprintf("%s",""); info("%s %s%s read=%d (+bl=%d,+err=%d)%s%s" - " offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)" + " missing=%d offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)" RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT) , completed?"completed":"processed", what, spec, ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err, - inprog, autodefer, + inprog, autodefer, ipf->count_nooffer_missing, CNT(Unchecked,sent) + CNT(Unsolicited,sent) , CNT(Unchecked,sent), CNT(Unsolicited,sent), CNT(Wanted,accepted) + CNT(Unsolicited,accepted) @@ -3192,12 +3237,13 @@ static char *debug_report_ipf(InputFile *ipf) { const char *slash= strrchr(ipf->path,'/'); const char *path= slash ? slash+1 : ipf->path; - return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s", + return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s%s", ipf, path, ipf->queue.count, ipf->inprogress, ipf->autodefer, (long)ipf->offset, ipf->fd, ipf->rd ? "" : ",!rd", - ipf->skippinglong ? "*skiplong" : ""); + ipf->skippinglong ? "*skiplong" : "", + ipf->rd && ipf->paused ? "*paused" : ""); } static void period(void) { @@ -3207,11 +3253,11 @@ static void period(void) { debug("PERIOD" " sms=%s[%d] conns=%d until_connect=%d" - " input_files main:%s flushing:%s backlog:%s" + " input_files main:%s flushing:%s backlog:%s[%d]" " children connecting=%ld inndcomm=%ld" , sms_names[sms], until_flush, conns.count, until_connect, - dipf_main, dipf_flushing, dipf_backlog, + dipf_main, dipf_flushing, dipf_backlog, until_backlog_nextscan, (long)connecting_child, (long)inndcomm_child ); @@ -3534,6 +3580,7 @@ static const Option innduct_options[]= { {0,"max-connections", "N", &max_connections, op_integer }, {0,"max-queue-per-conn", "N", &max_queue_per_conn, op_integer }, +{0,"max-queue-per-file", "N", &max_queue_per_ipf, op_integer }, {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer }, {0,"period-interval", "TIME", &period_seconds, op_seconds }, @@ -3628,6 +3675,9 @@ int main(int argc, char **argv) { feedfile= xasprintf("%s%s",feedfile,sitename); } + if (max_queue_per_ipf<0) + max_queue_per_ipf= max_queue_per_conn * 2; + const char *feedfile_forbidden= "?*[~#"; int c; while ((c= *feedfile_forbidden++))