chiark / gitweb /
bf8c8087842c986400cfd1598bf3191876f1b046
[innduct.git] / duct.c
1 /*
2  *  innduct
3  *  tailing reliable realtime streaming feeder for inn
4  *  duct.c - main program, option parsing and startup
5  *
6  *  Copyright Ian Jackson <ijackson@chiark.greenend.org.uk>
7  *  and contributors; see LICENCE.txt.
8  *  SPDX-License-Identifier: GPL-3.0-or-later
9  */
10
11 #include "innduct.h"
12
13 const char *sms_names[]= {
14 #define SMS_DEF_NAME(s) #s ,
15   SMS_LIST(SMS_DEF_NAME)
16   0
17 };
18
19
20 /*----- general operational variables -----*/
21
22 /* main initialises */
23 oop_source *loop;
24 ConnList conns;
25 char *path_lock, *path_flushing, *path_defer, *path_dump;
26 char *globpat_backlog;
27 pid_t self_pid;
28 int *lowvol_perperiod;
29 int lowvol_circptr;
30 int lowvol_total; /* does not include current period */
31 int until_stats_log=1;
32
33 /*---------- configuration option variables ----------*/
34 /* when changing defaults, remember to update the manpage */
35
36 const char *sitename, *remote_host;
37 const char *feedfile, *path_run, *path_cli, *path_cli_dir;
38 int quiet_multiple=0;
39 int interactive=0, try_filemon=1;
40 int try_stream=1;
41 int port=119;
42 const char *inndconffile;
43
44 int max_connections=10;
45 int max_queue_per_conn=200;
46 int target_max_feedfile_size=100000;
47 int period_seconds=30;
48 int filepoll_seconds=5;
49 int max_queue_per_ipf=-1;
50
51 int connection_setup_timeout=200;
52 int inndcomm_flush_timeout=100;
53
54 double nocheck_thresh= 95.0; /* converted from percentage by main */
55 double nocheck_decay= 100; /* conv'd from articles to lambda by main */
56
57 /* all these are initialised to seconds, and converted to periods in main */
58 int reconnect_delay_periods=1000;
59 int flushfail_retry_periods=1000;
60 int backlog_retry_minperiods=100;
61 int backlog_spontrescan_periods=300;
62 int spontaneous_flush_periods=100000;
63 int max_separated_periods=2000;
64 int need_activity_periods=1000;
65 int stats_log_periods=2500;
66 int lowvol_thresh=3;
67 int lowvol_periods=1000;
68
69 double max_bad_data_ratio= 1; /* conv'd from percentage by main */
70 int max_bad_data_initial= 30;
71   /* in one corrupt 4096-byte block the number of newlines has
72    * mean 16 and standard deviation 3.99.  30 corresponds to z=+3.5 */
73
74 /*========== main program ==========*/
75
76 static void postfork_inputfile(InputFile *ipf) {
77   if (!ipf) return;
78   xclose(ipf->fd, "(in child) input file ", ipf->path);
79 }
80
81 static void postfork_stdio(FILE *f, const char *what, const char *what2) {
82   /* we have no stdio streams that are buffered long-term */
83   if (!f) return;
84   if (fclose(f)) syscrash("(in child) close %s%s", what, what2?what2:0);
85 }
86
87 void postfork(void) {
88   in_child= 1;
89
90   xsigsetdefault(SIGTERM);
91   xsigsetdefault(SIGINT);
92   xsigsetdefault(SIGPIPE);
93   if (terminate_sig_flag) raise(terminate_sig_flag);
94
95   postfork_inputfile(main_input_file);
96   postfork_inputfile(flushing_input_file);
97
98   Conn *conn;
99   FOR_CONN(conn)
100     conn_closefd(conn,"(in child) ");
101
102   postfork_stdio(defer, "defer file ", path_defer);
103 }
104
105 typedef struct Every Every;
106 struct Every {
107   struct timeval interval;
108   int fixed_rate;
109   void (*f)(void);
110 };
111
112 static void every_schedule(Every *e, struct timeval base);
113
114 static void *every_happens(oop_source *lp, struct timeval base, void *e_v) {
115   Every *e= e_v;
116   e->f();
117   if (!e->fixed_rate) xgettimeofday(&base);
118   every_schedule(e, base);
119   return OOP_CONTINUE;
120 }
121
122 static void every_schedule(Every *e, struct timeval base) {
123   struct timeval when;
124   timeradd(&base, &e->interval, &when);
125   loop->on_time(loop, when, every_happens, e);
126 }
127
128 static void every(int interval, int fixed_rate, void (*f)(void)) {
129   NEW_DECL(Every *,e);
130   e->interval.tv_sec= interval;
131   e->interval.tv_usec= 0;
132   e->fixed_rate= fixed_rate;
133   e->f= f;
134   struct timeval now;
135   xgettimeofday(&now);
136   every_schedule(e, now);
137 }
138
139 void period(void) {
140   char *dipf_main=     dbg_report_ipf(main_input_file);
141   char *dipf_flushing= dbg_report_ipf(flushing_input_file);
142   char *dipf_backlog=  dbg_report_ipf(backlog_input_file);
143
144   dbg("PERIOD"
145       " sms=%s[%d] conns=%d until_connect=%d"
146       " input_files main:%s flushing:%s backlog:%s[%d]"
147       " children connecting=%ld inndcomm=%ld lowvol_total=%d"
148       ,
149       sms_names[sms], until_flush, conns.count, until_connect,
150       dipf_main, dipf_flushing, dipf_backlog, until_backlog_nextscan,
151       (long)connecting_child, (long)inndcomm_child, lowvol_total
152       );
153
154   free(dipf_main);
155   free(dipf_flushing);
156   free(dipf_backlog);
157
158   if (until_stats_log) until_stats_log--;
159   else showstats();
160
161   if (until_connect) until_connect--;
162
163   inputfile_queue_check_expired(backlog_input_file);
164   poll_backlog_file();
165   if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
166   statemc_period_poll();
167   check_assign_articles();
168   check_idle_conns();
169 }
170
171
172 /*========== option parsing ==========*/
173
174 static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
175 static void vbadusage(const char *fmt, va_list al) {
176   char *m= mvasprintf(fmt,al);
177   fprintf(stderr, "bad usage: %s\n"
178           "say --help for help, or read the manpage\n",
179           m);
180   if (interactive < 2)
181     syslog(LOG_ERR,"innduct: invoked with bad usage: %s",m);
182   exit(8);
183 }
184
185 /*---------- generic option parser ----------*/
186
187 static void badusage(const char *fmt, ...) NORET_PRINTF(1,2);
188 static void badusage(const char *fmt, ...) {
189   va_list al;
190   va_start(al,fmt);
191   vbadusage(fmt,al);
192 }
193
194 enum OptFlags {
195   of_seconds= 001000u,
196   of_boolean= 002000u,
197 };
198
199 typedef struct Option Option;
200 typedef void OptionParser(const Option*, const char *val);
201
202 struct Option {
203   int shrt;
204   const char *lng, *formarg;
205   void *store;
206   OptionParser *fn;
207   int intval;
208 };
209
210 static void parse_options(const Option *options, char ***argvp) {
211   /* on return *argvp is first non-option arg; argc is not updated */
212
213   for (;;) {
214     const char *arg= *++(*argvp);
215     if (!arg) break;
216     if (*arg != '-') break;
217     if (!strcmp(arg,"--")) { arg= *++(*argvp); break; }
218     int a;
219     while ((a= *++arg)) {
220       const Option *o;
221       if (a=='-') {
222         arg++;
223         char *equals= strchr(arg,'=');
224         unsigned len= equals ? (size_t)(equals - arg) : strlen(arg);
225         for (o=options; o->shrt || o->lng; o++)
226           if (strlen(o->lng) == len && !memcmp(o->lng,arg,len))
227             goto found_long;
228         badusage("unknown long option --%s",arg);
229       found_long:
230         if (!o->formarg) {
231           if (equals) badusage("option --%s does not take a value",o->lng);
232           arg= 0;
233         } else if (equals) {
234           arg= equals+1;
235         } else {
236           arg= *++(*argvp);
237           if (!arg) badusage("option --%s needs a value for %s",
238                              o->lng, o->formarg);
239         }
240         o->fn(o, arg);
241         break; /* eaten the whole argument now */
242       }
243       for (o=options; o->shrt || o->lng; o++)
244         if (a == o->shrt)
245           goto found_short;
246       badusage("unknown short option -%c",a);
247     found_short:
248       if (!o->formarg) {
249         o->fn(o,0);
250       } else {
251         if (!*++arg) {
252           arg= *++(*argvp);
253           if (!arg) badusage("option -%c needs a value for %s",
254                              o->shrt, o->formarg);
255         }
256         o->fn(o,arg);
257         break; /* eaten the whole argument now */
258       }
259     }
260   }
261 }
262
263 #define DELIMPERHAPS(delim,str)  (str) ? (delim) : "", (str) ? (str) : ""
264
265 static void print_options(const Option *options, FILE *f) {
266   const Option *o;
267   for (o=options; o->shrt || o->lng; o++) {
268     char shrt[2] = { o->shrt, 0 };
269     char *optspec= masprintf("%s%s%s%s%s",
270                              o->shrt ? "-" : "", shrt,
271                              o->shrt && o->lng ? "|" : "",
272                              DELIMPERHAPS("--", o->lng));
273     fprintf(f, "  %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
274     free(optspec);
275   }
276 }
277
278 /*---------- specific option types ----------*/
279
280 static void op_integer(const Option *o, const char *val) {
281   char *ep;
282   errno= 0;
283   unsigned long ul= strtoul(val,&ep,10);
284   if (*ep || ep==val || errno || ul>INT_MAX)
285     badusage("bad integer value for %s",o->lng);
286   int *store= o->store;
287   *store= ul;
288 }
289
290 static void op_double(const Option *o, const char *val) {
291   int *store= o->store;
292   char *ep;
293   errno= 0;
294   *store= strtod(val, &ep);
295   if (*ep || ep==val || errno)
296     badusage("bad floating point value for %s",o->lng);
297 }
298
299 static void op_string(const Option *o, const char *val) {
300   const char **store= o->store;
301   *store= val;
302 }
303
304 static void op_seconds(const Option *o, const char *val) {
305   int *store= o->store;
306   char *ep;
307   int unit;
308
309   double v= strtod(val,&ep);
310   if (ep==val) badusage("bad time/duration value for %s",o->lng);
311
312   if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1;
313   else if (!strcmp(ep,"m") || !strcmp(ep,"min"))    unit= 60;
314   else if (!strcmp(ep,"h") || !strcmp(ep,"hour"))   unit= 3600;
315   else if (!strcmp(ep,"d") || !strcmp(ep,"day"))    unit= 86400;
316   else if (!strcmp(ep,"das")) unit= 10;
317   else if (!strcmp(ep,"hs"))  unit= 100;
318   else if (!strcmp(ep,"ks"))  unit= 1000;
319   else if (!strcmp(ep,"Ms"))  unit= 1000000;
320   else badusage("bad units %s for time/duration value for %s",ep,o->lng);
321
322   v *= unit;
323   v= ceil(v);
324   if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng);
325   *store= v;
326 }
327
328 static void op_setint(const Option *o, const char *val) {
329   int *store= o->store;
330   *store= o->intval;
331 }
332
333 /*---------- specific options ----------*/
334
335 static void help(const Option *o, const char *val);
336
337 static const Option innduct_options[]= {
338 {'f',"feedfile",         "F",     &feedfile,                 op_string      },
339 {'q',"quiet-multiple",   0,       &quiet_multiple,           op_setint, 1   },
340 {0,"no-daemon",          0,       &interactive,              op_setint, 1   },
341 {0,"interactive",        0,       &interactive,              op_setint, 2   },
342 {0,"no-streaming",       0,       &try_stream,               op_setint, 0   },
343 {0,"no-filemon",         0,       &try_filemon,              op_setint, 0   },
344 {'C',"inndconf",         "F",     &inndconffile,             op_string      },
345 {'P',"port",             "PORT",  &port,                     op_integer     },
346 {0,"chdir",              "DIR",   &path_run,                 op_string      },
347 {0,"cli",            "DIR/|PATH", &path_cli,                 op_string      },
348 {0,"help",               0,       0,                         help           },
349
350 {0,"max-connections",    "N",     &max_connections,          op_integer     },
351 {0,"max-queue-per-conn", "N",     &max_queue_per_conn,       op_integer     },
352 {0,"max-queue-per-file", "N",     &max_queue_per_ipf,        op_integer     },
353 {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer     },
354 {0,"period-interval",    "TIME",  &period_seconds,           op_seconds     },
355
356 {0,"connection-timeout",   "TIME",  &connection_setup_timeout, op_seconds   },
357 {0,"stuck-flush-timeout",  "TIME",  &inndcomm_flush_timeout,   op_seconds   },
358 {0,"feedfile-poll",        "TIME",  &filepoll_seconds,         op_seconds   },
359
360 {0,"no-check-proportion",   "PERCENT",   &nocheck_thresh,       op_double   },
361 {0,"no-check-response-time","ARTICLES",  &nocheck_decay,        op_double   },
362
363 {0,"reconnect-interval",     "PERIOD", &reconnect_delay_periods,  op_seconds },
364 {0,"flush-retry-interval",   "PERIOD", &flushfail_retry_periods,  op_seconds },
365 {0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
366 {0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
367 {0,"max-flush-interval",     "PERIOD", &spontaneous_flush_periods,op_seconds },
368 {0,"flush-finish-timeout",   "PERIOD", &max_separated_periods,    op_seconds },
369 {0,"idle-timeout",           "PERIOD", &need_activity_periods,    op_seconds },
370 {0,"stats-log-interval",     "PERIOD", &stats_log_periods,        op_seconds },
371 {0,"low-volume-thresh",      "PERIOD", &lowvol_thresh,            op_integer },
372 {0,"low-volume-window",      "PERIOD", &lowvol_periods,           op_seconds },
373
374 {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio,   op_double    },
375 {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer   },
376
377 {0,0}
378 };
379
380 static void printusage(FILE *f) {
381   fputs("usage: innduct [options] site [fqdn]\n"
382         "available options are:\n", f);
383   print_options(innduct_options, f);
384 }
385
386 static void printcopyright(FILE *f) {
387   fputs(
388  "innduct is Copyright Ian Jackson and contributors.\n"
389  "It is free software, licenced under GPL version 3 or later.\n"
390  "There is NO WARRANTY.  See the file LICENCE.txt for details.\n",
391         stderr);
392 }
393
394 static void help(const Option *o, const char *val) {
395   printusage(stdout);
396   if (ferror(stdout) || fflush(stdout)) {
397     perror("innduct: writing help");
398     exit(12);
399   }
400   exit(0);
401 }
402
403 static void convert_to_periods_rndup(int *store) {
404   *store += period_seconds-1;
405   *store /= period_seconds;
406 }
407
408 static int path_ends_slash(const char *specified) {
409   int l= strlen(specified);
410   assert(l);
411   return specified[l-1] == '/';
412 }
413
414 static int innduct_fatal_cleanup(void) { return 12; } /* used for libinn die */
415
416 int main(int argc, char **argv) {
417   /* set up libinn logging */
418   message_program_name= "innduct";
419   message_fatal_cleanup= innduct_fatal_cleanup;
420
421 #define INNLOGSET_CALL(fn, pfx, sysloglevel)    \
422   message_handlers_##fn(1, duct_log_##fn);
423   INNLOGSETS(INNLOGSET_CALL)
424
425   if (!argv[1]) {
426     printusage(stderr);
427     printcopyright(stderr);
428     exit(8);
429   }
430
431   parse_options(innduct_options, &argv);
432
433   /* arguments */
434
435   sitename= *argv++;
436   if (!sitename) badusage("need site name argument");
437
438   if (*argv) remote_host= *argv++;
439   else remote_host= sitename;
440   
441   if (*argv) badusage("too many non-option arguments");
442
443   /* defaults */
444
445   int r= innconf_read(inndconffile);
446   if (!r) badusage("could not read inn.conf");
447
448   if (!remote_host) remote_host= sitename;
449
450   if (nocheck_thresh < 0 || nocheck_thresh > 100)
451     badusage("nocheck threshold percentage must be between 0..100");
452   nocheck_thresh *= 0.01;
453
454   if (nocheck_decay < 0.1)
455     badusage("nocheck decay articles must be at least 0.1");
456   nocheck_decay= pow(0.5, 1.0/nocheck_decay);
457
458   convert_to_periods_rndup(&reconnect_delay_periods);
459   convert_to_periods_rndup(&flushfail_retry_periods);
460   convert_to_periods_rndup(&backlog_retry_minperiods);
461   convert_to_periods_rndup(&backlog_spontrescan_periods);
462   convert_to_periods_rndup(&spontaneous_flush_periods);
463   convert_to_periods_rndup(&max_separated_periods);
464   convert_to_periods_rndup(&need_activity_periods);
465   convert_to_periods_rndup(&stats_log_periods);
466   convert_to_periods_rndup(&lowvol_periods);
467
468   if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
469     badusage("bad input data ratio must be between 0..100");
470   max_bad_data_ratio *= 0.01;
471
472   if (!path_run)
473     path_run= innconf->pathrun;
474
475   if (!feedfile) feedfile= sitename;
476   if (!feedfile[0]) badusage("feed filename, if specified, must be nonempty");
477   if (path_ends_slash(feedfile))
478     feedfile= masprintf("%s%s", feedfile, sitename);
479   if (feedfile[0] != '/')
480     feedfile= masprintf("%s/%s", innconf->pathoutgoing, feedfile);
481
482   if (!path_cli) {
483     path_cli_dir= "innduct";
484   } else if (!path_cli[0] || !strcmp(path_cli,"none")) {
485     path_cli= 0; /* ok, don't then */
486   } else if (path_ends_slash(path_cli)) {
487     path_cli_dir= masprintf("%.*s", (int)(strlen(path_cli)-1), path_cli);
488   }
489   if (path_cli_dir)
490     path_cli= masprintf("%s/%s", path_cli_dir, sitename);
491
492   if (max_queue_per_ipf<0)
493     max_queue_per_ipf= max_queue_per_conn * 2;
494
495   const char *feedfile_forbidden= "?*[~#";
496   int c;
497   while ((c= *feedfile_forbidden++))
498     if (strchr(feedfile, c))
499       badusage("feed filename may not contain metacharacter %c",c);
500
501   int i;
502   lowvol_perperiod= xcalloc(sizeof(*lowvol_perperiod), lowvol_periods);
503   for (i=0; i<lowvol_periods; i++) {
504     lowvol_perperiod[i]= lowvol_thresh;
505     lowvol_total += lowvol_thresh;
506   }
507   lowvol_total -= lowvol_thresh;
508
509   /* set things up */
510
511   path_lock=        masprintf("%s_lock",      feedfile);
512   path_flushing=    masprintf("%s_flushing",  feedfile);
513   path_defer=       masprintf("%s_defer",     feedfile);
514   path_dump=        masprintf("%s_dump",      feedfile);
515   globpat_backlog=  masprintf("%s_backlog*",  feedfile);
516
517   oop_source_sys *sysloop= oop_sys_new();
518   if (!sysloop) syscrash("could not create liboop event loop");
519   loop= (oop_source*)sysloop;
520
521   LIST_INIT(conns);
522
523   if (interactive < 1) {
524     for (i=3; i<255; i++)
525       /* do this now before we open syslog, etc. */
526       close(i);
527   }
528
529   logv_prefix= masprintf("%s| ", sitename);
530   if (interactive < 2) {
531     openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS);
532     logv_use_syslog= 1;
533   }
534
535   if (interactive < 1) {
536     int null= open("/dev/null",O_RDWR);
537     if (null<0) sysdie("failed to open /dev/null");
538     dup2(null,0);
539     dup2(null,1);
540     dup2(null,2);
541     xclose(null, "/dev/null original fd",0);
542
543     pid_t child1= xfork_bare("daemonise first fork");
544     if (child1) _exit(0);
545
546     pid_t sid= setsid();
547     if (sid == -1) sysdie("setsid failed");
548
549     pid_t child2= xfork_bare("daemonise second fork");
550     if (child2) _exit(0);
551   }
552
553   self_pid= getpid();
554   if (self_pid==-1) syscrash("getpid");
555
556   r= chdir(path_run);
557   if (r) sysdie("could not chdir to pathrun %s", path_run);
558
559   statemc_lock();
560
561   init_signals();
562
563   notice("starting");
564
565   int val= 1;
566   r= SMsetup(SM_PREOPEN, &val); if (!r) warn("SMsetup SM_PREOPEN failed");
567   r= SMinit(); if (!r) die("storage manager initialisation (SMinit) failed");
568
569   if (interactive >= 2)
570     cli_stdio();
571
572   cli_init();
573
574   int filemon_ok= 0;
575   if (!try_filemon) {
576     notice("filemon: suppressed by command line option, polling");
577   } else {
578     filemon_ok= filemon_method_init();
579     if (!filemon_ok)
580       warn("filemon: no file monitoring available, polling");
581   }
582   if (!filemon_ok)
583     every(filepoll_seconds,0,filepoll);
584
585   every(period_seconds,1,period);
586
587   statemc_init();
588
589   /* let's go */
590
591   void *run= oop_sys_run(sysloop);
592   assert(run == OOP_ERROR);
593   syscrash("event loop failed");
594 }