X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=blobdiff_plain;f=backends%2Finnduct.c;h=c4d42e1300e2f5101005b37b6cc9e37924a98b2c;hp=613a85d6bde8d6344ef35fbcad0677b76d62ea5e;hb=fd6ed72868d60a901ac6dfc312c0a1640a8146c3;hpb=c6028aa66d8e254a4c52ae376f9f51179a638547 diff --git a/backends/innduct.c b/backends/innduct.c index 613a85d..c4d42e1 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,13 +1,28 @@ /* - * todo - * - abolish xk_Malloc - * - rename sm_period_counter as it's just about flushes - * - manpage: document control master stuff - * * debugging rune: * build-lfs/backends/innduct --connection-timeout=30 --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost */ +/*-- +flow control notes +to ensure articles go away eventually +separate queue for each input file + queue expiry + every period, check head of backlog queue for expiry with SMretrieve + if too old: discard, and check next article + also check every backlog article as we read it + flush expiry + after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping + one-off: eat queued articles from flushing and write them to defer + one-off: connfail all connections which have any articles from flushing + newly read articles from flushing go straight to defer + this should take care of it and get us out of this state +to avoid filling up ram needlessly + input control + limit number of queued articles for each ipf + pause/resume inputfile tailing +--*/ + /* * Newsfeeds file entries should look like this: * host.name.of.site[/exclude,exclude,...]\ @@ -300,7 +315,7 @@ static void statemc_start_flush(const char *why); /* Normal => Flushing */ static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */ static int trigger_flush_ok(void); /* => Flushing,FLUSHING, ret 1; or ret 0 */ -static void article_done(Conn *conn, Article *art, int whichcount); +static void article_done(Article *art, int whichcount); static void check_assign_articles(void); static void queue_check_input_done(void); @@ -410,13 +425,12 @@ typedef enum { #define CONNIOVS 128 typedef enum { - xk_Malloc, xk_Const, xk_Artdata + xk_Const, xk_Artdata } XmitKind; struct XmitDetails { XmitKind kind; union { - char *malloc_tofree; ARTHANDLE *sm_art; } info; }; @@ -434,10 +448,12 @@ struct InputFile { Filemon_Perfile *filemon; oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */ - long inprogress; /* no. of articles read but not processed */ off_t offset; int skippinglong; + ArticleList queue; + long inprogress; /* includes queue.count and also articles in conns */ + int counts[art_MaxState][RCI_max]; int readcount_ok, readcount_blank, readcount_err; char path[]; @@ -493,7 +509,6 @@ struct Conn { /* main initialises */ static oop_source *loop; static ConnList conns; -static ArticleList queue; static char *path_lock, *path_flushing, *path_defer; static char *path_control, *path_dump; static char *globpat_backlog; @@ -501,7 +516,7 @@ static pid_t self_pid; /* statemc_init initialises */ static StateMachineState sms; -static int sm_period_counter; +static int until_flush; static InputFile *main_input_file, *flushing_input_file, *backlog_input_file; static FILE *defer; @@ -818,6 +833,8 @@ CCMD(help) { const ControlCommand *ccmd; for (ccmd=control_commands; ccmd->cmd; ccmd++) fprintf(cc->out, " %s\n", ccmd->cmd); + fputs("NB: permissible arguments are not shown above." + " Not all commands listed are safe. See innduct(8).\n", cc->out); } CCMD(flush) { @@ -850,11 +867,11 @@ static const ControlCommand control_commands[]= { { "p", ccmd_period }, #define POKES(cmd,func) \ - { cmd "sm", func, &sm_period_counter, 1 }, \ + { cmd "flush", func, &until_flush, 1 }, \ { cmd "conn", func, &until_connect, 0 }, \ { cmd "blscan", func, &until_backlog_nextscan, 0 }, -POKES("prod ", ccmd_setint_period) POKES("next ", ccmd_setint) +POKES("prod ", ccmd_setint_period) { "pretend flush", ccmd_setintarg, &simulate_flush }, { "wedge blscan", ccmd_setint, &until_backlog_nextscan, -1 }, @@ -1104,12 +1121,17 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) { memset(requeue,0,sizeof(requeue)); Article *art; - while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art); - while ((art= LIST_REMHEAD(conn->waiting))) LIST_ADDTAIL(queue, art); + + while ((art= LIST_REMHEAD(conn->priority))) + LIST_ADDTAIL(art->ipf->queue, art); + + while ((art= LIST_REMHEAD(conn->waiting))) + LIST_ADDTAIL(art->ipf->queue, art); + while ((art= LIST_REMHEAD(conn->sent))) { requeue[art->state]++; if (art->state==art_Unsolicited) art->state= art_Unchecked; - LIST_ADDTAIL(queue,art); + LIST_ADDTAIL(art->ipf->queue,art); } int i; @@ -1419,9 +1441,23 @@ static void connect_start(void) { /*---------- assigning articles to conns, and transmitting ----------*/ +static Article *dequeue_from(int peek, InputFile *ipf) { + if (!ipf) return 0; + if (peek) return LIST_HEAD(ipf->queue); + else return LIST_REMHEAD(ipf->queue); +} + +static Article *dequeue(int peek) { + Article *art; + art= dequeue_from(peek, flushing_input_file); if (art) return art; + art= dequeue_from(peek, backlog_input_file); if (art) return art; + art= dequeue_from(peek, main_input_file); if (art) return art; + return 0; +} + static void check_assign_articles(void) { for (;;) { - if (!queue.count) + if (!dequeue(1)) break; Conn *walk, *use=0; @@ -1446,7 +1482,7 @@ static void check_assign_articles(void) { if (use) { if (!inqueue) use->since_activity= 0; /* reset idle counter */ while (spare>0) { - Article *art= LIST_REMHEAD(queue); + Article *art= dequeue(0); if (!art) break; LIST_ADDTAIL(use->waiting, art); spare--; @@ -1489,6 +1525,27 @@ static void conn_maybe_write(Conn *conn) { } } +static int article_check_expired(Article *art /* must be queued, not conn */) { + ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT); + if (artdata) { SMfreearticle(artdata); return 0; } + + LIST_REMOVE(art->ipf->queue, art); + art->missing= 1; + art->ipf->counts[art_Unchecked][RC_missing]++; + article_done(art,-1); + return 1; +} + +static void inputfile_queue_check_expired(InputFile *ipf) { + if (!ipf) return; + + for (;;) { + Article *art= LIST_HEAD(ipf->queue); + int exp= article_check_expired(art); + if (!exp) break; + } +} + /*========== article transmission ==========*/ static XmitDetails *xmit_core(Conn *conn, const char *data, int len, @@ -1513,7 +1570,6 @@ static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) { static void xmit_free(XmitDetails *d) { switch (d->kind) { - case xk_Malloc: free(d->info.malloc_tofree); break; case xk_Artdata: SMfreearticle(d->info.sm_art); break; case xk_Const: break; default: abort(); @@ -1587,7 +1643,7 @@ static void conn_make_some_xmits(Conn *conn) { XMIT_LITERAL("\r\n"); xmit_artbody(conn, artdata); } else { - article_done(conn, art, -1); + article_done(art, -1); continue; } } else { @@ -1708,7 +1764,7 @@ static void update_nocheck(int accepted) { nocheck= new_nocheck; } -static void article_done(Conn *conn, Article *art, int whichcount) { +static void article_done(Article *art, int whichcount) { if (!art->missing) art->ipf->counts[art->state][whichcount]++; if (whichcount == RC_accepted) update_nocheck(1); @@ -1797,7 +1853,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, #define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{ \ code_streaming= (streaming); \ GET_ARTICLE(musthavesent); \ - article_done(conn, art, RC_##how); \ + article_done(art, RC_##how); \ goto dealtwith; \ }while(0) @@ -1846,7 +1902,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0 || fflush(defer)) sysfatal("write to defer file %s",path_defer); - article_done(conn, art, RC_deferred); + article_done(art, RC_deferred); break; } @@ -1888,6 +1944,7 @@ static InputFile *open_input_file(const char *path) { ipf->fd= fd; strcpy(ipf->path, path); + LIST_INIT(ipf->queue); return ipf; } @@ -1985,7 +2042,10 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, art->offset= old_offset; art->blanklen= recsz; strcpy(art->messageid, space+1); - LIST_ADDTAIL(queue, art); + LIST_ADDTAIL(ipf->queue, art); + + if (ipf==backlog_input_file) + article_check_expired(art); if (sms==sm_NORMAL && ipf==main_input_file && ipf->offset >= target_max_feedfile_size) @@ -2442,7 +2502,7 @@ static void statemc_start_flush(const char *why) { /* Normal => Flushing */ why, (unsigned long)(main_input_file ? main_input_file->offset : 0), (unsigned long)target_max_feedfile_size, - sm_period_counter); + until_flush); int r= link(feedfile, path_flushing); if (r) sysfatal("link feedfile %s to flushing file %s", @@ -2469,11 +2529,11 @@ static int trigger_flush_ok(void) { /* => Flushing,FLUSHING, ret 1; or ret 0 */ } static void statemc_period_poll(void) { - if (!sm_period_counter) return; - sm_period_counter--; - assert(sm_period_counter>=0); + if (!until_flush) return; + until_flush--; + assert(until_flush>=0); - if (sm_period_counter) return; + if (until_flush) return; int ok= trigger_flush_ok(); assert(ok); } @@ -2583,7 +2643,7 @@ static void queue_check_input_done(void) { static void statemc_setstate(StateMachineState newsms, int periods, const char *forlog, const char *why) { sms= newsms; - sm_period_counter= periods; + until_flush= periods; const char *xtra= ""; switch (sms) { @@ -3066,9 +3126,9 @@ static char *debug_report_ipf(InputFile *ipf) { const char *slash= strrchr(ipf->path,'/'); const char *path= slash ? slash+1 : ipf->path; - return xasprintf("%p/%s:ip=%ld,off=%ld,fd=%d%s%s", + return xasprintf("%p/%s:queue=%d,ip=%ld,off=%ld,fd=%d%s%s", ipf, path, - ipf->inprogress, (long)ipf->offset, + ipf->queue.count, ipf->inprogress, (long)ipf->offset, ipf->fd, ipf->rd ? "" : ",!rd", ipf->skippinglong ? "*skiplong" : ""); @@ -3080,12 +3140,11 @@ static void period(void) { char *dipf_backlog= debug_report_ipf(backlog_input_file); debug("PERIOD" - " sms=%s[%d] conns=%d queue=%d until_connect=%d" + " sms=%s[%d] conns=%d until_connect=%d" " input_files main:%s flushing:%s backlog:%s" " children connecting=%ld inndcomm=%ld" , - sms_names[sms], sm_period_counter, - conns.count, queue.count, until_connect, + sms_names[sms], until_flush, conns.count, until_connect, dipf_main, dipf_flushing, dipf_backlog, (long)connecting_child, (long)inndcomm_child ); @@ -3096,6 +3155,7 @@ static void period(void) { if (until_connect) until_connect--; + inputfile_queue_check_expired(backlog_input_file); poll_backlog_file(); if (!backlog_input_file) close_defer(); /* want to start on a new backlog */ statemc_period_poll(); @@ -3123,7 +3183,8 @@ static void dump_article_list(FILE *f, const ControlCommand *c, } } -static void dump_input_file(FILE *f, InputFile *ipf, const char *wh) { +static void dump_input_file(FILE *f, const ControlCommand *c, + InputFile *ipf, const char *wh) { char *dipf= debug_report_ipf(ipf); fprintf(f,"input %s %s", wh, dipf); free(dipf); @@ -3144,6 +3205,8 @@ static void dump_input_file(FILE *f, InputFile *ipf, const char *wh) { wh, *statename RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL)); } + fprintf(f,"input %s queue", wh); + dump_article_list(f,c,&ipf->queue); } } @@ -3155,7 +3218,7 @@ CCMD(dump) { fprintf(f,"general"); DUMPV("%s", sms_names,[sms]); - DUMPV("%d", ,sm_period_counter); + DUMPV("%d", ,until_flush); DUMPV("%ld", (long),self_pid); DUMPV("%p", , defer); DUMPV("%d", , until_connect); @@ -3176,9 +3239,9 @@ CCMD(dump) { fprintf(f,"filemon "); filemon_method_dump_info(f); - dump_input_file(f, main_input_file, "main" ); - dump_input_file(f, flushing_input_file, "flushing"); - dump_input_file(f, backlog_input_file, "backlog" ); + dump_input_file(f,c, main_input_file, "main" ); + dump_input_file(f,c, flushing_input_file, "flushing"); + dump_input_file(f,c, backlog_input_file, "backlog" ); fprintf(f,"conns count=%d\n", conns.count); @@ -3200,18 +3263,9 @@ CCMD(dump) { const struct iovec *iv= &conn->xmit[i]; const XmitDetails *xd= &conn->xmitd[i]; char *dinfo; - long diff; switch (xd->kind) { - case xk_Malloc: - diff= xd->info.malloc_tofree - (char*)iv->iov_base; - dinfo= xasprintf("M%5ld", diff); - break; - case xk_Const: - dinfo= xasprintf("Const"); - break; - case xk_Artdata: - dinfo= xasprintf("A%p", xd->info.sm_art); - break; + case xk_Const: dinfo= xasprintf("Const"); break; + case xk_Artdata: dinfo= xasprintf("A%p", xd->info.sm_art); break; default: abort(); } @@ -3221,8 +3275,6 @@ CCMD(dump) { } } - fprintf(f,"queue"); dump_article_list(f,c,&queue); - fprintf(f,"paths"); DUMPV("%s", , path_lock); DUMPV("%s", , path_flushing); @@ -3528,7 +3580,6 @@ int main(int argc, char **argv) { loop= (oop_source*)sysloop; LIST_INIT(conns); - LIST_INIT(queue); if (become_daemon) { int i;