X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=blobdiff_plain;f=backends%2Finnduct.c;h=bc954f41c785666e9c3878ec68294fbb19ace6ba;hp=1fd382835ec9c1355bcd70827585a2bba703e7d1;hb=31d0604e13e16afb3f0da8f8cfeb7282a702f154;hpb=6bae4265aedaee30cb0b92c757dd14d2ed39b6e1 diff --git a/backends/innduct.c b/backends/innduct.c index 1fd3828..bc954f4 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,6 +1,13 @@ /* - * TODO - * - pid, sitename, hostname in lockfile + * todo + * - actually do something with readable on control master + * - option for realsockdir + * - option for filepoll + * - option for no inotify + * - manpage: document control master stuff + * - manpage: innconf is used for communicating with innd + * - debug this: + * build-lfs/backends/innduct --no-daemon -f `pwd`/fee sit dom */ /* @@ -168,6 +175,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #include #include #include +#include #include #include #include @@ -178,9 +186,11 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #include #include #include +#include #include #include #include +#include #include #include @@ -194,6 +204,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #define INNDCOMMCHILD_ESTATUS_NONESUCH 27 #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10) +#define MAX_CONTROL_COMMAND 1000 #define VA va_list al; va_start(al,fmt) #define PRINTF(f,a) __attribute__((__format__(printf,f,a))) @@ -208,7 +219,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. int count; \ } T##List -#define NODE(n) (assert((void*)&(n)->list_node == &(n)), &(n)->list_node) +#define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node) #define LIST_CHECKCANHAVENODE(l,n) \ ((void)((n) == ((l).u.for_type))) /* just for the type check */ @@ -234,7 +245,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead) #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail) -#define LIST_INIT(l) (list_new(&(l).u.li)) +#define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li)) #define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l)))) #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n)))) #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n)))) @@ -287,6 +298,7 @@ static void statemc_check_flushing_done(void); static void statemc_check_backlog_done(void); static void postfork(void); +static void period(void); static void open_defer(void); static void close_defer(void); @@ -309,7 +321,7 @@ static oop_rd_call peer_rd_err, peer_rd_ok; /* when changing defaults, remember to update the manpage */ static const char *sitename, *remote_host; -static const char *feedfile; +static const char *feedfile, *realsockdir="/tmp/innduct.control"; static int quiet_multiple=0; static int become_daemon=1; static int try_stream=1; @@ -463,7 +475,9 @@ struct Conn { static oop_source *loop; static ConnList conns; static ArticleList queue; -static char *path_lock, *path_flushing, *path_defer, *globpat_backlog; +static char *path_lock, *path_flushing, *path_defer, *path_control; +static char *globpat_backlog; +static pid_t self_pid; /* statemc_init initialises */ static StateMachineState sms; @@ -476,6 +490,9 @@ static int until_connect, until_backlog_nextscan; static double accept_proportion; static int nocheck, nocheck_reported; +/* for simulation, debugging, etc. */ +int simulate_flush= -1; + /*========== logging ==========*/ static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3); @@ -484,6 +501,7 @@ static void logcore(int sysloglevel, const char *fmt, ...) { if (become_daemon) { vsyslog(sysloglevel,fmt,al); } else { + if (self_pid) fprintf(stderr,"[%lu] ",(unsigned long)self_pid); vfprintf(stderr,fmt,al); putc('\n',stderr); } @@ -532,7 +550,7 @@ diewrap(fatal, " fatal", LOG_ERR, -1, 12); logwrap(syswarn, " warning", LOG_WARNING, errno); logwrap(warn, " warning", LOG_WARNING, -1); -logwrap(notice, "", LOG_NOTICE, -1); +logwrap(notice, " notice", LOG_NOTICE, -1); logwrap(info, " info", LOG_INFO, -1); logwrap(debug, " debug", LOG_DEBUG, -1); @@ -555,7 +573,7 @@ static char *xasprintf(const char *fmt, ...) { } static int close_perhaps(int *fd) { - if (!*fd) return 0; + if (*fd <= 0) return 0; int r= close(*fd); *fd=0; return r; @@ -565,7 +583,7 @@ static void xclose(int fd, const char *what, const char *what2) { if (r) sysdie("close %s%s",what,what2?what2:""); } static void xclose_perhaps(int *fd, const char *what, const char *what2) { - if (!*fd) return; + if (*fd <= 0) return; xclose(*fd,what,what2); *fd=0; } @@ -614,8 +632,9 @@ static int xwaitpid(pid_t *pid, const char *what) { int r= kill(*pid, SIGKILL); if (r) sysdie("cannot kill %s child", what); - pid_t got= waitpid(*pid, &status, WNOHANG); + pid_t got= waitpid(*pid, &status, 0); if (got==-1) sysdie("cannot reap %s child", what); + if (got==0) die("cannot reap %s child", what); *pid= 0; @@ -633,6 +652,16 @@ static time_t xtime(void) { return now; } +static void xgettimeofday(struct timeval *tv_r) { + int r= gettimeofday(tv_r,0); + if (r) sysdie("gettimeofday(2) failed"); +} + +static void xsetnonblock(int fd, int nonblocking) { + int errnoval= oop_fd_nonblock(fd, nonblocking); + if (errnoval) { errno= errnoval; sysdie("setnonblocking"); } +} + static void check_isreg(const struct stat *stab, const char *path, const char *what) { if (!S_ISREG(stab->st_mode)) @@ -691,6 +720,286 @@ static int isewouldblock(int errnoval) { return errnoval==EWOULDBLOCK || errnoval==EAGAIN; } + +/*========== command and control connections ==========*/ + +static int control_master; + +typedef struct ControlConn ControlConn; +struct ControlConn { + void (*destroy)(ControlConn*); + int fd; + oop_read *rd; + FILE *out; + union { + struct sockaddr sa; + struct sockaddr_un un; + } sa; + socklen_t salen; +}; + +static const oop_rd_style control_rd_style= { + OOP_RD_DELIM_STRIP, '\n', + OOP_RD_NUL_FORBID, + OOP_RD_SHORTREC_FORBID +}; + +static void control_destroy(ControlConn *cc) { + cc->destroy(cc); +} + +static void control_checkouterr(ControlConn *cc /* may destroy*/) { + if (ferror(cc->out) | fflush(cc->out)) { + info("CTRL%d write error %s", cc->fd, strerror(errno)); + control_destroy(cc); + } +} + +static void control_prompt(ControlConn *cc /* may destroy*/) { + fprintf(cc->out, "%s| ", sitename); + control_checkouterr(cc); +} + +typedef struct ControlCommand ControlCommand; +struct ControlCommand { + const char *cmd; + void (*f)(ControlConn *cc, const ControlCommand *ccmd, + const char *arg, size_t argsz); + void *xdata; + int xval; +}; + +static const ControlCommand control_commands[]; + +#define CCMD(wh) \ + static void ccmd_##wh(ControlConn *cc, const ControlCommand *c, \ + const char *arg, size_t argsz) + +CCMD(help) { + fputs("commands:\n", cc->out); + const ControlCommand *ccmd; + for (ccmd=control_commands; ccmd->cmd; ccmd++) + fprintf(cc->out, " %s\n", ccmd->cmd); +} + +CCMD(period) { period(); } +CCMD(setintarg) { *(int*)c->xdata= atoi(arg); } +CCMD(setint) { *(int*)c->xdata= c->xval; } + +static const ControlCommand control_commands[]= { + { "h", ccmd_help }, + { "p", ccmd_period }, + { "pretend flush", ccmd_setintarg, &simulate_flush }, + { "poke sm", ccmd_setint, &sm_period_counter, 1 }, + { "poke conn", ccmd_setint, &until_connect, 0 }, + { "poke blscan", ccmd_setint, &until_backlog_nextscan, 0 }, + { "wedge blscan", ccmd_setint, &until_backlog_nextscan, -1 }, + { 0 } +}; + +static void *control_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, + const char *errmsg, int errnoval, + const char *data, size_t recsz, void *cc_v) { + ControlConn *cc= cc_v; + + if (!data) { + info("CTRL%d closed", cc->fd); + cc->destroy(cc); + return OOP_CONTINUE; + } + + if (recsz == 0) goto prompt; + + const ControlCommand *ccmd; + for (ccmd=control_commands; ccmd->cmd; ccmd++) { + int l= strlen(ccmd->cmd); + if (recsz < l) continue; + if (recsz > l && data[l] != ' ') continue; + if (memcmp(data, ccmd->cmd, l)) continue; + + int argl= (int)recsz - (l+1); + ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl); + goto prompt; + } + + fputs("unknown command; h for help\n", cc->out); + + prompt: + control_prompt(cc); + return OOP_CONTINUE; +} + +static void *control_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev, + const char *errmsg, int errnoval, + const char *data, size_t recsz, void *cc_v) { + ControlConn *cc= cc_v; + + info("CTRL%d read error %s", cc->fd, errmsg); + cc->destroy(cc); + return OOP_CONTINUE; +} + +static int control_conn_startup(ControlConn *cc /* may destroy*/, + const char *how) { + cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0); + if (!cc->rd) { warn("oop_rd_new_fd control failed"); return -1; } + + int er= oop_rd_read(cc->rd, &control_rd_style, MAX_CONTROL_COMMAND, + control_rd_ok, cc, + control_rd_err, cc); + if (er) { errno= er; syswarn("oop_rd_read control failed"); return -1; } + + info("CTRL%d %s ready", cc->fd, how); + control_prompt(cc); + return 0; +} + +static void control_stdio_destroy(ControlConn *cc) { + if (cc->rd) { + oop_rd_cancel(cc->rd); + errno= oop_rd_delete_tidy(cc->rd); + if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)"); + } + free(cc); +} + +static void control_stdio(void) { + ControlConn *cc= xmalloc(sizeof(*cc)); + memset(cc,0,sizeof(*cc)); + cc->destroy= control_stdio_destroy; + + cc->fd= 0; + cc->out= stdout; + int r= control_conn_startup(cc,"stdio"); + if (r) cc->destroy(cc); +} + +static void control_accepted_destroy(ControlConn *cc) { + if (cc->rd) { + oop_rd_cancel(cc->rd); + oop_rd_delete_kill(cc->rd); + } + if (cc->out) { fclose(cc->out); cc->fd=0; } + close_perhaps(&cc->fd); + free(cc); +} + +static void *control_master_readable(oop_source *lp, int master, + oop_event ev, void *u) { + ControlConn *cc= xmalloc(sizeof(*cc)); + memset(cc,0,sizeof(*cc)); + cc->destroy= control_accepted_destroy; + + cc->salen= sizeof(cc->sa); + cc->fd= accept(master, &cc->sa.sa, &cc->salen); + if (cc->fd<0) { syswarn("error accepting control connection"); goto x; } + + cc->out= fdopen(cc->fd, "w"); + if (!cc->out) { syswarn("error fdopening accepted control conn"); goto x; } + + int r= control_conn_startup(cc, "accepted"); + if (r) goto x; + + return OOP_CONTINUE; + + x: + cc->destroy(cc); + return OOP_CONTINUE; +} + +#define NOCONTROL(...) do{ \ + syswarn("no control socket, because failed to " __VA_ARGS__); \ + goto nocontrol; \ + }while(0) + +static void control_init(void) { + char *real=0; + + union { + struct sockaddr sa; + struct sockaddr_un un; + } sa; + + memset(&sa,0,sizeof(sa)); + int maxlen= sizeof(sa.un.sun_path); + + int reallen= readlink(path_control, sa.un.sun_path, maxlen); + if (reallen<0) { + if (errno != ENOENT) + NOCONTROL("readlink control socket symlink path %s", path_control); + } + if (reallen >= maxlen) { + debug("control socket symlink path too long (r=%d)",reallen); + xunlink(path_control, "old (overlong) control socket symlink"); + reallen= -1; + } + + if (reallen<0) { + struct stat stab; + int r= lstat(realsockdir,&stab); + if (r) { + if (errno != ENOENT) NOCONTROL("lstat real socket dir %s", realsockdir); + + r= mkdir(realsockdir, 0700); + if (r) NOCONTROL("mkdir real socket dir %s", realsockdir); + + } else { + uid_t self= geteuid(); + if (!S_ISDIR(stab.st_mode) || + stab.st_uid != self || + stab.st_mode & 0077) { + warn("no control socket, because real socket directory" + " is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)", + !!S_ISDIR(stab.st_mode), + (unsigned long)stab.st_uid, (unsigned long)self, + (unsigned long)stab.st_mode & 0777UL); + goto nocontrol; + } + } + + real= xasprintf("%s/s%lx.%lx", realsockdir, + (unsigned long)xtime(), (unsigned long)self_pid); + int reallen= strlen(real); + + if (reallen >= maxlen) { + warn("no control socket, because tmpnam gave overly-long path" + " %s", real); + goto nocontrol; + } + r= symlink(real, path_control); + if (r) NOCONTROL("make control socket path %s a symlink to real" + " socket path %s", path_control, real); + memcpy(sa.un.sun_path, real, reallen); + } + + int r= unlink(sa.un.sun_path); + if (r && errno!=ENOENT) + NOCONTROL("remove old real socket %s", sa.un.sun_path); + + control_master= socket(PF_UNIX, SOCK_STREAM, 0); + if (control_master<0) NOCONTROL("create new control socket"); + + sa.un.sun_family= AF_UNIX; + int sl= strlen(sa.un.sun_path) + offsetof(struct sockaddr_un, sun_path); + r= bind(control_master, &sa.sa, sl); + if (r) NOCONTROL("bind to real socket path %s", sa.un.sun_path); + + r= listen(control_master, 5); + if (r) NOCONTROL("listen"); + + xsetnonblock(control_master, 1); + + loop->on_fd(loop, control_master, OOP_READ, control_master_readable, 0); + info("control socket ok, real path %s", sa.un.sun_path); + + return; + + nocontrol: + free(real); + xclose_perhaps(&control_master, "control master",0); + return; +} + /*========== management of connections ==========*/ static void conn_closefd(Conn *conn, const char *msgprefix) { @@ -815,10 +1124,7 @@ static int connecting_fdpass_sock; static void connect_attempt_discard(void) { if (connecting_child) { - int r= kill(connecting_child, SIGTERM); - if (r) syswarn("failed to kill connecting child"); int status= xwaitpid(&connecting_child, "connect"); - if (!(WIFEXITED(status) || (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) report_child_status("connect", status); @@ -830,9 +1136,15 @@ static void connect_attempt_discard(void) { } #define PREP_DECL_MSG_CMSG(msg) \ + char msgbyte= 0; \ + struct iovec msgiov; \ + msgiov.iov_base= &msgbyte; \ + msgiov.iov_len= 1; \ struct msghdr msg; \ memset(&msg,0,sizeof(msg)); \ - char msg##cbuf[CMSG_SPACE(sizeof(fd))]; \ + char msg##cbuf[CMSG_SPACE(sizeof(int))]; \ + msg.msg_iov= &msgiov; \ + msg.msg_iovlen= 1; \ msg.msg_control= msg##cbuf; \ msg.msg_controllen= sizeof(msg##cbuf); @@ -841,42 +1153,45 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { assert(fd == connecting_fdpass_sock); + PREP_DECL_MSG_CMSG(msg); + + ssize_t rs= recvmsg(fd, &msg, 0); + if (rs<0) { + if (isewouldblock(errno)) return OOP_CONTINUE; + syswarn("failed to read socket from connecting child"); + goto x; + } + conn= xmalloc(sizeof(*conn)); memset(conn,0,sizeof(*conn)); + LIST_INIT(conn->waiting); + LIST_INIT(conn->priority); + LIST_INIT(conn->sent); - PREP_DECL_MSG_CMSG(msg); struct cmsghdr *h= 0; - ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT); if (rs >= 0) h= CMSG_FIRSTHDR(&msg); if (!h) { - int status; - pid_t got= waitpid(connecting_child, &status, WNOHANG); - if (got != -1) { - assert(got==connecting_child); - connecting_child= 0; - if (WIFEXITED(status)) { - if (WEXITSTATUS(status) != 0 && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM) - /* child already reported the problem */; - else - warn("connect: connection child exited code %d but no cmsg", + int status= xwaitpid(&connecting_child, "connect child (broken)"); + + if (WIFEXITED(status)) { + if (WEXITSTATUS(status) != 0 && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM) + /* child already reported the problem */; + else { + if (e == OOP_EXCEPTION) + warn("connect: connection child exited code %d but" + " unexpected exception on fdpass socket", WEXITSTATUS(status)); - } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { - warn("connect: connection attempt timed out"); - } else { - report_child_status("connect", status); + else + warn("connect: connection child exited code %d but" + " no cmsg (rs=%d)", + WEXITSTATUS(status), (int)rs); } + } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { + warn("connect: connection attempt timed out"); } else { - /* child is still running apparently, report the socket problem */ - if (rs < 0) - syswarn("connect: read from fdpass socket failed"); - else if (e == OOP_EXCEPTION) - warn("connect: unexpected exception on fdpass socket"); - else if (!rs) - warn("connect: unexpected EOF on fdpass socket"); - else - fatal("connect: unexpected lack of cmsg from child"); + report_child_status("connect", status); } goto x; } @@ -912,14 +1227,11 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { } /* Phew! */ - LIST_INIT(conn->waiting); - LIST_INIT(conn->priority); - LIST_INIT(conn->sent); conn->max_queue= conn->stream ? max_queue_per_conn : 1; loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn); conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */ - if (!conn->fd) sysdie("oop_rd_new_fd (fd=%d)",conn->fd); + if (!conn->fd) die("oop_rd_new_fd conn failed (fd=%d)",conn->fd); int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN, &peer_rd_ok, conn, &peer_rd_err, conn); @@ -948,7 +1260,7 @@ static void connect_start(void) { assert(!connecting_child); assert(!connecting_fdpass_sock); - notice("starting connection attempt"); + info("starting connection attempt"); int socks[2]; int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks); @@ -965,13 +1277,26 @@ static void connect_start(void) { alarm(connection_setup_timeout); if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) { - if (buf[0]) fatal("connect: rejected: %s", sanitise(buf)); - else sysfatal("connect: connection attempt failed"); + int l= strlen(buf); + int stripped=0; + while (l>0) { + unsigned char c= buf[l-1]; + if (!isspace(c)) break; + if (c=='\n' || c=='\r') stripped=1; + --l; + } + if (!buf[0]) { + sysfatal("connect: connection attempt failed"); + } else { + buf[l]= 0; + fatal("connect: %s: %s", stripped ? "rejected" : "failed", + sanitise(buf)); + } } if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0) sysfatal("connect: authentication failed"); if (try_stream) { - if (fputs("MODE STREAM\r\n", cn_to) || + if (fputs("MODE STREAM\r\n", cn_to)==EOF || fflush(cn_to)) sysfatal("connect: could not send MODE STREAM"); buf[sizeof(buf)-1]= 0; @@ -983,7 +1308,7 @@ static void connect_start(void) { } int l= strlen(buf); assert(l>=1); - if (buf[-1]!='\n') + if (buf[l-1]!='\n') fatal("connect: response to MODE STREAM is too long: %.100s...", sanitise(buf)); l--; if (l>0 && buf[l-1]=='\r') l--; @@ -1018,13 +1343,15 @@ static void connect_start(void) { msg.msg_controllen= cmsg->cmsg_len; r= sendmsg(socks[1], &msg, 0); - if (r) sysdie("sendmsg failed for new connection"); + if (r<0) sysdie("sendmsg failed for new connection"); + if (r!=1) die("sendmsg for new connection gave wrong result %d",r); _exit(exitstatus); } xclose(socks[1], "connecting fdpass child's socket",0); connecting_fdpass_sock= socks[0]; + xsetnonblock(connecting_fdpass_sock, 1); on_fd_read_except(connecting_fdpass_sock, connchild_event); } @@ -1058,6 +1385,7 @@ static void check_assign_articles(void) { if (!inqueue) use->since_activity= 0; /* reset idle counter */ while (spare>0) { Article *art= LIST_REMHEAD(queue); + if (!art) break; LIST_ADDTAIL(use->waiting, art); spare--; } @@ -1488,7 +1816,7 @@ static void close_input_file(InputFile *ipf) { /* does not free */ static void *feedfile_got_bad_data(InputFile *ipf, off_t offset, const char *data, const char *how) { - warn("corrupted file: %s, offset %lu: %s: %s", + warn("corrupted file: %s, offset %lu: %s: in %s", ipf->path, (unsigned long)offset, how, sanitise(data)); ipf->readcount_err++; if (ipf->readcount_err > max_bad_data_initial + @@ -1635,6 +1963,7 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, abort(); } } + tailing_queue_readable(ipf); return r; } } @@ -1749,7 +2078,8 @@ static void filemon_stop(InputFile *ipf) { } static void filemon_callback(InputFile *ipf) { - ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user); + if (ipf && ipf->readable_callback) /* so filepoll() can be naive */ + ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user); } /*---------- interface to start and stop an input file ----------*/ @@ -1895,15 +2225,9 @@ static void startup_set_input_file(InputFile *f) { inputfile_reading_start(f); } -static void statemc_init(void) { - struct stat stab, stabf; - - path_lock= xasprintf("%s_lock", feedfile); - path_flushing= xasprintf("%s_flushing", feedfile); - path_defer= xasprintf("%s_defer", feedfile); - globpat_backlog= xasprintf("%s_backlog*", feedfile); - +static void statemc_lock(void) { int lockfd; + struct stat stab, stabf; for (;;) { lockfd= open(path_lock, O_CREAT|O_RDWR, 0600); @@ -1931,8 +2255,6 @@ static void statemc_init(void) { xclose(lockfd, "stale lockfile ", path_lock); } - pid_t self= getpid(); - if (self==-1) sysdie("getpid"); FILE *lockfile= fdopen(lockfd, "w"); if (!lockfile) sysdie("fdopen lockfile"); @@ -1941,21 +2263,26 @@ static void statemc_init(void) { if (r) sysdie("truncate lockfile to write new info"); if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n", - (unsigned long)self, sitename, feedfile, remote_host) == EOF || + (unsigned long)self_pid, + sitename, feedfile, remote_host) == EOF || fflush(lockfile)) sysfatal("write info to lockfile %s", path_lock); debug("startup: locked"); +} + +static void statemc_init(void) { + struct stat stabdefer; search_backlog_file(); int defer_noent; - xlstat_isreg(path_defer, &stab, &defer_noent, "defer file"); + xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file"); if (defer_noent) { debug("startup: ductdefer ENOENT"); } else { - debug("startup: ductdefer nlink=%ld", (long)stab.st_nlink); - switch (stab.st_nlink==1) { + debug("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink); + switch (stabdefer.st_nlink==1) { case 1: open_defer(); /* so that we will later close it and rename it */ break; @@ -1965,7 +2292,7 @@ static void statemc_init(void) { break; default: die("defer file %s has unexpected link count %d", - path_defer, stab.st_nlink); + path_defer, stabdefer.st_nlink); } } @@ -1998,7 +2325,7 @@ static void statemc_init(void) { InputFile *file_f= open_input_file(feedfile); if (!file_f) die("feed file vanished during startup"); startup_set_input_file(file_f); - SMS(NORMAL, flushfail_retry_periods, "normal startup"); + SMS(NORMAL, spontaneous_flush_periods, "normal startup"); } } } @@ -2154,9 +2481,9 @@ static void statemc_setstate(StateMachineState newsms, int periods, } if (periods) { - info("%s%s[%d] %s",forlog,xtra,periods,why); + info("state %s%s[%d] %s",forlog,xtra,periods,why); } else { - info("%s%s %s",forlog,xtra,why); + info("state %s%s %s",forlog,xtra,why); } } @@ -2305,7 +2632,12 @@ static void search_backlog_file(void) { if (sms==sm_DROPPED) { notice("feed dropped and our work is complete"); - xunlink(path_lock, "lockfile for old feed"); + + int r= unlink(path_control); + if (r && errno!=ENOENT) + syswarn("failed to remove control symlink for old feed"); + + xunlink(path_lock, "lockfile for old feed"); exit(4); } until_backlog_nextscan= backlog_spontrescan_periods; @@ -2354,8 +2686,11 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { assert(inndcomm_child); assert(fd == inndcomm_sentinel_fd); int status= xwaitpid(&inndcomm_child, "inndcomm"); + inndcomm_child= 0; + cancel_fd_read_except(fd); xclose_perhaps(&fd, "inndcomm sentinel pipe",0); + inndcomm_sentinel_fd= 0; assert(!flushing_input_file); @@ -2366,7 +2701,7 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { goto failed; case INNDCOMMCHILD_ESTATUS_NONESUCH: - warn("feed has been dropped by innd, finishing up"); + notice("feed has been dropped by innd, finishing up"); flushing_input_file= main_input_file; tailing_queue_readable(flushing_input_file); /* we probably previously returned EAGAIN from our fake read method @@ -2444,6 +2779,12 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0); /* parent spots the autoclose of pipefds[1] when we die or exit */ + if (simulate_flush>=0) { + warn("SIMULATING flush child status %d", simulate_flush); + if (simulate_flush>128) raise(simulate_flush-128); + else exit(simulate_flush); + } + alarm(inndcomm_flush_timeout); r= ICCopen(); if (r) inndcommfail("connect"); r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit"); @@ -2454,6 +2795,8 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ exit(INNDCOMMCHILD_ESTATUS_FAIL); } + simulate_flush= -1; + xclose(pipefds[1], "inndcomm sentinel child's end",0); inndcomm_sentinel_fd= pipefds[0]; assert(inndcomm_sentinel_fd); @@ -2489,25 +2832,47 @@ static void postfork(void) { postfork_stdio(defer, "defer file ", path_defer); } -#define EVERY(what, interval_sec, interval_usec, body) \ - static struct timeval what##_timeout = { interval_sec, interval_usec }; \ - static void what##_schedule(void); \ - static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \ - body; \ - what##_schedule(); \ - return OOP_CONTINUE; \ - } \ - static void what##_schedule(void) { \ - loop->on_time(loop, what##_timeout, what##_timedout, 0); \ - } +typedef struct Every Every; +struct Every { + struct timeval interval; + int fixed_rate; + void (*f)(void); +}; -EVERY(filepoll, 5,0, ({ - if (main_input_file && main_input_file->readable_callback) - filemon_callback(main_input_file); -})); +static void every_schedule(Every *e, struct timeval base); + +static void *every_happens(oop_source *lp, struct timeval base, void *e_v) { + Every *e= e_v; + e->f(); + if (!e->fixed_rate) xgettimeofday(&base); + every_schedule(e, base); + return OOP_CONTINUE; +} + +static void every_schedule(Every *e, struct timeval base) { + struct timeval when; + timeradd(&base, &e->interval, &when); + loop->on_time(loop, when, every_happens, e); +} + +static void every(int interval, int fixed_rate, void (*f)(void)) { + Every *e= xmalloc(sizeof(*e)); + e->interval.tv_sec= interval; + e->interval.tv_usec= 0; + e->fixed_rate= fixed_rate; + e->f= f; + struct timeval now; + xgettimeofday(&now); + every_schedule(e, now); +} + +static void filepoll(void) { + filemon_callback(main_input_file); + filemon_callback(flushing_input_file); +} static char *debug_report_ipf(InputFile *ipf) { - if (!ipf) return xasprintf("-"); + if (!ipf) return xasprintf("none"); const char *slash= strrchr(ipf->path,'/'); const char *path= slash ? slash+1 : ipf->path; @@ -2518,7 +2883,7 @@ static char *debug_report_ipf(InputFile *ipf) { ipf->fd, ipf->rd ? "+" : ""); } -EVERY(period, -1,0, ({ +static void period(void) { char *dipf_main= debug_report_ipf(main_input_file); char *dipf_flushing= debug_report_ipf(flushing_input_file); char *dipf_backlog= debug_report_ipf(backlog_input_file); @@ -2526,7 +2891,7 @@ EVERY(period, -1,0, ({ debug("PERIOD" " sms=%s[%d] conns=%d queue=%d until_connect=%d" " input_files main:%s old:%s flushing:%s" - " children connecting=%ld inndcomm_child=%ld" + " children connecting=%ld inndcomm=%ld" , sms_names[sms], sm_period_counter, conns.count, queue.count, until_connect, @@ -2545,7 +2910,7 @@ EVERY(period, -1,0, ({ statemc_period_poll(); check_assign_articles(); check_idle_conns(); -})); +} /*========== option parsing ==========*/ @@ -2718,7 +3083,7 @@ static const Option innduct_options[]= { {'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 }, {0,"no-daemon", 0, &become_daemon, op_setint, 0 }, {0,"no-streaming", 0, &try_stream, op_setint, 0 }, -{0,"inndconf", "F", &inndconffile, op_string }, +{'C',"inndconf", "F", &inndconffile, op_string }, {'P',"port", "PORT", &port, op_integer }, {0,"help", 0, 0, help }, @@ -2783,6 +3148,9 @@ int main(int argc, char **argv) { /* defaults */ + int r= innconf_read(inndconffile); + if (!r) badusage("could not read inn.conf (more info on stderr)"); + if (!remote_host) remote_host= sitename; if (nocheck_thresh < 0 || nocheck_thresh > 100) @@ -2805,7 +3173,6 @@ int main(int argc, char **argv) { max_bad_data_ratio *= 0.01; if (!feedfile) { - innconf_read(inndconffile); feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename); } else if (!feedfile[0]) { badusage("feed filename must be nonempty"); @@ -2821,6 +3188,12 @@ int main(int argc, char **argv) { /* set things up */ + path_lock= xasprintf("%s_lock", feedfile); + path_flushing= xasprintf("%s_flushing", feedfile); + path_defer= xasprintf("%s_defer", feedfile); + path_control= xasprintf("%s_control", feedfile); + globpat_backlog= xasprintf("%s_backlog*", feedfile); + oop_source_sys *sysloop= oop_sys_new(); if (!sysloop) sysdie("could not create liboop event loop"); loop= (oop_source*)sysloop; @@ -2855,21 +3228,30 @@ int main(int argc, char **argv) { if (child2) _exit(0); } + self_pid= getpid(); + if (self_pid==-1) sysdie("getpid"); + + statemc_lock(); + notice("starting"); + if (!become_daemon) + control_stdio(); + + control_init(); + if (!filemon_method_init()) { warn("no file monitoring available, polling"); - filepoll_schedule(); + every(5,0,filepoll); } - period_timeout.tv_sec= period_seconds; - period_schedule(); + every(period_seconds,1,period); statemc_init(); /* let's go */ - void *r= oop_sys_run(sysloop); - assert(r == OOP_ERROR); + void *run= oop_sys_run(sysloop); + assert(run == OOP_ERROR); sysdie("event loop failed"); }