3 /* statemc_init initialises */
6 InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
9 /* initialisation to 0 is good */
10 int until_connect, until_backlog_nextscan;
11 double accept_proportion;
12 int nocheck, nocheck_reported, in_child;
13 sig_atomic_t terminate_sig_flag;
16 static void startup_set_input_file(InputFile *f) {
17 assert(!main_input_file);
19 inputfile_reading_start(f);
22 static void statemc_lock(void) {
24 struct stat stab, stabf;
27 lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
28 if (lockfd<0) sysdie("open lockfile %s", path_lock);
31 memset(&fl,0,sizeof(fl));
33 fl.l_whence= SEEK_SET;
34 int r= fcntl(lockfd, F_SETLK, &fl);
36 if (errno==EACCES || isewouldblock(errno)) {
37 if (quiet_multiple) exit(0);
38 die("another duct holds the lockfile");
40 sysdie("fcntl F_SETLK lockfile %s", path_lock);
43 xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
45 xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
47 if (!lock_noent && samefile(&stab, &stabf))
50 xclose(lockfd, "stale lockfile ", path_lock);
53 FILE *lockfile= fdopen(lockfd, "w");
54 if (!lockfile) syscrash("fdopen lockfile");
56 int r= ftruncate(lockfd, 0);
57 if (r) syscrash("truncate lockfile to write new info");
59 if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
60 (unsigned long)self_pid,
61 sitename, feedfile, remote_host) == EOF ||
63 sysdie("write info to lockfile %s", path_lock);
65 dbg("startup: locked");
68 static void statemc_init(void) {
69 struct stat stabdefer;
71 search_backlog_file();
74 xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
76 dbg("startup: ductdefer ENOENT");
78 dbg("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
79 switch (stabdefer.st_nlink==1) {
81 open_defer(); /* so that we will later close it and rename it */
84 xunlink(path_defer, "stale defer file link"
85 " (presumably hardlink to backlog file)");
88 crash("defer file %s has unexpected link count %d",
89 path_defer, stabdefer.st_nlink);
93 struct stat stab_f, stab_d;
96 InputFile *file_d= open_input_file(path_flushing);
97 if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
99 xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
101 if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
102 dbg("startup: F==D => Hardlinked");
103 xunlink(feedfile, "feed file (during startup)"); /* => Moved */
108 dbg("startup: F ENOENT => Moved");
109 if (file_d) startup_set_input_file(file_d);
110 spawn_inndcomm_flush("feedfile missing at startup");
111 /* => Flushing, sms:=FLUSHING */
114 dbg("startup: F!=D => Separated");
115 startup_set_input_file(file_d);
116 flushing_input_file= main_input_file;
117 main_input_file= open_input_file(feedfile);
118 if (!main_input_file) crash("feedfile vanished during startup");
119 SMS(SEPARATED, max_separated_periods,
120 "found both old and current feed files");
122 dbg("startup: F exists, D ENOENT => Normal");
123 InputFile *file_f= open_input_file(feedfile);
124 if (!file_f) crash("feed file vanished during startup");
125 startup_set_input_file(file_f);
126 SMS(NORMAL, spontaneous_flush_periods, "normal startup");
131 static void statemc_start_flush(const char *why) { /* Normal => Flushing */
132 assert(sms == sm_NORMAL);
134 dbg("starting flush (%s) (%lu >?= %lu) (%d)",
136 (unsigned long)(main_input_file ? main_input_file->offset : 0),
137 (unsigned long)target_max_feedfile_size,
140 int r= link(feedfile, path_flushing);
141 if (r) sysdie("link feedfile %s to flushing file %s",
142 feedfile, path_flushing);
145 xunlink(feedfile, "old feedfile link");
148 spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
151 static int trigger_flush_ok(const char *why) {
155 statemc_start_flush(why ? why : "periodic");
156 return 1; /* Normal => Flushing; => FLUSHING */
159 spawn_inndcomm_flush(why ? why : "retry");
160 return 1; /* Moved => Flushing; => FLUSHING */
164 warn("abandoning old feedfile after flush (%s), autodeferring",
165 why ? why : "took too long to complete");
166 assert(flushing_input_file);
167 autodefer_input_file(flushing_input_file);
175 static void statemc_period_poll(void) {
176 if (!until_flush) return;
178 assert(until_flush>=0);
180 if (until_flush) return;
181 int ok= trigger_flush_ok(0);
185 static int inputfile_is_done(InputFile *ipf) {
187 if (ipf->inprogress) return 0; /* new article in the meantime */
188 if (ipf->rd) return 0; /* not had EOF */
192 static void notice_processed(InputFile *ipf, int completed,
193 const char *what, const char *spec) {
194 if (!ipf) return; /* allows preterminate to be lazy */
196 #define RCI_NOTHING(x) /* nothing */
197 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
198 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
200 #define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
202 char *inprog= completed
203 ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
204 : xasprintf(" inprogress=%ld", ipf->inprogress);
205 char *autodefer= ipf->autodefer >= 0
206 ? xasprintf(" autodeferred=%ld", ipf->autodefer)
207 : xasprintf("%s","");
209 info("%s %s%s read=%d (+bl=%d,+err=%d)%s%s"
210 " missing=%d offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
211 RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
213 completed?"completed":"processed", what, spec,
214 ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err,
215 inprog, autodefer, ipf->count_nooffer_missing,
216 CNT(Unchecked,sent) + CNT(Unsolicited,sent)
217 , CNT(Unchecked,sent), CNT(Unsolicited,sent),
218 CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
219 , CNT(Wanted,accepted), CNT(Unsolicited,accepted)
220 RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_VALS)
229 static void statemc_check_backlog_done(void) {
230 InputFile *ipf= backlog_input_file;
231 if (!inputfile_is_done(ipf)) return;
233 const char *slash= strrchr(ipf->path, '/');
234 const char *leaf= slash ? slash+1 : ipf->path;
235 const char *under= strchr(slash, '_');
236 const char *rest= under ? under+1 : leaf;
237 if (!strncmp(rest,"backlog",7)) rest += 7;
238 notice_processed(ipf,1,"backlog ",rest);
240 close_input_file(ipf);
241 if (unlink(ipf->path)) {
243 syscrash("could not unlink processed backlog file %s", ipf->path);
244 warn("backlog file %s vanished while we were reading it"
245 " so we couldn't remove it (but it's done now, anyway)",
249 backlog_input_file= 0;
250 search_backlog_file();
254 static void statemc_check_flushing_done(void) {
255 InputFile *ipf= flushing_input_file;
256 if (!inputfile_is_done(ipf)) return;
258 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
260 notice_processed(ipf,1,"feedfile","");
264 xunlink(path_flushing, "old flushing file");
266 close_input_file(flushing_input_file);
267 free(flushing_input_file);
268 flushing_input_file= 0;
270 if (sms==sm_SEPARATED) {
271 notice("flush complete");
272 SMS(NORMAL, spontaneous_flush_periods, "flush complete");
273 } else if (sms==sm_DROPPING) {
274 SMS(DROPPED, max_separated_periods, "old flush complete");
275 search_backlog_file();
276 notice("feed dropped, but will continue until backlog is finished");
280 static void *statemc_check_input_done(oop_source *lp, struct timeval now,
282 /* main input file may be idle but if so that's because
283 * we haven't got to it yet, but that doesn't mean it's really done */
284 statemc_check_flushing_done();
285 statemc_check_backlog_done();
289 static void queue_check_input_done(void) {
290 loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
293 static void statemc_setstate(StateMachineState newsms, int periods,
294 const char *forlog, const char *why) {
296 until_flush= periods;
298 const char *xtra= "";
302 if (!main_input_file) xtra= "-ABSENT";
306 xtra= flushing_input_file->rd ? "-1" : "-2";
312 info("state %s%s[%d] %s",forlog,xtra,periods,why);
314 info("state %s%s %s",forlog,xtra,why);
318 /*========== flushing the feed ==========*/
320 pid_t inndcomm_child;
321 static int inndcomm_sentinel_fd;
323 static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
324 assert(inndcomm_child);
325 assert(fd == inndcomm_sentinel_fd);
326 int status= xwaitpid(&inndcomm_child, "inndcomm");
329 cancel_fd_read_except(fd);
330 xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
331 inndcomm_sentinel_fd= 0;
333 assert(!flushing_input_file);
335 if (WIFEXITED(status)) {
336 switch (WEXITSTATUS(status)) {
338 case INNDCOMMCHILD_ESTATUS_FAIL:
341 case INNDCOMMCHILD_ESTATUS_NONESUCH:
342 notice("feed has been dropped by innd, finishing up");
343 flushing_input_file= main_input_file;
344 tailing_make_readable(flushing_input_file);
345 /* we probably previously returned EAGAIN from our fake read method
346 * when in fact we were at EOF, so signal another readable event
347 * so we actually see the EOF */
351 if (flushing_input_file) {
352 SMS(DROPPING, max_separated_periods,
353 "feed dropped by innd, but must finish last flush");
356 SMS(DROPPED, 0, "feed dropped by innd");
357 search_backlog_file();
363 flushing_input_file= main_input_file;
364 tailing_make_readable(flushing_input_file);
366 main_input_file= open_input_file(feedfile);
367 if (!main_input_file)
368 crash("flush succeeded but feedfile %s does not exist!"
369 " (this probably means feedfile does not correspond"
370 " to site %s in newsfeeds)", feedfile, sitename);
372 if (flushing_input_file) {
373 SMS(SEPARATED, max_separated_periods, "flush complete");
376 SMS(NORMAL, spontaneous_flush_periods, "recovery flush complete");
381 goto unexpected_exitstatus;
384 } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
385 warn("flush timed out trying to talk to innd");
388 unexpected_exitstatus:
389 report_child_status("inndcomm child", status);
393 SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
397 static void inndcommfail(const char *what) {
398 syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
399 exit(INNDCOMMCHILD_ESTATUS_FAIL);
402 void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
405 notice("flushing %s",why);
407 assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
408 assert(!inndcomm_child);
409 assert(!inndcomm_sentinel_fd);
411 if (pipe(pipefds)) sysdie("create pipe for inndcomm child sentinel");
413 inndcomm_child= xfork("inndcomm child");
415 if (!inndcomm_child) {
416 const char *flushargv[2]= { sitename, 0 };
420 xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
421 /* parent spots the autoclose of pipefds[1] when we die or exit */
423 if (simulate_flush>=0) {
424 warn("SIMULATING flush child status %d", simulate_flush);
425 if (simulate_flush>128) raise(simulate_flush-128);
426 else exit(simulate_flush);
429 alarm(inndcomm_flush_timeout);
430 r= ICCopen(); if (r) inndcommfail("connect");
431 r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit");
432 if (!r) exit(0); /* yay! */
434 if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
435 syswarn("innd ctlinnd flush failed: innd said %s", reply);
436 exit(INNDCOMMCHILD_ESTATUS_FAIL);
441 xclose(pipefds[1], "inndcomm sentinel child's end",0);
442 inndcomm_sentinel_fd= pipefds[0];
443 assert(inndcomm_sentinel_fd);
444 on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
446 SMS(FLUSHING, 0, why);
449 /*---------- shutdown and signal handling ----------*/
451 static void preterminate(void) {
452 if (in_child) return;
453 notice_processed(main_input_file,0,"feedfile","");
454 notice_processed(flushing_input_file,0,"flushing","");
455 if (backlog_input_file)
456 notice_processed(backlog_input_file,0, "backlog file ",
457 backlog_input_file->path);
460 static int signal_self_pipe[2];
462 static void raise_default(int signo) {
463 xsigsetdefault(signo);
468 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
469 assert(fd=signal_self_pipe[0]);
471 int r= read(signal_self_pipe[0], buf, sizeof(buf));
472 if (r<0 && !isewouldblock(errno))
473 syscrash("failed to read signal self pipe");
474 if (r==0) crash("eof on signal self pipe");
475 if (terminate_sig_flag) {
477 notice("terminating (%s)", strsignal(terminate_sig_flag));
478 raise_default(terminate_sig_flag);
483 static void sigarrived_handler(int signum) {
488 if (!terminate_sig_flag) terminate_sig_flag= signum;
493 write(signal_self_pipe[1],&x,1);
496 static void init_signals(void) {
497 if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
498 syscrash("could not ignore SIGPIPE");
500 if (pipe(signal_self_pipe)) sysdie("create self-pipe for signals");
502 xsetnonblock(signal_self_pipe[0],1);
503 xsetnonblock(signal_self_pipe[1],1);
506 memset(&sa,0,sizeof(sa));
507 sa.sa_handler= sigarrived_handler;
508 sa.sa_flags= SA_RESTART;
509 xsigaction(SIGTERM,&sa);
510 xsigaction(SIGINT,&sa);
512 on_fd_read_except(signal_self_pipe[0], sigarrived_event);