From 61af0833343b7a0bafd7aa86f9c0bec420134368 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Mon, 3 May 2010 00:27:23 +0100 Subject: [PATCH] queue for each input file --- backends/innduct.c | 61 ++++++++++++++++++++++++++++++---------------- 1 file changed, 40 insertions(+), 21 deletions(-) diff --git a/backends/innduct.c b/backends/innduct.c index 18ce508..befc3af 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -448,10 +448,12 @@ struct InputFile { Filemon_Perfile *filemon; oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */ - long inprogress; /* no. of articles read but not processed */ off_t offset; int skippinglong; + ArticleList queue; + long inprogress; /* includes queue.count and also articles in conns */ + int counts[art_MaxState][RCI_max]; int readcount_ok, readcount_blank, readcount_err; char path[]; @@ -507,7 +509,6 @@ struct Conn { /* main initialises */ static oop_source *loop; static ConnList conns; -static ArticleList queue; static char *path_lock, *path_flushing, *path_defer; static char *path_control, *path_dump; static char *globpat_backlog; @@ -1120,12 +1121,17 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) { memset(requeue,0,sizeof(requeue)); Article *art; - while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art); - while ((art= LIST_REMHEAD(conn->waiting))) LIST_ADDTAIL(queue, art); + + while ((art= LIST_REMHEAD(conn->priority))) + LIST_ADDTAIL(art->ipf->queue, art); + + while ((art= LIST_REMHEAD(conn->waiting))) + LIST_ADDTAIL(art->ipf->queue, art); + while ((art= LIST_REMHEAD(conn->sent))) { requeue[art->state]++; if (art->state==art_Unsolicited) art->state= art_Unchecked; - LIST_ADDTAIL(queue,art); + LIST_ADDTAIL(art->ipf->queue,art); } int i; @@ -1435,9 +1441,23 @@ static void connect_start(void) { /*---------- assigning articles to conns, and transmitting ----------*/ +static Article *dequeue_from(int peek, InputFile *ipf) { + if (!ipf) return 0; + if (peek) return LIST_HEAD(ipf->queue); + else return LIST_REMHEAD(ipf->queue); +} + +static Article *dequeue(int peek) { + Article *art; + art= dequeue_from(peek, flushing_input_file); if (art) return art; + art= dequeue_from(peek, backlog_input_file); if (art) return art; + art= dequeue_from(peek, main_input_file); if (art) return art; + return 0; +} + static void check_assign_articles(void) { for (;;) { - if (!queue.count) + if (!dequeue(1)) break; Conn *walk, *use=0; @@ -1462,7 +1482,7 @@ static void check_assign_articles(void) { if (use) { if (!inqueue) use->since_activity= 0; /* reset idle counter */ while (spare>0) { - Article *art= LIST_REMHEAD(queue); + Article *art= dequeue(0); if (!art) break; LIST_ADDTAIL(use->waiting, art); spare--; @@ -1903,6 +1923,7 @@ static InputFile *open_input_file(const char *path) { ipf->fd= fd; strcpy(ipf->path, path); + LIST_INIT(ipf->queue); return ipf; } @@ -2000,7 +2021,7 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, art->offset= old_offset; art->blanklen= recsz; strcpy(art->messageid, space+1); - LIST_ADDTAIL(queue, art); + LIST_ADDTAIL(ipf->queue, art); if (sms==sm_NORMAL && ipf==main_input_file && ipf->offset >= target_max_feedfile_size) @@ -3081,9 +3102,9 @@ static char *debug_report_ipf(InputFile *ipf) { const char *slash= strrchr(ipf->path,'/'); const char *path= slash ? slash+1 : ipf->path; - return xasprintf("%p/%s:ip=%ld,off=%ld,fd=%d%s%s", + return xasprintf("%p/%s:queue=%d,ip=%ld,off=%ld,fd=%d%s%s", ipf, path, - ipf->inprogress, (long)ipf->offset, + ipf->queue.count, ipf->inprogress, (long)ipf->offset, ipf->fd, ipf->rd ? "" : ",!rd", ipf->skippinglong ? "*skiplong" : ""); @@ -3095,12 +3116,11 @@ static void period(void) { char *dipf_backlog= debug_report_ipf(backlog_input_file); debug("PERIOD" - " sms=%s[%d] conns=%d queue=%d until_connect=%d" + " sms=%s[%d] conns=%d until_connect=%d" " input_files main:%s flushing:%s backlog:%s" " children connecting=%ld inndcomm=%ld" , - sms_names[sms], until_flush, - conns.count, queue.count, until_connect, + sms_names[sms], until_flush, conns.count, until_connect, dipf_main, dipf_flushing, dipf_backlog, (long)connecting_child, (long)inndcomm_child ); @@ -3138,7 +3158,8 @@ static void dump_article_list(FILE *f, const ControlCommand *c, } } -static void dump_input_file(FILE *f, InputFile *ipf, const char *wh) { +static void dump_input_file(FILE *f, const ControlCommand *c, + InputFile *ipf, const char *wh) { char *dipf= debug_report_ipf(ipf); fprintf(f,"input %s %s", wh, dipf); free(dipf); @@ -3159,6 +3180,8 @@ static void dump_input_file(FILE *f, InputFile *ipf, const char *wh) { wh, *statename RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL)); } + fprintf(f,"input %s queue", wh); + dump_article_list(f,c,&ipf->queue); } } @@ -3191,9 +3214,9 @@ CCMD(dump) { fprintf(f,"filemon "); filemon_method_dump_info(f); - dump_input_file(f, main_input_file, "main" ); - dump_input_file(f, flushing_input_file, "flushing"); - dump_input_file(f, backlog_input_file, "backlog" ); + dump_input_file(f,c, main_input_file, "main" ); + dump_input_file(f,c, flushing_input_file, "flushing"); + dump_input_file(f,c, backlog_input_file, "backlog" ); fprintf(f,"conns count=%d\n", conns.count); @@ -3215,7 +3238,6 @@ CCMD(dump) { const struct iovec *iv= &conn->xmit[i]; const XmitDetails *xd= &conn->xmitd[i]; char *dinfo; - long diff; switch (xd->kind) { case xk_Const: dinfo= xasprintf("Const"); break; case xk_Artdata: dinfo= xasprintf("A%p", xd->info.sm_art); break; @@ -3228,8 +3250,6 @@ CCMD(dump) { } } - fprintf(f,"queue"); dump_article_list(f,c,&queue); - fprintf(f,"paths"); DUMPV("%s", , path_lock); DUMPV("%s", , path_flushing); @@ -3535,7 +3555,6 @@ int main(int argc, char **argv) { loop= (oop_source*)sysloop; LIST_INIT(conns); - LIST_INIT(queue); if (become_daemon) { int i; -- 2.30.2