X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;f=backends%2Finnduct.c;h=bc954f41c785666e9c3878ec68294fbb19ace6ba;hb=31d0604e13e16afb3f0da8f8cfeb7282a702f154;hp=8e599f56026835595780ad3a28725754699fd06f;hpb=fc66b511debe93980fd49331d96939fb876024da;p=inn-innduct.git diff --git a/backends/innduct.c b/backends/innduct.c index 8e599f5..bc954f4 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,10 +1,13 @@ /* - * TODO - * - actually implement badusage - * - options for all options - * - manpage - * - pid, sitename, hostname in lockfile - * - -k kill mode ? + * todo + * - actually do something with readable on control master + * - option for realsockdir + * - option for filepoll + * - option for no inotify + * - manpage: document control master stuff + * - manpage: innconf is used for communicating with innd + * - debug this: + * build-lfs/backends/innduct --no-daemon -f `pwd`/fee sit dom */ /* @@ -172,6 +175,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #include #include #include +#include #include #include #include @@ -182,22 +186,25 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #include #include #include +#include #include #include #include +#include #include #include /*----- general definitions, probably best not changed -----*/ -#define CONNCHILD_ESTATUS_STREAM 4 -#define CONNCHILD_ESTATUS_NOSTREAM 5 +#define CONNCHILD_ESTATUS_STREAM 24 +#define CONNCHILD_ESTATUS_NOSTREAM 25 -#define INNDCOMMCHILD_ESTATUS_FAIL 6 -#define INNDCOMMCHILD_ESTATUS_NONESUCH 7 +#define INNDCOMMCHILD_ESTATUS_FAIL 26 +#define INNDCOMMCHILD_ESTATUS_NONESUCH 27 #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10) +#define MAX_CONTROL_COMMAND 1000 #define VA va_list al; va_start(al,fmt) #define PRINTF(f,a) __attribute__((__format__(printf,f,a))) @@ -212,7 +219,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. int count; \ } T##List -#define NODE(n) (assert((void*)&(n)->list_node == &(n)), &(n)->list_node) +#define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node) #define LIST_CHECKCANHAVENODE(l,n) \ ((void)((n) == ((l).u.for_type))) /* just for the type check */ @@ -238,7 +245,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead) #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail) -#define LIST_INIT(l) (list_new(&(l).u.li)) +#define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li)) #define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l)))) #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n)))) #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n)))) @@ -291,6 +298,7 @@ static void statemc_check_flushing_done(void); static void statemc_check_backlog_done(void); static void postfork(void); +static void period(void); static void open_defer(void); static void close_defer(void); @@ -310,9 +318,10 @@ static const oop_rd_style peer_rd_style; static oop_rd_call peer_rd_err, peer_rd_ok; /*----- configuration options -----*/ +/* when changing defaults, remember to update the manpage */ static const char *sitename, *remote_host; -static const char *feedfile; +static const char *feedfile, *realsockdir="/tmp/innduct.control"; static int quiet_multiple=0; static int become_daemon=1; static int try_stream=1; @@ -466,7 +475,9 @@ struct Conn { static oop_source *loop; static ConnList conns; static ArticleList queue; -static char *path_lock, *path_flushing, *path_defer, *globpat_backlog; +static char *path_lock, *path_flushing, *path_defer, *path_control; +static char *globpat_backlog; +static pid_t self_pid; /* statemc_init initialises */ static StateMachineState sms; @@ -479,6 +490,9 @@ static int until_connect, until_backlog_nextscan; static double accept_proportion; static int nocheck, nocheck_reported; +/* for simulation, debugging, etc. */ +int simulate_flush= -1; + /*========== logging ==========*/ static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3); @@ -487,6 +501,7 @@ static void logcore(int sysloglevel, const char *fmt, ...) { if (become_daemon) { vsyslog(sysloglevel,fmt,al); } else { + if (self_pid) fprintf(stderr,"[%lu] ",(unsigned long)self_pid); vfprintf(stderr,fmt,al); putc('\n',stderr); } @@ -535,7 +550,7 @@ diewrap(fatal, " fatal", LOG_ERR, -1, 12); logwrap(syswarn, " warning", LOG_WARNING, errno); logwrap(warn, " warning", LOG_WARNING, -1); -logwrap(notice, "", LOG_NOTICE, -1); +logwrap(notice, " notice", LOG_NOTICE, -1); logwrap(info, " info", LOG_INFO, -1); logwrap(debug, " debug", LOG_DEBUG, -1); @@ -558,7 +573,7 @@ static char *xasprintf(const char *fmt, ...) { } static int close_perhaps(int *fd) { - if (!*fd) return 0; + if (*fd <= 0) return 0; int r= close(*fd); *fd=0; return r; @@ -568,7 +583,7 @@ static void xclose(int fd, const char *what, const char *what2) { if (r) sysdie("close %s%s",what,what2?what2:""); } static void xclose_perhaps(int *fd, const char *what, const char *what2) { - if (!*fd) return; + if (*fd <= 0) return; xclose(*fd,what,what2); *fd=0; } @@ -577,7 +592,7 @@ static pid_t xfork(const char *what) { pid_t child; child= fork(); - if (child==-1) sysdie("cannot fork for %s",what); + if (child==-1) sysfatal("cannot fork for %s",what); debug("forked %s %ld", what, (unsigned long)child); if (!child) postfork(); return child; @@ -617,8 +632,9 @@ static int xwaitpid(pid_t *pid, const char *what) { int r= kill(*pid, SIGKILL); if (r) sysdie("cannot kill %s child", what); - pid_t got= waitpid(*pid, &status, WNOHANG); + pid_t got= waitpid(*pid, &status, 0); if (got==-1) sysdie("cannot reap %s child", what); + if (got==0) die("cannot reap %s child", what); *pid= 0; @@ -636,6 +652,16 @@ static time_t xtime(void) { return now; } +static void xgettimeofday(struct timeval *tv_r) { + int r= gettimeofday(tv_r,0); + if (r) sysdie("gettimeofday(2) failed"); +} + +static void xsetnonblock(int fd, int nonblocking) { + int errnoval= oop_fd_nonblock(fd, nonblocking); + if (errnoval) { errno= errnoval; sysdie("setnonblocking"); } +} + static void check_isreg(const struct stat *stab, const char *path, const char *what) { if (!S_ISREG(stab->st_mode)) @@ -694,6 +720,286 @@ static int isewouldblock(int errnoval) { return errnoval==EWOULDBLOCK || errnoval==EAGAIN; } + +/*========== command and control connections ==========*/ + +static int control_master; + +typedef struct ControlConn ControlConn; +struct ControlConn { + void (*destroy)(ControlConn*); + int fd; + oop_read *rd; + FILE *out; + union { + struct sockaddr sa; + struct sockaddr_un un; + } sa; + socklen_t salen; +}; + +static const oop_rd_style control_rd_style= { + OOP_RD_DELIM_STRIP, '\n', + OOP_RD_NUL_FORBID, + OOP_RD_SHORTREC_FORBID +}; + +static void control_destroy(ControlConn *cc) { + cc->destroy(cc); +} + +static void control_checkouterr(ControlConn *cc /* may destroy*/) { + if (ferror(cc->out) | fflush(cc->out)) { + info("CTRL%d write error %s", cc->fd, strerror(errno)); + control_destroy(cc); + } +} + +static void control_prompt(ControlConn *cc /* may destroy*/) { + fprintf(cc->out, "%s| ", sitename); + control_checkouterr(cc); +} + +typedef struct ControlCommand ControlCommand; +struct ControlCommand { + const char *cmd; + void (*f)(ControlConn *cc, const ControlCommand *ccmd, + const char *arg, size_t argsz); + void *xdata; + int xval; +}; + +static const ControlCommand control_commands[]; + +#define CCMD(wh) \ + static void ccmd_##wh(ControlConn *cc, const ControlCommand *c, \ + const char *arg, size_t argsz) + +CCMD(help) { + fputs("commands:\n", cc->out); + const ControlCommand *ccmd; + for (ccmd=control_commands; ccmd->cmd; ccmd++) + fprintf(cc->out, " %s\n", ccmd->cmd); +} + +CCMD(period) { period(); } +CCMD(setintarg) { *(int*)c->xdata= atoi(arg); } +CCMD(setint) { *(int*)c->xdata= c->xval; } + +static const ControlCommand control_commands[]= { + { "h", ccmd_help }, + { "p", ccmd_period }, + { "pretend flush", ccmd_setintarg, &simulate_flush }, + { "poke sm", ccmd_setint, &sm_period_counter, 1 }, + { "poke conn", ccmd_setint, &until_connect, 0 }, + { "poke blscan", ccmd_setint, &until_backlog_nextscan, 0 }, + { "wedge blscan", ccmd_setint, &until_backlog_nextscan, -1 }, + { 0 } +}; + +static void *control_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, + const char *errmsg, int errnoval, + const char *data, size_t recsz, void *cc_v) { + ControlConn *cc= cc_v; + + if (!data) { + info("CTRL%d closed", cc->fd); + cc->destroy(cc); + return OOP_CONTINUE; + } + + if (recsz == 0) goto prompt; + + const ControlCommand *ccmd; + for (ccmd=control_commands; ccmd->cmd; ccmd++) { + int l= strlen(ccmd->cmd); + if (recsz < l) continue; + if (recsz > l && data[l] != ' ') continue; + if (memcmp(data, ccmd->cmd, l)) continue; + + int argl= (int)recsz - (l+1); + ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl); + goto prompt; + } + + fputs("unknown command; h for help\n", cc->out); + + prompt: + control_prompt(cc); + return OOP_CONTINUE; +} + +static void *control_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev, + const char *errmsg, int errnoval, + const char *data, size_t recsz, void *cc_v) { + ControlConn *cc= cc_v; + + info("CTRL%d read error %s", cc->fd, errmsg); + cc->destroy(cc); + return OOP_CONTINUE; +} + +static int control_conn_startup(ControlConn *cc /* may destroy*/, + const char *how) { + cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0); + if (!cc->rd) { warn("oop_rd_new_fd control failed"); return -1; } + + int er= oop_rd_read(cc->rd, &control_rd_style, MAX_CONTROL_COMMAND, + control_rd_ok, cc, + control_rd_err, cc); + if (er) { errno= er; syswarn("oop_rd_read control failed"); return -1; } + + info("CTRL%d %s ready", cc->fd, how); + control_prompt(cc); + return 0; +} + +static void control_stdio_destroy(ControlConn *cc) { + if (cc->rd) { + oop_rd_cancel(cc->rd); + errno= oop_rd_delete_tidy(cc->rd); + if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)"); + } + free(cc); +} + +static void control_stdio(void) { + ControlConn *cc= xmalloc(sizeof(*cc)); + memset(cc,0,sizeof(*cc)); + cc->destroy= control_stdio_destroy; + + cc->fd= 0; + cc->out= stdout; + int r= control_conn_startup(cc,"stdio"); + if (r) cc->destroy(cc); +} + +static void control_accepted_destroy(ControlConn *cc) { + if (cc->rd) { + oop_rd_cancel(cc->rd); + oop_rd_delete_kill(cc->rd); + } + if (cc->out) { fclose(cc->out); cc->fd=0; } + close_perhaps(&cc->fd); + free(cc); +} + +static void *control_master_readable(oop_source *lp, int master, + oop_event ev, void *u) { + ControlConn *cc= xmalloc(sizeof(*cc)); + memset(cc,0,sizeof(*cc)); + cc->destroy= control_accepted_destroy; + + cc->salen= sizeof(cc->sa); + cc->fd= accept(master, &cc->sa.sa, &cc->salen); + if (cc->fd<0) { syswarn("error accepting control connection"); goto x; } + + cc->out= fdopen(cc->fd, "w"); + if (!cc->out) { syswarn("error fdopening accepted control conn"); goto x; } + + int r= control_conn_startup(cc, "accepted"); + if (r) goto x; + + return OOP_CONTINUE; + + x: + cc->destroy(cc); + return OOP_CONTINUE; +} + +#define NOCONTROL(...) do{ \ + syswarn("no control socket, because failed to " __VA_ARGS__); \ + goto nocontrol; \ + }while(0) + +static void control_init(void) { + char *real=0; + + union { + struct sockaddr sa; + struct sockaddr_un un; + } sa; + + memset(&sa,0,sizeof(sa)); + int maxlen= sizeof(sa.un.sun_path); + + int reallen= readlink(path_control, sa.un.sun_path, maxlen); + if (reallen<0) { + if (errno != ENOENT) + NOCONTROL("readlink control socket symlink path %s", path_control); + } + if (reallen >= maxlen) { + debug("control socket symlink path too long (r=%d)",reallen); + xunlink(path_control, "old (overlong) control socket symlink"); + reallen= -1; + } + + if (reallen<0) { + struct stat stab; + int r= lstat(realsockdir,&stab); + if (r) { + if (errno != ENOENT) NOCONTROL("lstat real socket dir %s", realsockdir); + + r= mkdir(realsockdir, 0700); + if (r) NOCONTROL("mkdir real socket dir %s", realsockdir); + + } else { + uid_t self= geteuid(); + if (!S_ISDIR(stab.st_mode) || + stab.st_uid != self || + stab.st_mode & 0077) { + warn("no control socket, because real socket directory" + " is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)", + !!S_ISDIR(stab.st_mode), + (unsigned long)stab.st_uid, (unsigned long)self, + (unsigned long)stab.st_mode & 0777UL); + goto nocontrol; + } + } + + real= xasprintf("%s/s%lx.%lx", realsockdir, + (unsigned long)xtime(), (unsigned long)self_pid); + int reallen= strlen(real); + + if (reallen >= maxlen) { + warn("no control socket, because tmpnam gave overly-long path" + " %s", real); + goto nocontrol; + } + r= symlink(real, path_control); + if (r) NOCONTROL("make control socket path %s a symlink to real" + " socket path %s", path_control, real); + memcpy(sa.un.sun_path, real, reallen); + } + + int r= unlink(sa.un.sun_path); + if (r && errno!=ENOENT) + NOCONTROL("remove old real socket %s", sa.un.sun_path); + + control_master= socket(PF_UNIX, SOCK_STREAM, 0); + if (control_master<0) NOCONTROL("create new control socket"); + + sa.un.sun_family= AF_UNIX; + int sl= strlen(sa.un.sun_path) + offsetof(struct sockaddr_un, sun_path); + r= bind(control_master, &sa.sa, sl); + if (r) NOCONTROL("bind to real socket path %s", sa.un.sun_path); + + r= listen(control_master, 5); + if (r) NOCONTROL("listen"); + + xsetnonblock(control_master, 1); + + loop->on_fd(loop, control_master, OOP_READ, control_master_readable, 0); + info("control socket ok, real path %s", sa.un.sun_path); + + return; + + nocontrol: + free(real); + xclose_perhaps(&control_master, "control master",0); + return; +} + /*========== management of connections ==========*/ static void conn_closefd(Conn *conn, const char *msgprefix) { @@ -818,10 +1124,7 @@ static int connecting_fdpass_sock; static void connect_attempt_discard(void) { if (connecting_child) { - int r= kill(connecting_child, SIGTERM); - if (r) syswarn("failed to kill connecting child"); int status= xwaitpid(&connecting_child, "connect"); - if (!(WIFEXITED(status) || (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) report_child_status("connect", status); @@ -833,9 +1136,15 @@ static void connect_attempt_discard(void) { } #define PREP_DECL_MSG_CMSG(msg) \ + char msgbyte= 0; \ + struct iovec msgiov; \ + msgiov.iov_base= &msgbyte; \ + msgiov.iov_len= 1; \ struct msghdr msg; \ memset(&msg,0,sizeof(msg)); \ - char msg##cbuf[CMSG_SPACE(sizeof(fd))]; \ + char msg##cbuf[CMSG_SPACE(sizeof(int))]; \ + msg.msg_iov= &msgiov; \ + msg.msg_iovlen= 1; \ msg.msg_control= msg##cbuf; \ msg.msg_controllen= sizeof(msg##cbuf); @@ -844,42 +1153,45 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { assert(fd == connecting_fdpass_sock); + PREP_DECL_MSG_CMSG(msg); + + ssize_t rs= recvmsg(fd, &msg, 0); + if (rs<0) { + if (isewouldblock(errno)) return OOP_CONTINUE; + syswarn("failed to read socket from connecting child"); + goto x; + } + conn= xmalloc(sizeof(*conn)); memset(conn,0,sizeof(*conn)); + LIST_INIT(conn->waiting); + LIST_INIT(conn->priority); + LIST_INIT(conn->sent); - PREP_DECL_MSG_CMSG(msg); struct cmsghdr *h= 0; - ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT); if (rs >= 0) h= CMSG_FIRSTHDR(&msg); if (!h) { - int status; - pid_t got= waitpid(connecting_child, &status, WNOHANG); - if (got != -1) { - assert(got==connecting_child); - connecting_child= 0; - if (WIFEXITED(status)) { - if (WEXITSTATUS(status) != 0 && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM) - /* child already reported the problem */; - else - warn("connect: connection child exited code %d but no cmsg", + int status= xwaitpid(&connecting_child, "connect child (broken)"); + + if (WIFEXITED(status)) { + if (WEXITSTATUS(status) != 0 && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM) + /* child already reported the problem */; + else { + if (e == OOP_EXCEPTION) + warn("connect: connection child exited code %d but" + " unexpected exception on fdpass socket", WEXITSTATUS(status)); - } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { - warn("connect: connection attempt timed out"); - } else { - report_child_status("connect", status); + else + warn("connect: connection child exited code %d but" + " no cmsg (rs=%d)", + WEXITSTATUS(status), (int)rs); } + } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { + warn("connect: connection attempt timed out"); } else { - /* child is still running apparently, report the socket problem */ - if (rs < 0) - syswarn("connect: read from fdpass socket failed"); - else if (e == OOP_EXCEPTION) - warn("connect: unexpected exception on fdpass socket"); - else if (!rs) - warn("connect: unexpected EOF on fdpass socket"); - else - fatal("connect: unexpected lack of cmsg from child"); + report_child_status("connect", status); } goto x; } @@ -915,14 +1227,11 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { } /* Phew! */ - LIST_INIT(conn->waiting); - LIST_INIT(conn->priority); - LIST_INIT(conn->sent); conn->max_queue= conn->stream ? max_queue_per_conn : 1; loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn); conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */ - if (!conn->fd) sysdie("oop_rd_new_fd (fd=%d)",conn->fd); + if (!conn->fd) die("oop_rd_new_fd conn failed (fd=%d)",conn->fd); int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN, &peer_rd_ok, conn, &peer_rd_err, conn); @@ -951,7 +1260,7 @@ static void connect_start(void) { assert(!connecting_child); assert(!connecting_fdpass_sock); - notice("starting connection attempt"); + info("starting connection attempt"); int socks[2]; int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks); @@ -968,13 +1277,26 @@ static void connect_start(void) { alarm(connection_setup_timeout); if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) { - if (buf[0]) fatal("connect: rejected: %s", sanitise(buf)); - else sysfatal("connect: connection attempt failed"); + int l= strlen(buf); + int stripped=0; + while (l>0) { + unsigned char c= buf[l-1]; + if (!isspace(c)) break; + if (c=='\n' || c=='\r') stripped=1; + --l; + } + if (!buf[0]) { + sysfatal("connect: connection attempt failed"); + } else { + buf[l]= 0; + fatal("connect: %s: %s", stripped ? "rejected" : "failed", + sanitise(buf)); + } } if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0) sysfatal("connect: authentication failed"); if (try_stream) { - if (fputs("MODE STREAM\r\n", cn_to) || + if (fputs("MODE STREAM\r\n", cn_to)==EOF || fflush(cn_to)) sysfatal("connect: could not send MODE STREAM"); buf[sizeof(buf)-1]= 0; @@ -986,7 +1308,7 @@ static void connect_start(void) { } int l= strlen(buf); assert(l>=1); - if (buf[-1]!='\n') + if (buf[l-1]!='\n') fatal("connect: response to MODE STREAM is too long: %.100s...", sanitise(buf)); l--; if (l>0 && buf[l-1]=='\r') l--; @@ -1021,13 +1343,15 @@ static void connect_start(void) { msg.msg_controllen= cmsg->cmsg_len; r= sendmsg(socks[1], &msg, 0); - if (r) sysdie("sendmsg failed for new connection"); + if (r<0) sysdie("sendmsg failed for new connection"); + if (r!=1) die("sendmsg for new connection gave wrong result %d",r); _exit(exitstatus); } xclose(socks[1], "connecting fdpass child's socket",0); connecting_fdpass_sock= socks[0]; + xsetnonblock(connecting_fdpass_sock, 1); on_fd_read_except(connecting_fdpass_sock, connchild_event); } @@ -1061,6 +1385,7 @@ static void check_assign_articles(void) { if (!inqueue) use->since_activity= 0; /* reset idle counter */ while (spare>0) { Article *art= LIST_REMHEAD(queue); + if (!art) break; LIST_ADDTAIL(use->waiting, art); spare--; } @@ -1491,7 +1816,7 @@ static void close_input_file(InputFile *ipf) { /* does not free */ static void *feedfile_got_bad_data(InputFile *ipf, off_t offset, const char *data, const char *how) { - warn("corrupted file: %s, offset %lu: %s: %s", + warn("corrupted file: %s, offset %lu: %s: in %s", ipf->path, (unsigned long)offset, how, sanitise(data)); ipf->readcount_err++; if (ipf->readcount_err > max_bad_data_initial + @@ -1638,6 +1963,7 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, abort(); } } + tailing_queue_readable(ipf); return r; } } @@ -1659,7 +1985,7 @@ struct Filemon_Perfile { static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY); - if (wd < 0) sysdie("inotify_add_watch %s", ipf->path); + if (wd < 0) sysfatal("inotify_add_watch %s", ipf->path); if (wd >= filemon_inotify_wdmax) { int newmax= wd+2; @@ -1752,7 +2078,8 @@ static void filemon_stop(InputFile *ipf) { } static void filemon_callback(InputFile *ipf) { - ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user); + if (ipf && ipf->readable_callback) /* so filepoll() can be naive */ + ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user); } /*---------- interface to start and stop an input file ----------*/ @@ -1898,16 +2225,12 @@ static void startup_set_input_file(InputFile *f) { inputfile_reading_start(f); } -static void statemc_init(void) { +static void statemc_lock(void) { + int lockfd; struct stat stab, stabf; - - path_lock= xasprintf("%s_lock", feedfile); - path_flushing= xasprintf("%s_flushing", feedfile); - path_defer= xasprintf("%s_defer", feedfile); - globpat_backlog= xasprintf("%s_backlog*", feedfile); - + for (;;) { - int lockfd= open(path_lock, O_CREAT|O_RDWR, 0600); + lockfd= open(path_lock, O_CREAT|O_RDWR, 0600); if (lockfd<0) sysfatal("open lockfile %s", path_lock); struct flock fl; @@ -1920,7 +2243,7 @@ static void statemc_init(void) { if (quiet_multiple) exit(0); fatal("another duct holds the lockfile"); } - sysdie("fcntl F_SETLK lockfile %s", path_lock); + sysfatal("fcntl F_SETLK lockfile %s", path_lock); } xfstat_isreg(lockfd, &stabf, path_lock, "lockfile"); @@ -1932,17 +2255,34 @@ static void statemc_init(void) { xclose(lockfd, "stale lockfile ", path_lock); } + + FILE *lockfile= fdopen(lockfd, "w"); + if (!lockfile) sysdie("fdopen lockfile"); + + int r= ftruncate(lockfd, 0); + if (r) sysdie("truncate lockfile to write new info"); + + if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n", + (unsigned long)self_pid, + sitename, feedfile, remote_host) == EOF || + fflush(lockfile)) + sysfatal("write info to lockfile %s", path_lock); + debug("startup: locked"); +} + +static void statemc_init(void) { + struct stat stabdefer; search_backlog_file(); int defer_noent; - xlstat_isreg(path_defer, &stab, &defer_noent, "defer file"); + xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file"); if (defer_noent) { debug("startup: ductdefer ENOENT"); } else { - debug("startup: ductdefer nlink=%ld", (long)stab.st_nlink); - switch (stab.st_nlink==1) { + debug("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink); + switch (stabdefer.st_nlink==1) { case 1: open_defer(); /* so that we will later close it and rename it */ break; @@ -1952,7 +2292,7 @@ static void statemc_init(void) { break; default: die("defer file %s has unexpected link count %d", - path_defer, stab.st_nlink); + path_defer, stabdefer.st_nlink); } } @@ -1985,7 +2325,7 @@ static void statemc_init(void) { InputFile *file_f= open_input_file(feedfile); if (!file_f) die("feed file vanished during startup"); startup_set_input_file(file_f); - SMS(NORMAL, flushfail_retry_periods, "normal startup"); + SMS(NORMAL, spontaneous_flush_periods, "normal startup"); } } } @@ -2000,8 +2340,8 @@ static void statemc_start_flush(const char *why) { /* Normal => Flushing */ sm_period_counter); int r= link(feedfile, path_flushing); - if (r) sysdie("link feedfile %s to flushing file %s", - feedfile, path_flushing); + if (r) sysfatal("link feedfile %s to flushing file %s", + feedfile, path_flushing); /* => Hardlinked */ xunlink(feedfile, "old feedfile link"); @@ -2141,9 +2481,9 @@ static void statemc_setstate(StateMachineState newsms, int periods, } if (periods) { - info("%s%s[%d] %s",forlog,xtra,periods,why); + info("state %s%s[%d] %s",forlog,xtra,periods,why); } else { - info("%s%s %s",forlog,xtra,why); + info("state %s%s %s",forlog,xtra,why); } } @@ -2255,9 +2595,9 @@ static void search_backlog_file(void) { switch (r) { case GLOB_ABORTED: - sysdie("failed to expand backlog pattern %s", globpat_backlog); + sysfatal("failed to expand backlog pattern %s", globpat_backlog); case GLOB_NOSPACE: - die("out of memory expanding backlog pattern %s", globpat_backlog); + fatal("out of memory expanding backlog pattern %s", globpat_backlog); case 0: for (i=0; i Flushing */ assert(!inndcomm_child); assert(!inndcomm_sentinel_fd); - if (pipe(pipefds)) sysdie("create pipe for inndcomm child sentinel"); + if (pipe(pipefds)) sysfatal("create pipe for inndcomm child sentinel"); inndcomm_child= xfork("inndcomm child"); @@ -2431,6 +2779,12 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0); /* parent spots the autoclose of pipefds[1] when we die or exit */ + if (simulate_flush>=0) { + warn("SIMULATING flush child status %d", simulate_flush); + if (simulate_flush>128) raise(simulate_flush-128); + else exit(simulate_flush); + } + alarm(inndcomm_flush_timeout); r= ICCopen(); if (r) inndcommfail("connect"); r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit"); @@ -2441,6 +2795,8 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ exit(INNDCOMMCHILD_ESTATUS_FAIL); } + simulate_flush= -1; + xclose(pipefds[1], "inndcomm sentinel child's end",0); inndcomm_sentinel_fd= pipefds[0]; assert(inndcomm_sentinel_fd); @@ -2476,25 +2832,47 @@ static void postfork(void) { postfork_stdio(defer, "defer file ", path_defer); } -#define EVERY(what, interval_sec, interval_usec, body) \ - static struct timeval what##_timeout = { interval_sec, interval_usec }; \ - static void what##_schedule(void); \ - static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \ - body; \ - what##_schedule(); \ - return OOP_CONTINUE; \ - } \ - static void what##_schedule(void) { \ - loop->on_time(loop, what##_timeout, what##_timedout, 0); \ - } +typedef struct Every Every; +struct Every { + struct timeval interval; + int fixed_rate; + void (*f)(void); +}; -EVERY(filepoll, 5,0, ({ - if (main_input_file && main_input_file->readable_callback) - filemon_callback(main_input_file); -})); +static void every_schedule(Every *e, struct timeval base); + +static void *every_happens(oop_source *lp, struct timeval base, void *e_v) { + Every *e= e_v; + e->f(); + if (!e->fixed_rate) xgettimeofday(&base); + every_schedule(e, base); + return OOP_CONTINUE; +} + +static void every_schedule(Every *e, struct timeval base) { + struct timeval when; + timeradd(&base, &e->interval, &when); + loop->on_time(loop, when, every_happens, e); +} + +static void every(int interval, int fixed_rate, void (*f)(void)) { + Every *e= xmalloc(sizeof(*e)); + e->interval.tv_sec= interval; + e->interval.tv_usec= 0; + e->fixed_rate= fixed_rate; + e->f= f; + struct timeval now; + xgettimeofday(&now); + every_schedule(e, now); +} + +static void filepoll(void) { + filemon_callback(main_input_file); + filemon_callback(flushing_input_file); +} static char *debug_report_ipf(InputFile *ipf) { - if (!ipf) return xasprintf("-"); + if (!ipf) return xasprintf("none"); const char *slash= strrchr(ipf->path,'/'); const char *path= slash ? slash+1 : ipf->path; @@ -2505,7 +2883,7 @@ static char *debug_report_ipf(InputFile *ipf) { ipf->fd, ipf->rd ? "+" : ""); } -EVERY(period, -1,0, ({ +static void period(void) { char *dipf_main= debug_report_ipf(main_input_file); char *dipf_flushing= debug_report_ipf(flushing_input_file); char *dipf_backlog= debug_report_ipf(backlog_input_file); @@ -2513,7 +2891,7 @@ EVERY(period, -1,0, ({ debug("PERIOD" " sms=%s[%d] conns=%d queue=%d until_connect=%d" " input_files main:%s old:%s flushing:%s" - " children connecting=%ld inndcomm_child=%ld" + " children connecting=%ld inndcomm=%ld" , sms_names[sms], sm_period_counter, conns.count, queue.count, until_connect, @@ -2532,7 +2910,7 @@ EVERY(period, -1,0, ({ statemc_period_poll(); check_assign_articles(); check_idle_conns(); -})); +} /*========== option parsing ==========*/ @@ -2675,10 +3053,14 @@ static void op_seconds(const Option *o, const char *val) { double v= strtod(val,&ep); if (ep==val) badusage("bad time/duration value for %s",o->lng); - if (!*ep || !strcmp(ep,"s")) unit= 1; - else if (!strcmp(ep,"m")) unit= 60; - else if (!strcmp(ep,"h")) unit= 3600; - else if (!strcmp(ep,"d")) unit= 86400; + if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1; + else if (!strcmp(ep,"m") || !strcmp(ep,"min")) unit= 60; + else if (!strcmp(ep,"h") || !strcmp(ep,"hour")) unit= 3600; + else if (!strcmp(ep,"d") || !strcmp(ep,"day")) unit= 86400; + else if (!strcmp(ep,"das")) unit= 10; + else if (!strcmp(ep,"hs")) unit= 100; + else if (!strcmp(ep,"ks")) unit= 1000; + else if (!strcmp(ep,"Ms")) unit= 1000000; else badusage("bad units %s for time/duration value for %s",ep,o->lng); v *= unit; @@ -2701,7 +3083,7 @@ static const Option innduct_options[]= { {'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 }, {0,"no-daemon", 0, &become_daemon, op_setint, 0 }, {0,"no-streaming", 0, &try_stream, op_setint, 0 }, -{0,"inndconf", "F", &inndconffile, op_string }, +{'C',"inndconf", "F", &inndconffile, op_string }, {'P',"port", "PORT", &port, op_integer }, {0,"help", 0, 0, help }, @@ -2752,7 +3134,7 @@ static void convert_to_periods_rndup(int *store) { int main(int argc, char **argv) { if (!argv[1]) { printusage(stderr); - exit(12); + exit(8); } parse_options(innduct_options, &argv); @@ -2766,6 +3148,9 @@ int main(int argc, char **argv) { /* defaults */ + int r= innconf_read(inndconffile); + if (!r) badusage("could not read inn.conf (more info on stderr)"); + if (!remote_host) remote_host= sitename; if (nocheck_thresh < 0 || nocheck_thresh > 100) @@ -2774,7 +3159,7 @@ int main(int argc, char **argv) { if (nocheck_decay < 0.1) badusage("nocheck decay articles must be at least 0.1"); - nocheck_decay= 1 - 1.0/nocheck_decay; + nocheck_decay= pow(0.5, 1.0/nocheck_decay); convert_to_periods_rndup(&reconnect_delay_periods); convert_to_periods_rndup(&flushfail_retry_periods); @@ -2788,7 +3173,6 @@ int main(int argc, char **argv) { max_bad_data_ratio *= 0.01; if (!feedfile) { - innconf_read(inndconffile); feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename); } else if (!feedfile[0]) { badusage("feed filename must be nonempty"); @@ -2804,6 +3188,12 @@ int main(int argc, char **argv) { /* set things up */ + path_lock= xasprintf("%s_lock", feedfile); + path_flushing= xasprintf("%s_flushing", feedfile); + path_defer= xasprintf("%s_defer", feedfile); + path_control= xasprintf("%s_control", feedfile); + globpat_backlog= xasprintf("%s_backlog*", feedfile); + oop_source_sys *sysloop= oop_sys_new(); if (!sysloop) sysdie("could not create liboop event loop"); loop= (oop_source*)sysloop; @@ -2822,7 +3212,7 @@ int main(int argc, char **argv) { openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS); int null= open("/dev/null",O_RDWR); - if (null<0) sysdie("failed to open /dev/null"); + if (null<0) sysfatal("failed to open /dev/null"); dup2(null,0); dup2(null,1); dup2(null,2); @@ -2832,27 +3222,36 @@ int main(int argc, char **argv) { if (child1) _exit(0); pid_t sid= setsid(); - if (sid != child1) sysdie("setsid failed"); + if (sid != child1) sysfatal("setsid failed"); pid_t child2= xfork("daemonise second fork"); if (child2) _exit(0); } + self_pid= getpid(); + if (self_pid==-1) sysdie("getpid"); + + statemc_lock(); + notice("starting"); + if (!become_daemon) + control_stdio(); + + control_init(); + if (!filemon_method_init()) { warn("no file monitoring available, polling"); - filepoll_schedule(); + every(5,0,filepoll); } - period_timeout.tv_sec= period_seconds; - period_schedule(); + every(period_seconds,1,period); statemc_init(); /* let's go */ - void *r= oop_sys_run(sysloop); - assert(r == OOP_ERROR); + void *run= oop_sys_run(sysloop); + assert(run == OOP_ERROR); sysdie("event loop failed"); }