X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=blobdiff_plain;f=backends%2Finnduct.c;h=288c9dc654c06fa4b60d0b67295e038137731a5d;hp=8e5181a84e9fc797684d20b7446f7163ee7f68bb;hb=6a7307f47a05fbef08b3f7a362ccfc742cebea50;hpb=65ba173dea53ef47288f95aecf8642ac9869181f diff --git a/backends/innduct.c b/backends/innduct.c index 8e5181a..288c9dc 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,10 +1,13 @@ /* - * TODO - * - actually implement badusage - * - options for all options - * - manpage - * - pid, sitename, hostname in lockfile - * - -k kill mode ? + * todo + * - actually do something with readable on control master + * - option for realsockdir + * - option for filepoll + * - option for no inotify + * - manpage: document control master stuff + * - manpage: innconf is used for communicating with innd + * - debug this: + * build-lfs/backends/innduct --no-daemon -f `pwd`/fee sit dom */ /* @@ -172,6 +175,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #include #include #include +#include #include #include #include @@ -182,6 +186,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #include #include #include +#include #include #include #include @@ -191,13 +196,14 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. /*----- general definitions, probably best not changed -----*/ -#define CONNCHILD_ESTATUS_STREAM 4 -#define CONNCHILD_ESTATUS_NOSTREAM 5 +#define CONNCHILD_ESTATUS_STREAM 24 +#define CONNCHILD_ESTATUS_NOSTREAM 25 -#define INNDCOMMCHILD_ESTATUS_FAIL 6 -#define INNDCOMMCHILD_ESTATUS_NONESUCH 7 +#define INNDCOMMCHILD_ESTATUS_FAIL 26 +#define INNDCOMMCHILD_ESTATUS_NONESUCH 27 #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10) +#define MAX_CONTROL_COMMAND 1000 #define VA va_list al; va_start(al,fmt) #define PRINTF(f,a) __attribute__((__format__(printf,f,a))) @@ -212,7 +218,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. int count; \ } T##List -#define NODE(n) (assert((void*)&(n)->list_node == &(n)), &(n)->list_node) +#define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node) #define LIST_CHECKCANHAVENODE(l,n) \ ((void)((n) == ((l).u.for_type))) /* just for the type check */ @@ -291,6 +297,7 @@ static void statemc_check_flushing_done(void); static void statemc_check_backlog_done(void); static void postfork(void); +static void period(void); static void open_defer(void); static void close_defer(void); @@ -306,34 +313,44 @@ static void filemon_callback(InputFile *ipf); static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0); static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3); -/*----- configuration options -----*/ +static const oop_rd_style peer_rd_style; +static oop_rd_call peer_rd_err, peer_rd_ok; -static const char *sitename, *feedfile, *pathoutgoing; -static const char *remote_host; -static int quiet_multiple=0, become_daemon=1; +/*----- configuration options -----*/ +/* when changing defaults, remember to update the manpage */ + +static const char *sitename, *remote_host; +static const char *feedfile, *realsockdir="/tmp/innduct.control"; +static int quiet_multiple=0; +static int become_daemon=1; +static int try_stream=1; +static int port=119; +static const char *inndconffile; -static int max_connections=10, max_queue_per_conn=200; +static int max_connections=10; +static int max_queue_per_conn=200; static int target_max_feedfile_size=100000; - static int period_seconds=60; -static double max_bad_data_ratio= 0.01; +static int connection_setup_timeout=200; +static int inndcomm_flush_timeout=100; + +static double nocheck_thresh= 95.0; /* converted from percentage by main */ +static double nocheck_decay= 100; /* conv'd from articles to lambda by main */ + +/* all these are initialised to seconds, and converted to periods in main */ +static int reconnect_delay_periods=1000; +static int flushfail_retry_periods=1000; +static int backlog_retry_minperiods=50; +static int backlog_spontrescan_periods=300; +static int spontaneous_flush_periods=100000; +static int need_activity_periods=1000; + +static double max_bad_data_ratio= 1; /* conv'd from percentage by main */ static int max_bad_data_initial= 30; /* in one corrupt 4096-byte block the number of newlines has * mean 16 and standard deviation 3.99. 30 corresponds to z=+3.5 */ -static int connection_setup_timeout=200, port=119, try_stream=1; -static int inndcomm_flush_timeout=100; -static int reconnect_delay_periods, flushfail_retry_periods, open_wait_periods; -static int backlog_retry_minperiods, backlog_spontaneous_rescan_periods; -static int spontaneous_flush_periods, need_activity_periods; -static const char *inndconffile; - -static double nocheck_thresh_pct= 95.0; -static double nocheck_thresh; /* computed in main from _pct */ -static double nocheck_decay_articles= 100; /* converted to _decay */ -static double nocheck_decay; /* computed in main from _articles */ - /*----- statistics -----*/ @@ -439,6 +456,7 @@ static const char *sms_names[]= { struct Conn { ISNODE(Conn); int fd; /* may be 0, meaning closed (during construction/destruction) */ + oop_read *rd; /* likewise */ int max_queue, stream, quitting; int since_activity; /* periods */ ArticleList waiting; /* not yet told peer */ @@ -456,7 +474,9 @@ struct Conn { static oop_source *loop; static ConnList conns; static ArticleList queue; -static char *path_lock, *path_flushing, *path_defer, *globpat_backlog; +static char *path_lock, *path_flushing, *path_defer, *path_control; +static char *globpat_backlog; +static pid_t self_pid; /* statemc_init initialises */ static StateMachineState sms; @@ -469,6 +489,9 @@ static int until_connect, until_backlog_nextscan; static double accept_proportion; static int nocheck, nocheck_reported; +/* for simulation, debugging, etc. */ +int simulate_flush= -1; + /*========== logging ==========*/ static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3); @@ -477,6 +500,7 @@ static void logcore(int sysloglevel, const char *fmt, ...) { if (become_daemon) { vsyslog(sysloglevel,fmt,al); } else { + if (self_pid) fprintf(stderr,"[%lu] ",(unsigned long)self_pid); vfprintf(stderr,fmt,al); putc('\n',stderr); } @@ -525,7 +549,7 @@ diewrap(fatal, " fatal", LOG_ERR, -1, 12); logwrap(syswarn, " warning", LOG_WARNING, errno); logwrap(warn, " warning", LOG_WARNING, -1); -logwrap(notice, "", LOG_NOTICE, -1); +logwrap(notice, " notice", LOG_NOTICE, -1); logwrap(info, " info", LOG_INFO, -1); logwrap(debug, " debug", LOG_DEBUG, -1); @@ -548,7 +572,7 @@ static char *xasprintf(const char *fmt, ...) { } static int close_perhaps(int *fd) { - if (!*fd) return 0; + if (*fd <= 0) return 0; int r= close(*fd); *fd=0; return r; @@ -558,7 +582,7 @@ static void xclose(int fd, const char *what, const char *what2) { if (r) sysdie("close %s%s",what,what2?what2:""); } static void xclose_perhaps(int *fd, const char *what, const char *what2) { - if (!*fd) return; + if (*fd <= 0) return; xclose(*fd,what,what2); *fd=0; } @@ -567,7 +591,7 @@ static pid_t xfork(const char *what) { pid_t child; child= fork(); - if (child==-1) sysdie("cannot fork for %s",what); + if (child==-1) sysfatal("cannot fork for %s",what); debug("forked %s %ld", what, (unsigned long)child); if (!child) postfork(); return child; @@ -607,8 +631,9 @@ static int xwaitpid(pid_t *pid, const char *what) { int r= kill(*pid, SIGKILL); if (r) sysdie("cannot kill %s child", what); - pid_t got= waitpid(*pid, &status, WNOHANG); + pid_t got= waitpid(*pid, &status, 0); if (got==-1) sysdie("cannot reap %s child", what); + if (got==0) die("cannot reap %s child", what); *pid= 0; @@ -623,6 +648,17 @@ static void xunlink(const char *path, const char *what) { static time_t xtime(void) { time_t now= time(0); if (now==-1) sysdie("time(2) failed"); + return now; +} + +static void xgettimeofday(struct timeval *tv_r) { + int r= gettimeofday(tv_r,0); + if (r) sysdie("gettimeofday(2) failed"); +} + +static void xsetnonblock(int fd, int nonblocking) { + int errnoval= oop_fd_nonblock(fd, nonblocking); + if (errnoval) { errno= errnoval; sysdie("setnonblocking"); } } static void check_isreg(const struct stat *stab, const char *path, @@ -655,13 +691,6 @@ static void xlstat_isreg(const char *path, struct stat *stab, check_isreg(stab, path, what); } -static void setnonblock(int fd, int nonblocking) { - int r= fcntl(fd, F_GETFL); if (r<0) sysdie("setnonblocking fcntl F_GETFL"); - if (nonblocking) r |= O_NONBLOCK; - else r &= ~O_NONBLOCK; - r= fcntl(fd, F_SETFL, r); if (r<0) sysdie("setnonblocking fcntl F_SETFL"); -} - static int samefile(const struct stat *a, const struct stat *b) { assert(S_ISREG(a->st_mode)); assert(S_ISREG(b->st_mode)); @@ -690,6 +719,286 @@ static int isewouldblock(int errnoval) { return errnoval==EWOULDBLOCK || errnoval==EAGAIN; } + +/*========== command and control connections ==========*/ + +static int control_master; + +typedef struct ControlConn ControlConn; +struct ControlConn { + void (*destroy)(ControlConn*); + int fd; + oop_read *rd; + FILE *out; + union { + struct sockaddr sa; + struct sockaddr_un un; + } sa; + socklen_t salen; +}; + +static const oop_rd_style control_rd_style= { + OOP_RD_DELIM_STRIP, '\n', + OOP_RD_NUL_FORBID, + OOP_RD_SHORTREC_FORBID +}; + +static void control_destroy(ControlConn *cc) { + cc->destroy(cc); +} + +static void control_checkouterr(ControlConn *cc /* may destroy*/) { + if (ferror(cc->out) | fflush(cc->out)) { + info("CTRL%d write error %s", cc->fd, strerror(errno)); + control_destroy(cc); + } +} + +static void control_prompt(ControlConn *cc /* may destroy*/) { + fprintf(cc->out, "%s| ", sitename); + control_checkouterr(cc); +} + +typedef struct ControlCommand ControlCommand; +struct ControlCommand { + const char *cmd; + void (*f)(ControlConn *cc, const ControlCommand *ccmd, + const char *arg, size_t argsz); + void *xdata; + int xval; +}; + +static const ControlCommand control_commands[]; + +#define CCMD(wh) \ + static void ccmd_##wh(ControlConn *cc, const ControlCommand *c, \ + const char *arg, size_t argsz) + +CCMD(help) { + fputs("commands:\n", cc->out); + const ControlCommand *ccmd; + for (ccmd=control_commands; ccmd->cmd; ccmd++) + fprintf(cc->out, " %s\n", ccmd->cmd); +} + +CCMD(period) { period(); } +CCMD(setintarg) { *(int*)c->xdata= atoi(arg); } +CCMD(setint) { *(int*)c->xdata= c->xval; } + +static const ControlCommand control_commands[]= { + { "h", ccmd_help }, + { "p", ccmd_period }, + { "pretend flush", ccmd_setintarg, &simulate_flush }, + { "poke sm", ccmd_setint, &sm_period_counter, 1 }, + { "poke conn", ccmd_setint, &until_connect, 0 }, + { "poke blscan", ccmd_setint, &until_backlog_nextscan, 0 }, + { "wedge blscan", ccmd_setint, &until_backlog_nextscan, -1 }, + { 0 } +}; + +static void *control_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, + const char *errmsg, int errnoval, + const char *data, size_t recsz, void *cc_v) { + ControlConn *cc= cc_v; + + if (!data) { + info("CTRL%d closed", cc->fd); + cc->destroy(cc); + return OOP_CONTINUE; + } + + if (recsz == 0) goto prompt; + + const ControlCommand *ccmd; + for (ccmd=control_commands; ccmd->cmd; ccmd++) { + int l= strlen(ccmd->cmd); + if (recsz < l) continue; + if (recsz > l && data[l] != ' ') continue; + if (memcmp(data, ccmd->cmd, l)) continue; + + int argl= (int)recsz - (l+1); + ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl); + goto prompt; + } + + fputs("unknown command; h for help\n", cc->out); + + prompt: + control_prompt(cc); + return OOP_CONTINUE; +} + +static void *control_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev, + const char *errmsg, int errnoval, + const char *data, size_t recsz, void *cc_v) { + ControlConn *cc= cc_v; + + info("CTRL%d read error %s", cc->fd, errmsg); + cc->destroy(cc); + return OOP_CONTINUE; +} + +static int control_conn_startup(ControlConn *cc /* may destroy*/, + const char *how) { + cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0); + if (!cc->rd) { warn("oop_rd_new_fd control failed"); return -1; } + + int er= oop_rd_read(cc->rd, &control_rd_style, MAX_CONTROL_COMMAND, + control_rd_ok, cc, + control_rd_err, cc); + if (er) { errno= er; syswarn("oop_rd_read control failed"); return -1; } + + info("CTRL%d %s ready", cc->fd, how); + control_prompt(cc); + return 0; +} + +static void control_stdio_destroy(ControlConn *cc) { + if (cc->rd) { + oop_rd_cancel(cc->rd); + errno= oop_rd_delete_tidy(cc->rd); + if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)"); + } + free(cc); +} + +static void control_stdio(void) { + ControlConn *cc= xmalloc(sizeof(*cc)); + memset(cc,0,sizeof(*cc)); + cc->destroy= control_stdio_destroy; + + cc->fd= 0; + cc->out= stdout; + int r= control_conn_startup(cc,"stdio"); + if (r) cc->destroy(cc); +} + +static void control_accepted_destroy(ControlConn *cc) { + if (cc->rd) { + oop_rd_cancel(cc->rd); + oop_rd_delete_kill(cc->rd); + } + if (cc->out) { fclose(cc->out); cc->fd=0; } + close_perhaps(&cc->fd); + free(cc); +} + +static void *control_master_readable(oop_source *lp, int master, + oop_event ev, void *u) { + ControlConn *cc= xmalloc(sizeof(*cc)); + memset(cc,0,sizeof(*cc)); + cc->destroy= control_accepted_destroy; + + cc->salen= sizeof(cc->sa); + cc->fd= accept(master, &cc->sa.sa, &cc->salen); + if (cc->fd<0) { syswarn("error accepting control connection"); goto x; } + + cc->out= fdopen(cc->fd, "w"); + if (!cc->out) { syswarn("error fdopening accepted control conn"); goto x; } + + int r= control_conn_startup(cc, "accepted"); + if (r) goto x; + + return OOP_CONTINUE; + + x: + cc->destroy(cc); + return OOP_CONTINUE; +} + +#define NOCONTROL(...) do{ \ + syswarn("no control socket, because failed to " __VA_ARGS__); \ + goto nocontrol; \ + }while(0) + +static void control_init(void) { + char *real=0; + + union { + struct sockaddr sa; + struct sockaddr_un un; + } sa; + + memset(&sa,0,sizeof(sa)); + int maxlen= sizeof(sa.un.sun_path); + + int reallen= readlink(path_control, sa.un.sun_path, maxlen); + if (reallen<0) { + if (errno != ENOENT) + NOCONTROL("readlink control socket symlink path %s", path_control); + } + if (reallen >= maxlen) { + debug("control socket symlink path too long (r=%d)",reallen); + xunlink(path_control, "old (overlong) control socket symlink"); + reallen= -1; + } + + if (reallen<0) { + struct stat stab; + int r= lstat(realsockdir,&stab); + if (r) { + if (errno != ENOENT) NOCONTROL("lstat real socket dir %s", realsockdir); + + r= mkdir(realsockdir, 0700); + if (r) NOCONTROL("mkdir real socket dir %s", realsockdir); + + } else { + uid_t self= geteuid(); + if (!S_ISDIR(stab.st_mode) || + stab.st_uid != self || + stab.st_mode & 0077) { + warn("no control socket, because real socket directory" + " is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)", + !!S_ISDIR(stab.st_mode), + (unsigned long)stab.st_uid, (unsigned long)self, + (unsigned long)stab.st_mode & 0777UL); + goto nocontrol; + } + } + + real= xasprintf("%s/s%lx.%lx", realsockdir, + (unsigned long)xtime(), (unsigned long)self_pid); + int reallen= strlen(real); + + if (reallen >= maxlen) { + warn("no control socket, because tmpnam gave overly-long path" + " %s", real); + goto nocontrol; + } + r= symlink(real, path_control); + if (r) NOCONTROL("make control socket path %s a symlink to real" + " socket path %s", path_control, real); + memcpy(sa.un.sun_path, real, reallen); + } + + int r= unlink(sa.un.sun_path); + if (r && errno!=ENOENT) + NOCONTROL("remove old real socket %s", sa.un.sun_path); + + control_master= socket(PF_UNIX, SOCK_STREAM, 0); + if (control_master<0) NOCONTROL("create new control socket"); + + sa.un.sun_family= AF_UNIX; + int sl= strlen(sa.un.sun_path) + offsetof(struct sockaddr_un, sun_path); + r= bind(control_master, &sa.sa, sl); + if (r) NOCONTROL("bind to real socket path %s", sa.un.sun_path); + + r= listen(control_master, 5); + if (r) NOCONTROL("listen"); + + xsetnonblock(control_master, 1); + + loop->on_fd(loop, control_master, OOP_READ, control_master_readable, 0); + info("control socket ok, real path %s", sa.un.sun_path); + + return; + + nocontrol: + free(real); + xclose_perhaps(&control_master, "control master",0); + return; +} + /*========== management of connections ==========*/ static void conn_closefd(Conn *conn, const char *msgprefix) { @@ -700,6 +1009,11 @@ static void conn_closefd(Conn *conn, const char *msgprefix) { static void conn_dispose(Conn *conn) { if (!conn) return; + if (conn->rd) { + oop_rd_cancel(conn->rd); + oop_rd_delete_kill(conn->rd); + conn->rd= 0; + } if (conn->fd) { loop->cancel_fd(loop, conn->fd, OOP_WRITE); loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION); @@ -809,10 +1123,7 @@ static int connecting_fdpass_sock; static void connect_attempt_discard(void) { if (connecting_child) { - int r= kill(connecting_child, SIGTERM); - if (r) syswarn("failed to kill connecting child"); int status= xwaitpid(&connecting_child, "connect"); - if (!(WIFEXITED(status) || (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) report_child_status("connect", status); @@ -834,51 +1145,50 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { Conn *conn= 0; assert(fd == connecting_fdpass_sock); + PREP_DECL_MSG_CMSG(msg); + ssize_t rs= recvmsg(fd, &msg, 0); + if (rs<0) { + if (isewouldblock(errno)) return OOP_CONTINUE; + syswarn("failed to read socket from connecting child"); + goto x; + } conn= xmalloc(sizeof(*conn)); memset(conn,0,sizeof(*conn)); - PREP_DECL_MSG_CMSG(msg); struct cmsghdr *h= 0; - ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT); if (rs >= 0) h= CMSG_FIRSTHDR(&msg); if (!h) { - int status; - pid_t got= waitpid(connecting_child, &status, WNOHANG); - if (got != -1) { - assert(got==connecting_child); - connecting_child= 0; - if (WIFEXITED(status)) { - if (WEXITSTATUS(status) != 0 && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM) - /* child already reported the problem */; - else - warn("connect: connection child exited code %d but no cmsg", + int status= xwaitpid(&connecting_child, "connect child (broken)"); + + if (WIFEXITED(status)) { + if (WEXITSTATUS(status) != 0 && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM) + /* child already reported the problem */; + else { + if (e == OOP_EXCEPTION) + warn("connect: connection child exited code %d but" + " unexpected exception on fdpass socket", WEXITSTATUS(status)); - } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { - warn("connect: connection attempt timed out"); - } else { - report_child_status("connect", status); + else + warn("connect: connection child exited code %d but" + " no cmsg (rs=%d)", + WEXITSTATUS(status), (int)rs); } + } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { + warn("connect: connection attempt timed out"); } else { - /* child is still running apparently, report the socket problem */ - if (rs < 0) - syswarn("connect: read from fdpass socket failed"); - else if (e == OOP_EXCEPTION) - warn("connect: unexpected exception on fdpass socket"); - else if (!rs) - warn("connect: unexpected EOF on fdpass socket"); - else - fatal("connect: unexpected lack of cmsg from child"); + report_child_status("connect", status); } goto x; } -#define CHK(field, val) \ - if (h->cmsg_##field != val) { \ - die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d"); \ - goto x; \ +#define CHK(field, val) \ + if (h->cmsg_##field != val) { \ + die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \ + h->cmsg_##field, val); \ + goto x; \ } CHK(level, SOL_SOCKET); CHK(type, SCM_RIGHTS); @@ -888,7 +1198,6 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { if (CMSG_NXTHDR(&msg,h)) die("connect: child sent many cmsgs"); memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd)); - loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn); int status; pid_t got= waitpid(connecting_child, &status, 0); @@ -909,8 +1218,16 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { LIST_INIT(conn->waiting); LIST_INIT(conn->priority); LIST_INIT(conn->sent); - setnonblock(conn->fd, 1); conn->max_queue= conn->stream ? max_queue_per_conn : 1; + + loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn); + conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */ + if (!conn->fd) die("oop_rd_new_fd conn failed (fd=%d)",conn->fd); + int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN, + &peer_rd_ok, conn, + &peer_rd_err, conn); + if (r) sysdie("oop_rd_read for peer (fd=%d)",conn->fd); + notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain"); LIST_ADDHEAD(conns, conn); @@ -921,6 +1238,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { x: conn_dispose(conn); connect_attempt_discard(); + return OOP_CONTINUE; } static int allow_connect_start(void) { @@ -950,7 +1268,7 @@ static void connect_start(void) { alarm(connection_setup_timeout); if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) { - if (buf[0]) fatal("connect: rejected: %s", sanitise(buf)); + if (buf[0]) fatal("connect: failed: %s", sanitise(buf)); else sysfatal("connect: connection attempt failed"); } if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0) @@ -970,7 +1288,7 @@ static void connect_start(void) { assert(l>=1); if (buf[-1]!='\n') fatal("connect: response to MODE STREAM is too long: %.100s...", - remote_host, sanitise(buf)); + sanitise(buf)); l--; if (l>0 && buf[l-1]=='\r') l--; buf[l]= 0; char *ep; @@ -1010,6 +1328,7 @@ static void connect_start(void) { xclose(socks[1], "connecting fdpass child's socket",0); connecting_fdpass_sock= socks[0]; + xsetnonblock(connecting_fdpass_sock, 1); on_fd_read_except(connecting_fdpass_sock, connchild_event); } @@ -1021,7 +1340,7 @@ static void check_assign_articles(void) { break; Conn *walk, *use=0; - int spare, inqueue; + int spare=0, inqueue=0; /* Find a connection to offer this article. We prefer a busy * connection to an idle one, provided it's not full. We take the @@ -1063,7 +1382,6 @@ static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) { } static void conn_maybe_write(Conn *conn) { - void *rp= 0; for (;;) { conn_make_some_xmits(conn); if (!conn->xmitu) { @@ -1143,7 +1461,7 @@ static void *conn_write_some_xmits(Conn *conn) { rs -= vp->iov_len; xmit_free(dp); } else { - vp->iov_base += rs; + vp->iov_base= (char*)vp->iov_base + rs; vp->iov_len -= rs; } } @@ -1217,7 +1535,7 @@ static const oop_rd_style peer_rd_style= { OOP_RD_SHORTREC_FORBID }; -static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_event ev, +static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev, const char *errmsg, int errnoval, const char *data, size_t recsz, void *conn_v) { Conn *conn= conn_v; @@ -1316,7 +1634,8 @@ static void article_done(Conn *conn, Article *art, int whichcount) { if (r==-1) { if (errno==EINTR) continue; sysdie("failed to blank entry for %s (length %d at offset %lu) in %s", - art->messageid, art->blanklen, art->offset, ipf->path); + art->messageid, art->blanklen, + (unsigned long)art->offset, ipf->path); } assert(r>=0 && r<=w); art->blanklen -= w; @@ -1331,7 +1650,7 @@ static void article_done(Conn *conn, Article *art, int whichcount) { queue_check_input_done(); } -static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, +static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, const char *errmsg, int errnoval, const char *data, size_t recsz, void *conn_v) { Conn *conn= conn_v; @@ -1473,7 +1792,7 @@ static void close_input_file(InputFile *ipf) { /* does not free */ static void *feedfile_got_bad_data(InputFile *ipf, off_t offset, const char *data, const char *how) { - warn("corrupted file: %s, offset %lu: %s: %s", + warn("corrupted file: %s, offset %lu: %s: in %s", ipf->path, (unsigned long)offset, how, sanitise(data)); ipf->readcount_err++; if (ipf->readcount_err > max_bad_data_initial + @@ -1515,7 +1834,7 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, if (ipf->skippinglong) { if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */ - return; + return OOP_CONTINUE; } if (ev==OOP_RD_LONG) { ipf->skippinglong= 1; @@ -1620,6 +1939,7 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, abort(); } } + tailing_queue_readable(ipf); return r; } } @@ -1641,7 +1961,7 @@ struct Filemon_Perfile { static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY); - if (wd < 0) sysdie("inotify_add_watch %s", ipf->path); + if (wd < 0) sysfatal("inotify_add_watch %s", ipf->path); if (wd >= filemon_inotify_wdmax) { int newmax= wd+2; @@ -1734,7 +2054,8 @@ static void filemon_stop(InputFile *ipf) { } static void filemon_callback(InputFile *ipf) { - ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user); + if (ipf && ipf->readable_callback) /* so filepoll() can be naive */ + ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user); } /*---------- interface to start and stop an input file ----------*/ @@ -1880,16 +2201,12 @@ static void startup_set_input_file(InputFile *f) { inputfile_reading_start(f); } -static void statemc_init(void) { +static void statemc_lock(void) { + int lockfd; struct stat stab, stabf; - - path_lock= xasprintf("%s_lock", feedfile); - path_flushing= xasprintf("%s_flushing", feedfile); - path_defer= xasprintf("%s_defer", feedfile); - globpat_backlog= xasprintf("%s_backlog*", feedfile); - + for (;;) { - int lockfd= open(path_lock, O_CREAT|O_RDWR, 0600); + lockfd= open(path_lock, O_CREAT|O_RDWR, 0600); if (lockfd<0) sysfatal("open lockfile %s", path_lock); struct flock fl; @@ -1902,7 +2219,7 @@ static void statemc_init(void) { if (quiet_multiple) exit(0); fatal("another duct holds the lockfile"); } - sysdie("fcntl F_SETLK lockfile %s", path_lock); + sysfatal("fcntl F_SETLK lockfile %s", path_lock); } xfstat_isreg(lockfd, &stabf, path_lock, "lockfile"); @@ -1914,17 +2231,34 @@ static void statemc_init(void) { xclose(lockfd, "stale lockfile ", path_lock); } + + FILE *lockfile= fdopen(lockfd, "w"); + if (!lockfile) sysdie("fdopen lockfile"); + + int r= ftruncate(lockfd, 0); + if (r) sysdie("truncate lockfile to write new info"); + + if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n", + (unsigned long)self_pid, + sitename, feedfile, remote_host) == EOF || + fflush(lockfile)) + sysfatal("write info to lockfile %s", path_lock); + debug("startup: locked"); +} + +static void statemc_init(void) { + struct stat stabdefer; search_backlog_file(); int defer_noent; - xlstat_isreg(path_defer, &stab, &defer_noent, "defer file"); + xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file"); if (defer_noent) { debug("startup: ductdefer ENOENT"); } else { - debug("startup: ductdefer nlink=%ld", (long)stab.st_nlink); - switch (stab.st_nlink==1) { + debug("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink); + switch (stabdefer.st_nlink==1) { case 1: open_defer(); /* so that we will later close it and rename it */ break; @@ -1934,7 +2268,7 @@ static void statemc_init(void) { break; default: die("defer file %s has unexpected link count %d", - path_defer, stab.st_nlink); + path_defer, stabdefer.st_nlink); } } @@ -1967,7 +2301,7 @@ static void statemc_init(void) { InputFile *file_f= open_input_file(feedfile); if (!file_f) die("feed file vanished during startup"); startup_set_input_file(file_f); - SMS(NORMAL, flushfail_retry_periods, "normal startup"); + SMS(NORMAL, spontaneous_flush_periods, "normal startup"); } } } @@ -1982,8 +2316,8 @@ static void statemc_start_flush(const char *why) { /* Normal => Flushing */ sm_period_counter); int r= link(feedfile, path_flushing); - if (r) sysdie("link feedfile %s to flushing file %s", - feedfile, path_flushing); + if (r) sysfatal("link feedfile %s to flushing file %s", + feedfile, path_flushing); /* => Hardlinked */ xunlink(feedfile, "old feedfile link"); @@ -2111,19 +2445,21 @@ static void statemc_setstate(StateMachineState newsms, int periods, const char *xtra= ""; switch (sms) { - case sm_FLUSHING: sm_FLUSHFAILED: + case sm_FLUSHING: + case sm_FLUSHFAILED: if (!main_input_file) xtra= "-ABSENT"; break; - case sm_SEPARATED: case sm_DROPPING: + case sm_SEPARATED: + case sm_DROPPING: xtra= flushing_input_file->rd ? "-1" : "-2"; break; default:; } if (periods) { - info("%s%s[%d] %s",forlog,xtra,periods,why); + info("state %s%s[%d] %s",forlog,xtra,periods,why); } else { - info("%s%s %s",forlog,xtra,why); + info("state %s%s %s",forlog,xtra,why); } } @@ -2225,7 +2561,7 @@ static void search_backlog_file(void) { int r, i; struct stat stab; const char *oldest_path=0; - time_t oldest_mtime, now; + time_t oldest_mtime=0, now; if (backlog_input_file) return; @@ -2235,9 +2571,9 @@ static void search_backlog_file(void) { switch (r) { case GLOB_ABORTED: - sysdie("failed to expand backlog pattern %s", globpat_backlog); + sysfatal("failed to expand backlog pattern %s", globpat_backlog); case GLOB_NOSPACE: - die("out of memory expanding backlog pattern %s", globpat_backlog); + fatal("out of memory expanding backlog pattern %s", globpat_backlog); case 0: for (i=0; i= 0 && - until_backlog_nextscan > backlog_spontaneous_rescan_periods) - until_backlog_nextscan= backlog_spontaneous_rescan_periods; + if (backlog_spontrescan_periods >= 0 && + until_backlog_nextscan > backlog_spontrescan_periods) + until_backlog_nextscan= backlog_spontrescan_periods; debug("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s", age, age_deficiency, until_backlog_nextscan, oldest_path); + + xfree: + globfree(&gl); return; } @@ -2319,8 +2662,11 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { assert(inndcomm_child); assert(fd == inndcomm_sentinel_fd); int status= xwaitpid(&inndcomm_child, "inndcomm"); + inndcomm_child= 0; + cancel_fd_read_except(fd); xclose_perhaps(&fd, "inndcomm sentinel pipe",0); + inndcomm_sentinel_fd= 0; assert(!flushing_input_file); @@ -2331,7 +2677,7 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { goto failed; case INNDCOMMCHILD_ESTATUS_NONESUCH: - warn("feed has been dropped by innd, finishing up"); + notice("feed has been dropped by innd, finishing up"); flushing_input_file= main_input_file; tailing_queue_readable(flushing_input_file); /* we probably previously returned EAGAIN from our fake read method @@ -2380,6 +2726,7 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { failed: SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry"); + return OOP_CONTINUE; } static void inndcommfail(const char *what) { @@ -2396,7 +2743,7 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ assert(!inndcomm_child); assert(!inndcomm_sentinel_fd); - if (pipe(pipefds)) sysdie("create pipe for inndcomm child sentinel"); + if (pipe(pipefds)) sysfatal("create pipe for inndcomm child sentinel"); inndcomm_child= xfork("inndcomm child"); @@ -2408,6 +2755,12 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0); /* parent spots the autoclose of pipefds[1] when we die or exit */ + if (simulate_flush>=0) { + warn("SIMULATING flush child status %d", simulate_flush); + if (simulate_flush>128) raise(simulate_flush-128); + else exit(simulate_flush); + } + alarm(inndcomm_flush_timeout); r= ICCopen(); if (r) inndcommfail("connect"); r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit"); @@ -2418,6 +2771,8 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ exit(INNDCOMMCHILD_ESTATUS_FAIL); } + simulate_flush= -1; + xclose(pipefds[1], "inndcomm sentinel child's end",0); inndcomm_sentinel_fd= pipefds[0]; assert(inndcomm_sentinel_fd); @@ -2453,44 +2808,77 @@ static void postfork(void) { postfork_stdio(defer, "defer file ", path_defer); } -#define EVERY(what, interval_sec, interval_usec, body) \ - static struct timeval what##_timeout = { interval_sec, interval_usec }; \ - static void what##_schedule(void); \ - static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \ - { body } \ - what##_schedule(); \ - } \ - static void what##_schedule(void) { \ - loop->on_time(loop, what##_timeout, what##_timedout, 0); \ - } +typedef struct Every Every; +struct Every { + struct timeval interval; + int fixed_rate; + void (*f)(void); +}; -EVERY(filepoll, 5,0, { - if (main_input_file && main_input_file->readable_callback) - filemon_callback(main_input_file); -}); +static void every_schedule(Every *e, struct timeval base); + +static void *every_happens(oop_source *lp, struct timeval base, void *e_v) { + Every *e= e_v; + e->f(); + if (!e->fixed_rate) xgettimeofday(&base); + every_schedule(e, base); + return OOP_CONTINUE; +} -#define DEBUGF_IPF(wh) " " #wh "=%p/%s:ip=%ld,off=%ld,fd=%d%s" -#define DEBUG_IPF(wh) \ - wh##_input_file, debug_ipf_path(wh##_input_file), \ - wh##_input_file->inprogress, (long)wh##_input_file->offset, \ - wh##_input_file->fd, wh##_input_file->rd ? "+" : "" -static const char *debug_ipf_path(InputFile *ipf) { - char *slash= strrchr(ipf->path,'/'); - return slash ? slash+1 : ipf->path; +static void every_schedule(Every *e, struct timeval base) { + struct timeval when; + timeradd(&base, &e->interval, &when); + loop->on_time(loop, when, every_happens, e); } -EVERY(period, -1,0, { +static void every(int interval, int fixed_rate, void (*f)(void)) { + Every *e= xmalloc(sizeof(*e)); + e->interval.tv_sec= interval; + e->interval.tv_usec= 0; + e->fixed_rate= fixed_rate; + e->f= f; + struct timeval now; + xgettimeofday(&now); + every_schedule(e, now); +} + +static void filepoll(void) { + filemon_callback(main_input_file); + filemon_callback(flushing_input_file); +} + +static char *debug_report_ipf(InputFile *ipf) { + if (!ipf) return xasprintf("none"); + + const char *slash= strrchr(ipf->path,'/'); + const char *path= slash ? slash+1 : ipf->path; + + return xasprintf("%p/%s:ip=%ld,off=%ld,fd=%d%s", + ipf, path, + ipf->inprogress, (long)ipf->offset, + ipf->fd, ipf->rd ? "+" : ""); +} + +static void period(void) { + char *dipf_main= debug_report_ipf(main_input_file); + char *dipf_flushing= debug_report_ipf(flushing_input_file); + char *dipf_backlog= debug_report_ipf(backlog_input_file); + debug("PERIOD" " sms=%s[%d] conns=%d queue=%d until_connect=%d" - " input_files" DEBUGF_IPF(main) DEBUGF_IPF(old) DEBUGF_IPF(flushing) - " children connecting=%ld inndcomm_child" + " input_files main:%s old:%s flushing:%s" + " children connecting=%ld inndcomm=%ld" , sms_names[sms], sm_period_counter, - queue.count, conns.count, until_connect, - DEBUG_IPF(main), DEBUG_IPF(flushing), DEBUG_IPF(flushing), + conns.count, queue.count, until_connect, + dipf_main, dipf_flushing, dipf_backlog, (long)connecting_child, (long)inndcomm_child ); + free(dipf_main); + free(dipf_flushing); + free(dipf_backlog); + if (until_connect) until_connect--; poll_backlog_file(); @@ -2498,7 +2886,7 @@ EVERY(period, -1,0, { statemc_period_poll(); check_assign_articles(); check_idle_conns(); -}); +} /*========== option parsing ==========*/ @@ -2506,7 +2894,7 @@ EVERY(period, -1,0, { static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0); static void vbadusage(const char *fmt, va_list al) { char *m= xvasprintf(fmt,al); - fprintf(stderr, "bad usage: %s\n%s" + fprintf(stderr, "bad usage: %s\n" "say --help for help, or read the manpage\n", m); if (become_daemon) @@ -2599,7 +2987,7 @@ static void print_options(const Option *options, FILE *f) { for (o=options; o->shrt || o->lng; o++) { char shrt[2] = { o->shrt, 0 }; char *optspec= xasprintf("%s%s%s%s%s", - o->shrt ? "-" : "", o->shrt, + o->shrt ? "-" : "", shrt, o->shrt && o->lng ? "|" : "", DELIMPERHAPS("--", o->lng)); fprintf(f, " %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg)); @@ -2641,10 +3029,14 @@ static void op_seconds(const Option *o, const char *val) { double v= strtod(val,&ep); if (ep==val) badusage("bad time/duration value for %s",o->lng); - if (!*ep || !strcmp(ep,"s")) unit= 1; - else if (!strcmp(ep,"m")) unit= 60; - else if (!strcmp(ep,"h")) unit= 3600; - else if (!strcmp(ep,"d")) unit= 86400; + if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1; + else if (!strcmp(ep,"m") || !strcmp(ep,"min")) unit= 60; + else if (!strcmp(ep,"h") || !strcmp(ep,"hour")) unit= 3600; + else if (!strcmp(ep,"d") || !strcmp(ep,"day")) unit= 86400; + else if (!strcmp(ep,"das")) unit= 10; + else if (!strcmp(ep,"hs")) unit= 100; + else if (!strcmp(ep,"ks")) unit= 1000; + else if (!strcmp(ep,"Ms")) unit= 1000000; else badusage("bad units %s for time/duration value for %s",ep,o->lng); v *= unit; @@ -2653,13 +3045,6 @@ static void op_seconds(const Option *o, const char *val) { *store= v; } -static void op_periods_rndup(const Option *o, const char *val) { - int *store= o->store; - op_seconds(o,val); - *store += period_seconds-1; - *store /= period_seconds; -} - static void op_setint(const Option *o, const char *val) { int *store= o->store; *store= o->intval; @@ -2670,26 +3055,35 @@ static void op_setint(const Option *o, const char *val) { static void help(const Option *o, const char *val); static const Option innduct_options[]= { -{'f',"feedfile", "F", &feedfile, op_string }, -{'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 }, -{0,"help", 0, help }, - -{0,"max-connections", "N", &max_connections, op_integer }, -{0,"max-queue-per-conn", "N", &max_queue_per_conn, op_integer }, - - -{0,"streaming", 0, &try_stream, op_setint, 1 }, -{0,"no-streaming", 0, &try_stream, op_setint, 0 }, -{'P',"port", "PORT",&port, op_integer }, -{0,"inndconf", "F", &inndconffile, op_string }, -{0,"no-daemon", 0, &become_daemon, op_setint, 0 }, +{'f',"feedfile", "F", &feedfile, op_string }, +{'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 }, +{0,"no-daemon", 0, &become_daemon, op_setint, 0 }, +{0,"no-streaming", 0, &try_stream, op_setint, 0 }, +{0,"inndconf", "F", &inndconffile, op_string }, +{'P',"port", "PORT", &port, op_integer }, +{0,"help", 0, 0, help }, + +{0,"max-connections", "N", &max_connections, op_integer }, +{0,"max-queue-per-conn", "N", &max_queue_per_conn, op_integer }, +{0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer }, +{0,"period-interval", "TIME", &period_seconds, op_seconds }, + +{0,"connection-timeout", "TIME", &connection_setup_timeout, op_seconds }, +{0,"stuck-flush-timeout","TIME", &inndcomm_flush_timeout, op_seconds }, + +{0,"no-check-proportion", "PERCENT", &nocheck_thresh, op_double }, +{0,"no-check-response-time","ARTICLES", &nocheck_decay, op_double }, + +{0,"reconnect-interval", "PERIOD", &reconnect_delay_periods, op_seconds }, +{0,"flush-retry-interval", "PERIOD", &flushfail_retry_periods, op_seconds }, +{0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds }, +{0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds}, +{0,"max-flush-interval", "PERIOD", &spontaneous_flush_periods,op_seconds }, +{0,"idle-timeout", "PERIOD", &need_activity_periods, op_seconds }, + +{0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio, op_double }, +{0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer }, -{0,"no-check-proportion","PERCENT", &nocheck_thresh_pct, op_double }, -{0,"no-check-filter", "ARTICLES", &nocheck_decay_articles, op_double }, - -{0,"reconnect-interval", "TIME", &reconnect_delay_periods, op_periods_rndup }, -{0,"flush-retry-interval","TIME", &flushfail_retry_periods, op_periods_rndup }, -{0,"inndcomm-timeout", "TIME", &inndcomm_flush_timeout, op_seconds }, {0,0} }; @@ -2708,10 +3102,15 @@ static void help(const Option *o, const char *val) { exit(0); } +static void convert_to_periods_rndup(int *store) { + *store += period_seconds-1; + *store /= period_seconds; +} + int main(int argc, char **argv) { if (!argv[1]) { printusage(stderr); - exit(12); + exit(8); } parse_options(innduct_options, &argv); @@ -2727,24 +3126,33 @@ int main(int argc, char **argv) { if (!remote_host) remote_host= sitename; - if (nocheck_thresh_pct < 0 || nocheck_thresh_pct > 100) + if (nocheck_thresh < 0 || nocheck_thresh > 100) badusage("nocheck threshold percentage must be between 0..100"); - nocheck_thresh= nocheck_thresh_pct * 0.01; + nocheck_thresh *= 0.01; - if (nocheck_decay_articles < 0.1) + if (nocheck_decay < 0.1) badusage("nocheck decay articles must be at least 0.1"); - nocheck_decay= 1 - 1/nocheck_decay_articles; - - if (!pathoutgoing) - pathoutgoing= innconf->pathoutgoing; - innconf_read(inndconffile); - - if (!feedfile) - feedfile= xasprintf("%s/%s",pathoutgoing,sitename); - else if (!feedfile[0]) + nocheck_decay= pow(0.5, 1.0/nocheck_decay); + + convert_to_periods_rndup(&reconnect_delay_periods); + convert_to_periods_rndup(&flushfail_retry_periods); + convert_to_periods_rndup(&backlog_retry_minperiods); + convert_to_periods_rndup(&backlog_spontrescan_periods); + convert_to_periods_rndup(&spontaneous_flush_periods); + convert_to_periods_rndup(&need_activity_periods); + + if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100) + badusage("bad input data ratio must be between 0..100"); + max_bad_data_ratio *= 0.01; + + if (!feedfile) { + innconf_read(inndconffile); + feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename); + } else if (!feedfile[0]) { badusage("feed filename must be nonempty"); - else if (feedfile[strlen(feedfile)-1]=='/') + } else if (feedfile[strlen(feedfile)-1]=='/') { feedfile= xasprintf("%s%s",feedfile,sitename); + } const char *feedfile_forbidden= "?*[~#"; int c; @@ -2754,6 +3162,12 @@ int main(int argc, char **argv) { /* set things up */ + path_lock= xasprintf("%s_lock", feedfile); + path_flushing= xasprintf("%s_flushing", feedfile); + path_defer= xasprintf("%s_defer", feedfile); + path_control= xasprintf("%s_control", feedfile); + globpat_backlog= xasprintf("%s_backlog*", feedfile); + oop_source_sys *sysloop= oop_sys_new(); if (!sysloop) sysdie("could not create liboop event loop"); loop= (oop_source*)sysloop; @@ -2772,7 +3186,7 @@ int main(int argc, char **argv) { openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS); int null= open("/dev/null",O_RDWR); - if (null<0) sysdie("failed to open /dev/null"); + if (null<0) sysfatal("failed to open /dev/null"); dup2(null,0); dup2(null,1); dup2(null,2); @@ -2782,21 +3196,30 @@ int main(int argc, char **argv) { if (child1) _exit(0); pid_t sid= setsid(); - if (sid != child1) sysdie("setsid failed"); + if (sid != child1) sysfatal("setsid failed"); pid_t child2= xfork("daemonise second fork"); if (child2) _exit(0); } + self_pid= getpid(); + if (self_pid==-1) sysdie("getpid"); + + statemc_lock(); + notice("starting"); + if (!become_daemon) + control_stdio(); + + control_init(); + if (!filemon_method_init()) { warn("no file monitoring available, polling"); - filepoll_schedule(); + every(5,0,filepoll); } - period_timeout.tv_sec= period_seconds; - period_schedule(); + every(period_seconds,1,period); statemc_init();