X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;f=backends%2Finnduct.c;h=b86b1bef644a0b085560dac13cddb5a827ca4980;hb=2ab46c4a237b9d9614dd655fcf0078b68fd4581b;hp=047d99e6f9f3563903ac17e2310fa1470f93066e;hpb=a319a3c12dc51eebaf5305db1b6c150af68d5e35;p=inn-innduct.git diff --git a/backends/innduct.c b/backends/innduct.c index 047d99e..b86b1be 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,28 +1,38 @@ +/* + * innduct + * tailing reliable realtime streaming feeder for inn + * + * Copyright (C) 2010 Ian Jackson + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * (I believe that when you compile and link this as part of the inn2 + * build, with the Makefile runes I have provided, all the libraries + * and files which end up included in innduct are licence-compatible + * with GPLv3. If not then please let me know. -Ian Jackson.) + */ + +/* + * todo + * specify perms of /tmp/innduct.control + */ + /* * debugging rune: * build-lfs/backends/innduct --connection-timeout=30 --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost */ -/*-- -flow control notes -to ensure articles go away eventually -separate queue for each input file - queue expiry - every period, check head of backlog queue for expiry with SMretrieve - if too old: discard, and check next article - also check every backlog article as we read it - flush expiry - after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping - one-off: eat queued articles from flushing and write them to defer - one-off: connfail all connections which have any articles from flushing - newly read articles from flushing go straight to defer - this should take care of it and get us out of this state -to avoid filling up ram needlessly - input control - limit number of queued articles for each ipf - pause/resume inputfile tailing ---*/ - /* * Newsfeeds file entries should look like this: * host.name.of.site[/exclude,exclude,...]\ @@ -313,12 +323,14 @@ static void statemc_setstate(StateMachineState newsms, int periods, static void statemc_start_flush(const char *why); /* Normal => Flushing */ static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */ -static int trigger_flush_ok(void); /* => Flushing,FLUSHING, ret 1; or ret 0 */ +static int trigger_flush_ok(const char *why /* 0 means timeout */); + /* => Flushing,FLUSHING, ret 1; or ret 0 */ static void article_done(Article *art, int whichcount); static void check_assign_articles(void); static void queue_check_input_done(void); +static void check_reading_pause_resume(InputFile *ipf); static void statemc_check_flushing_done(void); static void statemc_check_backlog_done(void); @@ -335,6 +347,9 @@ static char *debug_report_ipf(InputFile *ipf); static void inputfile_reading_start(InputFile *ipf); static void inputfile_reading_stop(InputFile *ipf); +static void inputfile_reading_pause(InputFile *ipf); +static void inputfile_reading_resume(InputFile *ipf); + /* pause and resume are idempotent, and no-op if not done _reading_start */ static void filemon_start(InputFile *ipf); static void filemon_stop(InputFile *ipf); @@ -362,6 +377,7 @@ static int max_queue_per_conn=200; static int target_max_feedfile_size=100000; static int period_seconds=60; static int filepoll_seconds=5; +static int max_queue_per_ipf=-1; static int connection_setup_timeout=200; static int inndcomm_flush_timeout=100; @@ -450,14 +466,14 @@ struct InputFile { oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */ off_t offset; - int skippinglong; + int skippinglong, paused; ArticleList queue; long inprogress; /* includes queue.count and also articles in conns */ long autodefer; /* -1 means not doing autodefer */ int counts[art_MaxState][RCI_max]; - int readcount_ok, readcount_blank, readcount_err; + int readcount_ok, readcount_blank, readcount_err, count_nooffer_missing; char path[]; }; @@ -549,7 +565,7 @@ static void logv(int sysloglevel, const char *pfx, int errnoval, const char *fmt, va_list al) PRINTF(5,0); static void logv(int sysloglevel, const char *pfx, int errnoval, const char *fmt, va_list al) { - char msgbuf[256]; /* NB do not call xvasprintf here or you'll recurse */ + char msgbuf[1024]; /* NB do not call xvasprintf here or you'll recurse */ vsnprintf(msgbuf,sizeof(msgbuf), fmt,al); msgbuf[sizeof(msgbuf)-1]= 0; @@ -840,7 +856,7 @@ CCMD(help) { } CCMD(flush) { - int ok= trigger_flush_ok(); + int ok= trigger_flush_ok("manual request"); if (!ok) fprintf(cc->out,"already flushing (state is %s)\n", sms_names[sms]); } @@ -1134,6 +1150,7 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) { requeue[art->state]++; if (art->state==art_Unsolicited) art->state= art_Unchecked; LIST_ADDTAIL(art->ipf->queue,art); + check_reading_pause_resume(art->ipf); } int i; @@ -1446,7 +1463,11 @@ static void connect_start(void) { static Article *dequeue_from(int peek, InputFile *ipf) { if (!ipf) return 0; if (peek) return LIST_HEAD(ipf->queue); - else return LIST_REMHEAD(ipf->queue); + + Article *art= LIST_REMHEAD(ipf->queue); + if (!art) return 0; + check_reading_pause_resume(ipf); + return art; } static Article *dequeue(int peek) { @@ -1527,7 +1548,34 @@ static void conn_maybe_write(Conn *conn) { } } -/*---------- expiry and deferral ----------*/ +/*---------- expiry, flow control and deferral ----------*/ + +/* + * flow control notes + * to ensure articles go away eventually + * separate queue for each input file + * queue expiry + * every period, check head of backlog queue for expiry with SMretrieve + * if too old: discard, and check next article + * also check every backlog article as we read it + * flush expiry + * after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping + * one-off: eat queued articles from flushing and write them to defer + * one-off: connfail all connections which have any articles from flushing + * newly read articles from flushing go straight to defer + * this should take care of it and get us out of this state + * to avoid filling up ram needlessly + * input control + * limit number of queued articles for each ipf + * pause/resume inputfile tailing + */ + +static void check_reading_pause_resume(InputFile *ipf) { + if (ipf->queue.count >= max_queue_per_ipf) + inputfile_reading_pause(ipf); + else + inputfile_reading_resume(ipf); +} static void article_defer(Article *art /* not on a queue */, int whichcount) { open_defer(); @@ -1543,7 +1591,7 @@ static int article_check_expired(Article *art /* must be queued, not conn */) { LIST_REMOVE(art->ipf->queue, art); art->missing= 1; - art->ipf->counts[art_Unchecked][RC_missing]++; + art->ipf->count_nooffer_missing++; article_done(art,-1); return 1; } @@ -1556,6 +1604,7 @@ static void inputfile_queue_check_expired(InputFile *ipf) { int exp= article_check_expired(art); if (!exp) break; } + check_reading_pause_resume(ipf); } static void article_autodefer(InputFile *ipf, Article *art) { @@ -1570,12 +1619,16 @@ static int has_article_in(const ArticleList *al, InputFile *ipf) { return 0; } -static void autodefer_input_file(InputFile *ipf) { - ipf->autodefer= 0; - +static void autodefer_input_file_articles(InputFile *ipf) { Article *art; while ((art= LIST_REMHEAD(ipf->queue))) article_autodefer(ipf, art); +} + +static void autodefer_input_file(InputFile *ipf) { + ipf->autodefer= 0; + + autodefer_input_file_articles(ipf); if (ipf->inprogress) { Conn *walk; @@ -1593,8 +1646,11 @@ static void autodefer_input_file(InputFile *ipf) { found: connfail(walk, "connection is stuck or crawling," " and we need to finish flush"); + autodefer_input_file_articles(ipf); } } + + check_reading_pause_resume(ipf); } /*========== article transmission ==========*/ @@ -1815,7 +1871,8 @@ static void update_nocheck(int accepted) { } static void article_done(Article *art, int whichcount) { - if (!art->missing) art->ipf->counts[art->state][whichcount]++; + if (whichcount>=0 && !art->missing) + art->ipf->counts[art->state][whichcount]++; if (whichcount == RC_accepted) update_nocheck(1); else if (whichcount == RC_unwanted) update_nocheck(0); @@ -1915,11 +1972,12 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, switch (code) { - case 400: PEERBADMSG("peer stopped accepting articles"); default: PEERBADMSG("peer sent unexpected message"); - case 503: - if (conn_busy) PEERBADMSG("peer timed us out"); + case 400: + if (conn_busy) + PEERBADMSG("peer timed us out or stopped accepting articles"); + notice("C%d idle connection closed by peer", conn->fd); LIST_REMOVE(conns,conn); conn_dispose(conn); @@ -2100,7 +2158,8 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, ipf->offset >= target_max_feedfile_size) statemc_start_flush("feed file size"); - check_assign_articles(); + check_assign_articles(); /* may destroy conn but that's OK */ + check_reading_pause_resume(ipf); return OOP_CONTINUE; } @@ -2251,7 +2310,7 @@ static void filemon_method_dump_info(FILE *f) { DUMPV("%d",,filemon_inotify_fd); DUMPV("%d",,filemon_inotify_wdmax); for (i=0; ird) return; + if (!ipf->paused) return; + + int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE, + feedfile_got_article,ipf, feedfile_read_err, ipf); + if (r) sysdie("unable start reading feedfile %s",ipf->path); + + ipf->paused= 0; +} + +static void inputfile_reading_pause(InputFile *ipf) { + if (!ipf->rd) return; + if (ipf->paused) return; + oop_rd_cancel(ipf->rd); + ipf->paused= 1; +} + static void inputfile_reading_start(InputFile *ipf) { assert(!ipf->rd); ipf->readable.on_readable= tailing_on_readable; @@ -2315,14 +2392,13 @@ static void inputfile_reading_start(InputFile *ipf) { ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0); assert(ipf->rd); - int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE, - feedfile_got_article,ipf, feedfile_read_err, ipf); - if (r) sysdie("unable start reading feedfile %s",ipf->path); + ipf->paused= 1; + inputfile_reading_resume(ipf); } static void inputfile_reading_stop(InputFile *ipf) { assert(ipf->rd); - oop_rd_cancel(ipf->rd); + inputfile_reading_pause(ipf); oop_rd_delete(ipf->rd); ipf->rd= 0; assert(!ipf->filemon); /* we shouldn't be monitoring it now */ @@ -2565,20 +2641,21 @@ static void statemc_start_flush(const char *why) { /* Normal => Flushing */ spawn_inndcomm_flush(why); /* => Flushing FLUSHING */ } -static int trigger_flush_ok(void) { /* => Flushing,FLUSHING, ret 1; or ret 0 */ +static int trigger_flush_ok(const char *why) { switch (sms) { case sm_NORMAL: - statemc_start_flush("periodic"); /* Normal => Flushing; => FLUSHING */ - return 1; + statemc_start_flush(why ? why : "periodic"); + return 1; /* Normal => Flushing; => FLUSHING */ case sm_FLUSHFAILED: - spawn_inndcomm_flush("retry"); /* Moved => Flushing; => FLUSHING */ - return 1; + spawn_inndcomm_flush(why ? why : "retry"); + return 1; /* Moved => Flushing; => FLUSHING */ case sm_SEPARATED: case sm_DROPPING: - warn("took too long to complete old feedfile after flush, autodeferring"); + warn("abandoning old feedfile after flush (%s), autodeferring", + why ? why : "took too long to complete"); assert(flushing_input_file); autodefer_input_file(flushing_input_file); return 1; @@ -2594,7 +2671,7 @@ static void statemc_period_poll(void) { assert(until_flush>=0); if (until_flush) return; - int ok= trigger_flush_ok(); + int ok= trigger_flush_ok(0); assert(ok); } @@ -2623,12 +2700,12 @@ static void notice_processed(InputFile *ipf, int completed, : xasprintf("%s",""); info("%s %s%s read=%d (+bl=%d,+err=%d)%s%s" - " offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)" + " missing=%d offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)" RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT) , completed?"completed":"processed", what, spec, ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err, - inprog, autodefer, + inprog, autodefer, ipf->count_nooffer_missing, CNT(Unchecked,sent) + CNT(Unsolicited,sent) , CNT(Unchecked,sent), CNT(Unsolicited,sent), CNT(Wanted,accepted) + CNT(Unsolicited,accepted) @@ -3192,12 +3269,13 @@ static char *debug_report_ipf(InputFile *ipf) { const char *slash= strrchr(ipf->path,'/'); const char *path= slash ? slash+1 : ipf->path; - return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s", + return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s%s", ipf, path, ipf->queue.count, ipf->inprogress, ipf->autodefer, (long)ipf->offset, ipf->fd, ipf->rd ? "" : ",!rd", - ipf->skippinglong ? "*skiplong" : ""); + ipf->skippinglong ? "*skiplong" : "", + ipf->rd && ipf->paused ? "*paused" : ""); } static void period(void) { @@ -3207,11 +3285,11 @@ static void period(void) { debug("PERIOD" " sms=%s[%d] conns=%d until_connect=%d" - " input_files main:%s flushing:%s backlog:%s" + " input_files main:%s flushing:%s backlog:%s[%d]" " children connecting=%ld inndcomm=%ld" , sms_names[sms], until_flush, conns.count, until_connect, - dipf_main, dipf_flushing, dipf_backlog, + dipf_main, dipf_flushing, dipf_backlog, until_backlog_nextscan, (long)connecting_child, (long)inndcomm_child ); @@ -3259,6 +3337,7 @@ static void dump_input_file(FILE *f, const ControlCommand *c, DUMPV("%d", ipf->,readcount_ok); DUMPV("%d", ipf->,readcount_blank); DUMPV("%d", ipf->,readcount_err); + DUMPV("%d", ipf->,count_nooffer_missing); } fprintf(f,"\n"); if (ipf) { @@ -3534,6 +3613,7 @@ static const Option innduct_options[]= { {0,"max-connections", "N", &max_connections, op_integer }, {0,"max-queue-per-conn", "N", &max_queue_per_conn, op_integer }, +{0,"max-queue-per-file", "N", &max_queue_per_ipf, op_integer }, {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer }, {0,"period-interval", "TIME", &period_seconds, op_seconds }, @@ -3628,6 +3708,9 @@ int main(int argc, char **argv) { feedfile= xasprintf("%s%s",feedfile,sitename); } + if (max_queue_per_ipf<0) + max_queue_per_ipf= max_queue_per_conn * 2; + const char *feedfile_forbidden= "?*[~#"; int c; while ((c= *feedfile_forbidden++)) @@ -3682,6 +3765,10 @@ int main(int argc, char **argv) { notice("starting"); + int val= 1; + r= SMsetup(SM_PREOPEN, &val); if (!r) warn("SMsetup SM_PREOPEN failed"); + r= SMinit(); if (!r) die("storage manager initialisation (SMinit) failed"); + if (!become_daemon) control_stdio();