X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;f=backends%2Finnduct.c;h=c4d42e1300e2f5101005b37b6cc9e37924a98b2c;hb=fd6ed72868d60a901ac6dfc312c0a1640a8146c3;hp=f1f8fd4a9ae6edb0ab54345bd6e45b5dac7c0568;hpb=780c1d174bed4a92d1e08f943ddcc77d8104b999;p=inn-innduct.git diff --git a/backends/innduct.c b/backends/innduct.c index f1f8fd4..c4d42e1 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,12 +1,28 @@ /* - * todo - * - manpage: document control master stuff - * - admin-initiated flush - * * debugging rune: * build-lfs/backends/innduct --connection-timeout=30 --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost */ +/*-- +flow control notes +to ensure articles go away eventually +separate queue for each input file + queue expiry + every period, check head of backlog queue for expiry with SMretrieve + if too old: discard, and check next article + also check every backlog article as we read it + flush expiry + after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping + one-off: eat queued articles from flushing and write them to defer + one-off: connfail all connections which have any articles from flushing + newly read articles from flushing go straight to defer + this should take care of it and get us out of this state +to avoid filling up ram needlessly + input control + limit number of queued articles for each ipf + pause/resume inputfile tailing +--*/ + /* * Newsfeeds file entries should look like this: * host.name.of.site[/exclude,exclude,...]\ @@ -297,8 +313,9 @@ static void statemc_setstate(StateMachineState newsms, int periods, static void statemc_start_flush(const char *why); /* Normal => Flushing */ static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */ +static int trigger_flush_ok(void); /* => Flushing,FLUSHING, ret 1; or ret 0 */ -static void article_done(Conn *conn, Article *art, int whichcount); +static void article_done(Article *art, int whichcount); static void check_assign_articles(void); static void queue_check_input_done(void); @@ -408,13 +425,12 @@ typedef enum { #define CONNIOVS 128 typedef enum { - xk_Malloc, xk_Const, xk_Artdata + xk_Const, xk_Artdata } XmitKind; struct XmitDetails { XmitKind kind; union { - char *malloc_tofree; ARTHANDLE *sm_art; } info; }; @@ -432,10 +448,12 @@ struct InputFile { Filemon_Perfile *filemon; oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */ - long inprogress; /* no. of articles read but not processed */ off_t offset; int skippinglong; + ArticleList queue; + long inprogress; /* includes queue.count and also articles in conns */ + int counts[art_MaxState][RCI_max]; int readcount_ok, readcount_blank, readcount_err; char path[]; @@ -491,7 +509,6 @@ struct Conn { /* main initialises */ static oop_source *loop; static ConnList conns; -static ArticleList queue; static char *path_lock, *path_flushing, *path_defer; static char *path_control, *path_dump; static char *globpat_backlog; @@ -499,7 +516,7 @@ static pid_t self_pid; /* statemc_init initialises */ static StateMachineState sms; -static int sm_period_counter; +static int until_flush; static InputFile *main_input_file, *flushing_input_file, *backlog_input_file; static FILE *defer; @@ -816,13 +833,14 @@ CCMD(help) { const ControlCommand *ccmd; for (ccmd=control_commands; ccmd->cmd; ccmd++) fprintf(cc->out, " %s\n", ccmd->cmd); + fputs("NB: permissible arguments are not shown above." + " Not all commands listed are safe. See innduct(8).\n", cc->out); } -CCMD(period) { period(); } -CCMD(setintarg) { *(int*)c->xdata= atoi(arg); } -CCMD(setint) { *(int*)c->xdata= c->xval; } -CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); } -CCMD(dump); +CCMD(flush) { + int ok= trigger_flush_ok(); + if (!ok) fprintf(cc->out,"already flushing (state is %s)\n", sms_names[sms]); +} CCMD(stop) { preterminate(); @@ -831,19 +849,29 @@ CCMD(stop) { abort(); } +CCMD(dump); + +/* messing with our head: */ +CCMD(period) { period(); } +CCMD(setintarg) { *(int*)c->xdata= atoi(arg); } +CCMD(setint) { *(int*)c->xdata= c->xval; } +CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); } + static const ControlCommand control_commands[]= { { "h", ccmd_help }, - { "p", ccmd_period }, + { "flush", ccmd_flush }, { "stop", ccmd_stop }, { "dump q", ccmd_dump, 0,0 }, { "dump a", ccmd_dump, 0,1 }, + { "p", ccmd_period }, + #define POKES(cmd,func) \ - { cmd "sm", func, &sm_period_counter, 1 }, \ + { cmd "flush", func, &until_flush, 1 }, \ { cmd "conn", func, &until_connect, 0 }, \ { cmd "blscan", func, &until_backlog_nextscan, 0 }, -POKES("prod ", ccmd_setint_period) POKES("next ", ccmd_setint) +POKES("prod ", ccmd_setint_period) { "pretend flush", ccmd_setintarg, &simulate_flush }, { "wedge blscan", ccmd_setint, &until_backlog_nextscan, -1 }, @@ -1093,12 +1121,17 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) { memset(requeue,0,sizeof(requeue)); Article *art; - while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art); - while ((art= LIST_REMHEAD(conn->waiting))) LIST_ADDTAIL(queue, art); + + while ((art= LIST_REMHEAD(conn->priority))) + LIST_ADDTAIL(art->ipf->queue, art); + + while ((art= LIST_REMHEAD(conn->waiting))) + LIST_ADDTAIL(art->ipf->queue, art); + while ((art= LIST_REMHEAD(conn->sent))) { requeue[art->state]++; if (art->state==art_Unsolicited) art->state= art_Unchecked; - LIST_ADDTAIL(queue,art); + LIST_ADDTAIL(art->ipf->queue,art); } int i; @@ -1408,9 +1441,23 @@ static void connect_start(void) { /*---------- assigning articles to conns, and transmitting ----------*/ +static Article *dequeue_from(int peek, InputFile *ipf) { + if (!ipf) return 0; + if (peek) return LIST_HEAD(ipf->queue); + else return LIST_REMHEAD(ipf->queue); +} + +static Article *dequeue(int peek) { + Article *art; + art= dequeue_from(peek, flushing_input_file); if (art) return art; + art= dequeue_from(peek, backlog_input_file); if (art) return art; + art= dequeue_from(peek, main_input_file); if (art) return art; + return 0; +} + static void check_assign_articles(void) { for (;;) { - if (!queue.count) + if (!dequeue(1)) break; Conn *walk, *use=0; @@ -1435,7 +1482,7 @@ static void check_assign_articles(void) { if (use) { if (!inqueue) use->since_activity= 0; /* reset idle counter */ while (spare>0) { - Article *art= LIST_REMHEAD(queue); + Article *art= dequeue(0); if (!art) break; LIST_ADDTAIL(use->waiting, art); spare--; @@ -1478,6 +1525,27 @@ static void conn_maybe_write(Conn *conn) { } } +static int article_check_expired(Article *art /* must be queued, not conn */) { + ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT); + if (artdata) { SMfreearticle(artdata); return 0; } + + LIST_REMOVE(art->ipf->queue, art); + art->missing= 1; + art->ipf->counts[art_Unchecked][RC_missing]++; + article_done(art,-1); + return 1; +} + +static void inputfile_queue_check_expired(InputFile *ipf) { + if (!ipf) return; + + for (;;) { + Article *art= LIST_HEAD(ipf->queue); + int exp= article_check_expired(art); + if (!exp) break; + } +} + /*========== article transmission ==========*/ static XmitDetails *xmit_core(Conn *conn, const char *data, int len, @@ -1502,7 +1570,6 @@ static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) { static void xmit_free(XmitDetails *d) { switch (d->kind) { - case xk_Malloc: free(d->info.malloc_tofree); break; case xk_Artdata: SMfreearticle(d->info.sm_art); break; case xk_Const: break; default: abort(); @@ -1576,7 +1643,7 @@ static void conn_make_some_xmits(Conn *conn) { XMIT_LITERAL("\r\n"); xmit_artbody(conn, artdata); } else { - article_done(conn, art, -1); + article_done(art, -1); continue; } } else { @@ -1697,7 +1764,7 @@ static void update_nocheck(int accepted) { nocheck= new_nocheck; } -static void article_done(Conn *conn, Article *art, int whichcount) { +static void article_done(Article *art, int whichcount) { if (!art->missing) art->ipf->counts[art->state][whichcount]++; if (whichcount == RC_accepted) update_nocheck(1); @@ -1786,7 +1853,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{ \ code_streaming= (streaming); \ GET_ARTICLE(musthavesent); \ - article_done(conn, art, RC_##how); \ + article_done(art, RC_##how); \ goto dealtwith; \ }while(0) @@ -1835,7 +1902,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0 || fflush(defer)) sysfatal("write to defer file %s",path_defer); - article_done(conn, art, RC_deferred); + article_done(art, RC_deferred); break; } @@ -1877,6 +1944,7 @@ static InputFile *open_input_file(const char *path) { ipf->fd= fd; strcpy(ipf->path, path); + LIST_INIT(ipf->queue); return ipf; } @@ -1966,7 +2034,7 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, ipf->readcount_ok++; art= xmalloc(sizeof(*art) - 1 + midlen + 1); - memset(art,0,sizeof(art)); + memset(art,0,sizeof(*art)); art->state= art_Unchecked; art->midlen= midlen; art->ipf= ipf; ipf->inprogress++; @@ -1974,7 +2042,10 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, art->offset= old_offset; art->blanklen= recsz; strcpy(art->messageid, space+1); - LIST_ADDTAIL(queue, art); + LIST_ADDTAIL(ipf->queue, art); + + if (ipf==backlog_input_file) + article_check_expired(art); if (sms==sm_NORMAL && ipf==main_input_file && ipf->offset >= target_max_feedfile_size) @@ -2271,7 +2342,7 @@ static void inputfile_reading_stop(InputFile *ipf) { | flsh->rd!=0 | | flsh->rd!=0 | [Separated] | | [Dropping] | main F idle | | main none - | old D tail | | old D tail + | flsh D tail | | flsh D tail | ============= | | ============ | | | | install | ^ | EOF ON D | | defer | EOF ON D @@ -2281,7 +2352,7 @@ static void inputfile_reading_stop(InputFile *ipf) { | flsh->rd==0 | V flsh->rd==0 | [Finishing] | | [Dropping] | main F tail | `. main none - | old D closed | `. old D closed + | flsh D closed | `. flsh D closed | =============== V `. =============== | | `. | | | ALL D PROCESSED `. | ALL D PROCESSED @@ -2294,7 +2365,7 @@ static void inputfile_reading_stop(InputFile *ipf) { DROPPED [Dropped] main none - old none + flsh none some backlog ============== | @@ -2410,6 +2481,9 @@ static void statemc_init(void) { if (file_d) { debug("startup: F!=D => Separated"); startup_set_input_file(file_d); + flushing_input_file= main_input_file; + main_input_file= open_input_file(feedfile); + if (!main_input_file) die("feedfile vanished during startup"); SMS(SEPARATED, 0, "found both old and current feed files"); } else { debug("startup: F exists, D ENOENT => Normal"); @@ -2428,7 +2502,7 @@ static void statemc_start_flush(const char *why) { /* Normal => Flushing */ why, (unsigned long)(main_input_file ? main_input_file->offset : 0), (unsigned long)target_max_feedfile_size, - sm_period_counter); + until_flush); int r= link(feedfile, path_flushing); if (r) sysfatal("link feedfile %s to flushing file %s", @@ -2441,24 +2515,29 @@ static void statemc_start_flush(const char *why) { /* Normal => Flushing */ spawn_inndcomm_flush(why); /* => Flushing FLUSHING */ } -static void statemc_period_poll(void) { - if (!sm_period_counter) return; - sm_period_counter--; - assert(sm_period_counter>=0); - - if (sm_period_counter) return; +static int trigger_flush_ok(void) { /* => Flushing,FLUSHING, ret 1; or ret 0 */ switch (sms) { case sm_NORMAL: statemc_start_flush("periodic"); /* Normal => Flushing; => FLUSHING */ - break; + return 1; case sm_FLUSHFAILED: spawn_inndcomm_flush("retry"); /* Moved => Flushing; => FLUSHING */ - break; + return 1; default: - abort(); + return 0; } } +static void statemc_period_poll(void) { + if (!until_flush) return; + until_flush--; + assert(until_flush>=0); + + if (until_flush) return; + int ok= trigger_flush_ok(); + assert(ok); +} + static int inputfile_is_done(InputFile *ipf) { if (!ipf) return 0; if (ipf->inprogress) return 0; /* new article in the meantime */ @@ -2564,7 +2643,7 @@ static void queue_check_input_done(void) { static void statemc_setstate(StateMachineState newsms, int periods, const char *forlog, const char *why) { sms= newsms; - sm_period_counter= periods; + until_flush= periods; const char *xtra= ""; switch (sms) { @@ -2782,7 +2861,7 @@ static void search_backlog_file(void) { static void preterminate(void) { if (in_child) return; notice_processed(main_input_file,0,"feedfile",""); - notice_processed(flushing_input_file,0,"flushing file",""); + notice_processed(flushing_input_file,0,"flushing",""); if (backlog_input_file) notice_processed(backlog_input_file,0, "backlog file ", backlog_input_file->path); @@ -3047,9 +3126,9 @@ static char *debug_report_ipf(InputFile *ipf) { const char *slash= strrchr(ipf->path,'/'); const char *path= slash ? slash+1 : ipf->path; - return xasprintf("%p/%s:ip=%ld,off=%ld,fd=%d%s%s", + return xasprintf("%p/%s:queue=%d,ip=%ld,off=%ld,fd=%d%s%s", ipf, path, - ipf->inprogress, (long)ipf->offset, + ipf->queue.count, ipf->inprogress, (long)ipf->offset, ipf->fd, ipf->rd ? "" : ",!rd", ipf->skippinglong ? "*skiplong" : ""); @@ -3061,12 +3140,11 @@ static void period(void) { char *dipf_backlog= debug_report_ipf(backlog_input_file); debug("PERIOD" - " sms=%s[%d] conns=%d queue=%d until_connect=%d" + " sms=%s[%d] conns=%d until_connect=%d" " input_files main:%s flushing:%s backlog:%s" " children connecting=%ld inndcomm=%ld" , - sms_names[sms], sm_period_counter, - conns.count, queue.count, until_connect, + sms_names[sms], until_flush, conns.count, until_connect, dipf_main, dipf_flushing, dipf_backlog, (long)connecting_child, (long)inndcomm_child ); @@ -3077,6 +3155,7 @@ static void period(void) { if (until_connect) until_connect--; + inputfile_queue_check_expired(backlog_input_file); poll_backlog_file(); if (!backlog_input_file) close_defer(); /* want to start on a new backlog */ statemc_period_poll(); @@ -3104,7 +3183,8 @@ static void dump_article_list(FILE *f, const ControlCommand *c, } } -static void dump_input_file(FILE *f, InputFile *ipf, const char *wh) { +static void dump_input_file(FILE *f, const ControlCommand *c, + InputFile *ipf, const char *wh) { char *dipf= debug_report_ipf(ipf); fprintf(f,"input %s %s", wh, dipf); free(dipf); @@ -3125,6 +3205,8 @@ static void dump_input_file(FILE *f, InputFile *ipf, const char *wh) { wh, *statename RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL)); } + fprintf(f,"input %s queue", wh); + dump_article_list(f,c,&ipf->queue); } } @@ -3136,7 +3218,7 @@ CCMD(dump) { fprintf(f,"general"); DUMPV("%s", sms_names,[sms]); - DUMPV("%d", ,sm_period_counter); + DUMPV("%d", ,until_flush); DUMPV("%ld", (long),self_pid); DUMPV("%p", , defer); DUMPV("%d", , until_connect); @@ -3157,9 +3239,9 @@ CCMD(dump) { fprintf(f,"filemon "); filemon_method_dump_info(f); - dump_input_file(f, main_input_file, "main" ); - dump_input_file(f, flushing_input_file, "flushing"); - dump_input_file(f, backlog_input_file, "backlog" ); + dump_input_file(f,c, main_input_file, "main" ); + dump_input_file(f,c, flushing_input_file, "flushing"); + dump_input_file(f,c, backlog_input_file, "backlog" ); fprintf(f,"conns count=%d\n", conns.count); @@ -3181,18 +3263,9 @@ CCMD(dump) { const struct iovec *iv= &conn->xmit[i]; const XmitDetails *xd= &conn->xmitd[i]; char *dinfo; - long diff; switch (xd->kind) { - case xk_Malloc: - diff= xd->info.malloc_tofree - (char*)iv->iov_base; - dinfo= xasprintf("M%5ld", diff); - break; - case xk_Const: - dinfo= xasprintf("Const"); - break; - case xk_Artdata: - dinfo= xasprintf("A%p", xd->info.sm_art); - break; + case xk_Const: dinfo= xasprintf("Const"); break; + case xk_Artdata: dinfo= xasprintf("A%p", xd->info.sm_art); break; default: abort(); } @@ -3202,8 +3275,6 @@ CCMD(dump) { } } - fprintf(f,"queue"); dump_article_list(f,c,&queue); - fprintf(f,"paths"); DUMPV("%s", , path_lock); DUMPV("%s", , path_flushing); @@ -3509,7 +3580,6 @@ int main(int argc, char **argv) { loop= (oop_source*)sysloop; LIST_INIT(conns); - LIST_INIT(queue); if (become_daemon) { int i;