chiark / gitweb /
7ade8b6ac3291c35f8b6195f77fdd06eb094e20e
[innduct.git] / innduct.c
1 /*
2  *  innduct
3  *  tailing reliable realtime streaming feeder for inn
4  *  main program - option parsing and startup
5  *
6  *  Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
7  * 
8  *  This program is free software: you can redistribute it and/or modify
9  *  it under the terms of the GNU General Public License as published by
10  *  the Free Software Foundation, either version 3 of the License, or
11  *  (at your option) any later version.
12  * 
13  *  This program is distributed in the hope that it will be useful,
14  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *  GNU General Public License for more details.
17  * 
18  *  You should have received a copy of the GNU General Public License
19  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
20  *
21  *  (I believe that when you compile and link this as part of the inn2
22  *  build, with the Makefile runes I have provided, all the libraries
23  *  and files which end up included in innduct are licence-compatible
24  *  with GPLv3.  If not then please let me know.  -Ian Jackson.)
25  */
26
27 #include "innduct.h"
28
29 const char *sms_names[]= {
30 #define SMS_DEF_NAME(s) #s ,
31   SMS_LIST(SMS_DEF_NAME)
32   0
33 };
34
35 struct Conn {
36   ISNODE(Conn);
37   int fd; /* may be 0, meaning closed (during construction/destruction) */
38   oop_read *rd; /* likewise */
39   int oopwriting; /* since on_fd is not idempotent */
40   int max_queue, stream;
41   const char *quitting;
42   int since_activity; /* periods */
43   ArticleList waiting; /* not yet told peer */
44   ArticleList priority; /* peer says send it now */
45   ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
46   struct iovec xmit[CONNIOVS];
47   XmitDetails xmitd[CONNIOVS];
48   int xmitu;
49 };
50
51
52 /*----- general operational variables -----*/
53
54 /* main initialises */
55 oop_source *loop;
56 ConnList conns;
57 char *path_lock, *path_flushing, *path_defer, *path_dump;
58 char *globpat_backlog;
59 pid_t self_pid;
60 int *lowvol_perperiod;
61 int lowvol_circptr;
62 int lowvol_total; /* does not include current period */
63
64 /*========== main program ==========*/
65
66 static void postfork_inputfile(InputFile *ipf) {
67   if (!ipf) return;
68   xclose(ipf->fd, "(in child) input file ", ipf->path);
69 }
70
71 static void postfork_stdio(FILE *f, const char *what, const char *what2) {
72   /* we have no stdio streams that are buffered long-term */
73   if (!f) return;
74   if (fclose(f)) syscrash("(in child) close %s%s", what, what2?what2:0);
75 }
76
77 void postfork(void) {
78   in_child= 1;
79
80   xsigsetdefault(SIGTERM);
81   xsigsetdefault(SIGINT);
82   xsigsetdefault(SIGPIPE);
83   if (terminate_sig_flag) raise(terminate_sig_flag);
84
85   postfork_inputfile(main_input_file);
86   postfork_inputfile(flushing_input_file);
87
88   Conn *conn;
89   FOR_CONN(conn)
90     conn_closefd(conn,"(in child) ");
91
92   postfork_stdio(defer, "defer file ", path_defer);
93 }
94
95 typedef struct Every Every;
96 struct Every {
97   struct timeval interval;
98   int fixed_rate;
99   void (*f)(void);
100 };
101
102 static void every_schedule(Every *e, struct timeval base);
103
104 static void *every_happens(oop_source *lp, struct timeval base, void *e_v) {
105   Every *e= e_v;
106   e->f();
107   if (!e->fixed_rate) xgettimeofday(&base);
108   every_schedule(e, base);
109   return OOP_CONTINUE;
110 }
111
112 static void every_schedule(Every *e, struct timeval base) {
113   struct timeval when;
114   timeradd(&base, &e->interval, &when);
115   loop->on_time(loop, when, every_happens, e);
116 }
117
118 static void every(int interval, int fixed_rate, void (*f)(void)) {
119   NEW_DECL(Every *,e);
120   e->interval.tv_sec= interval;
121   e->interval.tv_usec= 0;
122   e->fixed_rate= fixed_rate;
123   e->f= f;
124   struct timeval now;
125   xgettimeofday(&now);
126   every_schedule(e, now);
127 }
128
129 static char *dbg_report_ipf(InputFile *ipf) {
130   if (!ipf) return xasprintf("none");
131
132   const char *slash= strrchr(ipf->path,'/');
133   const char *path= slash ? slash+1 : ipf->path;
134
135   return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s%s",
136                    ipf, path,
137                    ipf->queue.count, ipf->inprogress, ipf->autodefer,
138                    (long)ipf->offset, ipf->fd,
139                    ipf->rd ? "" : ",!rd",
140                    ipf->skippinglong ? "*skiplong" : "",
141                    ipf->rd && ipf->paused ? "*paused" : "");
142 }
143
144 static void period(void) {
145   char *dipf_main=     dbg_report_ipf(main_input_file);
146   char *dipf_flushing= dbg_report_ipf(flushing_input_file);
147   char *dipf_backlog=  dbg_report_ipf(backlog_input_file);
148
149   dbg("PERIOD"
150       " sms=%s[%d] conns=%d until_connect=%d"
151       " input_files main:%s flushing:%s backlog:%s[%d]"
152       " children connecting=%ld inndcomm=%ld lowvol_total=%d"
153       ,
154       sms_names[sms], until_flush, conns.count, until_connect,
155       dipf_main, dipf_flushing, dipf_backlog, until_backlog_nextscan,
156       (long)connecting_child, (long)inndcomm_child, lowvol_total
157       );
158
159   free(dipf_main);
160   free(dipf_flushing);
161   free(dipf_backlog);
162
163   if (until_connect) until_connect--;
164
165   inputfile_queue_check_expired(backlog_input_file);
166   poll_backlog_file();
167   if (!backlog_input_file) close_defer(); /* want to start on a new backlog */
168   statemc_period_poll();
169   check_assign_articles();
170   check_idle_conns();
171 }
172
173
174 /*========== option parsing ==========*/
175
176 static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0);
177 static void vbadusage(const char *fmt, va_list al) {
178   char *m= xvasprintf(fmt,al);
179   fprintf(stderr, "bad usage: %s\n"
180           "say --help for help, or read the manpage\n",
181           m);
182   if (interactive < 2)
183     syslog(LOG_ERR,"innduct: invoked with bad usage: %s",m);
184   exit(8);
185 }
186
187 /*---------- generic option parser ----------*/
188
189 static void badusage(const char *fmt, ...) NORET_PRINTF(1,2);
190 static void badusage(const char *fmt, ...) {
191   va_list al;
192   va_start(al,fmt);
193   vbadusage(fmt,al);
194 }
195
196 enum OptFlags {
197   of_seconds= 001000u,
198   of_boolean= 002000u,
199 };
200
201 typedef struct Option Option;
202 typedef void OptionParser(const Option*, const char *val);
203
204 struct Option {
205   int shrt;
206   const char *lng, *formarg;
207   void *store;
208   OptionParser *fn;
209   int intval;
210 };
211
212 static void parse_options(const Option *options, char ***argvp) {
213   /* on return *argvp is first non-option arg; argc is not updated */
214
215   for (;;) {
216     const char *arg= *++(*argvp);
217     if (!arg) break;
218     if (*arg != '-') break;
219     if (!strcmp(arg,"--")) { arg= *++(*argvp); break; }
220     int a;
221     while ((a= *++arg)) {
222       const Option *o;
223       if (a=='-') {
224         arg++;
225         char *equals= strchr(arg,'=');
226         unsigned len= equals ? (size_t)(equals - arg) : strlen(arg);
227         for (o=options; o->shrt || o->lng; o++)
228           if (strlen(o->lng) == len && !memcmp(o->lng,arg,len))
229             goto found_long;
230         badusage("unknown long option --%s",arg);
231       found_long:
232         if (!o->formarg) {
233           if (equals) badusage("option --%s does not take a value",o->lng);
234           arg= 0;
235         } else if (equals) {
236           arg= equals+1;
237         } else {
238           arg= *++(*argvp);
239           if (!arg) badusage("option --%s needs a value for %s",
240                              o->lng, o->formarg);
241         }
242         o->fn(o, arg);
243         break; /* eaten the whole argument now */
244       }
245       for (o=options; o->shrt || o->lng; o++)
246         if (a == o->shrt)
247           goto found_short;
248       badusage("unknown short option -%c",a);
249     found_short:
250       if (!o->formarg) {
251         o->fn(o,0);
252       } else {
253         if (!*++arg) {
254           arg= *++(*argvp);
255           if (!arg) badusage("option -%c needs a value for %s",
256                              o->shrt, o->formarg);
257         }
258         o->fn(o,arg);
259         break; /* eaten the whole argument now */
260       }
261     }
262   }
263 }
264
265 #define DELIMPERHAPS(delim,str)  (str) ? (delim) : "", (str) ? (str) : ""
266
267 static void print_options(const Option *options, FILE *f) {
268   const Option *o;
269   for (o=options; o->shrt || o->lng; o++) {
270     char shrt[2] = { o->shrt, 0 };
271     char *optspec= xasprintf("%s%s%s%s%s",
272                              o->shrt ? "-" : "", shrt,
273                              o->shrt && o->lng ? "|" : "",
274                              DELIMPERHAPS("--", o->lng));
275     fprintf(f, "  %s%s%s\n", optspec, DELIMPERHAPS(" ", o->formarg));
276     free(optspec);
277   }
278 }
279
280 /*---------- specific option types ----------*/
281
282 static void op_integer(const Option *o, const char *val) {
283   char *ep;
284   errno= 0;
285   unsigned long ul= strtoul(val,&ep,10);
286   if (*ep || ep==val || errno || ul>INT_MAX)
287     badusage("bad integer value for %s",o->lng);
288   int *store= o->store;
289   *store= ul;
290 }
291
292 static void op_double(const Option *o, const char *val) {
293   int *store= o->store;
294   char *ep;
295   errno= 0;
296   *store= strtod(val, &ep);
297   if (*ep || ep==val || errno)
298     badusage("bad floating point value for %s",o->lng);
299 }
300
301 static void op_string(const Option *o, const char *val) {
302   const char **store= o->store;
303   *store= val;
304 }
305
306 static void op_seconds(const Option *o, const char *val) {
307   int *store= o->store;
308   char *ep;
309   int unit;
310
311   double v= strtod(val,&ep);
312   if (ep==val) badusage("bad time/duration value for %s",o->lng);
313
314   if (!*ep || !strcmp(ep,"s") || !strcmp(ep,"sec")) unit= 1;
315   else if (!strcmp(ep,"m") || !strcmp(ep,"min"))    unit= 60;
316   else if (!strcmp(ep,"h") || !strcmp(ep,"hour"))   unit= 3600;
317   else if (!strcmp(ep,"d") || !strcmp(ep,"day"))    unit= 86400;
318   else if (!strcmp(ep,"das")) unit= 10;
319   else if (!strcmp(ep,"hs"))  unit= 100;
320   else if (!strcmp(ep,"ks"))  unit= 1000;
321   else if (!strcmp(ep,"Ms"))  unit= 1000000;
322   else badusage("bad units %s for time/duration value for %s",ep,o->lng);
323
324   v *= unit;
325   v= ceil(v);
326   if (v > INT_MAX) badusage("time/duration value for %s out of range",o->lng);
327   *store= v;
328 }
329
330 static void op_setint(const Option *o, const char *val) {
331   int *store= o->store;
332   *store= o->intval;
333 }
334
335 /*---------- specific options ----------*/
336
337 static void help(const Option *o, const char *val);
338
339 static const Option innduct_options[]= {
340 {'f',"feedfile",         "F",     &feedfile,                 op_string      },
341 {'q',"quiet-multiple",   0,       &quiet_multiple,           op_setint, 1   },
342 {0,"no-daemon",          0,       &interactive,              op_setint, 1   },
343 {0,"interactive",        0,       &interactive,              op_setint, 2   },
344 {0,"no-streaming",       0,       &try_stream,               op_setint, 0   },
345 {0,"no-filemon",         0,       &try_filemon,              op_setint, 0   },
346 {'C',"inndconf",         "F",     &inndconffile,             op_string      },
347 {'P',"port",             "PORT",  &port,                     op_integer     },
348 {0,"chdir",              "DIR",   &path_run,                 op_string      },
349 {0,"cli",            "DIR/|PATH", &path_cli,                 op_string      },
350 {0,"help",               0,       0,                         help           },
351
352 {0,"max-connections",    "N",     &max_connections,          op_integer     },
353 {0,"max-queue-per-conn", "N",     &max_queue_per_conn,       op_integer     },
354 {0,"max-queue-per-file", "N",     &max_queue_per_ipf,        op_integer     },
355 {0,"feedfile-flush-size","BYTES", &target_max_feedfile_size, op_integer     },
356 {0,"period-interval",    "TIME",  &period_seconds,           op_seconds     },
357
358 {0,"connection-timeout",   "TIME",  &connection_setup_timeout, op_seconds   },
359 {0,"stuck-flush-timeout",  "TIME",  &inndcomm_flush_timeout,   op_seconds   },
360 {0,"feedfile-poll",        "TIME",  &filepoll_seconds,         op_seconds   },
361
362 {0,"no-check-proportion",   "PERCENT",   &nocheck_thresh,       op_double   },
363 {0,"no-check-response-time","ARTICLES",  &nocheck_decay,        op_double   },
364
365 {0,"reconnect-interval",     "PERIOD", &reconnect_delay_periods,  op_seconds },
366 {0,"flush-retry-interval",   "PERIOD", &flushfail_retry_periods,  op_seconds },
367 {0,"earliest-deferred-retry","PERIOD", &backlog_retry_minperiods, op_seconds },
368 {0,"backlog-rescan-interval","PERIOD",&backlog_spontrescan_periods,op_seconds},
369 {0,"max-flush-interval",     "PERIOD", &spontaneous_flush_periods,op_seconds },
370 {0,"flush-finish-timeout",   "PERIOD", &max_separated_periods,    op_seconds },
371 {0,"idle-timeout",           "PERIOD", &need_activity_periods,    op_seconds },
372 {0,"low-volume-thresh",      "PERIOD", &lowvol_thresh,            op_integer },
373 {0,"low-volume-window",      "PERIOD", &lowvol_periods,           op_seconds },
374
375 {0,"max-bad-input-data-ratio","PERCENT", &max_bad_data_ratio,   op_double    },
376 {0,"max-bad-input-data-init", "PERCENT", &max_bad_data_initial, op_integer   },
377
378 {0,0}
379 };
380
381 static void printusage(FILE *f) {
382   fputs("usage: innduct [options] site [fqdn]\n"
383         "available options are:\n", f);
384   print_options(innduct_options, f);
385 }
386
387 static void help(const Option *o, const char *val) {
388   printusage(stdout);
389   if (ferror(stdout) || fflush(stdout)) {
390     perror("innduct: writing help");
391     exit(12);
392   }
393   exit(0);
394 }
395
396 static void convert_to_periods_rndup(int *store) {
397   *store += period_seconds-1;
398   *store /= period_seconds;
399 }
400
401 static int path_ends_slash(const char *specified) {
402   int l= strlen(specified);
403   assert(l);
404   return specified[l-1] == '/';
405 }
406
407 int main(int argc, char **argv) {
408   /* set up libinn logging */
409   message_program_name= "innduct";
410   message_fatal_cleanup= innduct_fatal_cleanup;
411   INNLOGSETS(INNLOGSET_CALL)
412
413   if (!argv[1]) {
414     printusage(stderr);
415     exit(8);
416   }
417
418   parse_options(innduct_options, &argv);
419
420   /* arguments */
421
422   sitename= *argv++;
423   if (!sitename) badusage("need site name argument");
424
425   if (*argv) remote_host= *argv++;
426   else remote_host= sitename;
427   
428   if (*argv) badusage("too many non-option arguments");
429
430   /* defaults */
431
432   int r= innconf_read(inndconffile);
433   if (!r) badusage("could not read inn.conf");
434
435   if (!remote_host) remote_host= sitename;
436
437   if (nocheck_thresh < 0 || nocheck_thresh > 100)
438     badusage("nocheck threshold percentage must be between 0..100");
439   nocheck_thresh *= 0.01;
440
441   if (nocheck_decay < 0.1)
442     badusage("nocheck decay articles must be at least 0.1");
443   nocheck_decay= pow(0.5, 1.0/nocheck_decay);
444
445   convert_to_periods_rndup(&reconnect_delay_periods);
446   convert_to_periods_rndup(&flushfail_retry_periods);
447   convert_to_periods_rndup(&backlog_retry_minperiods);
448   convert_to_periods_rndup(&backlog_spontrescan_periods);
449   convert_to_periods_rndup(&spontaneous_flush_periods);
450   convert_to_periods_rndup(&max_separated_periods);
451   convert_to_periods_rndup(&need_activity_periods);
452   convert_to_periods_rndup(&lowvol_periods);
453
454   if (max_bad_data_ratio < 0 || max_bad_data_ratio > 100)
455     badusage("bad input data ratio must be between 0..100");
456   max_bad_data_ratio *= 0.01;
457
458   if (!path_run)
459     path_run= innconf->pathrun;
460
461   if (!feedfile) feedfile= sitename;
462   if (!feedfile[0]) badusage("feed filename, if specified, must be nonempty");
463   if (path_ends_slash(feedfile))
464     feedfile= xasprintf("%s%s", feedfile, sitename);
465   if (feedfile[0] != '/')
466     feedfile= xasprintf("%s/%s", innconf->pathoutgoing, feedfile);
467
468   if (!path_cli) {
469     path_cli_dir= "innduct";
470   } else if (!path_cli[0] || !strcmp(path_cli,"none")) {
471     path_cli= 0; /* ok, don't then */
472   } else if (path_ends_slash(path_cli)) {
473     path_cli_dir= xasprintf("%.*s", strlen(path_cli)-1, path_cli);
474   }
475   if (path_cli_dir)
476     path_cli= xasprintf("%s/%s", path_cli_dir, sitename);
477
478   if (max_queue_per_ipf<0)
479     max_queue_per_ipf= max_queue_per_conn * 2;
480
481   const char *feedfile_forbidden= "?*[~#";
482   int c;
483   while ((c= *feedfile_forbidden++))
484     if (strchr(feedfile, c))
485       badusage("feed filename may not contain metacharacter %c",c);
486
487   int i;
488   lowvol_perperiod= xcalloc(sizeof(*lowvol_perperiod), lowvol_periods);
489   for (i=0; i<lowvol_periods; i++) {
490     lowvol_perperiod[i]= lowvol_thresh;
491     lowvol_total += lowvol_thresh;
492   }
493   lowvol_total -= lowvol_thresh;
494
495   /* set things up */
496
497   path_lock=        xasprintf("%s_lock",      feedfile);
498   path_flushing=    xasprintf("%s_flushing",  feedfile);
499   path_defer=       xasprintf("%s_defer",     feedfile);
500   path_dump=        xasprintf("%s_dump",      feedfile);
501   globpat_backlog=  xasprintf("%s_backlog*",  feedfile);
502
503   oop_source_sys *sysloop= oop_sys_new();
504   if (!sysloop) syscrash("could not create liboop event loop");
505   loop= (oop_source*)sysloop;
506
507   LIST_INIT(conns);
508
509   if (interactive < 1) {
510     for (i=3; i<255; i++)
511       /* do this now before we open syslog, etc. */
512       close(i);
513   }
514
515   logv_prefix= xasprintf("%s| ", sitename);
516   if (interactive < 2) {
517     openlog("innduct",LOG_NDELAY|LOG_PID,LOG_NEWS);
518     logv_use_syslog= 1;
519   }
520
521   if (interactive < 1) {
522     int null= open("/dev/null",O_RDWR);
523     if (null<0) sysdie("failed to open /dev/null");
524     dup2(null,0);
525     dup2(null,1);
526     dup2(null,2);
527     xclose(null, "/dev/null original fd",0);
528
529     pid_t child1= xfork("daemonise first fork");
530     if (child1) _exit(0);
531
532     pid_t sid= setsid();
533     if (sid == -1) sysdie("setsid failed");
534
535     pid_t child2= xfork("daemonise second fork");
536     if (child2) _exit(0);
537   }
538
539   self_pid= getpid();
540   if (self_pid==-1) syscrash("getpid");
541
542   r= chdir(path_run);
543   if (r) sysdie("could not chdir to pathrun %s", path_run);
544
545   statemc_lock();
546
547   init_signals();
548
549   notice("starting");
550
551   int val= 1;
552   r= SMsetup(SM_PREOPEN, &val); if (!r) warn("SMsetup SM_PREOPEN failed");
553   r= SMinit(); if (!r) die("storage manager initialisation (SMinit) failed");
554
555   if (interactive >= 2)
556     cli_stdio();
557
558   cli_init();
559
560   int filemon_ok= 0;
561   if (!try_filemon) {
562     notice("filemon: suppressed by command line option, polling");
563   } else {
564     filemon_ok= filemon_method_init();
565     if (!filemon_ok)
566       warn("filemon: no file monitoring available, polling");
567   }
568   if (!filemon_ok)
569     every(filepoll_seconds,0,filepoll);
570
571   every(period_seconds,1,period);
572
573   statemc_init();
574
575   /* let's go */
576
577   void *run= oop_sys_run(sysloop);
578   assert(run == OOP_ERROR);
579   syscrash("event loop failed");
580 }