chiark / gitweb /
Install the READMEs
[innduct.git] / duct.c
1 /*
2  *  innduct
3  *  tailing reliable realtime streaming feeder for inn
4  *  duct.c - main program, option parsing and startup
5  *
6  *  Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
7  * 
8  *  This program is free software: you can redistribute it and/or modify
9  *  it under the terms of the GNU General Public License as published by
10  *  the Free Software Foundation, either version 3 of the License, or
11  *  (at your option) any later version.
12  * 
13  *  This program is distributed in the hope that it will be useful,
14  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *  GNU General Public License for more details.
17  * 
18  *  You should have received a copy of the GNU General Public License
19  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
20  *
21  *  (I believe that when you compile and link this as part of the inn2
22  *  build, with the Makefile runes I have provided, all the libraries
23  *  and files which end up included in innduct are licence-compatible
24  *  with GPLv3.  If not then please let me know.  -Ian Jackson.)
25  */
26
27 #include "innduct.h"
28
29 const char *sms_names[]= {
30 #define SMS_DEF_NAME(s) #s ,
31   SMS_LIST(SMS_DEF_NAME)
32   0
33 };
34
35
36 /*----- general operational variables -----*/
37
38 /* main initialises */
39 oop_source *loop;
40 ConnList conns;
41 char *path_lock, *path_flushing, *path_defer, *path_dump;
42 char *globpat_backlog;
43 pid_t self_pid;
44 int *lowvol_perperiod;
45 int lowvol_circptr;
46 int lowvol_total; /* does not include current period */
47 int until_stats_log=1;
48
49 /*---------- configuration option variables ----------*/
50 /* when changing defaults, remember to update the manpage */
51
52 const char *sitename, *remote_host;
53 const char *feedfile, *path_run, *path_cli, *path_cli_dir;
54 int quiet_multiple=0;
55 int interactive=0, try_filemon=1;
56 int try_stream=1;
57 int port=119;
58 const char *inndconffile;
59
60 int max_connections=10;
61 int max_queue_per_conn=200;
62 int target_max_feedfile_size=100000;
63 int period_seconds=30;
64 int filepoll_seconds=5;
65 int max_queue_per_ipf=-1;
66
67 int connection_setup_timeout=200;
68 int inndcomm_flush_timeout=100;
69
70 double nocheck_thresh= 95.0; /* converted from percentage by main */
71 double nocheck_decay= 100; /* conv'd from articles to lambda by main */
72
73 /* all these are initialised to seconds, and converted to periods in main */
74 int reconnect_delay_periods=1000;
75 int flushfail_retry_periods=1000;
76 int backlog_retry_minperiods=100;
77 int backlog_spontrescan_periods=300;
78 int spontaneous_flush_periods=100000;
79 int max_separated_periods=2000;
80 int need_activity_periods=1000;
81 int stats_log_periods=2500;
82 int lowvol_thresh=3;
83 int lowvol_periods=1000;
84
85 double max_bad_data_ratio= 1; /* conv'd from percentage by main */
86 int max_bad_data_initial= 30;
87   /* in one corrupt 4096-byte block the number of newlines has
88    * mean 16 and standard deviation 3.99.  30 corresponds to z=+3.5 */
89
90 /*========== main program ==========*/
91
92 static void postfork_inputfile(InputFile *ipf) {
93   if (!ipf) return;
94   xclose(ipf->fd, "(in child) input file ", ipf->path);
95 }
96
97 static void postfork_stdio(FILE *f, const char *what, const char *what2) {
98   /* we have no stdio streams that are buffered long-term */
99   if (!f) return;
100   if (fclose(f)) syscrash("(in child) close %s%s", what, what2?what2:0);
101 }
102
103 void postfork(void) {
104   in_child= 1;
105
106   xsigsetdefault(SIGTERM);
107   xsigsetdefault(SIGINT);
108   xsigsetdefault(SIGPIPE);
109   if (terminate_sig_flag) raise(terminate_sig_flag);
110
111   postfork_inputfile(main_input_file);
112   postfork_inputfile(flushing_input_file);
113
114   Conn *conn;
115   FOR_CONN(conn)
116     conn_closefd(conn,"(in child) ");
117
118   postfork_stdio(defer, "defer file ", path_defer);
119 }
120
121 typedef struct Every Every;
122 struct Every {
123   struct timeval interval;
124   int fixed_rate;
125   void (*f)(void);
126 };
127
128 static void every_schedule(Every *e, struct timeval base);
129
130 static void *every_happens(oop_source *lp, struct timeval base, void *e_v) {
131   Every *e= e_v;
132   e->f();
133   if (!e->fixed_rate) xgettimeofday(&base);
134   every_schedule(e, base);
135   return OOP_CONTINUE;
136 }
137
138 static void every_schedule(Every *e, struct timeval base) {
139   struct timeval when;
140   timeradd(&base, &e->interval, &when);
141   loop->on_time(loop, when, every_happens, e);
142 }
143
144 static void every(int interval, int fixed_rate, void (*f)(void)) {
145   NEW_DECL(Every *,e);
146   e->interval.tv_sec= interval;
147   e->interval.tv_usec= 0;
148   e->fixed_rate= fixed_rate;
149   e->f= f;
150   struct timeval now;
151   xgettimeofday(&now);
152   every_schedule(e, now);
153 }
154
155 void period(void) {
156   char *dipf_main=     dbg_report_ipf(main_input_file);
157   char *dipf_flushing= dbg_report_ipf(flushing_input_file);
158   char *dipf_backlog=  dbg_report_ipf(backlog_input_file);
159
160   dbg("PERIOD"
161       " sms=%s[%d] conns=%d until_connect=%d"
162       " input_files main:%s flushing:%s backlog:%s[%d]"
163       " children connecting=%ld inndcomm=%ld lowvol_total=%d"
164       ,
165       sms_names[sms], until_flush, conns.count, until_connect,
166       dipf_main, dipf_flushing, dipf_backlog, until_backlog_nextscan,
167       (long)connecting_child, (long)inndcomm_child, lowvol_total
168       );
169
170   free(dipf_main);
171   free(dipf_flushing);
172   free(dipf_backlog);
173
174   if (until_stats_log) until_stats_log--;
175   else showstats();
176
177   if (until_connect) until_connect--;
178
179   inputfile_queue_check_expired(backlog_input_file);
180   poll_backlog_file();
181   if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
182   statemc_period_poll();
183   check_assign_articles();
184   check_idle_conns();
185 }
186
187
188 /*========== option parsing ==========*/
189
190 static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
191 static void vbadusage(const char *fmt, va_list al) {
192   char *m= mvasprintf(fmt,al);
193   fprintf(stderr, "bad usage: %s\n"
194           "say --help for help, or read the manpage\n",
195           m);
196   if (interactive < 2)
197     syslog(LOG_ERR,"innduct: invoked with bad usage: %s",m);
198   exit(8);
199 }
200
201 /*---------- generic option parser ----------*/
202
203 static void badusage(const char *fmt, ...) NORET_PRINTF(1,2);
204 static void badusage(const char *fmt, ...) {
205   va_list al;
206   va_start(al,fmt);
207   vbadusage(fmt,al);
208 }
209
210 enum OptFlags {
211   of_seconds= 001000u,
212   of_boolean= 002000u,
213 };
214
215 typedef struct Option Option;
216 typedef void OptionParser(const Option*, const char *val);
217
218 struct Option {
219   int shrt;
220   const char *lng, *formarg;
221   void *store;
222   OptionParser *fn;
223   int intval;
224 };
225
226 static void parse_options(const Option *options, char ***argvp) {
227   /* on return *argvp is first non-option arg; argc is not updated */
228
229   for (;;) {
230     const char *arg= *++(*argvp);
231     if (!arg) break;
232     if (*arg != '-') break;
233     if (!strcmp(arg,"--")) { arg= *++(*argvp); break; }
234     int a;
235     while ((a= *++arg)) {
236       const Option *o;
237       if (a=='-') {
238         arg++;
239         char *equals= strchr(arg,'=');
240         unsigned len= equals ? (size_t)(equals - arg) : strlen(arg);
241         for (o=options; o->shrt || o->lng; o++)
242           if (strlen(o->lng) == len && !memcmp(o->lng,arg,len))
243             goto found_long;
244         badusage("unknown long option --%s",arg);
245       found_long:
246         if (!o->formarg) {
247           if (equals) badusage("option --%s does not take a value",o->lng);
248           arg= 0;
249         } else if (equals) {
250           arg= equals+1;
251         } else {
252           arg= *++(*argvp);
253           if (!arg) badusage("option --%s needs a value for %s",
254                              o->lng, o->formarg);
255         }
256         o->fn(o, arg);
257         break; /* eaten the whole argument now */
258       }
259       for (o=options; o->shrt || o->lng; o++)
260         if (a == o->shrt)
261           goto found_short;
262       badusage("unknown short option -%c",a);
263     found_short:
264       if (!o->formarg) {
265         o->fn(o,0);
266       } else {
267         if (!*++arg) {
268           arg= *++(*argvp);
269           if (!arg) badusage("option -%c needs a value for %s",
270                              o->shrt, o->formarg);
271         }
272         o->fn(o,arg);
273         break; /* eaten the whole argument now */
274       }
275     }
276   }
277 }
278
279 #define DELIMPERHAPS(delim,str)  (str) ? (delim) : "", (str) ? (str) : ""
280
281 static void print_options(const Option *options, FILE *f) {
282   const Option *o;
283   for (o=options; o->shrt || o->lng; o++) {
284     char shrt[2] = { o->shrt, 0 };
285     char *optspec= masprintf("%s%s%s%s%s",
286                              o->shrt ? "-" : "", shrt,
287                              o->shrt && o->lng ? "|" : "",
288                              DELIMPERHAPS("--", o->lng));
289     fprintf(f, "  %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
290     free(optspec);
291   }
292 }
293
294 /*---------- specific option types ----------*/
295
296 static void op_integer(const Option *o, const char *val) {
297   char *ep;
298   errno= 0;
299   unsigned long ul= strtoul(val,&ep,10);
300   if (*ep || ep==val || errno || ul>INT_MAX)
301     badusage("bad integer value for %s",o->lng);
302   int *store= o->store;
303   *store= ul;
304 }
305
306 static void op_double(const Option *o, const char *val) {
307   int *store= o->store;
308   char *ep;
309   errno= 0;
310   *store= strtod(val, &ep);
311   if (*ep || ep==val || errno)
312     badusage("bad floating point value for %s",o->lng);
313 }
314
315 static void op_string(const Option *o, const char *val) {
316   const char **store= o->store;
317   *store= val;
318 }
319
320 static void op_seconds(const Option *o, const char *val) {
321   int *store= o->store;
322   char *ep;
323   int unit;
324
325   double v= strtod(val,&ep);
326   if (ep==val) badusage("bad time/duration value for %s",o->lng);
327
328   if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1;
329   else if (!strcmp(ep,"m") || !strcmp(ep,"min"))    unit= 60;
330   else if (!strcmp(ep,"h") || !strcmp(ep,"hour"))   unit= 3600;
331   else if (!strcmp(ep,"d") || !strcmp(ep,"day"))    unit= 86400;
332   else if (!strcmp(ep,"das")) unit= 10;
333   else if (!strcmp(ep,"hs"))  unit= 100;
334   else if (!strcmp(ep,"ks"))  unit= 1000;
335   else if (!strcmp(ep,"Ms"))  unit= 1000000;
336   else badusage("bad units %s for time/duration value for %s",ep,o->lng);
337
338   v *= unit;
339   v= ceil(v);
340   if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng);
341   *store= v;
342 }
343
344 static void op_setint(const Option *o, const char *val) {
345   int *store= o->store;
346   *store= o->intval;
347 }
348
349 /*---------- specific options ----------*/
350
351 static void help(const Option *o, const char *val);
352
353 static const Option innduct_options[]= {
354 {'f',"feedfile",         "F",     &feedfile,                 op_string      },
355 {'q',"quiet-multiple",   0,       &quiet_multiple,           op_setint, 1   },
356 {0,"no-daemon",          0,       &interactive,              op_setint, 1   },
357 {0,"interactive",        0,       &interactive,              op_setint, 2   },
358 {0,"no-streaming",       0,       &try_stream,               op_setint, 0   },
359 {0,"no-filemon",         0,       &try_filemon,              op_setint, 0   },
360 {'C',"inndconf",         "F",     &inndconffile,             op_string      },
361 {'P',"port",             "PORT",  &port,                     op_integer     },
362 {0,"chdir",              "DIR",   &path_run,                 op_string      },
363 {0,"cli",            "DIR/|PATH", &path_cli,                 op_string      },
364 {0,"help",               0,       0,                         help           },
365
366 {0,"max-connections",    "N",     &max_connections,          op_integer     },
367 {0,"max-queue-per-conn", "N",     &max_queue_per_conn,       op_integer     },
368 {0,"max-queue-per-file", "N",     &max_queue_per_ipf,        op_integer     },
369 {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer     },
370 {0,"period-interval",    "TIME",  &period_seconds,           op_seconds     },
371
372 {0,"connection-timeout",   "TIME",  &connection_setup_timeout, op_seconds   },
373 {0,"stuck-flush-timeout",  "TIME",  &inndcomm_flush_timeout,   op_seconds   },
374 {0,"feedfile-poll",        "TIME",  &filepoll_seconds,         op_seconds   },
375
376 {0,"no-check-proportion",   "PERCENT",   &nocheck_thresh,       op_double   },
377 {0,"no-check-response-time","ARTICLES",  &nocheck_decay,        op_double   },
378
379 {0,"reconnect-interval",     "PERIOD", &reconnect_delay_periods,  op_seconds },
380 {0,"flush-retry-interval",   "PERIOD", &flushfail_retry_periods,  op_seconds },
381 {0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
382 {0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
383 {0,"max-flush-interval",     "PERIOD", &spontaneous_flush_periods,op_seconds },
384 {0,"flush-finish-timeout",   "PERIOD", &max_separated_periods,    op_seconds },
385 {0,"idle-timeout",           "PERIOD", &need_activity_periods,    op_seconds },
386 {0,"stats-log-interval",     "PERIOD", &stats_log_periods,        op_seconds },
387 {0,"low-volume-thresh",      "PERIOD", &lowvol_thresh,            op_integer },
388 {0,"low-volume-window",      "PERIOD", &lowvol_periods,           op_seconds },
389
390 {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio,   op_double    },
391 {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer   },
392
393 {0,0}
394 };
395
396 static void printusage(FILE *f) {
397   fputs("usage: innduct [options] site [fqdn]\n"
398         "available options are:\n", f);
399   print_options(innduct_options, f);
400 }
401
402 static void printcopyright(FILE *f) {
403   fputs(
404  "innduct is Copyright (C)2010 Ian Jackson.\n"
405  "It is free software, licenced under GPL version 3 or later.\n"
406  "It is provided WITHOUT ANY WARRANTY.  See the file GPL-3 for details\n",
407         stderr);
408 }
409
410 static void help(const Option *o, const char *val) {
411   printusage(stdout);
412   if (ferror(stdout) || fflush(stdout)) {
413     perror("innduct: writing help");
414     exit(12);
415   }
416   exit(0);
417 }
418
419 static void convert_to_periods_rndup(int *store) {
420   *store += period_seconds-1;
421   *store /= period_seconds;
422 }
423
424 static int path_ends_slash(const char *specified) {
425   int l= strlen(specified);
426   assert(l);
427   return specified[l-1] == '/';
428 }
429
430 static int innduct_fatal_cleanup(void) { return 12; } /* used for libinn die */
431
432 int main(int argc, char **argv) {
433   /* set up libinn logging */
434   message_program_name= "innduct";
435   message_fatal_cleanup= innduct_fatal_cleanup;
436
437 #define INNLOGSET_CALL(fn, pfx, sysloglevel)    \
438   message_handlers_##fn(1, duct_log_##fn);
439   INNLOGSETS(INNLOGSET_CALL)
440
441   if (!argv[1]) {
442     printusage(stderr);
443     printcopyright(stderr);
444     exit(8);
445   }
446
447   parse_options(innduct_options, &argv);
448
449   /* arguments */
450
451   sitename= *argv++;
452   if (!sitename) badusage("need site name argument");
453
454   if (*argv) remote_host= *argv++;
455   else remote_host= sitename;
456   
457   if (*argv) badusage("too many non-option arguments");
458
459   /* defaults */
460
461   int r= innconf_read(inndconffile);
462   if (!r) badusage("could not read inn.conf");
463
464   if (!remote_host) remote_host= sitename;
465
466   if (nocheck_thresh < 0 || nocheck_thresh > 100)
467     badusage("nocheck threshold percentage must be between 0..100");
468   nocheck_thresh *= 0.01;
469
470   if (nocheck_decay < 0.1)
471     badusage("nocheck decay articles must be at least 0.1");
472   nocheck_decay= pow(0.5, 1.0/nocheck_decay);
473
474   convert_to_periods_rndup(&reconnect_delay_periods);
475   convert_to_periods_rndup(&flushfail_retry_periods);
476   convert_to_periods_rndup(&backlog_retry_minperiods);
477   convert_to_periods_rndup(&backlog_spontrescan_periods);
478   convert_to_periods_rndup(&spontaneous_flush_periods);
479   convert_to_periods_rndup(&max_separated_periods);
480   convert_to_periods_rndup(&need_activity_periods);
481   convert_to_periods_rndup(&stats_log_periods);
482   convert_to_periods_rndup(&lowvol_periods);
483
484   if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
485     badusage("bad input data ratio must be between 0..100");
486   max_bad_data_ratio *= 0.01;
487
488   if (!path_run)
489     path_run= innconf->pathrun;
490
491   if (!feedfile) feedfile= sitename;
492   if (!feedfile[0]) badusage("feed filename, if specified, must be nonempty");
493   if (path_ends_slash(feedfile))
494     feedfile= masprintf("%s%s", feedfile, sitename);
495   if (feedfile[0] != '/')
496     feedfile= masprintf("%s/%s", innconf->pathoutgoing, feedfile);
497
498   if (!path_cli) {
499     path_cli_dir= "innduct";
500   } else if (!path_cli[0] || !strcmp(path_cli,"none")) {
501     path_cli= 0; /* ok, don't then */
502   } else if (path_ends_slash(path_cli)) {
503     path_cli_dir= masprintf("%.*s", strlen(path_cli)-1, path_cli);
504   }
505   if (path_cli_dir)
506     path_cli= masprintf("%s/%s", path_cli_dir, sitename);
507
508   if (max_queue_per_ipf<0)
509     max_queue_per_ipf= max_queue_per_conn * 2;
510
511   const char *feedfile_forbidden= "?*[~#";
512   int c;
513   while ((c= *feedfile_forbidden++))
514     if (strchr(feedfile, c))
515       badusage("feed filename may not contain metacharacter %c",c);
516
517   int i;
518   lowvol_perperiod= xcalloc(sizeof(*lowvol_perperiod), lowvol_periods);
519   for (i=0; i<lowvol_periods; i++) {
520     lowvol_perperiod[i]= lowvol_thresh;
521     lowvol_total += lowvol_thresh;
522   }
523   lowvol_total -= lowvol_thresh;
524
525   /* set things up */
526
527   path_lock=        masprintf("%s_lock",      feedfile);
528   path_flushing=    masprintf("%s_flushing",  feedfile);
529   path_defer=       masprintf("%s_defer",     feedfile);
530   path_dump=        masprintf("%s_dump",      feedfile);
531   globpat_backlog=  masprintf("%s_backlog*",  feedfile);
532
533   oop_source_sys *sysloop= oop_sys_new();
534   if (!sysloop) syscrash("could not create liboop event loop");
535   loop= (oop_source*)sysloop;
536
537   LIST_INIT(conns);
538
539   if (interactive < 1) {
540     for (i=3; i<255; i++)
541       /* do this now before we open syslog, etc. */
542       close(i);
543   }
544
545   logv_prefix= masprintf("%s| ", sitename);
546   if (interactive < 2) {
547     openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS);
548     logv_use_syslog= 1;
549   }
550
551   if (interactive < 1) {
552     int null= open("/dev/null",O_RDWR);
553     if (null<0) sysdie("failed to open /dev/null");
554     dup2(null,0);
555     dup2(null,1);
556     dup2(null,2);
557     xclose(null, "/dev/null original fd",0);
558
559     pid_t child1= xfork_bare("daemonise first fork");
560     if (child1) _exit(0);
561
562     pid_t sid= setsid();
563     if (sid == -1) sysdie("setsid failed");
564
565     pid_t child2= xfork_bare("daemonise second fork");
566     if (child2) _exit(0);
567   }
568
569   self_pid= getpid();
570   if (self_pid==-1) syscrash("getpid");
571
572   r= chdir(path_run);
573   if (r) sysdie("could not chdir to pathrun %s", path_run);
574
575   statemc_lock();
576
577   init_signals();
578
579   notice("starting");
580
581   int val= 1;
582   r= SMsetup(SM_PREOPEN, &val); if (!r) warn("SMsetup SM_PREOPEN failed");
583   r= SMinit(); if (!r) die("storage manager initialisation (SMinit) failed");
584
585   if (interactive >= 2)
586     cli_stdio();
587
588   cli_init();
589
590   int filemon_ok= 0;
591   if (!try_filemon) {
592     notice("filemon: suppressed by command line option, polling");
593   } else {
594     filemon_ok= filemon_method_init();
595     if (!filemon_ok)
596       warn("filemon: no file monitoring available, polling");
597   }
598   if (!filemon_ok)
599     every(filepoll_seconds,0,filepoll);
600
601   every(period_seconds,1,period);
602
603   statemc_init();
604
605   /* let's go */
606
607   void *run= oop_sys_run(sysloop);
608   assert(run == OOP_ERROR);
609   syscrash("event loop failed");
610 }