X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=blobdiff_plain;f=backends%2Finnduct.c;h=288c9dc654c06fa4b60d0b67295e038137731a5d;hp=1a80ff31a4507c35787e744b2b74f638af0e95a8;hb=6a7307f47a05fbef08b3f7a362ccfc742cebea50;hpb=3b0226a35e7c466b36374ccb75360d7e88fa7472 diff --git a/backends/innduct.c b/backends/innduct.c index 1a80ff3..288c9dc 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -2,6 +2,8 @@ * todo * - actually do something with readable on control master * - option for realsockdir + * - option for filepoll + * - option for no inotify * - manpage: document control master stuff * - manpage: innconf is used for communicating with innd * - debug this: @@ -216,7 +218,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. int count; \ } T##List -#define NODE(n) (assert((void*)&(n)->list_node == &(n)), &(n)->list_node) +#define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node) #define LIST_CHECKCANHAVENODE(l,n) \ ((void)((n) == ((l).u.for_type))) /* just for the type check */ @@ -295,6 +297,7 @@ static void statemc_check_flushing_done(void); static void statemc_check_backlog_done(void); static void postfork(void); +static void period(void); static void open_defer(void); static void close_defer(void); @@ -486,6 +489,9 @@ static int until_connect, until_backlog_nextscan; static double accept_proportion; static int nocheck, nocheck_reported; +/* for simulation, debugging, etc. */ +int simulate_flush= -1; + /*========== logging ==========*/ static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3); @@ -625,8 +631,9 @@ static int xwaitpid(pid_t *pid, const char *what) { int r= kill(*pid, SIGKILL); if (r) sysdie("cannot kill %s child", what); - pid_t got= waitpid(*pid, &status, WNOHANG); + pid_t got= waitpid(*pid, &status, 0); if (got==-1) sysdie("cannot reap %s child", what); + if (got==0) die("cannot reap %s child", what); *pid= 0; @@ -644,6 +651,11 @@ static time_t xtime(void) { return now; } +static void xgettimeofday(struct timeval *tv_r) { + int r= gettimeofday(tv_r,0); + if (r) sysdie("gettimeofday(2) failed"); +} + static void xsetnonblock(int fd, int nonblocking) { int errnoval= oop_fd_nonblock(fd, nonblocking); if (errnoval) { errno= errnoval; sysdie("setnonblocking"); } @@ -743,7 +755,7 @@ static void control_checkouterr(ControlConn *cc /* may destroy*/) { } static void control_prompt(ControlConn *cc /* may destroy*/) { - fprintf(cc->out, "%s|", sitename); + fprintf(cc->out, "%s| ", sitename); control_checkouterr(cc); } @@ -752,20 +764,35 @@ struct ControlCommand { const char *cmd; void (*f)(ControlConn *cc, const ControlCommand *ccmd, const char *arg, size_t argsz); + void *xdata; + int xval; }; static const ControlCommand control_commands[]; -static void ccmd_help(ControlConn *cc, const ControlCommand *thisccmd, - const char *arg, size_t argsz) { +#define CCMD(wh) \ + static void ccmd_##wh(ControlConn *cc, const ControlCommand *c, \ + const char *arg, size_t argsz) + +CCMD(help) { fputs("commands:\n", cc->out); const ControlCommand *ccmd; for (ccmd=control_commands; ccmd->cmd; ccmd++) fprintf(cc->out, " %s\n", ccmd->cmd); } +CCMD(period) { period(); } +CCMD(setintarg) { *(int*)c->xdata= atoi(arg); } +CCMD(setint) { *(int*)c->xdata= c->xval; } + static const ControlCommand control_commands[]= { - { "h", ccmd_help }, + { "h", ccmd_help }, + { "p", ccmd_period }, + { "pretend flush", ccmd_setintarg, &simulate_flush }, + { "poke sm", ccmd_setint, &sm_period_counter, 1 }, + { "poke conn", ccmd_setint, &until_connect, 0 }, + { "poke blscan", ccmd_setint, &until_backlog_nextscan, 0 }, + { "wedge blscan", ccmd_setint, &until_backlog_nextscan, -1 }, { 0 } }; @@ -790,7 +817,7 @@ static void *control_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, if (memcmp(data, ccmd->cmd, l)) continue; int argl= (int)recsz - (l+1); - ccmd->f(cc, ccmd, argl>=0 ? data : 0, argl); + ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl); goto prompt; } @@ -1096,10 +1123,7 @@ static int connecting_fdpass_sock; static void connect_attempt_discard(void) { if (connecting_child) { - int r= kill(connecting_child, SIGTERM); - if (r) syswarn("failed to kill connecting child"); int status= xwaitpid(&connecting_child, "connect"); - if (!(WIFEXITED(status) || (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) report_child_status("connect", status); @@ -1121,43 +1145,41 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { Conn *conn= 0; assert(fd == connecting_fdpass_sock); + PREP_DECL_MSG_CMSG(msg); + ssize_t rs= recvmsg(fd, &msg, 0); + if (rs<0) { + if (isewouldblock(errno)) return OOP_CONTINUE; + syswarn("failed to read socket from connecting child"); + goto x; + } conn= xmalloc(sizeof(*conn)); memset(conn,0,sizeof(*conn)); - PREP_DECL_MSG_CMSG(msg); struct cmsghdr *h= 0; - ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT); if (rs >= 0) h= CMSG_FIRSTHDR(&msg); if (!h) { - int status; - pid_t got= waitpid(connecting_child, &status, WNOHANG); - if (got != -1) { - assert(got==connecting_child); - connecting_child= 0; - if (WIFEXITED(status)) { - if (WEXITSTATUS(status) != 0 && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM) - /* child already reported the problem */; - else - warn("connect: connection child exited code %d but no cmsg", + int status= xwaitpid(&connecting_child, "connect child (broken)"); + + if (WIFEXITED(status)) { + if (WEXITSTATUS(status) != 0 && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM) + /* child already reported the problem */; + else { + if (e == OOP_EXCEPTION) + warn("connect: connection child exited code %d but" + " unexpected exception on fdpass socket", WEXITSTATUS(status)); - } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { - warn("connect: connection attempt timed out"); - } else { - report_child_status("connect", status); + else + warn("connect: connection child exited code %d but" + " no cmsg (rs=%d)", + WEXITSTATUS(status), (int)rs); } + } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { + warn("connect: connection attempt timed out"); } else { - /* child is still running apparently, report the socket problem */ - if (rs < 0) - syswarn("connect: read from fdpass socket failed"); - else if (e == OOP_EXCEPTION) - warn("connect: unexpected exception on fdpass socket"); - else if (!rs) - warn("connect: unexpected EOF on fdpass socket"); - else - fatal("connect: unexpected lack of cmsg from child"); + report_child_status("connect", status); } goto x; } @@ -1246,7 +1268,7 @@ static void connect_start(void) { alarm(connection_setup_timeout); if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) { - if (buf[0]) fatal("connect: rejected: %s", sanitise(buf)); + if (buf[0]) fatal("connect: failed: %s", sanitise(buf)); else sysfatal("connect: connection attempt failed"); } if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0) @@ -1306,6 +1328,7 @@ static void connect_start(void) { xclose(socks[1], "connecting fdpass child's socket",0); connecting_fdpass_sock= socks[0]; + xsetnonblock(connecting_fdpass_sock, 1); on_fd_read_except(connecting_fdpass_sock, connchild_event); } @@ -1769,7 +1792,7 @@ static void close_input_file(InputFile *ipf) { /* does not free */ static void *feedfile_got_bad_data(InputFile *ipf, off_t offset, const char *data, const char *how) { - warn("corrupted file: %s, offset %lu: %s: %s", + warn("corrupted file: %s, offset %lu: %s: in %s", ipf->path, (unsigned long)offset, how, sanitise(data)); ipf->readcount_err++; if (ipf->readcount_err > max_bad_data_initial + @@ -1916,6 +1939,7 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, abort(); } } + tailing_queue_readable(ipf); return r; } } @@ -2030,7 +2054,8 @@ static void filemon_stop(InputFile *ipf) { } static void filemon_callback(InputFile *ipf) { - ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user); + if (ipf && ipf->readable_callback) /* so filepoll() can be naive */ + ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user); } /*---------- interface to start and stop an input file ----------*/ @@ -2276,7 +2301,7 @@ static void statemc_init(void) { InputFile *file_f= open_input_file(feedfile); if (!file_f) die("feed file vanished during startup"); startup_set_input_file(file_f); - SMS(NORMAL, flushfail_retry_periods, "normal startup"); + SMS(NORMAL, spontaneous_flush_periods, "normal startup"); } } } @@ -2432,9 +2457,9 @@ static void statemc_setstate(StateMachineState newsms, int periods, } if (periods) { - info("%s%s[%d] %s",forlog,xtra,periods,why); + info("state %s%s[%d] %s",forlog,xtra,periods,why); } else { - info("%s%s %s",forlog,xtra,why); + info("state %s%s %s",forlog,xtra,why); } } @@ -2583,7 +2608,12 @@ static void search_backlog_file(void) { if (sms==sm_DROPPED) { notice("feed dropped and our work is complete"); - xunlink(path_lock, "lockfile for old feed"); + + int r= unlink(path_control); + if (r && errno!=ENOENT) + syswarn("failed to remove control symlink for old feed"); + + xunlink(path_lock, "lockfile for old feed"); exit(4); } until_backlog_nextscan= backlog_spontrescan_periods; @@ -2647,7 +2677,7 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { goto failed; case INNDCOMMCHILD_ESTATUS_NONESUCH: - warn("feed has been dropped by innd, finishing up"); + notice("feed has been dropped by innd, finishing up"); flushing_input_file= main_input_file; tailing_queue_readable(flushing_input_file); /* we probably previously returned EAGAIN from our fake read method @@ -2725,6 +2755,12 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0); /* parent spots the autoclose of pipefds[1] when we die or exit */ + if (simulate_flush>=0) { + warn("SIMULATING flush child status %d", simulate_flush); + if (simulate_flush>128) raise(simulate_flush-128); + else exit(simulate_flush); + } + alarm(inndcomm_flush_timeout); r= ICCopen(); if (r) inndcommfail("connect"); r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit"); @@ -2735,6 +2771,8 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ exit(INNDCOMMCHILD_ESTATUS_FAIL); } + simulate_flush= -1; + xclose(pipefds[1], "inndcomm sentinel child's end",0); inndcomm_sentinel_fd= pipefds[0]; assert(inndcomm_sentinel_fd); @@ -2770,22 +2808,44 @@ static void postfork(void) { postfork_stdio(defer, "defer file ", path_defer); } -#define EVERY(what, interval_sec, interval_usec, body) \ - static struct timeval what##_timeout = { interval_sec, interval_usec }; \ - static void what##_schedule(void); \ - static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \ - body; \ - what##_schedule(); \ - return OOP_CONTINUE; \ - } \ - static void what##_schedule(void) { \ - loop->on_time(loop, what##_timeout, what##_timedout, 0); \ - } +typedef struct Every Every; +struct Every { + struct timeval interval; + int fixed_rate; + void (*f)(void); +}; + +static void every_schedule(Every *e, struct timeval base); + +static void *every_happens(oop_source *lp, struct timeval base, void *e_v) { + Every *e= e_v; + e->f(); + if (!e->fixed_rate) xgettimeofday(&base); + every_schedule(e, base); + return OOP_CONTINUE; +} + +static void every_schedule(Every *e, struct timeval base) { + struct timeval when; + timeradd(&base, &e->interval, &when); + loop->on_time(loop, when, every_happens, e); +} -EVERY(filepoll, 5,0, ({ - if (main_input_file && main_input_file->readable_callback) - filemon_callback(main_input_file); -})); +static void every(int interval, int fixed_rate, void (*f)(void)) { + Every *e= xmalloc(sizeof(*e)); + e->interval.tv_sec= interval; + e->interval.tv_usec= 0; + e->fixed_rate= fixed_rate; + e->f= f; + struct timeval now; + xgettimeofday(&now); + every_schedule(e, now); +} + +static void filepoll(void) { + filemon_callback(main_input_file); + filemon_callback(flushing_input_file); +} static char *debug_report_ipf(InputFile *ipf) { if (!ipf) return xasprintf("none"); @@ -2799,7 +2859,7 @@ static char *debug_report_ipf(InputFile *ipf) { ipf->fd, ipf->rd ? "+" : ""); } -EVERY(period, -1,0, ({ +static void period(void) { char *dipf_main= debug_report_ipf(main_input_file); char *dipf_flushing= debug_report_ipf(flushing_input_file); char *dipf_backlog= debug_report_ipf(backlog_input_file); @@ -2826,7 +2886,7 @@ EVERY(period, -1,0, ({ statemc_period_poll(); check_assign_articles(); check_idle_conns(); -})); +} /*========== option parsing ==========*/ @@ -3156,11 +3216,10 @@ int main(int argc, char **argv) { if (!filemon_method_init()) { warn("no file monitoring available, polling"); - filepoll_schedule(); + every(5,0,filepoll); } - period_timeout.tv_sec= period_seconds; - period_schedule(); + every(period_seconds,1,period); statemc_init();