X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;f=statemc.c;h=b068722a15e339f81e7639341110c2ab977f433e;hb=a34c62479ae1f91aac7b30d3d5f1a5106a6635f5;hp=970bb6cb6e4e818f25be8506a627f45a70753914;hpb=f4aee95c41a0d6231d115386b8fbb23f6b8e349a;p=innduct.git diff --git a/statemc.c b/statemc.c index 970bb6c..b068722 100644 --- a/statemc.c +++ b/statemc.c @@ -1,9 +1,38 @@ +/* + * innduct + * tailing reliable realtime streaming feeder for inn + * statemc.c - state machine core (see README.states). + * + * Copyright (C) 2010 Ian Jackson + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * (I believe that when you compile and link this as part of the inn2 + * build, with the Makefile runes I have provided, all the libraries + * and files which end up included in innduct are licence-compatible + * with GPLv3. If not then please let me know. -Ian Jackson.) + */ + +#include "innduct.h" /* statemc_init initialises */ StateMachineState sms; int until_flush; InputFile *main_input_file, *flushing_input_file, *backlog_input_file; +Counts backlog_counts; +int backlog_counts_report; FILE *defer; /* initialisation to 0 is good */ @@ -19,7 +48,7 @@ static void startup_set_input_file(InputFile *f) { inputfile_reading_start(f); } -static void statemc_lock(void) { +void statemc_lock(void) { int lockfd; struct stat stab, stabf; @@ -65,7 +94,7 @@ static void statemc_lock(void) { dbg("startup: locked"); } -static void statemc_init(void) { +void statemc_init(void) { struct stat stabdefer; search_backlog_file(); @@ -128,7 +157,7 @@ static void statemc_init(void) { } } -static void statemc_start_flush(const char *why) { /* Normal => Flushing */ +void statemc_start_flush(const char *why) { /* Normal => Flushing */ assert(sms == sm_NORMAL); dbg("starting flush (%s) (%lu >?= %lu) (%d)", @@ -148,7 +177,7 @@ static void statemc_start_flush(const char *why) { /* Normal => Flushing */ spawn_inndcomm_flush(why); /* => Flushing FLUSHING */ } -static int trigger_flush_ok(const char *why) { +int trigger_flush_ok(const char *why) { switch (sms) { case sm_NORMAL: @@ -161,8 +190,12 @@ static int trigger_flush_ok(const char *why) { case sm_SEPARATED: case sm_DROPPING: - warn("abandoning old feedfile after flush (%s), autodeferring", - why ? why : "took too long to complete"); + if (conns.count) + warn("abandoning old feedfile after flush (%s), autodeferring", + why ? why : "took too long to complete"); + else + info("autodeferring after flush (%s)", + why ? why : "no connections"); assert(flushing_input_file); autodefer_input_file(flushing_input_file); return 1; @@ -172,7 +205,7 @@ static int trigger_flush_ok(const char *why) { } } -static void statemc_period_poll(void) { +void statemc_period_poll(void) { if (!until_flush) return; until_flush--; assert(until_flush>=0); @@ -189,30 +222,30 @@ static int inputfile_is_done(InputFile *ipf) { return 1; } -static void notice_processed(InputFile *ipf, int completed, - const char *what, const char *spec) { - if (!ipf) return; /* allows preterminate to be lazy */ +static void notice_processed_counts(Counts *counts, int completed, + InputFile *ipf_xtra, const char *what) { #define RCI_NOTHING(x) /* nothing */ #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE -#define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x]) +#define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(counts->results, [RC_##x]) -#define CNT(art,rc) (ipf->counts[art_##art][RC_##rc]) +#define CNT(art,rc) (counts->results[art_##art][RC_##rc]) - char *inprog= completed - ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */ - : xasprintf(" inprogress=%ld", ipf->inprogress); - char *autodefer= ipf->autodefer >= 0 - ? xasprintf(" autodeferred=%ld", ipf->autodefer) - : xasprintf("%s",""); + char *inprog= ipf_xtra && !completed + ? masprintf(" inprogress=%ld", ipf_xtra->inprogress) + : masprintf("%s",""); /* GCC produces a stupid warning for printf("") ! */ + char *autodefer= ipf_xtra && ipf_xtra->autodefer >= 0 + ? masprintf(" autodeferred=%ld", ipf_xtra->autodefer) + : masprintf("%s",""); - info("%s %s%s read=%d (+bl=%d,+err=%d)%s%s" + notice("%s %s read=%d (+bl=%d,+err=%d)%s%s" " missing=%d offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)" RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT) , - completed?"completed":"processed", what, spec, - ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err, - inprog, autodefer, ipf->count_nooffer_missing, + completed?"completed":"processed", what, + counts->events[read_ok], counts->events[read_blank], + counts->events[read_err], + inprog, autodefer, counts->events[nooffer_missing], CNT(Unchecked,sent) + CNT(Unsolicited,sent) , CNT(Unchecked,sent), CNT(Unsolicited,sent), CNT(Wanted,accepted) + CNT(Unsolicited,accepted) @@ -220,23 +253,41 @@ static void notice_processed(InputFile *ipf, int completed, RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_VALS) ); + memset(counts, 0, sizeof(*counts)); + free(inprog); free(autodefer); #undef CNT } -static void statemc_check_backlog_done(void) { +static void notice_processed_inputfile(InputFile *ipf, int completed, + const char *what) { + if (!ipf) return; /* allows showstats to be lazy */ + notice_processed_counts(&ipf->counts, completed, ipf, what); +} + +static void backlog_accumulate_counts(InputFile *ipf) { + int i,j; + if (!ipf) return; + + for (i=0; icounts.results[i][j]; + + for (i=0; icounts.events[i]; + + memset(&ipf->counts, 0, sizeof(ipf->counts)); + backlog_counts_report= 1; +} + +void statemc_check_backlog_done(void) { InputFile *ipf= backlog_input_file; if (!inputfile_is_done(ipf)) return; - const char *slash= strrchr(ipf->path, '/'); - const char *leaf= slash ? slash+1 : ipf->path; - const char *under= strchr(slash, '_'); - const char *rest= under ? under+1 : leaf; - if (!strncmp(rest,"backlog",7)) rest += 7; - notice_processed(ipf,1,"backlog ",rest); - + dbg("backlog file %p %s complete", ipf, ipf->path); + backlog_accumulate_counts(ipf); close_input_file(ipf); if (unlink(ipf->path)) { if (errno != ENOENT) @@ -251,13 +302,13 @@ static void statemc_check_backlog_done(void) { return; } -static void statemc_check_flushing_done(void) { +void statemc_check_flushing_done(void) { InputFile *ipf= flushing_input_file; if (!inputfile_is_done(ipf)) return; assert(sms==sm_SEPARATED || sms==sm_DROPPING); - notice_processed(ipf,1,"feedfile",""); + notice_processed_inputfile(ipf,1,"batch"); close_defer(); @@ -286,12 +337,12 @@ static void *statemc_check_input_done(oop_source *lp, struct timeval now, return OOP_CONTINUE; } -static void queue_check_input_done(void) { +void queue_check_input_done(void) { loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0); } -static void statemc_setstate(StateMachineState newsms, int periods, - const char *forlog, const char *why) { +void statemc_setstate(StateMachineState newsms, int periods, + const char *forlog, const char *why) { sms= newsms; until_flush= periods; @@ -448,23 +499,27 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ /*---------- shutdown and signal handling ----------*/ -static void preterminate(void) { +void preterminate(void) { if (in_child) return; - notice_processed(main_input_file,0,"feedfile",""); - notice_processed(flushing_input_file,0,"flushing",""); - if (backlog_input_file) - notice_processed(backlog_input_file,0, "backlog file ", - backlog_input_file->path); + showstats(); } -static int signal_self_pipe[2]; +void showstats(void) { + notice_conns_stats(); + notice_processed_inputfile(main_input_file, 0, "feedfile"); + notice_processed_inputfile(flushing_input_file, 0, "flushing"); -static void raise_default(int signo) { - xsigsetdefault(signo); - raise(signo); - abort(); + backlog_accumulate_counts(backlog_input_file); + if (backlog_counts_report) { + notice_processed_counts(&backlog_counts, 0, + backlog_input_file, "backlogs"); + backlog_counts_report= 0; + } + until_stats_log= stats_log_periods; } +static int signal_self_pipe[2]; + static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) { assert(fd=signal_self_pipe[0]); char buf[PIPE_BUF]; @@ -481,6 +536,7 @@ static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) { } static void sigarrived_handler(int signum) { + int esave = errno; static char x; switch (signum) { case SIGTERM: @@ -490,10 +546,12 @@ static void sigarrived_handler(int signum) { default: abort(); } - write(signal_self_pipe[1],&x,1); + int r = write(signal_self_pipe[1],&x,1); + if (!(r==1 || isewouldblock(errno))) abort(); + errno = esave; } -static void init_signals(void) { +void init_signals(void) { if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) syscrash("could not ignore SIGPIPE");