chiark / gitweb /
Declare 1.0 (native) source format
[innduct.git] / statemc.c
1 /*
2  *  innduct
3  *  tailing reliable realtime streaming feeder for inn
4  *  statemc.c - state machine core (see README.states).
5  *
6  *  Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
7  * 
8  *  This program is free software: you can redistribute it and/or modify
9  *  it under the terms of the GNU General Public License as published by
10  *  the Free Software Foundation, either version 3 of the License, or
11  *  (at your option) any later version.
12  * 
13  *  This program is distributed in the hope that it will be useful,
14  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *  GNU General Public License for more details.
17  * 
18  *  You should have received a copy of the GNU General Public License
19  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
20  *
21  *  (I believe that when you compile and link this as part of the inn2
22  *  build, with the Makefile runes I have provided, all the libraries
23  *  and files which end up included in innduct are licence-compatible
24  *  with GPLv3.  If not then please let me know.  -Ian Jackson.)
25  */
26
27 #include "innduct.h"
28
29
30 /* statemc_init initialises */
31 StateMachineState sms;
32 int until_flush;
33 InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
34 Counts backlog_counts;
35 int backlog_counts_report;
36 FILE *defer;
37
38 /* initialisation to 0 is good */
39 int until_connect, until_backlog_nextscan;
40 double accept_proportion;
41 int nocheck, nocheck_reported, in_child;
42 sig_atomic_t terminate_sig_flag;
43
44
45 static void startup_set_input_file(InputFile *f) {
46   assert(!main_input_file);
47   main_input_file= f;
48   inputfile_reading_start(f);
49 }
50
51 void statemc_lock(void) {
52   int lockfd;
53   struct stat stab, stabf;
54   
55   for (;;) {
56     lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
57     if (lockfd<0) sysdie("open lockfile %s", path_lock);
58
59     struct flock fl;
60     memset(&fl,0,sizeof(fl));
61     fl.l_type= F_WRLCK;
62     fl.l_whence= SEEK_SET;
63     int r= fcntl(lockfd, F_SETLK, &fl);
64     if (r==-1) {
65       if (errno==EACCES || isewouldblock(errno)) {
66         if (quiet_multiple) exit(0);
67         die("another duct holds the lockfile");
68       }
69       sysdie("fcntl F_SETLK lockfile %s", path_lock);
70     }
71
72     xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
73     int lock_noent;
74     xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
75
76     if (!lock_noent && samefile(&stab, &stabf))
77       break;
78
79     xclose(lockfd, "stale lockfile ", path_lock);
80   }
81
82   FILE *lockfile= fdopen(lockfd, "w");
83   if (!lockfile) syscrash("fdopen lockfile");
84
85   int r= ftruncate(lockfd, 0);
86   if (r) syscrash("truncate lockfile to write new info");
87
88   if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
89               (unsigned long)self_pid,
90               sitename, feedfile, remote_host) == EOF ||
91       fflush(lockfile))
92     sysdie("write info to lockfile %s", path_lock);
93
94   dbg("startup: locked");
95 }
96
97 void statemc_init(void) {
98   struct stat stabdefer;
99
100   search_backlog_file();
101
102   int defer_noent;
103   xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
104   if (defer_noent) {
105     dbg("startup: ductdefer ENOENT");
106   } else {
107     dbg("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
108     switch (stabdefer.st_nlink) {
109     case 1:
110       open_defer(); /* so that we will later close it and rename it */
111       break;
112     case 2:
113       xunlink(path_defer, "stale defer file link"
114               " (presumably hardlink to backlog file)");
115       break;
116     default:
117       crash("defer file %s has unexpected link count %ld",
118             path_defer, (long)stabdefer.st_nlink);
119     }
120   }
121
122   struct stat stab_f, stab_d;
123   int noent_f;
124
125   InputFile *file_d= open_input_file(path_flushing);
126   if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
127
128   xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
129
130   if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
131     dbg("startup: F==D => Hardlinked");
132     xunlink(feedfile, "feed file (during startup)"); /* => Moved */
133     noent_f= 1;
134   }
135
136   if (noent_f) {
137     dbg("startup: F ENOENT => Moved");
138     if (file_d) startup_set_input_file(file_d);
139     spawn_inndcomm_flush("feedfile missing at startup");
140     /* => Flushing, sms:=FLUSHING */
141   } else {
142     if (file_d) {
143       dbg("startup: F!=D => Separated");
144       startup_set_input_file(file_d);
145       flushing_input_file= main_input_file;
146       main_input_file= open_input_file(feedfile);
147       if (!main_input_file) crash("feedfile vanished during startup");
148       SMS(SEPARATED, max_separated_periods,
149           "found both old and current feed files");
150     } else {
151       dbg("startup: F exists, D ENOENT => Normal");
152       InputFile *file_f= open_input_file(feedfile);
153       if (!file_f) crash("feed file vanished during startup");
154       startup_set_input_file(file_f);
155       SMS(NORMAL, spontaneous_flush_periods, "normal startup");
156     }
157   }
158 }
159
160 void statemc_start_flush(const char *why) { /* Normal => Flushing */
161   assert(sms == sm_NORMAL);
162
163   dbg("starting flush (%s) (%lu >?= %lu) (%d)",
164         why,
165         (unsigned long)(main_input_file ? main_input_file->offset : 0),
166         (unsigned long)target_max_feedfile_size,
167         until_flush);
168
169   int r= link(feedfile, path_flushing);
170   if (r) sysdie("link feedfile %s to flushing file %s",
171                 feedfile, path_flushing);
172   /* => Hardlinked */
173
174   xunlink(feedfile, "old feedfile link");
175   /* => Moved */
176
177   spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
178 }
179
180 int trigger_flush_ok(const char *why) {
181   switch (sms) {
182
183   case sm_NORMAL:
184     statemc_start_flush(why ? why : "periodic");
185     return 1;                           /* Normal => Flushing; => FLUSHING */
186
187   case sm_FLUSHFAILED:
188     spawn_inndcomm_flush(why ? why : "retry");
189     return 1;                            /* Moved => Flushing; => FLUSHING */
190
191   case sm_SEPARATED:
192   case sm_DROPPING:
193     if (conns.count)
194       warn("abandoning old feedfile after flush (%s), autodeferring",
195            why ? why : "took too long to complete");
196     else
197       info("autodeferring after flush (%s)",
198            why ? why : "no connections");
199     assert(flushing_input_file);
200     autodefer_input_file(flushing_input_file);
201     return 1;
202
203   default:
204     return 0;
205   }
206 }
207
208 void statemc_period_poll(void) {
209   if (!until_flush) return;
210   until_flush--;
211   assert(until_flush>=0);
212
213   if (until_flush) return;
214   int ok= trigger_flush_ok(0);
215   assert(ok);
216 }
217
218 static int inputfile_is_done(InputFile *ipf) {
219   if (!ipf) return 0;
220   if (ipf->inprogress) return 0; /* new article in the meantime */
221   if (ipf->rd) return 0; /* not had EOF */
222   return 1;
223 }
224
225 static void notice_processed_counts(Counts *counts, int completed,
226                                     InputFile *ipf_xtra, const char *what) {
227
228 #define RCI_NOTHING(x) /* nothing */
229 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
230 #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(counts->results, [RC_##x])
231
232 #define CNT(art,rc) (counts->results[art_##art][RC_##rc])
233
234   char *inprog= ipf_xtra && !completed
235     ? masprintf(" inprogress=%ld", ipf_xtra->inprogress)
236     : masprintf("%s",""); /* GCC produces a stupid warning for printf("") ! */
237   char *autodefer= ipf_xtra && ipf_xtra->autodefer >= 0
238     ? masprintf(" autodeferred=%ld", ipf_xtra->autodefer)
239     : masprintf("%s","");
240
241   notice("%s %s read=%d (+bl=%d,+err=%d)%s%s missing=%d"
242          " offered=%d (ch=%d,nc=%d)"
243          " accepted=%d (ch=%d,nc=%d)"
244        RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
245        ,
246        completed?"completed":"processed", what,
247        counts->events[read_ok], counts->events[read_blank],
248          counts->events[read_err],
249        inprog, autodefer, counts->events[nooffer_missing],
250        CNT(Unchecked,sent) + CNT(Unsolicited,sent)
251        , CNT(Unchecked,sent), CNT(Unsolicited,sent),
252        CNT(Wanted,accepted) + CNT(Wanted,accepted)
253        , CNT(Wanted,accepted), CNT(Wanted,accepted)
254        RESULT_COUNTS(RCI_NOTHING,  RCI_TRIPLE_VALS)
255        );
256
257   memset(counts, 0, sizeof(*counts));
258
259   free(inprog);
260   free(autodefer);
261
262 #undef CNT
263 }
264
265 static void notice_processed_inputfile(InputFile *ipf, int completed,
266                                        const char *what) {
267   if (!ipf) return; /* allows showstats to be lazy */
268   notice_processed_counts(&ipf->counts, completed, ipf, what);
269 }
270
271 static void backlog_accumulate_counts(InputFile *ipf) {
272   int i,j;
273   if (!ipf) return;
274
275   for (i=0; i<art_MaxState; i++)
276     for (j=0; j<RCI_max; j++)
277       backlog_counts.results[i][j] += ipf->counts.results[i][j];
278
279   for (i=0; i<ECI_max; i++)
280     backlog_counts.events[i] += ipf->counts.events[i];
281
282   memset(&ipf->counts, 0, sizeof(ipf->counts));
283   backlog_counts_report= 1;
284 }
285
286 void statemc_check_backlog_done(void) {
287   InputFile *ipf= backlog_input_file;
288   if (!inputfile_is_done(ipf)) return;
289
290   dbg("backlog file %p %s complete", ipf, ipf->path);
291   backlog_accumulate_counts(ipf);
292   close_input_file(ipf);
293   if (unlink(ipf->path)) {
294     if (errno != ENOENT)
295       syscrash("could not unlink processed backlog file %s", ipf->path);
296     warn("backlog file %s vanished while we were reading it"
297          " so we couldn't remove it (but it's done now, anyway)",
298          ipf->path);
299   }
300   free(ipf);
301   backlog_input_file= 0;
302   search_backlog_file();
303   return;
304 }
305
306 void statemc_check_flushing_done(void) {
307   InputFile *ipf= flushing_input_file;
308   if (!inputfile_is_done(ipf)) return;
309
310   assert(sms==sm_SEPARATED || sms==sm_DROPPING);
311
312   notice_processed_inputfile(ipf,1,"batch");
313
314   close_defer();
315
316   xunlink(path_flushing, "old flushing file");
317
318   close_input_file(flushing_input_file);
319   free(flushing_input_file);
320   flushing_input_file= 0;
321
322   if (sms==sm_SEPARATED) {
323     notice("flush complete");
324     SMS(NORMAL, spontaneous_flush_periods, "flush complete");
325   } else if (sms==sm_DROPPING) {
326     SMS(DROPPED, max_separated_periods, "old flush complete");
327     search_backlog_file();
328     notice("feed dropped, but will continue until backlog is finished");
329   }
330 }
331
332 static void *statemc_check_input_done(oop_source *lp, struct timeval now,
333                                       void *u) {
334   /* main input file may be idle but if so that's because
335    * we haven't got to it yet, but that doesn't mean it's really done */
336   statemc_check_flushing_done();
337   statemc_check_backlog_done();
338   return OOP_CONTINUE;
339 }
340
341 void queue_check_input_done(void) {
342   loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
343 }
344
345 void statemc_setstate(StateMachineState newsms, int periods,
346                       const char *forlog, const char *why) {
347   sms= newsms;
348   until_flush= periods;
349
350   const char *xtra= "";
351   switch (sms) {
352   case sm_FLUSHING:
353   case sm_FLUSHFAILED:
354     if (!main_input_file) xtra= "-ABSENT";
355     break;
356   case sm_SEPARATED:
357   case sm_DROPPING:
358     xtra= flushing_input_file->rd ? "-1" : "-2";
359     break;
360   default:;
361   }
362
363   if (periods) {
364     info("state %s%s[%d] %s",forlog,xtra,periods,why);
365   } else {
366     info("state %s%s %s",forlog,xtra,why);
367   }
368 }
369
370 /*========== flushing the feed ==========*/
371
372 pid_t inndcomm_child;
373 static int inndcomm_sentinel_fd;
374
375 static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
376   assert(inndcomm_child);
377   assert(fd == inndcomm_sentinel_fd);
378   int status= xwaitpid(&inndcomm_child, "inndcomm");
379   inndcomm_child= 0;
380   
381   cancel_fd_read_except(fd);
382   xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
383   inndcomm_sentinel_fd= 0;
384
385   assert(!flushing_input_file);
386
387   if (WIFEXITED(status)) {
388     switch (WEXITSTATUS(status)) {
389
390     case INNDCOMMCHILD_ESTATUS_FAIL:
391       goto failed;
392
393     case INNDCOMMCHILD_ESTATUS_NONESUCH:
394       notice("feed has been dropped by innd, finishing up");
395       flushing_input_file= main_input_file;
396       tailing_make_readable(flushing_input_file);
397         /* we probably previously returned EAGAIN from our fake read method
398          * when in fact we were at EOF, so signal another readable event
399          * so we actually see the EOF */
400
401       main_input_file= 0;
402
403       if (flushing_input_file) {
404         SMS(DROPPING, max_separated_periods,
405             "feed dropped by innd, but must finish last flush");
406       } else {
407         close_defer();
408         SMS(DROPPED, 0, "feed dropped by innd");
409         search_backlog_file();
410       }
411       return OOP_CONTINUE;
412
413     case 0:
414       /* as above */
415       flushing_input_file= main_input_file;
416       tailing_make_readable(flushing_input_file);
417
418       main_input_file= open_input_file(feedfile);
419       if (!main_input_file)
420         crash("flush succeeded but feedfile %s does not exist!"
421               " (this probably means feedfile does not correspond"
422               " to site %s in newsfeeds)", feedfile, sitename);
423
424       if (flushing_input_file) {
425         SMS(SEPARATED, max_separated_periods, "flush complete");
426       } else {
427         close_defer();
428         SMS(NORMAL, spontaneous_flush_periods, "recovery flush complete");
429       }
430       return OOP_CONTINUE;
431
432     default:
433       goto unexpected_exitstatus;
434
435     }
436   } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
437     warn("flush timed out trying to talk to innd");
438     goto failed;
439   } else {
440   unexpected_exitstatus:
441     report_child_status("inndcomm child", status);
442   }
443
444  failed:
445   SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
446   return OOP_CONTINUE;
447 }
448
449 static void inndcommfail(const char *what) {
450   syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
451   exit(INNDCOMMCHILD_ESTATUS_FAIL);
452 }
453
454 void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
455   int pipefds[2];
456
457   notice("flushing %s",why);
458
459   assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
460   assert(!inndcomm_child);
461   assert(!inndcomm_sentinel_fd);
462
463   if (pipe(pipefds)) sysdie("create pipe for inndcomm child sentinel");
464
465   inndcomm_child= xfork("inndcomm child");
466
467   if (!inndcomm_child) {
468     const char *flushargv[2]= { sitename, 0 };
469     char *reply;
470     int r;
471
472     xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
473     /* parent spots the autoclose of pipefds[1] when we die or exit */
474
475     if (simulate_flush>=0) {
476       warn("SIMULATING flush child status %d", simulate_flush);
477       if (simulate_flush>128) raise(simulate_flush-128);
478       else exit(simulate_flush);
479     }
480
481     alarm(inndcomm_flush_timeout);
482     r= ICCopen();                         if (r)   inndcommfail("connect");
483     r= ICCcommand('f',flushargv,&reply);  if (r<0) inndcommfail("transmit");
484     if (!r) exit(0); /* yay! */
485
486     if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
487     syswarn("innd ctlinnd flush failed: innd said %s", reply);
488     exit(INNDCOMMCHILD_ESTATUS_FAIL);
489   }
490
491   simulate_flush= -1;
492
493   xclose(pipefds[1], "inndcomm sentinel child's end",0);
494   inndcomm_sentinel_fd= pipefds[0];
495   assert(inndcomm_sentinel_fd);
496   on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
497
498   SMS(FLUSHING, 0, why);
499 }
500
501 /*---------- shutdown and signal handling ----------*/
502
503 void preterminate(void) {
504   if (in_child) return;
505   showstats();
506 }
507
508 void showstats(void) {
509   notice_conns_stats();
510   notice_processed_inputfile(main_input_file,     0, "feedfile");
511   notice_processed_inputfile(flushing_input_file, 0, "flushing");
512
513   backlog_accumulate_counts(backlog_input_file);
514   if (backlog_counts_report) {
515     notice_processed_counts(&backlog_counts, 0,
516                             backlog_input_file, "backlogs");
517     backlog_counts_report= 0;
518   }
519   until_stats_log= stats_log_periods;
520 }
521
522 static int signal_self_pipe[2];
523
524 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
525   assert(fd==signal_self_pipe[0]);
526   char buf[PIPE_BUF];
527   int r= read(signal_self_pipe[0], buf, sizeof(buf));
528   if (r<0 && !isewouldblock(errno))
529     syscrash("failed to read signal self pipe");
530   if (r==0) crash("eof on signal self pipe");
531   if (terminate_sig_flag) {
532     preterminate();
533     notice("terminating (%s)", strsignal(terminate_sig_flag));
534     raise_default(terminate_sig_flag);
535   }
536   return OOP_CONTINUE;
537 }
538
539 static void sigarrived_handler(int signum) {
540   int esave = errno;
541   static char x;
542   switch (signum) {
543   case SIGTERM:
544   case SIGINT:
545     if (!terminate_sig_flag) terminate_sig_flag= signum;
546     break;
547   default:
548     abort();
549   }
550   int r = write(signal_self_pipe[1],&x,1);
551   if (!(r==1 || isewouldblock(errno))) abort();
552   errno = esave;
553 }
554
555 void init_signals(void) {
556   if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
557     syscrash("could not ignore SIGPIPE");
558
559   if (pipe(signal_self_pipe)) sysdie("create self-pipe for signals");
560
561   xsetnonblock(signal_self_pipe[0],1);
562   xsetnonblock(signal_self_pipe[1],1);
563
564   struct sigaction sa;
565   memset(&sa,0,sizeof(sa));
566   sa.sa_handler= sigarrived_handler;
567   sa.sa_flags= SA_RESTART;
568   xsigaction(SIGTERM,&sa);
569   xsigaction(SIGINT,&sa);
570
571   on_fd_read_except(signal_self_pipe[0], sigarrived_event);
572 }
573