chiark / gitweb /
install innduct-stats-report
[innduct.git] / statemc.c
1 /*
2  *  innduct
3  *  tailing reliable realtime streaming feeder for inn
4  *  statemc.c - state machine core (see README.states).
5  *
6  *  Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
7  * 
8  *  This program is free software: you can redistribute it and/or modify
9  *  it under the terms of the GNU General Public License as published by
10  *  the Free Software Foundation, either version 3 of the License, or
11  *  (at your option) any later version.
12  * 
13  *  This program is distributed in the hope that it will be useful,
14  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *  GNU General Public License for more details.
17  * 
18  *  You should have received a copy of the GNU General Public License
19  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
20  *
21  *  (I believe that when you compile and link this as part of the inn2
22  *  build, with the Makefile runes I have provided, all the libraries
23  *  and files which end up included in innduct are licence-compatible
24  *  with GPLv3.  If not then please let me know.  -Ian Jackson.)
25  */
26
27 #include "innduct.h"
28
29
30 /* statemc_init initialises */
31 StateMachineState sms;
32 int until_flush;
33 InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
34 FILE *defer;
35
36 /* initialisation to 0 is good */
37 int until_connect, until_backlog_nextscan;
38 double accept_proportion;
39 int nocheck, nocheck_reported, in_child;
40 sig_atomic_t terminate_sig_flag;
41
42
43 static void startup_set_input_file(InputFile *f) {
44   assert(!main_input_file);
45   main_input_file= f;
46   inputfile_reading_start(f);
47 }
48
49 void statemc_lock(void) {
50   int lockfd;
51   struct stat stab, stabf;
52   
53   for (;;) {
54     lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
55     if (lockfd<0) sysdie("open lockfile %s", path_lock);
56
57     struct flock fl;
58     memset(&fl,0,sizeof(fl));
59     fl.l_type= F_WRLCK;
60     fl.l_whence= SEEK_SET;
61     int r= fcntl(lockfd, F_SETLK, &fl);
62     if (r==-1) {
63       if (errno==EACCES || isewouldblock(errno)) {
64         if (quiet_multiple) exit(0);
65         die("another duct holds the lockfile");
66       }
67       sysdie("fcntl F_SETLK lockfile %s", path_lock);
68     }
69
70     xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
71     int lock_noent;
72     xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
73
74     if (!lock_noent && samefile(&stab, &stabf))
75       break;
76
77     xclose(lockfd, "stale lockfile ", path_lock);
78   }
79
80   FILE *lockfile= fdopen(lockfd, "w");
81   if (!lockfile) syscrash("fdopen lockfile");
82
83   int r= ftruncate(lockfd, 0);
84   if (r) syscrash("truncate lockfile to write new info");
85
86   if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
87               (unsigned long)self_pid,
88               sitename, feedfile, remote_host) == EOF ||
89       fflush(lockfile))
90     sysdie("write info to lockfile %s", path_lock);
91
92   dbg("startup: locked");
93 }
94
95 void statemc_init(void) {
96   struct stat stabdefer;
97
98   search_backlog_file();
99
100   int defer_noent;
101   xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
102   if (defer_noent) {
103     dbg("startup: ductdefer ENOENT");
104   } else {
105     dbg("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
106     switch (stabdefer.st_nlink==1) {
107     case 1:
108       open_defer(); /* so that we will later close it and rename it */
109       break;
110     case 2:
111       xunlink(path_defer, "stale defer file link"
112               " (presumably hardlink to backlog file)");
113       break;
114     default:
115       crash("defer file %s has unexpected link count %d",
116             path_defer, stabdefer.st_nlink);
117     }
118   }
119
120   struct stat stab_f, stab_d;
121   int noent_f;
122
123   InputFile *file_d= open_input_file(path_flushing);
124   if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
125
126   xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
127
128   if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
129     dbg("startup: F==D => Hardlinked");
130     xunlink(feedfile, "feed file (during startup)"); /* => Moved */
131     noent_f= 1;
132   }
133
134   if (noent_f) {
135     dbg("startup: F ENOENT => Moved");
136     if (file_d) startup_set_input_file(file_d);
137     spawn_inndcomm_flush("feedfile missing at startup");
138     /* => Flushing, sms:=FLUSHING */
139   } else {
140     if (file_d) {
141       dbg("startup: F!=D => Separated");
142       startup_set_input_file(file_d);
143       flushing_input_file= main_input_file;
144       main_input_file= open_input_file(feedfile);
145       if (!main_input_file) crash("feedfile vanished during startup");
146       SMS(SEPARATED, max_separated_periods,
147           "found both old and current feed files");
148     } else {
149       dbg("startup: F exists, D ENOENT => Normal");
150       InputFile *file_f= open_input_file(feedfile);
151       if (!file_f) crash("feed file vanished during startup");
152       startup_set_input_file(file_f);
153       SMS(NORMAL, spontaneous_flush_periods, "normal startup");
154     }
155   }
156 }
157
158 void statemc_start_flush(const char *why) { /* Normal => Flushing */
159   assert(sms == sm_NORMAL);
160
161   dbg("starting flush (%s) (%lu >?= %lu) (%d)",
162         why,
163         (unsigned long)(main_input_file ? main_input_file->offset : 0),
164         (unsigned long)target_max_feedfile_size,
165         until_flush);
166
167   int r= link(feedfile, path_flushing);
168   if (r) sysdie("link feedfile %s to flushing file %s",
169                 feedfile, path_flushing);
170   /* => Hardlinked */
171
172   xunlink(feedfile, "old feedfile link");
173   /* => Moved */
174
175   spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
176 }
177
178 int trigger_flush_ok(const char *why) {
179   switch (sms) {
180
181   case sm_NORMAL:
182     statemc_start_flush(why ? why : "periodic");
183     return 1;                           /* Normal => Flushing; => FLUSHING */
184
185   case sm_FLUSHFAILED:
186     spawn_inndcomm_flush(why ? why : "retry");
187     return 1;                            /* Moved => Flushing; => FLUSHING */
188
189   case sm_SEPARATED:
190   case sm_DROPPING:
191     warn("abandoning old feedfile after flush (%s), autodeferring",
192          why ? why : "took too long to complete");
193     assert(flushing_input_file);
194     autodefer_input_file(flushing_input_file);
195     return 1;
196
197   default:
198     return 0;
199   }
200 }
201
202 void statemc_period_poll(void) {
203   if (!until_flush) return;
204   until_flush--;
205   assert(until_flush>=0);
206
207   if (until_flush) return;
208   int ok= trigger_flush_ok(0);
209   assert(ok);
210 }
211
212 static int inputfile_is_done(InputFile *ipf) {
213   if (!ipf) return 0;
214   if (ipf->inprogress) return 0; /* new article in the meantime */
215   if (ipf->rd) return 0; /* not had EOF */
216   return 1;
217 }
218
219 static void notice_processed(InputFile *ipf, int completed,
220                              const char *what, const char *spec) {
221   if (!ipf) return; /* allows preterminate to be lazy */
222
223 #define RCI_NOTHING(x) /* nothing */
224 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
225 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
226
227 #define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
228
229   char *inprog= completed
230     ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
231     : xasprintf(" inprogress=%ld", ipf->inprogress);
232   char *autodefer= ipf->autodefer >= 0
233     ? xasprintf(" autodeferred=%ld", ipf->autodefer)
234     : xasprintf("%s","");
235
236   info("%s %s%s read=%d (+bl=%d,+err=%d)%s%s"
237        " missing=%d offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
238        RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
239        ,
240        completed?"completed":"processed", what, spec,
241        ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err,
242        inprog, autodefer, ipf->count_nooffer_missing,
243        CNT(Unchecked,sent) + CNT(Unsolicited,sent)
244        , CNT(Unchecked,sent), CNT(Unsolicited,sent),
245        CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
246        , CNT(Wanted,accepted), CNT(Unsolicited,accepted)
247        RESULT_COUNTS(RCI_NOTHING,  RCI_TRIPLE_VALS)
248        );
249
250   memset(ipf->counts, 0, sizeof(ipf->counts));
251   ipf->readcount_ok= ipf->readcount_blank=
252     ipf->readcount_err= ipf->count_nooffer_missing= 0;
253
254   free(inprog);
255   free(autodefer);
256
257 #undef CNT
258 }
259
260 void statemc_check_backlog_done(void) {
261   InputFile *ipf= backlog_input_file;
262   if (!inputfile_is_done(ipf)) return;
263
264   const char *slash= strrchr(ipf->path, '/');
265   const char *leaf= slash ? slash+1 : ipf->path;
266   const char *under= strchr(slash, '_');
267   const char *rest= under ? under+1 : leaf;
268   if (!strncmp(rest,"backlog",7)) rest += 7;
269   notice_processed(ipf,1,"backlog ",rest);
270
271   close_input_file(ipf);
272   if (unlink(ipf->path)) {
273     if (errno != ENOENT)
274       syscrash("could not unlink processed backlog file %s", ipf->path);
275     warn("backlog file %s vanished while we were reading it"
276          " so we couldn't remove it (but it's done now, anyway)",
277          ipf->path);
278   }
279   free(ipf);
280   backlog_input_file= 0;
281   search_backlog_file();
282   return;
283 }
284
285 void statemc_check_flushing_done(void) {
286   InputFile *ipf= flushing_input_file;
287   if (!inputfile_is_done(ipf)) return;
288
289   assert(sms==sm_SEPARATED || sms==sm_DROPPING);
290
291   notice_processed(ipf,1,"feedfile","");
292
293   close_defer();
294
295   xunlink(path_flushing, "old flushing file");
296
297   close_input_file(flushing_input_file);
298   free(flushing_input_file);
299   flushing_input_file= 0;
300
301   if (sms==sm_SEPARATED) {
302     notice("flush complete");
303     SMS(NORMAL, spontaneous_flush_periods, "flush complete");
304   } else if (sms==sm_DROPPING) {
305     SMS(DROPPED, max_separated_periods, "old flush complete");
306     search_backlog_file();
307     notice("feed dropped, but will continue until backlog is finished");
308   }
309 }
310
311 static void *statemc_check_input_done(oop_source *lp, struct timeval now,
312                                       void *u) {
313   /* main input file may be idle but if so that's because
314    * we haven't got to it yet, but that doesn't mean it's really done */
315   statemc_check_flushing_done();
316   statemc_check_backlog_done();
317   return OOP_CONTINUE;
318 }
319
320 void queue_check_input_done(void) {
321   loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
322 }
323
324 void statemc_setstate(StateMachineState newsms, int periods,
325                       const char *forlog, const char *why) {
326   sms= newsms;
327   until_flush= periods;
328
329   const char *xtra= "";
330   switch (sms) {
331   case sm_FLUSHING:
332   case sm_FLUSHFAILED:
333     if (!main_input_file) xtra= "-ABSENT";
334     break;
335   case sm_SEPARATED:
336   case sm_DROPPING:
337     xtra= flushing_input_file->rd ? "-1" : "-2";
338     break;
339   default:;
340   }
341
342   if (periods) {
343     info("state %s%s[%d] %s",forlog,xtra,periods,why);
344   } else {
345     info("state %s%s %s",forlog,xtra,why);
346   }
347 }
348
349 /*========== flushing the feed ==========*/
350
351 pid_t inndcomm_child;
352 static int inndcomm_sentinel_fd;
353
354 static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
355   assert(inndcomm_child);
356   assert(fd == inndcomm_sentinel_fd);
357   int status= xwaitpid(&inndcomm_child, "inndcomm");
358   inndcomm_child= 0;
359   
360   cancel_fd_read_except(fd);
361   xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
362   inndcomm_sentinel_fd= 0;
363
364   assert(!flushing_input_file);
365
366   if (WIFEXITED(status)) {
367     switch (WEXITSTATUS(status)) {
368
369     case INNDCOMMCHILD_ESTATUS_FAIL:
370       goto failed;
371
372     case INNDCOMMCHILD_ESTATUS_NONESUCH:
373       notice("feed has been dropped by innd, finishing up");
374       flushing_input_file= main_input_file;
375       tailing_make_readable(flushing_input_file);
376         /* we probably previously returned EAGAIN from our fake read method
377          * when in fact we were at EOF, so signal another readable event
378          * so we actually see the EOF */
379
380       main_input_file= 0;
381
382       if (flushing_input_file) {
383         SMS(DROPPING, max_separated_periods,
384             "feed dropped by innd, but must finish last flush");
385       } else {
386         close_defer();
387         SMS(DROPPED, 0, "feed dropped by innd");
388         search_backlog_file();
389       }
390       return OOP_CONTINUE;
391
392     case 0:
393       /* as above */
394       flushing_input_file= main_input_file;
395       tailing_make_readable(flushing_input_file);
396
397       main_input_file= open_input_file(feedfile);
398       if (!main_input_file)
399         crash("flush succeeded but feedfile %s does not exist!"
400               " (this probably means feedfile does not correspond"
401               " to site %s in newsfeeds)", feedfile, sitename);
402
403       if (flushing_input_file) {
404         SMS(SEPARATED, max_separated_periods, "flush complete");
405       } else {
406         close_defer();
407         SMS(NORMAL, spontaneous_flush_periods, "recovery flush complete");
408       }
409       return OOP_CONTINUE;
410
411     default:
412       goto unexpected_exitstatus;
413
414     }
415   } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
416     warn("flush timed out trying to talk to innd");
417     goto failed;
418   } else {
419   unexpected_exitstatus:
420     report_child_status("inndcomm child", status);
421   }
422
423  failed:
424   SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
425   return OOP_CONTINUE;
426 }
427
428 static void inndcommfail(const char *what) {
429   syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
430   exit(INNDCOMMCHILD_ESTATUS_FAIL);
431 }
432
433 void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
434   int pipefds[2];
435
436   notice("flushing %s",why);
437
438   assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
439   assert(!inndcomm_child);
440   assert(!inndcomm_sentinel_fd);
441
442   if (pipe(pipefds)) sysdie("create pipe for inndcomm child sentinel");
443
444   inndcomm_child= xfork("inndcomm child");
445
446   if (!inndcomm_child) {
447     const char *flushargv[2]= { sitename, 0 };
448     char *reply;
449     int r;
450
451     xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
452     /* parent spots the autoclose of pipefds[1] when we die or exit */
453
454     if (simulate_flush>=0) {
455       warn("SIMULATING flush child status %d", simulate_flush);
456       if (simulate_flush>128) raise(simulate_flush-128);
457       else exit(simulate_flush);
458     }
459
460     alarm(inndcomm_flush_timeout);
461     r= ICCopen();                         if (r)   inndcommfail("connect");
462     r= ICCcommand('f',flushargv,&reply);  if (r<0) inndcommfail("transmit");
463     if (!r) exit(0); /* yay! */
464
465     if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
466     syswarn("innd ctlinnd flush failed: innd said %s", reply);
467     exit(INNDCOMMCHILD_ESTATUS_FAIL);
468   }
469
470   simulate_flush= -1;
471
472   xclose(pipefds[1], "inndcomm sentinel child's end",0);
473   inndcomm_sentinel_fd= pipefds[0];
474   assert(inndcomm_sentinel_fd);
475   on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
476
477   SMS(FLUSHING, 0, why);
478 }
479
480 /*---------- shutdown and signal handling ----------*/
481
482 void preterminate(void) {
483   if (in_child) return;
484   showstats();
485 }
486
487 void showstats(void) {
488   notice_processed(main_input_file,0,"feedfile","");
489   notice_processed(flushing_input_file,0,"flushing","");
490   if (backlog_input_file)
491     notice_processed(backlog_input_file,0, "backlog file ",
492                      backlog_input_file->path);
493   until_stats_log= stats_log_periods;
494 }
495
496 static int signal_self_pipe[2];
497
498 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
499   assert(fd=signal_self_pipe[0]);
500   char buf[PIPE_BUF];
501   int r= read(signal_self_pipe[0], buf, sizeof(buf));
502   if (r<0 && !isewouldblock(errno))
503     syscrash("failed to read signal self pipe");
504   if (r==0) crash("eof on signal self pipe");
505   if (terminate_sig_flag) {
506     preterminate();
507     notice("terminating (%s)", strsignal(terminate_sig_flag));
508     raise_default(terminate_sig_flag);
509   }
510   return OOP_CONTINUE;
511 }
512
513 static void sigarrived_handler(int signum) {
514   static char x;
515   switch (signum) {
516   case SIGTERM:
517   case SIGINT:
518     if (!terminate_sig_flag) terminate_sig_flag= signum;
519     break;
520   default:
521     abort();
522   }
523   write(signal_self_pipe[1],&x,1);
524 }
525
526 void init_signals(void) {
527   if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
528     syscrash("could not ignore SIGPIPE");
529
530   if (pipe(signal_self_pipe)) sysdie("create self-pipe for signals");
531
532   xsetnonblock(signal_self_pipe[0],1);
533   xsetnonblock(signal_self_pipe[1],1);
534
535   struct sigaction sa;
536   memset(&sa,0,sizeof(sa));
537   sa.sa_handler= sigarrived_handler;
538   sa.sa_flags= SA_RESTART;
539   xsigaction(SIGTERM,&sa);
540   xsigaction(SIGINT,&sa);
541
542   on_fd_read_except(signal_self_pipe[0], sigarrived_event);
543 }
544