From 777be8d20bd050e550db0e7cdb6ed68e18138c47 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Wed, 24 Mar 2010 00:16:45 +0000 Subject: [PATCH] WIP, before change sm timeout machinery to count down and general counter --- backends/innduct.c | 306 +++++++++++++++++++++++++++++++-------------- 1 file changed, 215 insertions(+), 91 deletions(-) diff --git a/backends/innduct.c b/backends/innduct.c index 2bc5075..54e6196 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -142,12 +142,16 @@ #define CONNCHILD_ESTATUS_STREAM 4 #define CONNCHILD_ESTATUS_NOSTREAM 5 +#define INNDCOMMCHILD_ESTATUS_FAIL 6 +#define INNDCOMMCHILD_ESTATUS_NONESUCH 7 + /*----- configuration options -----*/ -static char *feedfile; +static char *feedname, *feedfile; static int max_connections, max_queue_per_conn; static int connection_setup_timeout, port, try_stream; +static int inndcomm_flush_timeout; static const char *remote_host; static double accept_proportion; @@ -248,9 +252,10 @@ typedef enum { sm_NORMAL, sm_FLUSHING, sm_FLUSHFAIL, - sm_DROPPING, - sm_SEPARATED, - sm_FINISHING; + sm_SEPARATED1, + sm_SEPARATED2, /* must follow SEPARATED2 - see feedfile_eof */ + sm_DROPPING1, + sm_DROPPING2, /* must follow DROPPING1 - see feedfile_eof */ } StateMachineState; struct Conn { @@ -292,11 +297,23 @@ static void filemon_callback(void); static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } } +static pid_t xfork(const char *what) { + pid_t child; -/*========== making new connections ==========*/ + child= fork(); + if (child==-1) sysdie("cannot fork for %s",what); + if (!child) postfork(what); + return child; +} -static int connecting_sockets[2]= {-1,-1}; -static pid_t connecting_child; +static void on_fd_read_except(int fd, oop_call_fd callback) { + loop->on_fd(loop, fd, OOP_READ, callback, 0); + loop->on_fd(loop, fd, OOP_EXCEPTION, callback, 0); +} +static void cancel_fd_read_except(int fd) { + loop->cancel_fd(loop, fd, OOP_READ); + loop->cancel_fd(loop, fd, OOP_EXCEPTION); +} static void report_child_status(const char *what, int status) { if (WIFEXITED(status)) { @@ -317,27 +334,38 @@ static void report_child_status(const char *what, int status) { } } +static int xwaitpid(pid_t *pid, const char *what) { + int status; + + r= kill(*pid, SIGKILL); + if (r) sysdie("cannot kill %s child", what); + + pid_t got= waitpid(*pid, &status, WNOHANG); + if (got==-1) sysdie("cannot reap %s child", what); + + *pid= 0; + + return status; +} + +/*========== making new connections ==========*/ + +static int connecting_sockets[2]= {-1,-1}; +static pid_t connecting_child; + static void connect_attempt_discard(void) { - if (connecting_sockets[0]) { - cancel_fd(loop, connecting_sockets[0], OOP_READ); - cancel_fd(loop, connecting_sockets[0], OOP_EXCEPTION); - } + if (connecting_sockets[0]) + cancel_fd(connecting_sockets[0]); + perhaps_close(&connecting_sockets[0]); perhaps_close(&connecting_sockets[1]); if (connecting_child) { - int status; - r= kill(connecting_child, SIGKILL); - if (r) sysdie("cannot kill connect child"); - - pid_t got= waitpid(connecting_child, &status, WNOHANG); - if (got==-1) sysdie("cannot reap connect child"); + int status= xwaitpid(&connecting_child, "connect"); if (!(WIFEXITED(status) || - (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) { - report_child_status("connect" - } - connecting_child= 0; + (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) + report_child_status("connect", status); } } @@ -441,16 +469,13 @@ static void connect_start() { r= socketpair(AF_UNIX, SOCK_STREAM, 0, connecting_sockets); if (r) { syswarn("connect: cannot create socketpair for child"); goto x; } - connecting_child= fork(); - if (connecting_child==-1) { syswarn("connect: cannot fork"); goto x; } + connecting_child= xfork("connection"); if (!connecting_child) { FILE *cn_from, *cn_to; char buf[NNTP_STRLEN+100]; int exitstatus= CONNCHILD_ESTATUS_NOSTREAM; - postfork(); - r= close(connecting_sockets[0]); if (r) sysdie("connect: close parent socket in child"); @@ -524,8 +549,7 @@ static void connect_start() { r= close(connecting_sockets[1]); connecting_sockets[1]= 0; if (r) sysdie("connect: close child socket in parent"); - loop->on_fd(loop, connecting_sockets[0], OOP_READ, connchild_event, 0); - loop->on_fd(loop, connecting_sockets[0], OOP_EXCEPTION, connchild_event, 0); + on_fd_read_except(connecting_sockets[0], connchild_event); return OOP_CONTINUE; x: @@ -826,6 +850,9 @@ static void article_done(Connection *conn, Article *art, int whichcount) { ipf->inprogress--; assert(ipf->inprogress >= 0); + if (!ipf->inprogress) + loop->on_time(loop, OOP_TIME_NOW, statemc_check_oldinput_done, 0); + free(art); } @@ -935,16 +962,17 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, return OOP_CONTINUE; } - + /*========== monitoring of input files ==========*/ static void feedfile_eof(InputFile *ipf) { assert(ipf != main_input_file); /* promised by tailing_try_read */ assert(ipf == old_input_file); - assert(sms == sm_SEPARATED); - sms= sm_FINISHING; + assert(sms==sm_SEPARATED1 || sms==sm_DROPPING1); + sms++; inputfile_tailing_stop(ipf); - inputfile_tailing_start(main_input_file); + if (main_input_file) + inputfile_tailing_start(main_input_file); } static InputFile *open_input_file(const char *path) { @@ -956,14 +984,14 @@ static InputFile *open_input_file(const char *path) { InputFile *ipf= xmalloc(sizeof(InputFile)); memset(ipf,0,sizeof(*ipf)); - + ipf->readable.on_readable= tailing_on_readable; ipf->readable.on_cancel= tailing_on_cancel; ipf->readable.try_read= tailing_try_read; ipf->fd= fd; ipf->path= path; - + return ipf; } @@ -1015,6 +1043,9 @@ typedef void *feedfile_got_article(oop_source *lp, oop_read *rd, ipf->offset += recsz + 1; if (sms==sm_NORMAL && ipf->offset >= flush_threshold) { + notice("starting flush (%lu >= %lu)", + (unsigned long)ipf->offset, (unsigned long)flush_threshold); + int r= link(feedfile, duct_path); if (r) sysdie("link feedfile %s to ductfile %s", feedfile, dut_path); /* => Hardlinked */ @@ -1083,8 +1114,14 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, InputFile *ipf= (void*)rable; for (;;) { ssize_t r= read(ipf->fd, buffer, length); - if (!r && ipf==main_input_file) { errno=EAGAIN; return -1; } - if (r==-1 && errno==EINTR) continue; + if (r==-1) { + if (errno==EINTR) continue; + return r; + } + if (!r) { + if (ipf==main_input_file) { errno=EAGAIN; return -1; } + assert(sms==sm_SEPARATED1 || sms==sm_DROPPING1); + } return r; } } @@ -1214,7 +1251,7 @@ static void inputfile_tailing_stop(InputFile *ipf) { /* See official state diagram at top of file. We implement * this as follows: - * + ================ WAITING [Nothing/Noduct] @@ -1230,7 +1267,7 @@ static void inputfile_tailing_stop(InputFile *ipf) { | ======== (ESRCH) | NORMAL [Dropped] | [Normal] ========= - | read F + | main F tail | ======== | | | | F IS SO BIG WE SHOULD FLUSH @@ -1247,50 +1284,50 @@ static void inputfile_tailing_stop(InputFile *ipf) { | ========== | | FLUSHING | | [Flushing] | - | read D | + | main D tail | | ========== | | | | | | INNDCOMM FLUSH FAILS ^ - | |`----------------------->--------. | - | | | | - | | NO SUCH SITE V | - ^ |`----------------. ========= | - | | | FLUSHFAIL | - | | V [Moved] | - | | ========== read D | - | | DROPPING ========= | - | | [Dropping] | | - | | read D | TIME TO RETRY | - | | ========== `------------------' - | | FLUSH OK | - | | open F | AT EOF OF D AND ALL PROCESSED - | V | install defer as backlog - | =========== | unlink D - | SEPARATED | exit - | [Separated] V - | read D ========== - | =========== (ESRCH) - | | [Droppped] - | | ========== - | V - | | AT EOF OF D - ^ | - | =========== - | FINISHING - | [Finishing] - | read F - | write D - | =========== - | | - | | ALL D PROCESSED - | | install defer as backlog - | | start new defer - ^ V unlink D - | | close D - | | - `----------' - - * + | |`----------------------->----------. | + | | | | + | | NO SUCH SITE V | + ^ |`--------------->----. =========== | + | | \ FLUSHFAIL | + | | \ [Moved] | + | | \ main D tail | + | | \ =========== | + | | \ | | + | | \ | TIME TO RETRY | + | | \ `----------------' + | | FLUSH OK \ + | | open F \ + | V V + | ============= ============ + | SEPARATED1 DROPPING1 + | [Separated] [Dropping] + | main F idle main none + | old D tail old D tail + | ============= ============ + | | | + ^ | EOF ON D | EOF ON D + | V V + | ============= ============ + | SEPARATED2 DROPPING2 + | [Finishing] [Dropping] + | main F tail main none + | old D idle old D idle + | ============= ============ + | | | + | | ALL D PROCESSED | ALL D PROCESSED + | V install defer as backlog V install defer as backlog + ^ | close D | close D + | | unlink D | unlink D + | | start new defer | exit + | | V + `----------' ========== + (ESRCH) + [Droppped] + ========== */ static void open_defer(void) { @@ -1425,8 +1462,7 @@ static void statemc_init(void) { } static void statemc_poll(void) { - if (sms == sm_WAITING) statemc_waiting_poll(); - if (sms == sm_FINISHING && !old_input_file->inprogress) statemc_finishdone(); + if (sms==sm_WAITING) statemc_waiting_poll(); } static void statemc_waiting_poll(void) { @@ -1446,11 +1482,13 @@ static void startup_set_input_file(InputFile *f) { inputfile_tailing_start(f); } -static void statmc_finishdone(void) { - time_t now; +static void *statemc_check_oldinput_done(oop_source *lp, + struct timeval now, void *u) { struct stat stab; - assert(sms == sm_FINISHING); + int done= (sms==sm_SEPARATED2 || sms==sm_DROPPING2) + && old_input_file->inprogress; + if (!done) return; r= fstat(fileno(defer), &stab); if (r) sysdie("check defer file %s", path_defer); @@ -1458,11 +1496,8 @@ static void statmc_finishdone(void) { if (fclose(defer)) sysdie("could not close defer file %s", path_defer); defer= 0; - now= time(0); - if (now==-1) sysdie("could not get current time for backlog filename"); - char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile, - (unsigned long)now, + (unsigned long)now.tv_sec, (unsigned long)stab.st_ino); if (link(path_defer, path_backlog)) sysdie("could not install defer file %s as backlog file %s", @@ -1470,20 +1505,109 @@ static void statmc_finishdone(void) { if (unlink(path_defer)) sysdie("could not unlink old defer link %s to backlog file %s", path_defer, backlog); + + if (unlink(path_duct)) + sysdie("could not unlink old duct file %s", path_duct); + + if (sms==sm_DROPPING2) { + notice("feed dropped and our work is complete" + " (but check for backlog files)"); + exit(0); + } + open_defer(); close_input_file(old_input_file); old_input_file= 0; - if (unlink(path_duct)) - sysdie("could not unlink old duct file %s", path_duct); - sms= sm_NORMAL; } /*========== flushing the feed ==========*/ - +static pid_t inndcomm_child; + +static void inndcommfail(const char *what) { + syswarn("error communicating with innd: %s failed: %s", what, ICCfailure); + exit(INNDCOMMCHILD_ESTATUS_FAIL); +} + +static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { + assert(inndcomm_child); + int status= xwaitpid(&inndcomm_child, "inndcomm"); + loop->cancel_fd(fd); + close(fd); + + assert(!old_input_file); + + if (WIFEXITED(status)) { + switch (WEXITSTATUS(status)) { + + case INNDCOMMCHILD_ESTATUS_FAIL: + goto failed; + + case INNDCOMMCHILD_ESTATUS_NONESUCH: + warn("feed has been dropped by innd, finishing up"); + old_input_file= main_input_file; + main_input_file= 0; + sms= sm_DROPPING1; + return OOP_CONTINUE; + + case 0: + old_input_file= main_input_file; + main_input_file= open_input_file(feedfile); + if (!main_input_file) + die("flush succeeded but feedfile %s does not exist!", feedfile); + sms= sm_SEPARATED1; + return OOP_CONTINUE; + + default: + goto unexpected_exitstatus; + + } + } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { + warn("flush of %s timed out trying to talk to innd", feedname); + goto failed; + } else { + unexpected_exitstatus: + report_child_status("inndcomm child", status); + } + + failed: + + +void spawn_inndcomm_flush(void) { + int pipefds[2]; + + assert(sms == sm_NORMAL); + assert(!inndcomm_child); + + if (pipe(pipefds)) sysdie("create pipe for inndcomm child sentinel"); + + inndcomm_child= xfork(); + + if (!inndcomm_child) { + static char flushargv[2]= { feedname, 0 }; + char *reply; + + close(pipefds[0]); + + alarm(inndcomm_flush_timeout); + r= ICCopen(); if (r) inndcommfail("connect"); + r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit"); + if (!r) exit(0); /* yay! */ + + if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH); + syswarn("innd ctlinnd flush %s failed: innd said %s", feedname, reply); + exit(INNDCOMMCHILD_ESTATUS_FAIL); + } + + close(pipefds[1]); + int sentinel_fd= pipefds[0]; + on_fd_read_except(sentinel_fd, inndcomm_event); + + sms= sm_FLUSHING; +} /*========== main program ==========*/ -- 2.30.2