chiark / gitweb /
changelog: Finalise 2.2
[innduct.git] / statemc.c
1 /*
2  *  innduct
3  *  tailing reliable realtime streaming feeder for inn
4  *  statemc.c - state machine core (see README.states).
5  *
6  *  Copyright Ian Jackson <ijackson@chiark.greenend.org.uk>
7  *  and contributors; see LICENCE.txt.
8  *  SPDX-License-Identifier: GPL-3.0-or-later
9  */
10
11 #include "innduct.h"
12
13
14 /* statemc_init initialises */
15 StateMachineState sms;
16 int until_flush;
17 InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
18 Counts backlog_counts;
19 int backlog_counts_report;
20 FILE *defer;
21
22 /* initialisation to 0 is good */
23 int until_connect, until_backlog_nextscan;
24 double accept_proportion;
25 int nocheck, nocheck_reported, in_child;
26 sig_atomic_t terminate_sig_flag;
27
28
29 static void startup_set_input_file(InputFile *f) {
30   assert(!main_input_file);
31   main_input_file= f;
32   inputfile_reading_start(f);
33 }
34
35 void statemc_lock(void) {
36   int lockfd;
37   struct stat stab, stabf;
38   
39   for (;;) {
40     lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
41     if (lockfd<0) sysdie("open lockfile %s", path_lock);
42
43     struct flock fl;
44     memset(&fl,0,sizeof(fl));
45     fl.l_type= F_WRLCK;
46     fl.l_whence= SEEK_SET;
47     int r= fcntl(lockfd, F_SETLK, &fl);
48     if (r==-1) {
49       if (errno==EACCES || isewouldblock(errno)) {
50         if (quiet_multiple) exit(0);
51         die("another duct holds the lockfile");
52       }
53       sysdie("fcntl F_SETLK lockfile %s", path_lock);
54     }
55
56     xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
57     int lock_noent;
58     xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
59
60     if (!lock_noent && samefile(&stab, &stabf))
61       break;
62
63     xclose(lockfd, "stale lockfile ", path_lock);
64   }
65
66   FILE *lockfile= fdopen(lockfd, "w");
67   if (!lockfile) syscrash("fdopen lockfile");
68
69   int r= ftruncate(lockfd, 0);
70   if (r) syscrash("truncate lockfile to write new info");
71
72   if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
73               (unsigned long)self_pid,
74               sitename, feedfile, remote_host) == EOF ||
75       fflush(lockfile))
76     sysdie("write info to lockfile %s", path_lock);
77
78   dbg("startup: locked");
79 }
80
81 void statemc_init(void) {
82   struct stat stabdefer;
83
84   search_backlog_file();
85
86   int defer_noent;
87   xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
88   if (defer_noent) {
89     dbg("startup: ductdefer ENOENT");
90   } else {
91     dbg("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
92     switch (stabdefer.st_nlink) {
93     case 1:
94       open_defer(); /* so that we will later close it and rename it */
95       break;
96     case 2:
97       xunlink(path_defer, "stale defer file link"
98               " (presumably hardlink to backlog file)");
99       break;
100     default:
101       crash("defer file %s has unexpected link count %ld",
102             path_defer, (long)stabdefer.st_nlink);
103     }
104   }
105
106   struct stat stab_f, stab_d;
107   int noent_f;
108
109   InputFile *file_d= open_input_file(path_flushing);
110   if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
111
112   xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
113
114   if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
115     dbg("startup: F==D => Hardlinked");
116     xunlink(feedfile, "feed file (during startup)"); /* => Moved */
117     noent_f= 1;
118   }
119
120   if (noent_f) {
121     dbg("startup: F ENOENT => Moved");
122     if (file_d) startup_set_input_file(file_d);
123     spawn_inndcomm_flush("feedfile missing at startup");
124     /* => Flushing, sms:=FLUSHING */
125   } else {
126     if (file_d) {
127       dbg("startup: F!=D => Separated");
128       startup_set_input_file(file_d);
129       flushing_input_file= main_input_file;
130       main_input_file= open_input_file(feedfile);
131       if (!main_input_file) crash("feedfile vanished during startup");
132       SMS(SEPARATED, max_separated_periods,
133           "found both old and current feed files");
134     } else {
135       dbg("startup: F exists, D ENOENT => Normal");
136       InputFile *file_f= open_input_file(feedfile);
137       if (!file_f) crash("feed file vanished during startup");
138       startup_set_input_file(file_f);
139       SMS(NORMAL, spontaneous_flush_periods, "normal startup");
140     }
141   }
142 }
143
144 void statemc_start_flush(const char *why) { /* Normal => Flushing */
145   assert(sms == sm_NORMAL);
146
147   dbg("starting flush (%s) (%lu >?= %lu) (%d)",
148         why,
149         (unsigned long)(main_input_file ? main_input_file->offset : 0),
150         (unsigned long)target_max_feedfile_size,
151         until_flush);
152
153   int r= link(feedfile, path_flushing);
154   if (r) sysdie("link feedfile %s to flushing file %s",
155                 feedfile, path_flushing);
156   /* => Hardlinked */
157
158   xunlink(feedfile, "old feedfile link");
159   /* => Moved */
160
161   spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
162 }
163
164 int trigger_flush_ok(const char *why) {
165   switch (sms) {
166
167   case sm_NORMAL:
168     statemc_start_flush(why ? why : "periodic");
169     return 1;                           /* Normal => Flushing; => FLUSHING */
170
171   case sm_FLUSHFAILED:
172     spawn_inndcomm_flush(why ? why : "retry");
173     return 1;                            /* Moved => Flushing; => FLUSHING */
174
175   case sm_SEPARATED:
176   case sm_DROPPING:
177     if (conns.count)
178       warn("abandoning old feedfile after flush (%s), autodeferring",
179            why ? why : "took too long to complete");
180     else
181       info("autodeferring after flush (%s)",
182            why ? why : "no connections");
183     assert(flushing_input_file);
184     autodefer_input_file(flushing_input_file);
185     return 1;
186
187   default:
188     return 0;
189   }
190 }
191
192 void statemc_period_poll(void) {
193   if (!until_flush) return;
194   until_flush--;
195   assert(until_flush>=0);
196
197   if (until_flush) return;
198   int ok= trigger_flush_ok(0);
199   assert(ok);
200 }
201
202 static int inputfile_is_done(InputFile *ipf) {
203   if (!ipf) return 0;
204   if (ipf->inprogress) return 0; /* new article in the meantime */
205   if (ipf->rd) return 0; /* not had EOF */
206   return 1;
207 }
208
209 static void notice_processed_counts(Counts *counts, int completed,
210                                     InputFile *ipf_xtra, const char *what) {
211
212 #define RCI_NOTHING(x) /* nothing */
213 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
214 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(counts->results, [RC_##x])
215
216 #define CNT(art,rc) (counts->results[art_##art][RC_##rc])
217
218   char *inprog= ipf_xtra && !completed
219     ? masprintf(" inprogress=%ld", ipf_xtra->inprogress)
220     : masprintf("%s",""); /* GCC produces a stupid warning for printf("") ! */
221   char *autodefer= ipf_xtra && ipf_xtra->autodefer >= 0
222     ? masprintf(" autodeferred=%ld", ipf_xtra->autodefer)
223     : masprintf("%s","");
224
225   notice("%s %s read=%d (+bl=%d,+err=%d)%s%s missing=%d"
226          " offered=%d (ch=%d,nc=%d)"
227          " accepted=%d (ch=%d,nc=%d)"
228        RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
229        ,
230        completed?"completed":"processed", what,
231        counts->events[read_ok], counts->events[read_blank],
232          counts->events[read_err],
233        inprog, autodefer, counts->events[nooffer_missing],
234        CNT(Unchecked,sent) + CNT(Unsolicited,sent)
235        , CNT(Unchecked,sent), CNT(Unsolicited,sent),
236        CNT(Wanted,accepted) + CNT(Wanted,accepted)
237        , CNT(Wanted,accepted), CNT(Wanted,accepted)
238        RESULT_COUNTS(RCI_NOTHING,  RCI_TRIPLE_VALS)
239        );
240
241   memset(counts, 0, sizeof(*counts));
242
243   free(inprog);
244   free(autodefer);
245
246 #undef CNT
247 }
248
249 static void notice_processed_inputfile(InputFile *ipf, int completed,
250                                        const char *what) {
251   if (!ipf) return; /* allows showstats to be lazy */
252   notice_processed_counts(&ipf->counts, completed, ipf, what);
253 }
254
255 static void backlog_accumulate_counts(InputFile *ipf) {
256   int i,j;
257   if (!ipf) return;
258
259   for (i=0; i<art_MaxState; i++)
260     for (j=0; j<RCI_max; j++)
261       backlog_counts.results[i][j] += ipf->counts.results[i][j];
262
263   for (i=0; i<ECI_max; i++)
264     backlog_counts.events[i] += ipf->counts.events[i];
265
266   memset(&ipf->counts, 0, sizeof(ipf->counts));
267   backlog_counts_report= 1;
268 }
269
270 void statemc_check_backlog_done(void) {
271   InputFile *ipf= backlog_input_file;
272   if (!inputfile_is_done(ipf)) return;
273
274   dbg("backlog file %p %s complete", ipf, ipf->path);
275   backlog_accumulate_counts(ipf);
276   close_input_file(ipf);
277   if (unlink(ipf->path)) {
278     if (errno != ENOENT)
279       syscrash("could not unlink processed backlog file %s", ipf->path);
280     warn("backlog file %s vanished while we were reading it"
281          " so we couldn't remove it (but it's done now, anyway)",
282          ipf->path);
283   }
284   free(ipf);
285   backlog_input_file= 0;
286   search_backlog_file();
287   return;
288 }
289
290 void statemc_check_flushing_done(void) {
291   InputFile *ipf= flushing_input_file;
292   if (!inputfile_is_done(ipf)) return;
293
294   assert(sms==sm_SEPARATED || sms==sm_DROPPING);
295
296   notice_processed_inputfile(ipf,1,"batch");
297
298   close_defer();
299
300   xunlink(path_flushing, "old flushing file");
301
302   close_input_file(flushing_input_file);
303   free(flushing_input_file);
304   flushing_input_file= 0;
305
306   if (sms==sm_SEPARATED) {
307     notice("flush complete");
308     SMS(NORMAL, spontaneous_flush_periods, "flush complete");
309   } else if (sms==sm_DROPPING) {
310     SMS(DROPPED, max_separated_periods, "old flush complete");
311     search_backlog_file();
312     notice("feed dropped, but will continue until backlog is finished");
313   }
314 }
315
316 static void *statemc_check_input_done(oop_source *lp, struct timeval now,
317                                       void *u) {
318   /* main input file may be idle but if so that's because
319    * we haven't got to it yet, but that doesn't mean it's really done */
320   statemc_check_flushing_done();
321   statemc_check_backlog_done();
322   return OOP_CONTINUE;
323 }
324
325 void queue_check_input_done(void) {
326   loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
327 }
328
329 void statemc_setstate(StateMachineState newsms, int periods,
330                       const char *forlog, const char *why) {
331   sms= newsms;
332   until_flush= periods;
333
334   const char *xtra= "";
335   switch (sms) {
336   case sm_FLUSHING:
337   case sm_FLUSHFAILED:
338     if (!main_input_file) xtra= "-ABSENT";
339     break;
340   case sm_SEPARATED:
341   case sm_DROPPING:
342     xtra= flushing_input_file->rd ? "-1" : "-2";
343     break;
344   default:;
345   }
346
347   if (periods) {
348     info("state %s%s[%d] %s",forlog,xtra,periods,why);
349   } else {
350     info("state %s%s %s",forlog,xtra,why);
351   }
352 }
353
354 /*========== flushing the feed ==========*/
355
356 pid_t inndcomm_child;
357 static int inndcomm_sentinel_fd;
358
359 static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
360   assert(inndcomm_child);
361   assert(fd == inndcomm_sentinel_fd);
362   int status= xwaitpid(&inndcomm_child, "inndcomm");
363   inndcomm_child= 0;
364   
365   cancel_fd_read_except(fd);
366   xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
367   inndcomm_sentinel_fd= 0;
368
369   assert(!flushing_input_file);
370
371   if (WIFEXITED(status)) {
372     switch (WEXITSTATUS(status)) {
373
374     case INNDCOMMCHILD_ESTATUS_FAIL:
375       goto failed;
376
377     case INNDCOMMCHILD_ESTATUS_NONESUCH:
378       notice("feed has been dropped by innd, finishing up");
379       flushing_input_file= main_input_file;
380       tailing_make_readable(flushing_input_file);
381         /* we probably previously returned EAGAIN from our fake read method
382          * when in fact we were at EOF, so signal another readable event
383          * so we actually see the EOF */
384
385       main_input_file= 0;
386
387       if (flushing_input_file) {
388         SMS(DROPPING, max_separated_periods,
389             "feed dropped by innd, but must finish last flush");
390       } else {
391         close_defer();
392         SMS(DROPPED, 0, "feed dropped by innd");
393         search_backlog_file();
394       }
395       return OOP_CONTINUE;
396
397     case 0:
398       /* as above */
399       flushing_input_file= main_input_file;
400       tailing_make_readable(flushing_input_file);
401
402       main_input_file= open_input_file(feedfile);
403       if (!main_input_file)
404         crash("flush succeeded but feedfile %s does not exist!"
405               " (this probably means feedfile does not correspond"
406               " to site %s in newsfeeds)", feedfile, sitename);
407
408       if (flushing_input_file) {
409         SMS(SEPARATED, max_separated_periods, "flush complete");
410       } else {
411         close_defer();
412         SMS(NORMAL, spontaneous_flush_periods, "recovery flush complete");
413       }
414       return OOP_CONTINUE;
415
416     default:
417       goto unexpected_exitstatus;
418
419     }
420   } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
421     warn("flush timed out trying to talk to innd");
422     goto failed;
423   } else {
424   unexpected_exitstatus:
425     report_child_status("inndcomm child", status);
426   }
427
428  failed:
429   SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
430   return OOP_CONTINUE;
431 }
432
433 static void inndcommfail(const char *what) {
434   syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
435   exit(INNDCOMMCHILD_ESTATUS_FAIL);
436 }
437
438 void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
439   int pipefds[2];
440
441   notice("flushing %s",why);
442
443   assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
444   assert(!inndcomm_child);
445   assert(!inndcomm_sentinel_fd);
446
447   if (pipe(pipefds)) sysdie("create pipe for inndcomm child sentinel");
448
449   inndcomm_child= xfork("inndcomm child");
450
451   if (!inndcomm_child) {
452     const char *flushargv[2]= { sitename, 0 };
453     char *reply;
454     int r;
455
456     xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
457     /* parent spots the autoclose of pipefds[1] when we die or exit */
458
459     if (simulate_flush>=0) {
460       warn("SIMULATING flush child status %d", simulate_flush);
461       if (simulate_flush>128) raise(simulate_flush-128);
462       else exit(simulate_flush);
463     }
464
465     alarm(inndcomm_flush_timeout);
466     r= ICCopen();                         if (r)   inndcommfail("connect");
467     r= ICCcommand('f',flushargv,&reply);  if (r<0) inndcommfail("transmit");
468     if (!r) exit(0); /* yay! */
469
470     if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
471     syswarn("innd ctlinnd flush failed: innd said %s", reply);
472     exit(INNDCOMMCHILD_ESTATUS_FAIL);
473   }
474
475   simulate_flush= -1;
476
477   xclose(pipefds[1], "inndcomm sentinel child's end",0);
478   inndcomm_sentinel_fd= pipefds[0];
479   assert(inndcomm_sentinel_fd);
480   on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
481
482   SMS(FLUSHING, 0, why);
483 }
484
485 /*---------- shutdown and signal handling ----------*/
486
487 void preterminate(void) {
488   if (in_child) return;
489   showstats();
490 }
491
492 void showstats(void) {
493   notice_conns_stats();
494   notice_processed_inputfile(main_input_file,     0, "feedfile");
495   notice_processed_inputfile(flushing_input_file, 0, "flushing");
496
497   backlog_accumulate_counts(backlog_input_file);
498   if (backlog_counts_report) {
499     notice_processed_counts(&backlog_counts, 0,
500                             backlog_input_file, "backlogs");
501     backlog_counts_report= 0;
502   }
503   until_stats_log= stats_log_periods;
504 }
505
506 static int signal_self_pipe[2];
507
508 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
509   assert(fd==signal_self_pipe[0]);
510   char buf[PIPE_BUF];
511   int r= read(signal_self_pipe[0], buf, sizeof(buf));
512   if (r<0 && !isewouldblock(errno))
513     syscrash("failed to read signal self pipe");
514   if (r==0) crash("eof on signal self pipe");
515   if (terminate_sig_flag) {
516     preterminate();
517     notice("terminating (%s)", strsignal(terminate_sig_flag));
518     raise_default(terminate_sig_flag);
519   }
520   return OOP_CONTINUE;
521 }
522
523 static void sigarrived_handler(int signum) {
524   int esave = errno;
525   static char x;
526   switch (signum) {
527   case SIGTERM:
528   case SIGINT:
529     if (!terminate_sig_flag) terminate_sig_flag= signum;
530     break;
531   default:
532     abort();
533   }
534   int r = write(signal_self_pipe[1],&x,1);
535   if (!(r==1 || isewouldblock(errno))) abort();
536   errno = esave;
537 }
538
539 void init_signals(void) {
540   if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
541     syscrash("could not ignore SIGPIPE");
542
543   if (pipe(signal_self_pipe)) sysdie("create self-pipe for signals");
544
545   xsetnonblock(signal_self_pipe[0],1);
546   xsetnonblock(signal_self_pipe[1],1);
547
548   struct sigaction sa;
549   memset(&sa,0,sizeof(sa));
550   sa.sa_handler= sigarrived_handler;
551   sa.sa_flags= SA_RESTART;
552   xsigaction(SIGTERM,&sa);
553   xsigaction(SIGINT,&sa);
554
555   on_fd_read_except(signal_self_pipe[0], sigarrived_event);
556 }
557