chiark / gitweb /
970bb6cb6e4e818f25be8506a627f45a70753914
[innduct.git] / statemc.c
1
2
3 /* statemc_init initialises */
4 StateMachineState sms;
5 int until_flush;
6 InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
7 FILE *defer;
8
9 /* initialisation to 0 is good */
10 int until_connect, until_backlog_nextscan;
11 double accept_proportion;
12 int nocheck, nocheck_reported, in_child;
13 sig_atomic_t terminate_sig_flag;
14
15
16 static void startup_set_input_file(InputFile *f) {
17   assert(!main_input_file);
18   main_input_file= f;
19   inputfile_reading_start(f);
20 }
21
22 static void statemc_lock(void) {
23   int lockfd;
24   struct stat stab, stabf;
25   
26   for (;;) {
27     lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
28     if (lockfd<0) sysdie("open lockfile %s", path_lock);
29
30     struct flock fl;
31     memset(&fl,0,sizeof(fl));
32     fl.l_type= F_WRLCK;
33     fl.l_whence= SEEK_SET;
34     int r= fcntl(lockfd, F_SETLK, &fl);
35     if (r==-1) {
36       if (errno==EACCES || isewouldblock(errno)) {
37         if (quiet_multiple) exit(0);
38         die("another duct holds the lockfile");
39       }
40       sysdie("fcntl F_SETLK lockfile %s", path_lock);
41     }
42
43     xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
44     int lock_noent;
45     xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
46
47     if (!lock_noent && samefile(&stab, &stabf))
48       break;
49
50     xclose(lockfd, "stale lockfile ", path_lock);
51   }
52
53   FILE *lockfile= fdopen(lockfd, "w");
54   if (!lockfile) syscrash("fdopen lockfile");
55
56   int r= ftruncate(lockfd, 0);
57   if (r) syscrash("truncate lockfile to write new info");
58
59   if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
60               (unsigned long)self_pid,
61               sitename, feedfile, remote_host) == EOF ||
62       fflush(lockfile))
63     sysdie("write info to lockfile %s", path_lock);
64
65   dbg("startup: locked");
66 }
67
68 static void statemc_init(void) {
69   struct stat stabdefer;
70
71   search_backlog_file();
72
73   int defer_noent;
74   xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
75   if (defer_noent) {
76     dbg("startup: ductdefer ENOENT");
77   } else {
78     dbg("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
79     switch (stabdefer.st_nlink==1) {
80     case 1:
81       open_defer(); /* so that we will later close it and rename it */
82       break;
83     case 2:
84       xunlink(path_defer, "stale defer file link"
85               " (presumably hardlink to backlog file)");
86       break;
87     default:
88       crash("defer file %s has unexpected link count %d",
89             path_defer, stabdefer.st_nlink);
90     }
91   }
92
93   struct stat stab_f, stab_d;
94   int noent_f;
95
96   InputFile *file_d= open_input_file(path_flushing);
97   if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
98
99   xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
100
101   if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
102     dbg("startup: F==D => Hardlinked");
103     xunlink(feedfile, "feed file (during startup)"); /* => Moved */
104     noent_f= 1;
105   }
106
107   if (noent_f) {
108     dbg("startup: F ENOENT => Moved");
109     if (file_d) startup_set_input_file(file_d);
110     spawn_inndcomm_flush("feedfile missing at startup");
111     /* => Flushing, sms:=FLUSHING */
112   } else {
113     if (file_d) {
114       dbg("startup: F!=D => Separated");
115       startup_set_input_file(file_d);
116       flushing_input_file= main_input_file;
117       main_input_file= open_input_file(feedfile);
118       if (!main_input_file) crash("feedfile vanished during startup");
119       SMS(SEPARATED, max_separated_periods,
120           "found both old and current feed files");
121     } else {
122       dbg("startup: F exists, D ENOENT => Normal");
123       InputFile *file_f= open_input_file(feedfile);
124       if (!file_f) crash("feed file vanished during startup");
125       startup_set_input_file(file_f);
126       SMS(NORMAL, spontaneous_flush_periods, "normal startup");
127     }
128   }
129 }
130
131 static void statemc_start_flush(const char *why) { /* Normal => Flushing */
132   assert(sms == sm_NORMAL);
133
134   dbg("starting flush (%s) (%lu >?= %lu) (%d)",
135         why,
136         (unsigned long)(main_input_file ? main_input_file->offset : 0),
137         (unsigned long)target_max_feedfile_size,
138         until_flush);
139
140   int r= link(feedfile, path_flushing);
141   if (r) sysdie("link feedfile %s to flushing file %s",
142                 feedfile, path_flushing);
143   /* => Hardlinked */
144
145   xunlink(feedfile, "old feedfile link");
146   /* => Moved */
147
148   spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
149 }
150
151 static int trigger_flush_ok(const char *why) {
152   switch (sms) {
153
154   case sm_NORMAL:
155     statemc_start_flush(why ? why : "periodic");
156     return 1;                           /* Normal => Flushing; => FLUSHING */
157
158   case sm_FLUSHFAILED:
159     spawn_inndcomm_flush(why ? why : "retry");
160     return 1;                            /* Moved => Flushing; => FLUSHING */
161
162   case sm_SEPARATED:
163   case sm_DROPPING:
164     warn("abandoning old feedfile after flush (%s), autodeferring",
165          why ? why : "took too long to complete");
166     assert(flushing_input_file);
167     autodefer_input_file(flushing_input_file);
168     return 1;
169
170   default:
171     return 0;
172   }
173 }
174
175 static void statemc_period_poll(void) {
176   if (!until_flush) return;
177   until_flush--;
178   assert(until_flush>=0);
179
180   if (until_flush) return;
181   int ok= trigger_flush_ok(0);
182   assert(ok);
183 }
184
185 static int inputfile_is_done(InputFile *ipf) {
186   if (!ipf) return 0;
187   if (ipf->inprogress) return 0; /* new article in the meantime */
188   if (ipf->rd) return 0; /* not had EOF */
189   return 1;
190 }
191
192 static void notice_processed(InputFile *ipf, int completed,
193                              const char *what, const char *spec) {
194   if (!ipf) return; /* allows preterminate to be lazy */
195
196 #define RCI_NOTHING(x) /* nothing */
197 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
198 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
199
200 #define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
201
202   char *inprog= completed
203     ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
204     : xasprintf(" inprogress=%ld", ipf->inprogress);
205   char *autodefer= ipf->autodefer >= 0
206     ? xasprintf(" autodeferred=%ld", ipf->autodefer)
207     : xasprintf("%s","");
208
209   info("%s %s%s read=%d (+bl=%d,+err=%d)%s%s"
210        " missing=%d offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
211        RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
212        ,
213        completed?"completed":"processed", what, spec,
214        ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err,
215        inprog, autodefer, ipf->count_nooffer_missing,
216        CNT(Unchecked,sent) + CNT(Unsolicited,sent)
217        , CNT(Unchecked,sent), CNT(Unsolicited,sent),
218        CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
219        , CNT(Wanted,accepted), CNT(Unsolicited,accepted)
220        RESULT_COUNTS(RCI_NOTHING,  RCI_TRIPLE_VALS)
221        );
222
223   free(inprog);
224   free(autodefer);
225
226 #undef CNT
227 }
228
229 static void statemc_check_backlog_done(void) {
230   InputFile *ipf= backlog_input_file;
231   if (!inputfile_is_done(ipf)) return;
232
233   const char *slash= strrchr(ipf->path, '/');
234   const char *leaf= slash ? slash+1 : ipf->path;
235   const char *under= strchr(slash, '_');
236   const char *rest= under ? under+1 : leaf;
237   if (!strncmp(rest,"backlog",7)) rest += 7;
238   notice_processed(ipf,1,"backlog ",rest);
239
240   close_input_file(ipf);
241   if (unlink(ipf->path)) {
242     if (errno != ENOENT)
243       syscrash("could not unlink processed backlog file %s", ipf->path);
244     warn("backlog file %s vanished while we were reading it"
245          " so we couldn't remove it (but it's done now, anyway)",
246          ipf->path);
247   }
248   free(ipf);
249   backlog_input_file= 0;
250   search_backlog_file();
251   return;
252 }
253
254 static void statemc_check_flushing_done(void) {
255   InputFile *ipf= flushing_input_file;
256   if (!inputfile_is_done(ipf)) return;
257
258   assert(sms==sm_SEPARATED || sms==sm_DROPPING);
259
260   notice_processed(ipf,1,"feedfile","");
261
262   close_defer();
263
264   xunlink(path_flushing, "old flushing file");
265
266   close_input_file(flushing_input_file);
267   free(flushing_input_file);
268   flushing_input_file= 0;
269
270   if (sms==sm_SEPARATED) {
271     notice("flush complete");
272     SMS(NORMAL, spontaneous_flush_periods, "flush complete");
273   } else if (sms==sm_DROPPING) {
274     SMS(DROPPED, max_separated_periods, "old flush complete");
275     search_backlog_file();
276     notice("feed dropped, but will continue until backlog is finished");
277   }
278 }
279
280 static void *statemc_check_input_done(oop_source *lp, struct timeval now,
281                                       void *u) {
282   /* main input file may be idle but if so that's because
283    * we haven't got to it yet, but that doesn't mean it's really done */
284   statemc_check_flushing_done();
285   statemc_check_backlog_done();
286   return OOP_CONTINUE;
287 }
288
289 static void queue_check_input_done(void) {
290   loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
291 }
292
293 static void statemc_setstate(StateMachineState newsms, int periods,
294                              const char *forlog, const char *why) {
295   sms= newsms;
296   until_flush= periods;
297
298   const char *xtra= "";
299   switch (sms) {
300   case sm_FLUSHING:
301   case sm_FLUSHFAILED:
302     if (!main_input_file) xtra= "-ABSENT";
303     break;
304   case sm_SEPARATED:
305   case sm_DROPPING:
306     xtra= flushing_input_file->rd ? "-1" : "-2";
307     break;
308   default:;
309   }
310
311   if (periods) {
312     info("state %s%s[%d] %s",forlog,xtra,periods,why);
313   } else {
314     info("state %s%s %s",forlog,xtra,why);
315   }
316 }
317
318 /*========== flushing the feed ==========*/
319
320 pid_t inndcomm_child;
321 static int inndcomm_sentinel_fd;
322
323 static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
324   assert(inndcomm_child);
325   assert(fd == inndcomm_sentinel_fd);
326   int status= xwaitpid(&inndcomm_child, "inndcomm");
327   inndcomm_child= 0;
328   
329   cancel_fd_read_except(fd);
330   xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
331   inndcomm_sentinel_fd= 0;
332
333   assert(!flushing_input_file);
334
335   if (WIFEXITED(status)) {
336     switch (WEXITSTATUS(status)) {
337
338     case INNDCOMMCHILD_ESTATUS_FAIL:
339       goto failed;
340
341     case INNDCOMMCHILD_ESTATUS_NONESUCH:
342       notice("feed has been dropped by innd, finishing up");
343       flushing_input_file= main_input_file;
344       tailing_make_readable(flushing_input_file);
345         /* we probably previously returned EAGAIN from our fake read method
346          * when in fact we were at EOF, so signal another readable event
347          * so we actually see the EOF */
348
349       main_input_file= 0;
350
351       if (flushing_input_file) {
352         SMS(DROPPING, max_separated_periods,
353             "feed dropped by innd, but must finish last flush");
354       } else {
355         close_defer();
356         SMS(DROPPED, 0, "feed dropped by innd");
357         search_backlog_file();
358       }
359       return OOP_CONTINUE;
360
361     case 0:
362       /* as above */
363       flushing_input_file= main_input_file;
364       tailing_make_readable(flushing_input_file);
365
366       main_input_file= open_input_file(feedfile);
367       if (!main_input_file)
368         crash("flush succeeded but feedfile %s does not exist!"
369               " (this probably means feedfile does not correspond"
370               " to site %s in newsfeeds)", feedfile, sitename);
371
372       if (flushing_input_file) {
373         SMS(SEPARATED, max_separated_periods, "flush complete");
374       } else {
375         close_defer();
376         SMS(NORMAL, spontaneous_flush_periods, "recovery flush complete");
377       }
378       return OOP_CONTINUE;
379
380     default:
381       goto unexpected_exitstatus;
382
383     }
384   } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
385     warn("flush timed out trying to talk to innd");
386     goto failed;
387   } else {
388   unexpected_exitstatus:
389     report_child_status("inndcomm child", status);
390   }
391
392  failed:
393   SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
394   return OOP_CONTINUE;
395 }
396
397 static void inndcommfail(const char *what) {
398   syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
399   exit(INNDCOMMCHILD_ESTATUS_FAIL);
400 }
401
402 void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
403   int pipefds[2];
404
405   notice("flushing %s",why);
406
407   assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
408   assert(!inndcomm_child);
409   assert(!inndcomm_sentinel_fd);
410
411   if (pipe(pipefds)) sysdie("create pipe for inndcomm child sentinel");
412
413   inndcomm_child= xfork("inndcomm child");
414
415   if (!inndcomm_child) {
416     const char *flushargv[2]= { sitename, 0 };
417     char *reply;
418     int r;
419
420     xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
421     /* parent spots the autoclose of pipefds[1] when we die or exit */
422
423     if (simulate_flush>=0) {
424       warn("SIMULATING flush child status %d", simulate_flush);
425       if (simulate_flush>128) raise(simulate_flush-128);
426       else exit(simulate_flush);
427     }
428
429     alarm(inndcomm_flush_timeout);
430     r= ICCopen();                         if (r)   inndcommfail("connect");
431     r= ICCcommand('f',flushargv,&reply);  if (r<0) inndcommfail("transmit");
432     if (!r) exit(0); /* yay! */
433
434     if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
435     syswarn("innd ctlinnd flush failed: innd said %s", reply);
436     exit(INNDCOMMCHILD_ESTATUS_FAIL);
437   }
438
439   simulate_flush= -1;
440
441   xclose(pipefds[1], "inndcomm sentinel child's end",0);
442   inndcomm_sentinel_fd= pipefds[0];
443   assert(inndcomm_sentinel_fd);
444   on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
445
446   SMS(FLUSHING, 0, why);
447 }
448
449 /*---------- shutdown and signal handling ----------*/
450
451 static void preterminate(void) {
452   if (in_child) return;
453   notice_processed(main_input_file,0,"feedfile","");
454   notice_processed(flushing_input_file,0,"flushing","");
455   if (backlog_input_file)
456     notice_processed(backlog_input_file,0, "backlog file ",
457                      backlog_input_file->path);
458 }
459
460 static int signal_self_pipe[2];
461
462 static void raise_default(int signo) {
463   xsigsetdefault(signo);
464   raise(signo);
465   abort();
466 }
467
468 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
469   assert(fd=signal_self_pipe[0]);
470   char buf[PIPE_BUF];
471   int r= read(signal_self_pipe[0], buf, sizeof(buf));
472   if (r<0 && !isewouldblock(errno))
473     syscrash("failed to read signal self pipe");
474   if (r==0) crash("eof on signal self pipe");
475   if (terminate_sig_flag) {
476     preterminate();
477     notice("terminating (%s)", strsignal(terminate_sig_flag));
478     raise_default(terminate_sig_flag);
479   }
480   return OOP_CONTINUE;
481 }
482
483 static void sigarrived_handler(int signum) {
484   static char x;
485   switch (signum) {
486   case SIGTERM:
487   case SIGINT:
488     if (!terminate_sig_flag) terminate_sig_flag= signum;
489     break;
490   default:
491     abort();
492   }
493   write(signal_self_pipe[1],&x,1);
494 }
495
496 static void init_signals(void) {
497   if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
498     syscrash("could not ignore SIGPIPE");
499
500   if (pipe(signal_self_pipe)) sysdie("create self-pipe for signals");
501
502   xsetnonblock(signal_self_pipe[0],1);
503   xsetnonblock(signal_self_pipe[1],1);
504
505   struct sigaction sa;
506   memset(&sa,0,sizeof(sa));
507   sa.sa_handler= sigarrived_handler;
508   sa.sa_flags= SA_RESTART;
509   xsigaction(SIGTERM,&sa);
510   xsigaction(SIGINT,&sa);
511
512   on_fd_read_except(signal_self_pipe[0], sigarrived_event);
513 }
514