3 * tailing reliable realtime streaming feeder for inn
4 * duct.c - main program, option parsing and startup
6 * Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
8 * This program is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 * (I believe that when you compile and link this as part of the inn2
22 * build, with the Makefile runes I have provided, all the libraries
23 * and files which end up included in innduct are licence-compatible
24 * with GPLv3. If not then please let me know. -Ian Jackson.)
29 const char *sms_names[]= {
30 #define SMS_DEF_NAME(s) #s ,
31 SMS_LIST(SMS_DEF_NAME)
36 /*----- general operational variables -----*/
38 /* main initialises */
41 char *path_lock, *path_flushing, *path_defer, *path_dump;
42 char *globpat_backlog;
44 int *lowvol_perperiod;
46 int lowvol_total; /* does not include current period */
47 int until_stats_log=1;
49 /*---------- configuration option variables ----------*/
50 /* when changing defaults, remember to update the manpage */
52 const char *sitename, *remote_host;
53 const char *feedfile, *path_run, *path_cli, *path_cli_dir;
55 int interactive=0, try_filemon=1;
58 const char *inndconffile;
60 int max_connections=10;
61 int max_queue_per_conn=200;
62 int target_max_feedfile_size=100000;
63 int period_seconds=30;
64 int filepoll_seconds=5;
65 int max_queue_per_ipf=-1;
67 int connection_setup_timeout=200;
68 int inndcomm_flush_timeout=100;
70 double nocheck_thresh= 95.0; /* converted from percentage by main */
71 double nocheck_decay= 100; /* conv'd from articles to lambda by main */
73 /* all these are initialised to seconds, and converted to periods in main */
74 int reconnect_delay_periods=1000;
75 int flushfail_retry_periods=1000;
76 int backlog_retry_minperiods=100;
77 int backlog_spontrescan_periods=300;
78 int spontaneous_flush_periods=100000;
79 int max_separated_periods=2000;
80 int need_activity_periods=1000;
81 int stats_log_periods=2500;
83 int lowvol_periods=1000;
85 double max_bad_data_ratio= 1; /* conv'd from percentage by main */
86 int max_bad_data_initial= 30;
87 /* in one corrupt 4096-byte block the number of newlines has
88 * mean 16 and standard deviation 3.99. 30 corresponds to z=+3.5 */
90 /*========== main program ==========*/
92 static void postfork_inputfile(InputFile *ipf) {
94 xclose(ipf->fd, "(in child) input file ", ipf->path);
97 static void postfork_stdio(FILE *f, const char *what, const char *what2) {
98 /* we have no stdio streams that are buffered long-term */
100 if (fclose(f)) syscrash("(in child) close %s%s", what, what2?what2:0);
103 void postfork(void) {
106 xsigsetdefault(SIGTERM);
107 xsigsetdefault(SIGINT);
108 xsigsetdefault(SIGPIPE);
109 if (terminate_sig_flag) raise(terminate_sig_flag);
111 postfork_inputfile(main_input_file);
112 postfork_inputfile(flushing_input_file);
116 conn_closefd(conn,"(in child) ");
118 postfork_stdio(defer, "defer file ", path_defer);
121 typedef struct Every Every;
123 struct timeval interval;
128 static void every_schedule(Every *e, struct timeval base);
130 static void *every_happens(oop_source *lp, struct timeval base, void *e_v) {
133 if (!e->fixed_rate) xgettimeofday(&base);
134 every_schedule(e, base);
138 static void every_schedule(Every *e, struct timeval base) {
140 timeradd(&base, &e->interval, &when);
141 loop->on_time(loop, when, every_happens, e);
144 static void every(int interval, int fixed_rate, void (*f)(void)) {
146 e->interval.tv_sec= interval;
147 e->interval.tv_usec= 0;
148 e->fixed_rate= fixed_rate;
152 every_schedule(e, now);
156 char *dipf_main= dbg_report_ipf(main_input_file);
157 char *dipf_flushing= dbg_report_ipf(flushing_input_file);
158 char *dipf_backlog= dbg_report_ipf(backlog_input_file);
161 " sms=%s[%d] conns=%d until_connect=%d"
162 " input_files main:%s flushing:%s backlog:%s[%d]"
163 " children connecting=%ld inndcomm=%ld lowvol_total=%d"
165 sms_names[sms], until_flush, conns.count, until_connect,
166 dipf_main, dipf_flushing, dipf_backlog, until_backlog_nextscan,
167 (long)connecting_child, (long)inndcomm_child, lowvol_total
174 if (until_stats_log) until_stats_log--;
177 if (until_connect) until_connect--;
179 inputfile_queue_check_expired(backlog_input_file);
181 if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
182 statemc_period_poll();
183 check_assign_articles();
188 /*========== option parsing ==========*/
190 static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
191 static void vbadusage(const char *fmt, va_list al) {
192 char *m= xvasprintf(fmt,al);
193 fprintf(stderr, "bad usage: %s\n"
194 "say --help for help, or read the manpage\n",
197 syslog(LOG_ERR,"innduct: invoked with bad usage: %s",m);
201 /*---------- generic option parser ----------*/
203 static void badusage(const char *fmt, ...) NORET_PRINTF(1,2);
204 static void badusage(const char *fmt, ...) {
215 typedef struct Option Option;
216 typedef void OptionParser(const Option*, const char *val);
220 const char *lng, *formarg;
226 static void parse_options(const Option *options, char ***argvp) {
227 /* on return *argvp is first non-option arg; argc is not updated */
230 const char *arg= *++(*argvp);
232 if (*arg != '-') break;
233 if (!strcmp(arg,"--")) { arg= *++(*argvp); break; }
235 while ((a= *++arg)) {
239 char *equals= strchr(arg,'=');
240 unsigned len= equals ? (size_t)(equals - arg) : strlen(arg);
241 for (o=options; o->shrt || o->lng; o++)
242 if (strlen(o->lng) == len && !memcmp(o->lng,arg,len))
244 badusage("unknown long option --%s",arg);
247 if (equals) badusage("option --%s does not take a value",o->lng);
253 if (!arg) badusage("option --%s needs a value for %s",
257 break; /* eaten the whole argument now */
259 for (o=options; o->shrt || o->lng; o++)
262 badusage("unknown short option -%c",a);
269 if (!arg) badusage("option -%c needs a value for %s",
270 o->shrt, o->formarg);
273 break; /* eaten the whole argument now */
279 #define DELIMPERHAPS(delim,str) (str) ? (delim) : "", (str) ? (str) : ""
281 static void print_options(const Option *options, FILE *f) {
283 for (o=options; o->shrt || o->lng; o++) {
284 char shrt[2] = { o->shrt, 0 };
285 char *optspec= xasprintf("%s%s%s%s%s",
286 o->shrt ? "-" : "", shrt,
287 o->shrt && o->lng ? "|" : "",
288 DELIMPERHAPS("--", o->lng));
289 fprintf(f, " %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
294 /*---------- specific option types ----------*/
296 static void op_integer(const Option *o, const char *val) {
299 unsigned long ul= strtoul(val,&ep,10);
300 if (*ep || ep==val || errno || ul>INT_MAX)
301 badusage("bad integer value for %s",o->lng);
302 int *store= o->store;
306 static void op_double(const Option *o, const char *val) {
307 int *store= o->store;
310 *store= strtod(val, &ep);
311 if (*ep || ep==val || errno)
312 badusage("bad floating point value for %s",o->lng);
315 static void op_string(const Option *o, const char *val) {
316 const char **store= o->store;
320 static void op_seconds(const Option *o, const char *val) {
321 int *store= o->store;
325 double v= strtod(val,&ep);
326 if (ep==val) badusage("bad time/duration value for %s",o->lng);
328 if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1;
329 else if (!strcmp(ep,"m") || !strcmp(ep,"min")) unit= 60;
330 else if (!strcmp(ep,"h") || !strcmp(ep,"hour")) unit= 3600;
331 else if (!strcmp(ep,"d") || !strcmp(ep,"day")) unit= 86400;
332 else if (!strcmp(ep,"das")) unit= 10;
333 else if (!strcmp(ep,"hs")) unit= 100;
334 else if (!strcmp(ep,"ks")) unit= 1000;
335 else if (!strcmp(ep,"Ms")) unit= 1000000;
336 else badusage("bad units %s for time/duration value for %s",ep,o->lng);
340 if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng);
344 static void op_setint(const Option *o, const char *val) {
345 int *store= o->store;
349 /*---------- specific options ----------*/
351 static void help(const Option *o, const char *val);
353 static const Option innduct_options[]= {
354 {'f',"feedfile", "F", &feedfile, op_string },
355 {'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 },
356 {0,"no-daemon", 0, &interactive, op_setint, 1 },
357 {0,"interactive", 0, &interactive, op_setint, 2 },
358 {0,"no-streaming", 0, &try_stream, op_setint, 0 },
359 {0,"no-filemon", 0, &try_filemon, op_setint, 0 },
360 {'C',"inndconf", "F", &inndconffile, op_string },
361 {'P',"port", "PORT", &port, op_integer },
362 {0,"chdir", "DIR", &path_run, op_string },
363 {0,"cli", "DIR/|PATH", &path_cli, op_string },
364 {0,"help", 0, 0, help },
366 {0,"max-connections", "N", &max_connections, op_integer },
367 {0,"max-queue-per-conn", "N", &max_queue_per_conn, op_integer },
368 {0,"max-queue-per-file", "N", &max_queue_per_ipf, op_integer },
369 {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer },
370 {0,"period-interval", "TIME", &period_seconds, op_seconds },
372 {0,"connection-timeout", "TIME", &connection_setup_timeout, op_seconds },
373 {0,"stuck-flush-timeout", "TIME", &inndcomm_flush_timeout, op_seconds },
374 {0,"feedfile-poll", "TIME", &filepoll_seconds, op_seconds },
376 {0,"no-check-proportion", "PERCENT", &nocheck_thresh, op_double },
377 {0,"no-check-response-time","ARTICLES", &nocheck_decay, op_double },
379 {0,"reconnect-interval", "PERIOD", &reconnect_delay_periods, op_seconds },
380 {0,"flush-retry-interval", "PERIOD", &flushfail_retry_periods, op_seconds },
381 {0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
382 {0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
383 {0,"max-flush-interval", "PERIOD", &spontaneous_flush_periods,op_seconds },
384 {0,"flush-finish-timeout", "PERIOD", &max_separated_periods, op_seconds },
385 {0,"idle-timeout", "PERIOD", &need_activity_periods, op_seconds },
386 {0,"stats-log-interval", "PERIOD", &stats_log_periods, op_seconds },
387 {0,"low-volume-thresh", "PERIOD", &lowvol_thresh, op_integer },
388 {0,"low-volume-window", "PERIOD", &lowvol_periods, op_seconds },
390 {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio, op_double },
391 {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer },
396 static void printusage(FILE *f) {
397 fputs("usage: innduct [options] site [fqdn]\n"
398 "available options are:\n", f);
399 print_options(innduct_options, f);
402 static void printcopyright(FILE *f) {
404 "innduct is Copyright (C)2010 Ian Jackson.\n"
405 "It is free software, licenced under GPL version 3 or later.\n"
406 "It is provided WITHOUT ANY WARRANTY. See the file GPL-3 for details\n",
410 static void help(const Option *o, const char *val) {
412 if (ferror(stdout) || fflush(stdout)) {
413 perror("innduct: writing help");
419 static void convert_to_periods_rndup(int *store) {
420 *store += period_seconds-1;
421 *store /= period_seconds;
424 static int path_ends_slash(const char *specified) {
425 int l= strlen(specified);
427 return specified[l-1] == '/';
430 static int innduct_fatal_cleanup(void) { return 12; } /* used for libinn die */
432 int main(int argc, char **argv) {
433 /* set up libinn logging */
434 message_program_name= "innduct";
435 message_fatal_cleanup= innduct_fatal_cleanup;
437 #define INNLOGSET_CALL(fn, pfx, sysloglevel) \
438 message_handlers_##fn(1, duct_log_##fn);
439 INNLOGSETS(INNLOGSET_CALL)
443 printcopyright(stderr);
447 parse_options(innduct_options, &argv);
452 if (!sitename) badusage("need site name argument");
454 if (*argv) remote_host= *argv++;
455 else remote_host= sitename;
457 if (*argv) badusage("too many non-option arguments");
461 int r= innconf_read(inndconffile);
462 if (!r) badusage("could not read inn.conf");
464 if (!remote_host) remote_host= sitename;
466 if (nocheck_thresh < 0 || nocheck_thresh > 100)
467 badusage("nocheck threshold percentage must be between 0..100");
468 nocheck_thresh *= 0.01;
470 if (nocheck_decay < 0.1)
471 badusage("nocheck decay articles must be at least 0.1");
472 nocheck_decay= pow(0.5, 1.0/nocheck_decay);
474 convert_to_periods_rndup(&reconnect_delay_periods);
475 convert_to_periods_rndup(&flushfail_retry_periods);
476 convert_to_periods_rndup(&backlog_retry_minperiods);
477 convert_to_periods_rndup(&backlog_spontrescan_periods);
478 convert_to_periods_rndup(&spontaneous_flush_periods);
479 convert_to_periods_rndup(&max_separated_periods);
480 convert_to_periods_rndup(&need_activity_periods);
481 convert_to_periods_rndup(&stats_log_periods);
482 convert_to_periods_rndup(&lowvol_periods);
484 if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
485 badusage("bad input data ratio must be between 0..100");
486 max_bad_data_ratio *= 0.01;
489 path_run= innconf->pathrun;
491 if (!feedfile) feedfile= sitename;
492 if (!feedfile[0]) badusage("feed filename, if specified, must be nonempty");
493 if (path_ends_slash(feedfile))
494 feedfile= xasprintf("%s%s", feedfile, sitename);
495 if (feedfile[0] != '/')
496 feedfile= xasprintf("%s/%s", innconf->pathoutgoing, feedfile);
499 path_cli_dir= "innduct";
500 } else if (!path_cli[0] || !strcmp(path_cli,"none")) {
501 path_cli= 0; /* ok, don't then */
502 } else if (path_ends_slash(path_cli)) {
503 path_cli_dir= xasprintf("%.*s", strlen(path_cli)-1, path_cli);
506 path_cli= xasprintf("%s/%s", path_cli_dir, sitename);
508 if (max_queue_per_ipf<0)
509 max_queue_per_ipf= max_queue_per_conn * 2;
511 const char *feedfile_forbidden= "?*[~#";
513 while ((c= *feedfile_forbidden++))
514 if (strchr(feedfile, c))
515 badusage("feed filename may not contain metacharacter %c",c);
518 lowvol_perperiod= xcalloc(sizeof(*lowvol_perperiod), lowvol_periods);
519 for (i=0; i<lowvol_periods; i++) {
520 lowvol_perperiod[i]= lowvol_thresh;
521 lowvol_total += lowvol_thresh;
523 lowvol_total -= lowvol_thresh;
527 path_lock= xasprintf("%s_lock", feedfile);
528 path_flushing= xasprintf("%s_flushing", feedfile);
529 path_defer= xasprintf("%s_defer", feedfile);
530 path_dump= xasprintf("%s_dump", feedfile);
531 globpat_backlog= xasprintf("%s_backlog*", feedfile);
533 oop_source_sys *sysloop= oop_sys_new();
534 if (!sysloop) syscrash("could not create liboop event loop");
535 loop= (oop_source*)sysloop;
539 if (interactive < 1) {
540 for (i=3; i<255; i++)
541 /* do this now before we open syslog, etc. */
545 logv_prefix= xasprintf("%s| ", sitename);
546 if (interactive < 2) {
547 openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS);
551 if (interactive < 1) {
552 int null= open("/dev/null",O_RDWR);
553 if (null<0) sysdie("failed to open /dev/null");
557 xclose(null, "/dev/null original fd",0);
559 pid_t child1= xfork_bare("daemonise first fork");
560 if (child1) _exit(0);
563 if (sid == -1) sysdie("setsid failed");
565 pid_t child2= xfork_bare("daemonise second fork");
566 if (child2) _exit(0);
570 if (self_pid==-1) syscrash("getpid");
573 if (r) sysdie("could not chdir to pathrun %s", path_run);
582 r= SMsetup(SM_PREOPEN, &val); if (!r) warn("SMsetup SM_PREOPEN failed");
583 r= SMinit(); if (!r) die("storage manager initialisation (SMinit) failed");
585 if (interactive >= 2)
592 notice("filemon: suppressed by command line option, polling");
594 filemon_ok= filemon_method_init();
596 warn("filemon: no file monitoring available, polling");
599 every(filepoll_seconds,0,filepoll);
601 every(period_seconds,1,period);
607 void *run= oop_sys_run(sysloop);
608 assert(run == OOP_ERROR);
609 syscrash("event loop failed");