3 * tailing reliable realtime streaming feeder for inn
4 * statemc.c - state machine core (see README.states).
6 * Copyright Ian Jackson <ijackson@chiark.greenend.org.uk>
7 * and contributors; see LICENCE.txt.
8 * SPDX-License-Identifier: GPL-3.0-or-later
14 /* statemc_init initialises */
15 StateMachineState sms;
17 InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
18 Counts backlog_counts;
19 int backlog_counts_report;
22 /* initialisation to 0 is good */
23 int until_connect, until_backlog_nextscan;
24 double accept_proportion;
25 int nocheck, nocheck_reported, in_child;
26 sig_atomic_t terminate_sig_flag;
29 static void startup_set_input_file(InputFile *f) {
30 assert(!main_input_file);
32 inputfile_reading_start(f);
35 void statemc_lock(void) {
37 struct stat stab, stabf;
40 lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
41 if (lockfd<0) sysdie("open lockfile %s", path_lock);
44 memset(&fl,0,sizeof(fl));
46 fl.l_whence= SEEK_SET;
47 int r= fcntl(lockfd, F_SETLK, &fl);
49 if (errno==EACCES || isewouldblock(errno)) {
50 if (quiet_multiple) exit(0);
51 die("another duct holds the lockfile");
53 sysdie("fcntl F_SETLK lockfile %s", path_lock);
56 xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
58 xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
60 if (!lock_noent && samefile(&stab, &stabf))
63 xclose(lockfd, "stale lockfile ", path_lock);
66 FILE *lockfile= fdopen(lockfd, "w");
67 if (!lockfile) syscrash("fdopen lockfile");
69 int r= ftruncate(lockfd, 0);
70 if (r) syscrash("truncate lockfile to write new info");
72 if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
73 (unsigned long)self_pid,
74 sitename, feedfile, remote_host) == EOF ||
76 sysdie("write info to lockfile %s", path_lock);
78 dbg("startup: locked");
81 void statemc_init(void) {
82 struct stat stabdefer;
84 search_backlog_file();
87 xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
89 dbg("startup: ductdefer ENOENT");
91 dbg("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
92 switch (stabdefer.st_nlink) {
94 open_defer(); /* so that we will later close it and rename it */
97 xunlink(path_defer, "stale defer file link"
98 " (presumably hardlink to backlog file)");
101 crash("defer file %s has unexpected link count %ld",
102 path_defer, (long)stabdefer.st_nlink);
106 struct stat stab_f, stab_d;
109 InputFile *file_d= open_input_file(path_flushing);
110 if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
112 xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
114 if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
115 dbg("startup: F==D => Hardlinked");
116 xunlink(feedfile, "feed file (during startup)"); /* => Moved */
121 dbg("startup: F ENOENT => Moved");
122 if (file_d) startup_set_input_file(file_d);
123 spawn_inndcomm_flush("feedfile missing at startup");
124 /* => Flushing, sms:=FLUSHING */
127 dbg("startup: F!=D => Separated");
128 startup_set_input_file(file_d);
129 flushing_input_file= main_input_file;
130 main_input_file= open_input_file(feedfile);
131 if (!main_input_file) crash("feedfile vanished during startup");
132 SMS(SEPARATED, max_separated_periods,
133 "found both old and current feed files");
135 dbg("startup: F exists, D ENOENT => Normal");
136 InputFile *file_f= open_input_file(feedfile);
137 if (!file_f) crash("feed file vanished during startup");
138 startup_set_input_file(file_f);
139 SMS(NORMAL, spontaneous_flush_periods, "normal startup");
144 void statemc_start_flush(const char *why) { /* Normal => Flushing */
145 assert(sms == sm_NORMAL);
147 dbg("starting flush (%s) (%lu >?= %lu) (%d)",
149 (unsigned long)(main_input_file ? main_input_file->offset : 0),
150 (unsigned long)target_max_feedfile_size,
153 int r= link(feedfile, path_flushing);
154 if (r) sysdie("link feedfile %s to flushing file %s",
155 feedfile, path_flushing);
158 xunlink(feedfile, "old feedfile link");
161 spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
164 int trigger_flush_ok(const char *why) {
168 statemc_start_flush(why ? why : "periodic");
169 return 1; /* Normal => Flushing; => FLUSHING */
172 spawn_inndcomm_flush(why ? why : "retry");
173 return 1; /* Moved => Flushing; => FLUSHING */
178 warn("abandoning old feedfile after flush (%s), autodeferring",
179 why ? why : "took too long to complete");
181 info("autodeferring after flush (%s)",
182 why ? why : "no connections");
183 assert(flushing_input_file);
184 autodefer_input_file(flushing_input_file);
192 void statemc_period_poll(void) {
193 if (!until_flush) return;
195 assert(until_flush>=0);
197 if (until_flush) return;
198 int ok= trigger_flush_ok(0);
202 static int inputfile_is_done(InputFile *ipf) {
204 if (ipf->inprogress) return 0; /* new article in the meantime */
205 if (ipf->rd) return 0; /* not had EOF */
209 static void notice_processed_counts(Counts *counts, int completed,
210 InputFile *ipf_xtra, const char *what) {
212 #define RCI_NOTHING(x) /* nothing */
213 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
214 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(counts->results, [RC_##x])
216 #define CNT(art,rc) (counts->results[art_##art][RC_##rc])
218 char *inprog= ipf_xtra && !completed
219 ? masprintf(" inprogress=%ld", ipf_xtra->inprogress)
220 : masprintf("%s",""); /* GCC produces a stupid warning for printf("") ! */
221 char *autodefer= ipf_xtra && ipf_xtra->autodefer >= 0
222 ? masprintf(" autodeferred=%ld", ipf_xtra->autodefer)
223 : masprintf("%s","");
225 notice("%s %s read=%d (+bl=%d,+err=%d)%s%s missing=%d"
226 " offered=%d (ch=%d,nc=%d)"
227 " accepted=%d (ch=%d,nc=%d)"
228 RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
230 completed?"completed":"processed", what,
231 counts->events[read_ok], counts->events[read_blank],
232 counts->events[read_err],
233 inprog, autodefer, counts->events[nooffer_missing],
234 CNT(Unchecked,sent) + CNT(Unsolicited,sent)
235 , CNT(Unchecked,sent), CNT(Unsolicited,sent),
236 CNT(Wanted,accepted) + CNT(Wanted,accepted)
237 , CNT(Wanted,accepted), CNT(Wanted,accepted)
238 RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_VALS)
241 memset(counts, 0, sizeof(*counts));
249 static void notice_processed_inputfile(InputFile *ipf, int completed,
251 if (!ipf) return; /* allows showstats to be lazy */
252 notice_processed_counts(&ipf->counts, completed, ipf, what);
255 static void backlog_accumulate_counts(InputFile *ipf) {
259 for (i=0; i<art_MaxState; i++)
260 for (j=0; j<RCI_max; j++)
261 backlog_counts.results[i][j] += ipf->counts.results[i][j];
263 for (i=0; i<ECI_max; i++)
264 backlog_counts.events[i] += ipf->counts.events[i];
266 memset(&ipf->counts, 0, sizeof(ipf->counts));
267 backlog_counts_report= 1;
270 void statemc_check_backlog_done(void) {
271 InputFile *ipf= backlog_input_file;
272 if (!inputfile_is_done(ipf)) return;
274 dbg("backlog file %p %s complete", ipf, ipf->path);
275 backlog_accumulate_counts(ipf);
276 close_input_file(ipf);
277 if (unlink(ipf->path)) {
279 syscrash("could not unlink processed backlog file %s", ipf->path);
280 warn("backlog file %s vanished while we were reading it"
281 " so we couldn't remove it (but it's done now, anyway)",
285 backlog_input_file= 0;
286 search_backlog_file();
290 void statemc_check_flushing_done(void) {
291 InputFile *ipf= flushing_input_file;
292 if (!inputfile_is_done(ipf)) return;
294 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
296 notice_processed_inputfile(ipf,1,"batch");
300 xunlink(path_flushing, "old flushing file");
302 close_input_file(flushing_input_file);
303 free(flushing_input_file);
304 flushing_input_file= 0;
306 if (sms==sm_SEPARATED) {
307 notice("flush complete");
308 SMS(NORMAL, spontaneous_flush_periods, "flush complete");
309 } else if (sms==sm_DROPPING) {
310 SMS(DROPPED, max_separated_periods, "old flush complete");
311 search_backlog_file();
312 notice("feed dropped, but will continue until backlog is finished");
316 static void *statemc_check_input_done(oop_source *lp, struct timeval now,
318 /* main input file may be idle but if so that's because
319 * we haven't got to it yet, but that doesn't mean it's really done */
320 statemc_check_flushing_done();
321 statemc_check_backlog_done();
325 void queue_check_input_done(void) {
326 loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
329 void statemc_setstate(StateMachineState newsms, int periods,
330 const char *forlog, const char *why) {
332 until_flush= periods;
334 const char *xtra= "";
338 if (!main_input_file) xtra= "-ABSENT";
342 xtra= flushing_input_file->rd ? "-1" : "-2";
348 info("state %s%s[%d] %s",forlog,xtra,periods,why);
350 info("state %s%s %s",forlog,xtra,why);
354 /*========== flushing the feed ==========*/
356 pid_t inndcomm_child;
357 static int inndcomm_sentinel_fd;
359 static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
360 assert(inndcomm_child);
361 assert(fd == inndcomm_sentinel_fd);
362 int status= xwaitpid(&inndcomm_child, "inndcomm");
365 cancel_fd_read_except(fd);
366 xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
367 inndcomm_sentinel_fd= 0;
369 assert(!flushing_input_file);
371 if (WIFEXITED(status)) {
372 switch (WEXITSTATUS(status)) {
374 case INNDCOMMCHILD_ESTATUS_FAIL:
377 case INNDCOMMCHILD_ESTATUS_NONESUCH:
378 notice("feed has been dropped by innd, finishing up");
379 flushing_input_file= main_input_file;
380 tailing_make_readable(flushing_input_file);
381 /* we probably previously returned EAGAIN from our fake read method
382 * when in fact we were at EOF, so signal another readable event
383 * so we actually see the EOF */
387 if (flushing_input_file) {
388 SMS(DROPPING, max_separated_periods,
389 "feed dropped by innd, but must finish last flush");
392 SMS(DROPPED, 0, "feed dropped by innd");
393 search_backlog_file();
399 flushing_input_file= main_input_file;
400 tailing_make_readable(flushing_input_file);
402 main_input_file= open_input_file(feedfile);
403 if (!main_input_file)
404 crash("flush succeeded but feedfile %s does not exist!"
405 " (this probably means feedfile does not correspond"
406 " to site %s in newsfeeds)", feedfile, sitename);
408 if (flushing_input_file) {
409 SMS(SEPARATED, max_separated_periods, "flush complete");
412 SMS(NORMAL, spontaneous_flush_periods, "recovery flush complete");
417 goto unexpected_exitstatus;
420 } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
421 warn("flush timed out trying to talk to innd");
424 unexpected_exitstatus:
425 report_child_status("inndcomm child", status);
429 SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
433 static void inndcommfail(const char *what) {
434 syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
435 exit(INNDCOMMCHILD_ESTATUS_FAIL);
438 void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
441 notice("flushing %s",why);
443 assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
444 assert(!inndcomm_child);
445 assert(!inndcomm_sentinel_fd);
447 if (pipe(pipefds)) sysdie("create pipe for inndcomm child sentinel");
449 inndcomm_child= xfork("inndcomm child");
451 if (!inndcomm_child) {
452 const char *flushargv[2]= { sitename, 0 };
456 xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
457 /* parent spots the autoclose of pipefds[1] when we die or exit */
459 if (simulate_flush>=0) {
460 warn("SIMULATING flush child status %d", simulate_flush);
461 if (simulate_flush>128) raise(simulate_flush-128);
462 else exit(simulate_flush);
465 alarm(inndcomm_flush_timeout);
466 r= ICCopen(); if (r) inndcommfail("connect");
467 r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit");
468 if (!r) exit(0); /* yay! */
470 if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
471 syswarn("innd ctlinnd flush failed: innd said %s", reply);
472 exit(INNDCOMMCHILD_ESTATUS_FAIL);
477 xclose(pipefds[1], "inndcomm sentinel child's end",0);
478 inndcomm_sentinel_fd= pipefds[0];
479 assert(inndcomm_sentinel_fd);
480 on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
482 SMS(FLUSHING, 0, why);
485 /*---------- shutdown and signal handling ----------*/
487 void preterminate(void) {
488 if (in_child) return;
492 void showstats(void) {
493 notice_conns_stats();
494 notice_processed_inputfile(main_input_file, 0, "feedfile");
495 notice_processed_inputfile(flushing_input_file, 0, "flushing");
497 backlog_accumulate_counts(backlog_input_file);
498 if (backlog_counts_report) {
499 notice_processed_counts(&backlog_counts, 0,
500 backlog_input_file, "backlogs");
501 backlog_counts_report= 0;
503 until_stats_log= stats_log_periods;
506 static int signal_self_pipe[2];
508 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
509 assert(fd==signal_self_pipe[0]);
511 int r= read(signal_self_pipe[0], buf, sizeof(buf));
512 if (r<0 && !isewouldblock(errno))
513 syscrash("failed to read signal self pipe");
514 if (r==0) crash("eof on signal self pipe");
515 if (terminate_sig_flag) {
517 notice("terminating (%s)", strsignal(terminate_sig_flag));
518 raise_default(terminate_sig_flag);
523 static void sigarrived_handler(int signum) {
529 if (!terminate_sig_flag) terminate_sig_flag= signum;
534 int r = write(signal_self_pipe[1],&x,1);
535 if (!(r==1 || isewouldblock(errno))) abort();
539 void init_signals(void) {
540 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
541 syscrash("could not ignore SIGPIPE");
543 if (pipe(signal_self_pipe)) sysdie("create self-pipe for signals");
545 xsetnonblock(signal_self_pipe[0],1);
546 xsetnonblock(signal_self_pipe[1],1);
549 memset(&sa,0,sizeof(sa));
550 sa.sa_handler= sigarrived_handler;
551 sa.sa_flags= SA_RESTART;
552 xsigaction(SIGTERM,&sa);
553 xsigaction(SIGINT,&sa);
555 on_fd_read_except(signal_self_pipe[0], sigarrived_event);