From 76eef73f36c7ab8c3cbc7f41ca58310994f7cb96 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Sun, 11 Apr 2010 12:55:45 +0100 Subject: [PATCH] wip read backlog files ourselves; see TODO near blather in comment at top --- backends/innduct.c | 57 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 13 deletions(-) diff --git a/backends/innduct.c b/backends/innduct.c index 6a6e51a..0d4d05b 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -56,6 +56,10 @@ also we have to start a new backlog file every (some interval) + TODO for backlog file inputs + - all code to search for and open these files + - write proper algorithm comment + @@ -252,7 +256,7 @@ struct Article { char messageid[1]; }; -typedef struct { +typedef struct InputFile { /* This is an instance of struct oop_readable */ struct oop_readable readable; /* first */ oop_readable_call *readable_callback; @@ -265,6 +269,8 @@ typedef struct { oop_read *rd; long inprogress; /* no. of articles read but not processed */ off_t offset; + + Counts counts; } InputFile; typedef enum { @@ -301,7 +307,7 @@ static char *path_ductlock, *path_duct, *path_ductdefer; static StateMachineState sms; static FILE *defer; -static InputFile *main_input_file, *old_input_file; +static InputFile *main_input_file, *old_input_file, *backlog_input_file; static int sm_period_counter; @@ -819,7 +825,7 @@ static void conn_make_some_xmits(Conn *conn) { art->sent= 1; LIST_ADDTAIL(conn->sent, art); - counts[art->checked].sent++; + art->ipf->counts[art->checked].sent++; } else { /* check it */ @@ -832,7 +838,7 @@ static void conn_make_some_xmits(Conn *conn) { XMIT_LITERAL("\r\n"); LIST_ADDTAIL(conn->sent, art); - counts[art->checked].offered++; + art->ipf->counts[art->checked].offered++; } } } @@ -903,7 +909,7 @@ static void update_nocheck(int accepted) { static void article_done(Connection *conn, Article *art, int whichcount) { *count++; - counts.articles[art->checked][whichcount]++; + art->ipf->counts.articles[art->checked][whichcount]++; if (whichcount == RC_accepted) update_nocheck(1); else if (whichcount == RC_unwanted) update_nocheck(0); @@ -930,7 +936,7 @@ static void article_done(Connection *conn, Article *art, int whichcount) { assert(ipf->inprogress >= 0); if (!ipf->inprogress) - loop->on_time(loop, OOP_TIME_NOW, statemc_check_oldinput_done, 0); + loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, ipf); free(art); } @@ -1046,6 +1052,16 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, static void feedfile_eof(InputFile *ipf) { assert(ipf != main_input_file); /* promised by tailing_try_read */ + + inputfile_tailing_stop(ipf); + + if (ipf == backlog_input_file) { + assert(ipf->fd >= 0); + if (close(ipf->fd)) sysdie("could not close backlog file %s", ipf->path); + ipf->fd= -1; + return; + } + assert(ipf == old_input_file); inputfile_tailing_stop(ipf); @@ -1085,8 +1101,11 @@ static void close_input_file(InputFile *ipf) { assert(!ipf->rd); /* must have had inputfile_tailing_stop */ assert(!ipf->inprogress); /* no dangling pointers pointing here */ - if (ipf->fd>=0) + if (ipf->fd >= 0) if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path); + + fixme maybe free ipf->path; + free(ipf); } @@ -1591,14 +1610,26 @@ static void startup_set_input_file(InputFile *f) { inputfile_tailing_start(f); } -static void *statemc_check_oldinput_done(oop_source *lp, - struct timeval now, void *u) { +static void *statemc_check_input_done(oop_source *lp, + struct timeval now, void *ipf_v) { + InputFile *ipf= ipf_v; struct stat stab; - int done= (sms==sm_SEPARATED || sms==sm_DROPPING) - && old_input_file->fd==-1 - && !old_input_file->inprogress; - if (!done) return; + if (ipf->inprogress) return; /* new article in the meantime */ + if (ipf->fd >= 0); return; /* not had EOF */ + + if (ipf == backlog_input_file) { + notice_processed(ipf,"backlog file",ipf->path); + if (unlink(ipf->path)) + sysdie("could not unlink done backlog file %s", ipf->path); + close_input_file(ipf); + fixme trigger search for new backlog file; + } + + assert(ipf == old_input_file); + assert(sms==sm_SEPARATED || sms==sm_DROPPING); + + notice_processed(ipf,"feed file",0); r= fstat(fileno(defer), &stab); if (r) sysdie("check defer file %s", path_defer); -- 2.30.2