From 01c8d7ea6ab315c5ac205e8f4096dca83dfe0b41 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Thu, 29 Apr 2010 00:00:10 +0100 Subject: [PATCH] fixes --- backends/innduct.c | 126 +++++++++++++++++++++++++++++++++------------ 1 file changed, 94 insertions(+), 32 deletions(-) diff --git a/backends/innduct.c b/backends/innduct.c index 1a80ff3..091b198 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -2,6 +2,8 @@ * todo * - actually do something with readable on control master * - option for realsockdir + * - option for filepoll + * - option for no inotify * - manpage: document control master stuff * - manpage: innconf is used for communicating with innd * - debug this: @@ -295,6 +297,7 @@ static void statemc_check_flushing_done(void); static void statemc_check_backlog_done(void); static void postfork(void); +static void period(void); static void open_defer(void); static void close_defer(void); @@ -486,6 +489,9 @@ static int until_connect, until_backlog_nextscan; static double accept_proportion; static int nocheck, nocheck_reported; +/* for simulation, debugging, etc. */ +int simulate_flush= -1; + /*========== logging ==========*/ static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3); @@ -644,6 +650,11 @@ static time_t xtime(void) { return now; } +static void xgettimeofday(struct timeval *tv_r) { + int r= gettimeofday(tv_r,0); + if (r) sysdie("gettimeofday(2) failed"); +} + static void xsetnonblock(int fd, int nonblocking) { int errnoval= oop_fd_nonblock(fd, nonblocking); if (errnoval) { errno= errnoval; sysdie("setnonblocking"); } @@ -743,7 +754,7 @@ static void control_checkouterr(ControlConn *cc /* may destroy*/) { } static void control_prompt(ControlConn *cc /* may destroy*/) { - fprintf(cc->out, "%s|", sitename); + fprintf(cc->out, "%s| ", sitename); control_checkouterr(cc); } @@ -752,20 +763,35 @@ struct ControlCommand { const char *cmd; void (*f)(ControlConn *cc, const ControlCommand *ccmd, const char *arg, size_t argsz); + void *xdata; + int xval; }; static const ControlCommand control_commands[]; -static void ccmd_help(ControlConn *cc, const ControlCommand *thisccmd, - const char *arg, size_t argsz) { +#define CCMD(wh) \ + static void ccmd_##wh(ControlConn *cc, const ControlCommand *c, \ + const char *arg, size_t argsz) + +CCMD(help) { fputs("commands:\n", cc->out); const ControlCommand *ccmd; for (ccmd=control_commands; ccmd->cmd; ccmd++) fprintf(cc->out, " %s\n", ccmd->cmd); } +CCMD(period) { period(); } +CCMD(setintarg) { *(int*)c->xdata= atoi(arg); } +CCMD(setint) { *(int*)c->xdata= c->xval; } + static const ControlCommand control_commands[]= { - { "h", ccmd_help }, + { "h", ccmd_help }, + { "p", ccmd_period }, + { "pretend flush", ccmd_setintarg, &simulate_flush }, + { "poke sm", ccmd_setint, &sm_period_counter, 1 }, + { "poke conn", ccmd_setint, &until_connect, 0 }, + { "poke blscan", ccmd_setint, &until_backlog_nextscan, 0 }, + { "wedge blscan", ccmd_setint, &until_backlog_nextscan, -1 }, { 0 } }; @@ -790,7 +816,7 @@ static void *control_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, if (memcmp(data, ccmd->cmd, l)) continue; int argl= (int)recsz - (l+1); - ccmd->f(cc, ccmd, argl>=0 ? data : 0, argl); + ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl); goto prompt; } @@ -1769,7 +1795,7 @@ static void close_input_file(InputFile *ipf) { /* does not free */ static void *feedfile_got_bad_data(InputFile *ipf, off_t offset, const char *data, const char *how) { - warn("corrupted file: %s, offset %lu: %s: %s", + warn("corrupted file: %s, offset %lu: %s: in %s", ipf->path, (unsigned long)offset, how, sanitise(data)); ipf->readcount_err++; if (ipf->readcount_err > max_bad_data_initial + @@ -1916,6 +1942,7 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, abort(); } } + tailing_queue_readable(ipf); return r; } } @@ -2030,7 +2057,8 @@ static void filemon_stop(InputFile *ipf) { } static void filemon_callback(InputFile *ipf) { - ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user); + if (ipf && ipf->readable_callback) /* so filepoll() can be naive */ + ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user); } /*---------- interface to start and stop an input file ----------*/ @@ -2276,7 +2304,7 @@ static void statemc_init(void) { InputFile *file_f= open_input_file(feedfile); if (!file_f) die("feed file vanished during startup"); startup_set_input_file(file_f); - SMS(NORMAL, flushfail_retry_periods, "normal startup"); + SMS(NORMAL, spontaneous_flush_periods, "normal startup"); } } } @@ -2432,9 +2460,9 @@ static void statemc_setstate(StateMachineState newsms, int periods, } if (periods) { - info("%s%s[%d] %s",forlog,xtra,periods,why); + info("state %s%s[%d] %s",forlog,xtra,periods,why); } else { - info("%s%s %s",forlog,xtra,why); + info("state %s%s %s",forlog,xtra,why); } } @@ -2583,7 +2611,12 @@ static void search_backlog_file(void) { if (sms==sm_DROPPED) { notice("feed dropped and our work is complete"); - xunlink(path_lock, "lockfile for old feed"); + + int r= unlink(path_control); + if (r && errno!=ENOENT) + syswarn("failed to remove control symlink for old feed"); + + xunlink(path_lock, "lockfile for old feed"); exit(4); } until_backlog_nextscan= backlog_spontrescan_periods; @@ -2647,7 +2680,7 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { goto failed; case INNDCOMMCHILD_ESTATUS_NONESUCH: - warn("feed has been dropped by innd, finishing up"); + notice("feed has been dropped by innd, finishing up"); flushing_input_file= main_input_file; tailing_queue_readable(flushing_input_file); /* we probably previously returned EAGAIN from our fake read method @@ -2725,6 +2758,12 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0); /* parent spots the autoclose of pipefds[1] when we die or exit */ + if (simulate_flush>=0) { + warn("SIMULATING flush child status %d", simulate_flush); + if (simulate_flush>128) raise(simulate_flush-128); + else exit(simulate_flush); + } + alarm(inndcomm_flush_timeout); r= ICCopen(); if (r) inndcommfail("connect"); r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit"); @@ -2735,6 +2774,8 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ exit(INNDCOMMCHILD_ESTATUS_FAIL); } + simulate_flush= -1; + xclose(pipefds[1], "inndcomm sentinel child's end",0); inndcomm_sentinel_fd= pipefds[0]; assert(inndcomm_sentinel_fd); @@ -2770,22 +2811,44 @@ static void postfork(void) { postfork_stdio(defer, "defer file ", path_defer); } -#define EVERY(what, interval_sec, interval_usec, body) \ - static struct timeval what##_timeout = { interval_sec, interval_usec }; \ - static void what##_schedule(void); \ - static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \ - body; \ - what##_schedule(); \ - return OOP_CONTINUE; \ - } \ - static void what##_schedule(void) { \ - loop->on_time(loop, what##_timeout, what##_timedout, 0); \ - } +typedef struct Every Every; +struct Every { + struct timeval interval; + int fixed_rate; + void (*f)(void); +}; + +static void every_schedule(Every *e, struct timeval base); + +static void *every_happens(oop_source *lp, struct timeval base, void *e_v) { + Every *e= e_v; + e->f(); + if (!e->fixed_rate) xgettimeofday(&base); + every_schedule(e, base); + return OOP_CONTINUE; +} -EVERY(filepoll, 5,0, ({ - if (main_input_file && main_input_file->readable_callback) - filemon_callback(main_input_file); -})); +static void every_schedule(Every *e, struct timeval base) { + struct timeval when; + timeradd(&base, &e->interval, &when); + loop->on_time(loop, when, every_happens, e); +} + +static void every(int interval, int fixed_rate, void (*f)(void)) { + Every *e= xmalloc(sizeof(*e)); + e->interval.tv_sec= interval; + e->interval.tv_usec= 0; + e->fixed_rate= fixed_rate; + e->f= f; + struct timeval now; + xgettimeofday(&now); + every_schedule(e, now); +} + +static void filepoll(void) { + filemon_callback(main_input_file); + filemon_callback(flushing_input_file); +} static char *debug_report_ipf(InputFile *ipf) { if (!ipf) return xasprintf("none"); @@ -2799,7 +2862,7 @@ static char *debug_report_ipf(InputFile *ipf) { ipf->fd, ipf->rd ? "+" : ""); } -EVERY(period, -1,0, ({ +static void period(void) { char *dipf_main= debug_report_ipf(main_input_file); char *dipf_flushing= debug_report_ipf(flushing_input_file); char *dipf_backlog= debug_report_ipf(backlog_input_file); @@ -2826,7 +2889,7 @@ EVERY(period, -1,0, ({ statemc_period_poll(); check_assign_articles(); check_idle_conns(); -})); +} /*========== option parsing ==========*/ @@ -3156,11 +3219,10 @@ int main(int argc, char **argv) { if (!filemon_method_init()) { warn("no file monitoring available, polling"); - filepoll_schedule(); + every(5,0,filepoll); } - period_timeout.tv_sec= period_seconds; - period_schedule(); + every(period_seconds,1,period); statemc_init(); -- 2.30.2