+++ /dev/null
-/*
- * innduct
- * tailing reliable realtime streaming feeder for inn
- * innduct.c - main program, option parsing and startup
- *
- * Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * (I believe that when you compile and link this as part of the inn2
- * build, with the Makefile runes I have provided, all the libraries
- * and files which end up included in innduct are licence-compatible
- * with GPLv3. If not then please let me know. -Ian Jackson.)
- */
-
-#include "innduct.h"
-
-const char *sms_names[]= {
-#define SMS_DEF_NAME(s) #s ,
- SMS_LIST(SMS_DEF_NAME)
- 0
-};
-
-
-/*----- general operational variables -----*/
-
-/* main initialises */
-oop_source *loop;
-ConnList conns;
-char *path_lock, *path_flushing, *path_defer, *path_dump;
-char *globpat_backlog;
-pid_t self_pid;
-int *lowvol_perperiod;
-int lowvol_circptr;
-int lowvol_total; /* does not include current period */
-
-/*---------- configuration option variables ----------*/
-/* when changing defaults, remember to update the manpage */
-
-const char *sitename, *remote_host;
-const char *feedfile, *path_run, *path_cli, *path_cli_dir;
-int quiet_multiple=0;
-int interactive=0, try_filemon=1;
-int try_stream=1;
-int port=119;
-const char *inndconffile;
-
-int max_connections=10;
-int max_queue_per_conn=200;
-int target_max_feedfile_size=100000;
-int period_seconds=30;
-int filepoll_seconds=5;
-int max_queue_per_ipf=-1;
-
-int connection_setup_timeout=200;
-int inndcomm_flush_timeout=100;
-
-double nocheck_thresh= 95.0; /* converted from percentage by main */
-double nocheck_decay= 100; /* conv'd from articles to lambda by main */
-
-/* all these are initialised to seconds, and converted to periods in main */
-int reconnect_delay_periods=1000;
-int flushfail_retry_periods=1000;
-int backlog_retry_minperiods=100;
-int backlog_spontrescan_periods=300;
-int spontaneous_flush_periods=100000;
-int max_separated_periods=2000;
-int need_activity_periods=1000;
-int lowvol_thresh=3;
-int lowvol_periods=1000;
-
-double max_bad_data_ratio= 1; /* conv'd from percentage by main */
-int max_bad_data_initial= 30;
- /* in one corrupt 4096-byte block the number of newlines has
- * mean 16 and standard deviation 3.99. 30 corresponds to z=+3.5 */
-
-/*========== main program ==========*/
-
-static void postfork_inputfile(InputFile *ipf) {
- if (!ipf) return;
- xclose(ipf->fd, "(in child) input file ", ipf->path);
-}
-
-static void postfork_stdio(FILE *f, const char *what, const char *what2) {
- /* we have no stdio streams that are buffered long-term */
- if (!f) return;
- if (fclose(f)) syscrash("(in child) close %s%s", what, what2?what2:0);
-}
-
-void postfork(void) {
- in_child= 1;
-
- xsigsetdefault(SIGTERM);
- xsigsetdefault(SIGINT);
- xsigsetdefault(SIGPIPE);
- if (terminate_sig_flag) raise(terminate_sig_flag);
-
- postfork_inputfile(main_input_file);
- postfork_inputfile(flushing_input_file);
-
- Conn *conn;
- FOR_CONN(conn)
- conn_closefd(conn,"(in child) ");
-
- postfork_stdio(defer, "defer file ", path_defer);
-}
-
-typedef struct Every Every;
-struct Every {
- struct timeval interval;
- int fixed_rate;
- void (*f)(void);
-};
-
-static void every_schedule(Every *e, struct timeval base);
-
-static void *every_happens(oop_source *lp, struct timeval base, void *e_v) {
- Every *e= e_v;
- e->f();
- if (!e->fixed_rate) xgettimeofday(&base);
- every_schedule(e, base);
- return OOP_CONTINUE;
-}
-
-static void every_schedule(Every *e, struct timeval base) {
- struct timeval when;
- timeradd(&base, &e->interval, &when);
- loop->on_time(loop, when, every_happens, e);
-}
-
-static void every(int interval, int fixed_rate, void (*f)(void)) {
- NEW_DECL(Every *,e);
- e->interval.tv_sec= interval;
- e->interval.tv_usec= 0;
- e->fixed_rate= fixed_rate;
- e->f= f;
- struct timeval now;
- xgettimeofday(&now);
- every_schedule(e, now);
-}
-
-void period(void) {
- char *dipf_main= dbg_report_ipf(main_input_file);
- char *dipf_flushing= dbg_report_ipf(flushing_input_file);
- char *dipf_backlog= dbg_report_ipf(backlog_input_file);
-
- dbg("PERIOD"
- " sms=%s[%d] conns=%d until_connect=%d"
- " input_files main:%s flushing:%s backlog:%s[%d]"
- " children connecting=%ld inndcomm=%ld lowvol_total=%d"
- ,
- sms_names[sms], until_flush, conns.count, until_connect,
- dipf_main, dipf_flushing, dipf_backlog, until_backlog_nextscan,
- (long)connecting_child, (long)inndcomm_child, lowvol_total
- );
-
- free(dipf_main);
- free(dipf_flushing);
- free(dipf_backlog);
-
- if (until_connect) until_connect--;
-
- inputfile_queue_check_expired(backlog_input_file);
- poll_backlog_file();
- if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
- statemc_period_poll();
- check_assign_articles();
- check_idle_conns();
-}
-
-
-/*========== option parsing ==========*/
-
-static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
-static void vbadusage(const char *fmt, va_list al) {
- char *m= xvasprintf(fmt,al);
- fprintf(stderr, "bad usage: %s\n"
- "say --help for help, or read the manpage\n",
- m);
- if (interactive < 2)
- syslog(LOG_ERR,"innduct: invoked with bad usage: %s",m);
- exit(8);
-}
-
-/*---------- generic option parser ----------*/
-
-static void badusage(const char *fmt, ...) NORET_PRINTF(1,2);
-static void badusage(const char *fmt, ...) {
- va_list al;
- va_start(al,fmt);
- vbadusage(fmt,al);
-}
-
-enum OptFlags {
- of_seconds= 001000u,
- of_boolean= 002000u,
-};
-
-typedef struct Option Option;
-typedef void OptionParser(const Option*, const char *val);
-
-struct Option {
- int shrt;
- const char *lng, *formarg;
- void *store;
- OptionParser *fn;
- int intval;
-};
-
-static void parse_options(const Option *options, char ***argvp) {
- /* on return *argvp is first non-option arg; argc is not updated */
-
- for (;;) {
- const char *arg= *++(*argvp);
- if (!arg) break;
- if (*arg != '-') break;
- if (!strcmp(arg,"--")) { arg= *++(*argvp); break; }
- int a;
- while ((a= *++arg)) {
- const Option *o;
- if (a=='-') {
- arg++;
- char *equals= strchr(arg,'=');
- unsigned len= equals ? (size_t)(equals - arg) : strlen(arg);
- for (o=options; o->shrt || o->lng; o++)
- if (strlen(o->lng) == len && !memcmp(o->lng,arg,len))
- goto found_long;
- badusage("unknown long option --%s",arg);
- found_long:
- if (!o->formarg) {
- if (equals) badusage("option --%s does not take a value",o->lng);
- arg= 0;
- } else if (equals) {
- arg= equals+1;
- } else {
- arg= *++(*argvp);
- if (!arg) badusage("option --%s needs a value for %s",
- o->lng, o->formarg);
- }
- o->fn(o, arg);
- break; /* eaten the whole argument now */
- }
- for (o=options; o->shrt || o->lng; o++)
- if (a == o->shrt)
- goto found_short;
- badusage("unknown short option -%c",a);
- found_short:
- if (!o->formarg) {
- o->fn(o,0);
- } else {
- if (!*++arg) {
- arg= *++(*argvp);
- if (!arg) badusage("option -%c needs a value for %s",
- o->shrt, o->formarg);
- }
- o->fn(o,arg);
- break; /* eaten the whole argument now */
- }
- }
- }
-}
-
-#define DELIMPERHAPS(delim,str) (str) ? (delim) : "", (str) ? (str) : ""
-
-static void print_options(const Option *options, FILE *f) {
- const Option *o;
- for (o=options; o->shrt || o->lng; o++) {
- char shrt[2] = { o->shrt, 0 };
- char *optspec= xasprintf("%s%s%s%s%s",
- o->shrt ? "-" : "", shrt,
- o->shrt && o->lng ? "|" : "",
- DELIMPERHAPS("--", o->lng));
- fprintf(f, " %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
- free(optspec);
- }
-}
-
-/*---------- specific option types ----------*/
-
-static void op_integer(const Option *o, const char *val) {
- char *ep;
- errno= 0;
- unsigned long ul= strtoul(val,&ep,10);
- if (*ep || ep==val || errno || ul>INT_MAX)
- badusage("bad integer value for %s",o->lng);
- int *store= o->store;
- *store= ul;
-}
-
-static void op_double(const Option *o, const char *val) {
- int *store= o->store;
- char *ep;
- errno= 0;
- *store= strtod(val, &ep);
- if (*ep || ep==val || errno)
- badusage("bad floating point value for %s",o->lng);
-}
-
-static void op_string(const Option *o, const char *val) {
- const char **store= o->store;
- *store= val;
-}
-
-static void op_seconds(const Option *o, const char *val) {
- int *store= o->store;
- char *ep;
- int unit;
-
- double v= strtod(val,&ep);
- if (ep==val) badusage("bad time/duration value for %s",o->lng);
-
- if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1;
- else if (!strcmp(ep,"m") || !strcmp(ep,"min")) unit= 60;
- else if (!strcmp(ep,"h") || !strcmp(ep,"hour")) unit= 3600;
- else if (!strcmp(ep,"d") || !strcmp(ep,"day")) unit= 86400;
- else if (!strcmp(ep,"das")) unit= 10;
- else if (!strcmp(ep,"hs")) unit= 100;
- else if (!strcmp(ep,"ks")) unit= 1000;
- else if (!strcmp(ep,"Ms")) unit= 1000000;
- else badusage("bad units %s for time/duration value for %s",ep,o->lng);
-
- v *= unit;
- v= ceil(v);
- if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng);
- *store= v;
-}
-
-static void op_setint(const Option *o, const char *val) {
- int *store= o->store;
- *store= o->intval;
-}
-
-/*---------- specific options ----------*/
-
-static void help(const Option *o, const char *val);
-
-static const Option innduct_options[]= {
-{'f',"feedfile", "F", &feedfile, op_string },
-{'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 },
-{0,"no-daemon", 0, &interactive, op_setint, 1 },
-{0,"interactive", 0, &interactive, op_setint, 2 },
-{0,"no-streaming", 0, &try_stream, op_setint, 0 },
-{0,"no-filemon", 0, &try_filemon, op_setint, 0 },
-{'C',"inndconf", "F", &inndconffile, op_string },
-{'P',"port", "PORT", &port, op_integer },
-{0,"chdir", "DIR", &path_run, op_string },
-{0,"cli", "DIR/|PATH", &path_cli, op_string },
-{0,"help", 0, 0, help },
-
-{0,"max-connections", "N", &max_connections, op_integer },
-{0,"max-queue-per-conn", "N", &max_queue_per_conn, op_integer },
-{0,"max-queue-per-file", "N", &max_queue_per_ipf, op_integer },
-{0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer },
-{0,"period-interval", "TIME", &period_seconds, op_seconds },
-
-{0,"connection-timeout", "TIME", &connection_setup_timeout, op_seconds },
-{0,"stuck-flush-timeout", "TIME", &inndcomm_flush_timeout, op_seconds },
-{0,"feedfile-poll", "TIME", &filepoll_seconds, op_seconds },
-
-{0,"no-check-proportion", "PERCENT", &nocheck_thresh, op_double },
-{0,"no-check-response-time","ARTICLES", &nocheck_decay, op_double },
-
-{0,"reconnect-interval", "PERIOD", &reconnect_delay_periods, op_seconds },
-{0,"flush-retry-interval", "PERIOD", &flushfail_retry_periods, op_seconds },
-{0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
-{0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
-{0,"max-flush-interval", "PERIOD", &spontaneous_flush_periods,op_seconds },
-{0,"flush-finish-timeout", "PERIOD", &max_separated_periods, op_seconds },
-{0,"idle-timeout", "PERIOD", &need_activity_periods, op_seconds },
-{0,"low-volume-thresh", "PERIOD", &lowvol_thresh, op_integer },
-{0,"low-volume-window", "PERIOD", &lowvol_periods, op_seconds },
-
-{0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio, op_double },
-{0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer },
-
-{0,0}
-};
-
-static void printusage(FILE *f) {
- fputs("usage: innduct [options] site [fqdn]\n"
- "available options are:\n", f);
- print_options(innduct_options, f);
-}
-
-static void help(const Option *o, const char *val) {
- printusage(stdout);
- if (ferror(stdout) || fflush(stdout)) {
- perror("innduct: writing help");
- exit(12);
- }
- exit(0);
-}
-
-static void convert_to_periods_rndup(int *store) {
- *store += period_seconds-1;
- *store /= period_seconds;
-}
-
-static int path_ends_slash(const char *specified) {
- int l= strlen(specified);
- assert(l);
- return specified[l-1] == '/';
-}
-
-static int innduct_fatal_cleanup(void) { return 12; } /* used for libinn die */
-
-int main(int argc, char **argv) {
- /* set up libinn logging */
- message_program_name= "innduct";
- message_fatal_cleanup= innduct_fatal_cleanup;
-
-#define INNLOGSET_CALL(fn, pfx, sysloglevel) \
- message_handlers_##fn(1, duct_log_##fn);
- INNLOGSETS(INNLOGSET_CALL)
-
- if (!argv[1]) {
- printusage(stderr);
- exit(8);
- }
-
- parse_options(innduct_options, &argv);
-
- /* arguments */
-
- sitename= *argv++;
- if (!sitename) badusage("need site name argument");
-
- if (*argv) remote_host= *argv++;
- else remote_host= sitename;
-
- if (*argv) badusage("too many non-option arguments");
-
- /* defaults */
-
- int r= innconf_read(inndconffile);
- if (!r) badusage("could not read inn.conf");
-
- if (!remote_host) remote_host= sitename;
-
- if (nocheck_thresh < 0 || nocheck_thresh > 100)
- badusage("nocheck threshold percentage must be between 0..100");
- nocheck_thresh *= 0.01;
-
- if (nocheck_decay < 0.1)
- badusage("nocheck decay articles must be at least 0.1");
- nocheck_decay= pow(0.5, 1.0/nocheck_decay);
-
- convert_to_periods_rndup(&reconnect_delay_periods);
- convert_to_periods_rndup(&flushfail_retry_periods);
- convert_to_periods_rndup(&backlog_retry_minperiods);
- convert_to_periods_rndup(&backlog_spontrescan_periods);
- convert_to_periods_rndup(&spontaneous_flush_periods);
- convert_to_periods_rndup(&max_separated_periods);
- convert_to_periods_rndup(&need_activity_periods);
- convert_to_periods_rndup(&lowvol_periods);
-
- if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
- badusage("bad input data ratio must be between 0..100");
- max_bad_data_ratio *= 0.01;
-
- if (!path_run)
- path_run= innconf->pathrun;
-
- if (!feedfile) feedfile= sitename;
- if (!feedfile[0]) badusage("feed filename, if specified, must be nonempty");
- if (path_ends_slash(feedfile))
- feedfile= xasprintf("%s%s", feedfile, sitename);
- if (feedfile[0] != '/')
- feedfile= xasprintf("%s/%s", innconf->pathoutgoing, feedfile);
-
- if (!path_cli) {
- path_cli_dir= "innduct";
- } else if (!path_cli[0] || !strcmp(path_cli,"none")) {
- path_cli= 0; /* ok, don't then */
- } else if (path_ends_slash(path_cli)) {
- path_cli_dir= xasprintf("%.*s", strlen(path_cli)-1, path_cli);
- }
- if (path_cli_dir)
- path_cli= xasprintf("%s/%s", path_cli_dir, sitename);
-
- if (max_queue_per_ipf<0)
- max_queue_per_ipf= max_queue_per_conn * 2;
-
- const char *feedfile_forbidden= "?*[~#";
- int c;
- while ((c= *feedfile_forbidden++))
- if (strchr(feedfile, c))
- badusage("feed filename may not contain metacharacter %c",c);
-
- int i;
- lowvol_perperiod= xcalloc(sizeof(*lowvol_perperiod), lowvol_periods);
- for (i=0; i<lowvol_periods; i++) {
- lowvol_perperiod[i]= lowvol_thresh;
- lowvol_total += lowvol_thresh;
- }
- lowvol_total -= lowvol_thresh;
-
- /* set things up */
-
- path_lock= xasprintf("%s_lock", feedfile);
- path_flushing= xasprintf("%s_flushing", feedfile);
- path_defer= xasprintf("%s_defer", feedfile);
- path_dump= xasprintf("%s_dump", feedfile);
- globpat_backlog= xasprintf("%s_backlog*", feedfile);
-
- oop_source_sys *sysloop= oop_sys_new();
- if (!sysloop) syscrash("could not create liboop event loop");
- loop= (oop_source*)sysloop;
-
- LIST_INIT(conns);
-
- if (interactive < 1) {
- for (i=3; i<255; i++)
- /* do this now before we open syslog, etc. */
- close(i);
- }
-
- logv_prefix= xasprintf("%s| ", sitename);
- if (interactive < 2) {
- openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS);
- logv_use_syslog= 1;
- }
-
- if (interactive < 1) {
- int null= open("/dev/null",O_RDWR);
- if (null<0) sysdie("failed to open /dev/null");
- dup2(null,0);
- dup2(null,1);
- dup2(null,2);
- xclose(null, "/dev/null original fd",0);
-
- pid_t child1= xfork("daemonise first fork");
- if (child1) _exit(0);
-
- pid_t sid= setsid();
- if (sid == -1) sysdie("setsid failed");
-
- pid_t child2= xfork("daemonise second fork");
- if (child2) _exit(0);
- }
-
- self_pid= getpid();
- if (self_pid==-1) syscrash("getpid");
-
- r= chdir(path_run);
- if (r) sysdie("could not chdir to pathrun %s", path_run);
-
- statemc_lock();
-
- init_signals();
-
- notice("starting");
-
- int val= 1;
- r= SMsetup(SM_PREOPEN, &val); if (!r) warn("SMsetup SM_PREOPEN failed");
- r= SMinit(); if (!r) die("storage manager initialisation (SMinit) failed");
-
- if (interactive >= 2)
- cli_stdio();
-
- cli_init();
-
- int filemon_ok= 0;
- if (!try_filemon) {
- notice("filemon: suppressed by command line option, polling");
- } else {
- filemon_ok= filemon_method_init();
- if (!filemon_ok)
- warn("filemon: no file monitoring available, polling");
- }
- if (!filemon_ok)
- every(filepoll_seconds,0,filepoll);
-
- every(period_seconds,1,period);
-
- statemc_init();
-
- /* let's go */
-
- void *run= oop_sys_run(sysloop);
- assert(run == OOP_ERROR);
- syscrash("event loop failed");
-}