3 * tailing reliable realtime streaming feeder for inn
4 * statemc.c - state machine core (see README.states).
6 * Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
8 * This program is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 * (I believe that when you compile and link this as part of the inn2
22 * build, with the Makefile runes I have provided, all the libraries
23 * and files which end up included in innduct are licence-compatible
24 * with GPLv3. If not then please let me know. -Ian Jackson.)
30 /* statemc_init initialises */
31 StateMachineState sms;
33 InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
34 Counts backlog_counts;
35 int backlog_counts_report;
38 /* initialisation to 0 is good */
39 int until_connect, until_backlog_nextscan;
40 double accept_proportion;
41 int nocheck, nocheck_reported, in_child;
42 sig_atomic_t terminate_sig_flag;
45 static void startup_set_input_file(InputFile *f) {
46 assert(!main_input_file);
48 inputfile_reading_start(f);
51 void statemc_lock(void) {
53 struct stat stab, stabf;
56 lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
57 if (lockfd<0) sysdie("open lockfile %s", path_lock);
60 memset(&fl,0,sizeof(fl));
62 fl.l_whence= SEEK_SET;
63 int r= fcntl(lockfd, F_SETLK, &fl);
65 if (errno==EACCES || isewouldblock(errno)) {
66 if (quiet_multiple) exit(0);
67 die("another duct holds the lockfile");
69 sysdie("fcntl F_SETLK lockfile %s", path_lock);
72 xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
74 xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
76 if (!lock_noent && samefile(&stab, &stabf))
79 xclose(lockfd, "stale lockfile ", path_lock);
82 FILE *lockfile= fdopen(lockfd, "w");
83 if (!lockfile) syscrash("fdopen lockfile");
85 int r= ftruncate(lockfd, 0);
86 if (r) syscrash("truncate lockfile to write new info");
88 if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
89 (unsigned long)self_pid,
90 sitename, feedfile, remote_host) == EOF ||
92 sysdie("write info to lockfile %s", path_lock);
94 dbg("startup: locked");
97 void statemc_init(void) {
98 struct stat stabdefer;
100 search_backlog_file();
103 xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
105 dbg("startup: ductdefer ENOENT");
107 dbg("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
108 switch (stabdefer.st_nlink==1) {
110 open_defer(); /* so that we will later close it and rename it */
113 xunlink(path_defer, "stale defer file link"
114 " (presumably hardlink to backlog file)");
117 crash("defer file %s has unexpected link count %d",
118 path_defer, stabdefer.st_nlink);
122 struct stat stab_f, stab_d;
125 InputFile *file_d= open_input_file(path_flushing);
126 if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
128 xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
130 if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
131 dbg("startup: F==D => Hardlinked");
132 xunlink(feedfile, "feed file (during startup)"); /* => Moved */
137 dbg("startup: F ENOENT => Moved");
138 if (file_d) startup_set_input_file(file_d);
139 spawn_inndcomm_flush("feedfile missing at startup");
140 /* => Flushing, sms:=FLUSHING */
143 dbg("startup: F!=D => Separated");
144 startup_set_input_file(file_d);
145 flushing_input_file= main_input_file;
146 main_input_file= open_input_file(feedfile);
147 if (!main_input_file) crash("feedfile vanished during startup");
148 SMS(SEPARATED, max_separated_periods,
149 "found both old and current feed files");
151 dbg("startup: F exists, D ENOENT => Normal");
152 InputFile *file_f= open_input_file(feedfile);
153 if (!file_f) crash("feed file vanished during startup");
154 startup_set_input_file(file_f);
155 SMS(NORMAL, spontaneous_flush_periods, "normal startup");
160 void statemc_start_flush(const char *why) { /* Normal => Flushing */
161 assert(sms == sm_NORMAL);
163 dbg("starting flush (%s) (%lu >?= %lu) (%d)",
165 (unsigned long)(main_input_file ? main_input_file->offset : 0),
166 (unsigned long)target_max_feedfile_size,
169 int r= link(feedfile, path_flushing);
170 if (r) sysdie("link feedfile %s to flushing file %s",
171 feedfile, path_flushing);
174 xunlink(feedfile, "old feedfile link");
177 spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
180 int trigger_flush_ok(const char *why) {
184 statemc_start_flush(why ? why : "periodic");
185 return 1; /* Normal => Flushing; => FLUSHING */
188 spawn_inndcomm_flush(why ? why : "retry");
189 return 1; /* Moved => Flushing; => FLUSHING */
193 warn("abandoning old feedfile after flush (%s), autodeferring",
194 why ? why : "took too long to complete");
195 assert(flushing_input_file);
196 autodefer_input_file(flushing_input_file);
204 void statemc_period_poll(void) {
205 if (!until_flush) return;
207 assert(until_flush>=0);
209 if (until_flush) return;
210 int ok= trigger_flush_ok(0);
214 static int inputfile_is_done(InputFile *ipf) {
216 if (ipf->inprogress) return 0; /* new article in the meantime */
217 if (ipf->rd) return 0; /* not had EOF */
221 static void notice_processed_counts(Counts *counts, int completed,
222 InputFile *ipf, const char *what) {
224 #define RCI_NOTHING(x) /* nothing */
225 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
226 #define RCI_TRIPLE_VALS(x) ,RCI_TRIPLE_VALS_BASE(ipf->counts.results, [RC_##x])
228 #define CNT(art,rc) (ipf->counts.results[art_##art][RC_##rc])
230 char *inprog= completed
231 ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
232 : xasprintf(" inprogress=%ld", ipf->inprogress);
233 char *autodefer= ipf && ipf->autodefer >= 0
234 ? xasprintf(" autodeferred=%ld", ipf->autodefer)
235 : xasprintf("%s","");
237 info("%s %s read=%d (+bl=%d,+err=%d)%s%s"
238 " missing=%d offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
239 RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
241 completed?"completed":"processed", what,
242 ipf->counts.events[read_ok], ipf->counts.events[read_blank],
243 ipf->counts.events[read_err],
244 inprog, autodefer, ipf->counts.events[nooffer_missing],
245 CNT(Unchecked,sent) + CNT(Unsolicited,sent)
246 , CNT(Unchecked,sent), CNT(Unsolicited,sent),
247 CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
248 , CNT(Wanted,accepted), CNT(Unsolicited,accepted)
249 RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_VALS)
252 memset(&ipf->counts, 0, sizeof(ipf->counts));
260 static void notice_processed_inputfile(InputFile *ipf, int completed,
262 if (!ipf) return; /* allows showstats to be lazy */
263 notice_processed_counts(&ipf->counts, completed, ipf, what);
266 static void backlog_accumulate_counts(InputFile *ipf) {
270 for (i=0; i<art_MaxState; i++)
271 for (j=0; j<RCI_max; j++)
272 backlog_counts.results[i][j] += ipf->counts.results[i][j];
274 for (i=0; i<ECI_max; i++)
275 backlog_counts.events[i] += ipf->counts.events[i];
277 memset(&ipf->counts, 0, sizeof(ipf->counts));
278 backlog_counts_report= 1;
281 void statemc_check_backlog_done(void) {
282 InputFile *ipf= backlog_input_file;
283 if (!inputfile_is_done(ipf)) return;
285 backlog_accumulate_counts(ipf);
286 close_input_file(ipf);
287 if (unlink(ipf->path)) {
289 syscrash("could not unlink processed backlog file %s", ipf->path);
290 warn("backlog file %s vanished while we were reading it"
291 " so we couldn't remove it (but it's done now, anyway)",
295 backlog_input_file= 0;
296 search_backlog_file();
300 void statemc_check_flushing_done(void) {
301 InputFile *ipf= flushing_input_file;
302 if (!inputfile_is_done(ipf)) return;
304 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
306 notice_processed_inputfile(ipf,1,"feedfile");
310 xunlink(path_flushing, "old flushing file");
312 close_input_file(flushing_input_file);
313 free(flushing_input_file);
314 flushing_input_file= 0;
316 if (sms==sm_SEPARATED) {
317 notice("flush complete");
318 SMS(NORMAL, spontaneous_flush_periods, "flush complete");
319 } else if (sms==sm_DROPPING) {
320 SMS(DROPPED, max_separated_periods, "old flush complete");
321 search_backlog_file();
322 notice("feed dropped, but will continue until backlog is finished");
326 static void *statemc_check_input_done(oop_source *lp, struct timeval now,
328 /* main input file may be idle but if so that's because
329 * we haven't got to it yet, but that doesn't mean it's really done */
330 statemc_check_flushing_done();
331 statemc_check_backlog_done();
335 void queue_check_input_done(void) {
336 loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
339 void statemc_setstate(StateMachineState newsms, int periods,
340 const char *forlog, const char *why) {
342 until_flush= periods;
344 const char *xtra= "";
348 if (!main_input_file) xtra= "-ABSENT";
352 xtra= flushing_input_file->rd ? "-1" : "-2";
358 info("state %s%s[%d] %s",forlog,xtra,periods,why);
360 info("state %s%s %s",forlog,xtra,why);
364 /*========== flushing the feed ==========*/
366 pid_t inndcomm_child;
367 static int inndcomm_sentinel_fd;
369 static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
370 assert(inndcomm_child);
371 assert(fd == inndcomm_sentinel_fd);
372 int status= xwaitpid(&inndcomm_child, "inndcomm");
375 cancel_fd_read_except(fd);
376 xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
377 inndcomm_sentinel_fd= 0;
379 assert(!flushing_input_file);
381 if (WIFEXITED(status)) {
382 switch (WEXITSTATUS(status)) {
384 case INNDCOMMCHILD_ESTATUS_FAIL:
387 case INNDCOMMCHILD_ESTATUS_NONESUCH:
388 notice("feed has been dropped by innd, finishing up");
389 flushing_input_file= main_input_file;
390 tailing_make_readable(flushing_input_file);
391 /* we probably previously returned EAGAIN from our fake read method
392 * when in fact we were at EOF, so signal another readable event
393 * so we actually see the EOF */
397 if (flushing_input_file) {
398 SMS(DROPPING, max_separated_periods,
399 "feed dropped by innd, but must finish last flush");
402 SMS(DROPPED, 0, "feed dropped by innd");
403 search_backlog_file();
409 flushing_input_file= main_input_file;
410 tailing_make_readable(flushing_input_file);
412 main_input_file= open_input_file(feedfile);
413 if (!main_input_file)
414 crash("flush succeeded but feedfile %s does not exist!"
415 " (this probably means feedfile does not correspond"
416 " to site %s in newsfeeds)", feedfile, sitename);
418 if (flushing_input_file) {
419 SMS(SEPARATED, max_separated_periods, "flush complete");
422 SMS(NORMAL, spontaneous_flush_periods, "recovery flush complete");
427 goto unexpected_exitstatus;
430 } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
431 warn("flush timed out trying to talk to innd");
434 unexpected_exitstatus:
435 report_child_status("inndcomm child", status);
439 SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
443 static void inndcommfail(const char *what) {
444 syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
445 exit(INNDCOMMCHILD_ESTATUS_FAIL);
448 void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
451 notice("flushing %s",why);
453 assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
454 assert(!inndcomm_child);
455 assert(!inndcomm_sentinel_fd);
457 if (pipe(pipefds)) sysdie("create pipe for inndcomm child sentinel");
459 inndcomm_child= xfork("inndcomm child");
461 if (!inndcomm_child) {
462 const char *flushargv[2]= { sitename, 0 };
466 xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
467 /* parent spots the autoclose of pipefds[1] when we die or exit */
469 if (simulate_flush>=0) {
470 warn("SIMULATING flush child status %d", simulate_flush);
471 if (simulate_flush>128) raise(simulate_flush-128);
472 else exit(simulate_flush);
475 alarm(inndcomm_flush_timeout);
476 r= ICCopen(); if (r) inndcommfail("connect");
477 r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit");
478 if (!r) exit(0); /* yay! */
480 if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
481 syswarn("innd ctlinnd flush failed: innd said %s", reply);
482 exit(INNDCOMMCHILD_ESTATUS_FAIL);
487 xclose(pipefds[1], "inndcomm sentinel child's end",0);
488 inndcomm_sentinel_fd= pipefds[0];
489 assert(inndcomm_sentinel_fd);
490 on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
492 SMS(FLUSHING, 0, why);
495 /*---------- shutdown and signal handling ----------*/
497 void preterminate(void) {
498 if (in_child) return;
502 void showstats(void) {
503 notice_processed_inputfile(main_input_file, 0, "feedfile");
504 notice_processed_inputfile(flushing_input_file, 0, "flushing");
506 backlog_accumulate_counts(backlog_input_file);
507 if (backlog_counts_report) {
508 notice_processed_counts(&backlog_counts, 0,
509 backlog_input_file, "backlogs");
510 backlog_counts_report= 0;
512 until_stats_log= stats_log_periods;
515 static int signal_self_pipe[2];
517 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
518 assert(fd=signal_self_pipe[0]);
520 int r= read(signal_self_pipe[0], buf, sizeof(buf));
521 if (r<0 && !isewouldblock(errno))
522 syscrash("failed to read signal self pipe");
523 if (r==0) crash("eof on signal self pipe");
524 if (terminate_sig_flag) {
526 notice("terminating (%s)", strsignal(terminate_sig_flag));
527 raise_default(terminate_sig_flag);
532 static void sigarrived_handler(int signum) {
537 if (!terminate_sig_flag) terminate_sig_flag= signum;
542 write(signal_self_pipe[1],&x,1);
545 void init_signals(void) {
546 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
547 syscrash("could not ignore SIGPIPE");
549 if (pipe(signal_self_pipe)) sysdie("create self-pipe for signals");
551 xsetnonblock(signal_self_pipe[0],1);
552 xsetnonblock(signal_self_pipe[1],1);
555 memset(&sa,0,sizeof(sa));
556 sa.sa_handler= sigarrived_handler;
557 sa.sa_flags= SA_RESTART;
558 xsigaction(SIGTERM,&sa);
559 xsigaction(SIGINT,&sa);
561 on_fd_read_except(signal_self_pipe[0], sigarrived_event);