X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;f=backends%2Finnduct.c;h=0d4d05b0ad5dea68d39c8b7460e9d03f98c47b43;hb=76eef73f36c7ab8c3cbc7f41ca58310994f7cb96;hp=f3be2b7613f07014e5174e4c274b9c70765c0611;hpb=e7e211b5ea4799a7b962988c055442e6cec6fdfe;p=inn-innduct.git diff --git a/backends/innduct.c b/backends/innduct.c index f3be2b7..0d4d05b 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -56,6 +56,10 @@ also we have to start a new backlog file every (some interval) + TODO for backlog file inputs + - all code to search for and open these files + - write proper algorithm comment + @@ -252,7 +256,7 @@ struct Article { char messageid[1]; }; -typedef struct { +typedef struct InputFile { /* This is an instance of struct oop_readable */ struct oop_readable readable; /* first */ oop_readable_call *readable_callback; @@ -265,6 +269,8 @@ typedef struct { oop_read *rd; long inprogress; /* no. of articles read but not processed */ off_t offset; + + Counts counts; } InputFile; typedef enum { @@ -272,10 +278,8 @@ typedef enum { sm_NORMAL, sm_FLUSHING, sm_FLUSHFAIL, - sm_SEPARATED1, - sm_SEPARATED2, /* must follow SEPARATED2 - see feedfile_eof */ - sm_DROPPING1, - sm_DROPPING2, /* must follow DROPPING1 - see feedfile_eof */ + sm_SEPARATED, + sm_DROPPING, } StateMachineState; struct Conn { @@ -303,7 +307,7 @@ static char *path_ductlock, *path_duct, *path_ductdefer; static StateMachineState sms; static FILE *defer; -static InputFile *main_input_file, *old_input_file; +static InputFile *main_input_file, *old_input_file, *backlog_input_file; static int sm_period_counter; @@ -821,7 +825,7 @@ static void conn_make_some_xmits(Conn *conn) { art->sent= 1; LIST_ADDTAIL(conn->sent, art); - counts[art->checked].sent++; + art->ipf->counts[art->checked].sent++; } else { /* check it */ @@ -834,7 +838,7 @@ static void conn_make_some_xmits(Conn *conn) { XMIT_LITERAL("\r\n"); LIST_ADDTAIL(conn->sent, art); - counts[art->checked].offered++; + art->ipf->counts[art->checked].offered++; } } } @@ -905,7 +909,7 @@ static void update_nocheck(int accepted) { static void article_done(Connection *conn, Article *art, int whichcount) { *count++; - counts.articles[art->checked][whichcount]++; + art->ipf->counts.articles[art->checked][whichcount]++; if (whichcount == RC_accepted) update_nocheck(1); else if (whichcount == RC_unwanted) update_nocheck(0); @@ -932,7 +936,7 @@ static void article_done(Connection *conn, Article *art, int whichcount) { assert(ipf->inprogress >= 0); if (!ipf->inprogress) - loop->on_time(loop, OOP_TIME_NOW, statemc_check_oldinput_done, 0); + loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, ipf); free(art); } @@ -1048,13 +1052,25 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, static void feedfile_eof(InputFile *ipf) { assert(ipf != main_input_file); /* promised by tailing_try_read */ + + inputfile_tailing_stop(ipf); + + if (ipf == backlog_input_file) { + assert(ipf->fd >= 0); + if (close(ipf->fd)) sysdie("could not close backlog file %s", ipf->path); + ipf->fd= -1; + return; + } + assert(ipf == old_input_file); - if (sms==sm_SEPARATED1) SMS(SEPARATED2, 0, "eof on old feed file"); - else if (sms==sm_DROPPING1) SMS(DROPPING2, 0, "eof on dead feed file"); - else abort(); - inputfile_tailing_stop(ipf); + assert(ipf->fd >= 0); + if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path); + ipf->fd= -1; + + assert(sms==sm_SEPARATED || sms==sm_DROPPING); + if (main_input_file) inputfile_tailing_start(main_input_file); } @@ -1085,7 +1101,11 @@ static void close_input_file(InputFile *ipf) { assert(!ipf->rd); /* must have had inputfile_tailing_stop */ assert(!ipf->inprogress); /* no dangling pointers pointing here */ - if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path); + if (ipf->fd >= 0) + if (close(ipf->fd)) sysdie("could not close input file %s", ipf->path); + + fixme maybe free ipf->path; + free(ipf); } @@ -1203,8 +1223,17 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, return r; } if (!r) { - if (ipf==main_input_file) { errno=EAGAIN; return -1; } - assert(sms==sm_SEPARATED1 || sms==sm_DROPPING1); + if (ipf==main_input_file) { + errno=EAGAIN; + return -1; + } else if (ipf==old_input_file) { + assert(ipf->fd>=0); + assert(sms==sm_SEPARATED || sms==sm_DROPPING); + } else if (ipf==backlog_input_file) { + assert(ipf->fd>=0); + } else { + abort(); + } } return r; } @@ -1387,7 +1416,8 @@ static void inputfile_tailing_stop(InputFile *ipf) { | | open F \ | V V | ============= ============ - | SEPARATED1 DROPPING1 + | SEPARATED/ DROPPING/ + | old->fd>=0 old->fd>=0 | [Separated] [Dropping] | main F idle main none | old D tail old D tail @@ -1395,12 +1425,13 @@ static void inputfile_tailing_stop(InputFile *ipf) { | | | ^ | EOF ON D | EOF ON D | V V - | ============= ============ - | SEPARATED2 DROPPING2 + | =============== =============== + | SEPARATED/ DROPPING/ + | old->fd==-1 old->fd==-1 | [Finishing] [Dropping] | main F tail main none - | old D idle old D idle - | ============= ============ + | old D closed old D closed + | =============== =============== | | | | | ALL D PROCESSED | ALL D PROCESSED | V install defer as backlog V install defer as backlog @@ -1539,7 +1570,7 @@ static void statemc_init(void) { spawn_inndcomm_flush(); /* => Flushing, sets sms to sm_FLUSHING */ } else { /* F!=D => Separated */ - SMS(SEPARATED1, 0, "found both old and current feed files"); + SMS(SEPARATED, 0, "found both old and current feed files"); startup_set_input_file(file_d); } } else { /*!file_d*/ @@ -1579,13 +1610,26 @@ static void startup_set_input_file(InputFile *f) { inputfile_tailing_start(f); } -static void *statemc_check_oldinput_done(oop_source *lp, - struct timeval now, void *u) { +static void *statemc_check_input_done(oop_source *lp, + struct timeval now, void *ipf_v) { + InputFile *ipf= ipf_v; struct stat stab; - int done= (sms==sm_SEPARATED2 || sms==sm_DROPPING2) - && old_input_file->inprogress; - if (!done) return; + if (ipf->inprogress) return; /* new article in the meantime */ + if (ipf->fd >= 0); return; /* not had EOF */ + + if (ipf == backlog_input_file) { + notice_processed(ipf,"backlog file",ipf->path); + if (unlink(ipf->path)) + sysdie("could not unlink done backlog file %s", ipf->path); + close_input_file(ipf); + fixme trigger search for new backlog file; + } + + assert(ipf == old_input_file); + assert(sms==sm_SEPARATED || sms==sm_DROPPING); + + notice_processed(ipf,"feed file",0); r= fstat(fileno(defer), &stab); if (r) sysdie("check defer file %s", path_defer); @@ -1606,7 +1650,7 @@ static void *statemc_check_oldinput_done(oop_source *lp, if (unlink(path_duct)) sysdie("could not unlink old duct file %s", path_duct); - if (sms==sm_DROPPING2) { + if (sms==sm_DROPPING) { notice("feed dropped and our work is complete" " (but check for backlog files)"); exit(0); @@ -1654,7 +1698,7 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { warn("feed has been dropped by innd, finishing up"); old_input_file= main_input_file; main_input_file= 0; - SMS(DROPPING1, 0, "dropped by innd"); + SMS(DROPPING, 0, "dropped by innd"); return OOP_CONTINUE; case 0: @@ -1662,7 +1706,7 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { main_input_file= open_input_file(feedfile); if (!main_input_file) die("flush succeeded but feedfile %s does not exist!", feedfile); - SMS(SEPARATED1, 0, "feed file missing"); + SMS(SEPARATED, 0, "feed file missing"); return OOP_CONTINUE; default: