/*
- * warning if no inotify
- * inotify not working ?
- * some per-conn info thing for control
-
* todo
-
- * - actually do something with readable on control master
- * - option for realsockdir
- * - option for filepoll
- * - option for no inotify
+ * - some per-conn info thing for control
* - manpage: document control master stuff
- * - manpage: innconf is used for communicating with innd
- * - debug this:
- * build-lfs/backends/innduct --no-daemon -f `pwd`/fee sit dom
+ * - admin-initiated flush
+ *
+ * debugging rune:
+ * build-lfs/backends/innduct --no-daemon -C ../inn.conf -f `pwd`/fee sit localhost
*/
/*
static void open_defer(void);
static void close_defer(void);
static void search_backlog_file(void);
+static void preterminate(void);
+static void defraise(int signo);
static void inputfile_reading_start(InputFile *ipf);
static void inputfile_reading_stop(InputFile *ipf);
static const char *sitename, *remote_host;
static const char *feedfile, *realsockdir="/tmp/innduct.control";
static int quiet_multiple=0;
-static int become_daemon=1;
+static int become_daemon=1, try_filemon=1;
static int try_stream=1;
static int port=119;
static const char *inndconffile;
static int max_queue_per_conn=200;
static int target_max_feedfile_size=100000;
static int period_seconds=60;
+static int filepoll_seconds=5;
static int connection_setup_timeout=200;
static int inndcomm_flush_timeout=100;
/* initialisation to 0 is good */
static int until_connect, until_backlog_nextscan;
static double accept_proportion;
-static int nocheck, nocheck_reported;
+static int nocheck, nocheck_reported, in_child;
/* for simulation, debugging, etc. */
int simulate_flush= -1;
#define diewrap(fn, pfx, sysloglevel, err, estatus) \
static void fn(const char *fmt, ...) NORET_PRINTF(1,2); \
static void fn(const char *fmt, ...) { \
+ preterminate(); \
VA; \
logv(sysloglevel, pfx, err, fmt, al); \
exit(estatus); \
return now;
}
+static void xsigaction(int s, const struct sigaction *sa) {
+ int r= sigaction(s,sa,0);
+ if (r) sysdie("sigaction failed for \"%s\"", strsignal(s));
+}
+
static void xgettimeofday(struct timeval *tv_r) {
int r= gettimeofday(tv_r,0);
if (r) sysdie("gettimeofday(2) failed");
CCMD(period) { period(); }
CCMD(setintarg) { *(int*)c->xdata= atoi(arg); }
CCMD(setint) { *(int*)c->xdata= c->xval; }
+CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); }
+
+CCMD(stop) {
+ preterminate();
+ notice("terminating (CTRL%d)",cc->fd);
+ defraise(SIGTERM);
+ abort();
+}
static const ControlCommand control_commands[]= {
{ "h", ccmd_help },
{ "p", ccmd_period },
+ { "stop", ccmd_stop },
+
+#define POKES(cmd,func) \
+ { cmd "sm", func, &sm_period_counter, 1 }, \
+ { cmd "conn", func, &until_connect, 0 }, \
+ { cmd "blscan", func, &until_backlog_nextscan, 0 },
+POKES("prod ", ccmd_setint_period)
+POKES("next ", ccmd_setint)
+
{ "pretend flush", ccmd_setintarg, &simulate_flush },
- { "poke sm", ccmd_setint, &sm_period_counter, 1 },
- { "poke conn", ccmd_setint, &until_connect, 0 },
- { "poke blscan", ccmd_setint, &until_backlog_nextscan, 0 },
{ "wedge blscan", ccmd_setint, &until_backlog_nextscan, -1 },
{ 0 }
};
uid_t self= geteuid();
if (!S_ISDIR(stab.st_mode) ||
stab.st_uid != self ||
- stab.st_mode & 0077) {
+ stab.st_mode & 0007) {
warn("no control socket, because real socket directory"
" is somehow wrong (ISDIR=%d, uid=%lu (exp.%lu), mode %lo)",
!!S_ISDIR(stab.st_mode),
return 1;
}
-static void notice_processed(InputFile *ipf, const char *what,
- const char *spec) {
+static void notice_processed(InputFile *ipf, int completed,
+ const char *what, const char *spec) {
+ if (!ipf) return; /* allows preterminate to be lazy */
+
#define RCI_NOTHING(x) /* nothing */
#define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
#define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
#define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
- info("processed %s%s read=%d (+bl=%d,+err=%d)"
+ char *inprog= completed
+ ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
+ : xasprintf(" inprogress=%ld", ipf->inprogress);
+
+ info("%s %s%s read=%d (+bl=%d,+err=%d)%s"
" offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
,
- what, spec,
- ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err,
+ completed?"completed":"processed", what, spec,
+ ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err, inprog,
CNT(Unchecked,sent) + CNT(Unsolicited,sent)
, CNT(Unchecked,sent), CNT(Unsolicited,sent),
CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_VALS)
);
+ free(inprog);
+
#undef CNT
}
const char *under= strchr(slash, '_');
const char *rest= under ? under+1 : leaf;
if (!strncmp(rest,"backlog",7)) rest += 7;
- notice_processed(ipf,"backlog ",rest);
+ notice_processed(ipf,1,"backlog ",rest);
close_input_file(ipf);
if (unlink(ipf->path)) {
assert(sms==sm_SEPARATED || sms==sm_DROPPING);
- notice_processed(ipf,"feedfile","");
+ notice_processed(ipf,1,"feedfile","");
close_defer();
debug("backlog scan: none");
if (sms==sm_DROPPED) {
+ preterminate();
notice("feed dropped and our work is complete");
int r= unlink(path_control);
return;
}
+/*---------- shutdown and signal handling ----------*/
+
+static void preterminate(void) {
+ if (in_child) return;
+ notice_processed(main_input_file,0,"feedfile","");
+ notice_processed(flushing_input_file,0,"flushing file","");
+ if (backlog_input_file)
+ notice_processed(backlog_input_file,0, "backlog file ",
+ backlog_input_file->path);
+}
+
+static int signal_self_pipe[2];
+static sig_atomic_t terminate_sig_flag;
+
+static void defraise(int signo) {
+ struct sigaction sa;
+ memset(&sa,0,sizeof(sa));
+ sa.sa_handler= SIG_DFL;
+ xsigaction(signo,&sa);
+ raise(signo);
+}
+
+static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
+ assert(fd=signal_self_pipe[0]);
+ char buf[PIPE_BUF];
+ int r= read(signal_self_pipe[0], buf, sizeof(buf));
+ if (r<0 && !isewouldblock(errno)) sysdie("failed to read signal self pipe");
+ if (r==0) die("eof on signal self pipe");
+ if (terminate_sig_flag) {
+ preterminate();
+ notice("terminating (%s)", strsignal(terminate_sig_flag));
+ defraise(terminate_sig_flag);
+ abort();
+ }
+ return OOP_CONTINUE;
+}
+
+static void sigarrived_handler(int signum) {
+ static char x;
+ switch (signum) {
+ case SIGINT: case SIGTERM:
+ if (!terminate_sig_flag) terminate_sig_flag= signum;
+ break;
+ default:
+ abort();
+ }
+ write(signal_self_pipe[1],&x,1);
+}
+
+static void init_signals(void) {
+ if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
+ sysdie("could not ignore SIGPIPE");
+
+ if (pipe(signal_self_pipe)) sysfatal("create self-pipe for signals");
+
+ xsetnonblock(signal_self_pipe[0],1);
+ xsetnonblock(signal_self_pipe[1],1);
+
+ struct sigaction sa;
+ memset(&sa,0,sizeof(sa));
+ sa.sa_handler= sigarrived_handler;
+ sa.sa_flags= SA_RESTART;
+ xsigaction(SIGTERM,&sa);
+ xsigaction(SIGINT,&sa);
+
+ on_fd_read_except(signal_self_pipe[0], sigarrived_event);
+}
+
/*========== flushing the feed ==========*/
static pid_t inndcomm_child;
}
static void postfork(void) {
+ in_child= 1;
+
if (signal(SIGPIPE, SIG_DFL) == SIG_ERR)
sysdie("(in child) failed to reset SIGPIPE");
{'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 },
{0,"no-daemon", 0, &become_daemon, op_setint, 0 },
{0,"no-streaming", 0, &try_stream, op_setint, 0 },
+{0,"no-filemon", 0, &try_filemon, op_setint, 0 },
{'C',"inndconf", "F", &inndconffile, op_string },
{'P',"port", "PORT", &port, op_integer },
+{0,"ctrl-sock-dir", 0, &realsockdir, op_string },
{0,"help", 0, 0, help },
{0,"max-connections", "N", &max_connections, op_integer },
{0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer },
{0,"period-interval", "TIME", &period_seconds, op_seconds },
-{0,"connection-timeout", "TIME", &connection_setup_timeout, op_seconds },
-{0,"stuck-flush-timeout","TIME", &inndcomm_flush_timeout, op_seconds },
+{0,"connection-timeout", "TIME", &connection_setup_timeout, op_seconds },
+{0,"stuck-flush-timeout", "TIME", &inndcomm_flush_timeout, op_seconds },
+{0,"feedfile-poll", "TIME", &filepoll_seconds, op_seconds },
{0,"no-check-proportion", "PERCENT", &nocheck_thresh, op_double },
{0,"no-check-response-time","ARTICLES", &nocheck_decay, op_double },
if (!sysloop) sysdie("could not create liboop event loop");
loop= (oop_source*)sysloop;
- if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
- sysdie("could not ignore SIGPIPE");
-
LIST_INIT(conns);
LIST_INIT(queue);
statemc_lock();
+ init_signals();
+
notice("starting");
if (!become_daemon)
control_init();
- if (!filemon_method_init()) {
- warn("filemon: no file monitoring available, polling");
- every(5,0,filepoll);
+ int filemon_ok= 0;
+ if (!try_filemon) {
+ notice("filemon: suppressed by command line option, polling");
+ } else {
+ filemon_ok= filemon_method_init();
+ if (!filemon_ok)
+ warn("filemon: no file monitoring available, polling");
}
+ if (!filemon_ok)
+ every(filepoll_seconds,0,filepoll);
every(period_seconds,1,period);