+/*
+ * innduct
+ * tailing reliable realtime streaming feeder for inn
+ * statemc.c - state machine core (see README.states).
+ *
+ * Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * (I believe that when you compile and link this as part of the inn2
+ * build, with the Makefile runes I have provided, all the libraries
+ * and files which end up included in innduct are licence-compatible
+ * with GPLv3. If not then please let me know. -Ian Jackson.)
+ */
+
+#include "innduct.h"
/* statemc_init initialises */
StateMachineState sms;
int until_flush;
InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
+Counts backlog_counts;
+int backlog_counts_report;
FILE *defer;
/* initialisation to 0 is good */
inputfile_reading_start(f);
}
-static void statemc_lock(void) {
+void statemc_lock(void) {
int lockfd;
struct stat stab, stabf;
dbg("startup: locked");
}
-static void statemc_init(void) {
+void statemc_init(void) {
struct stat stabdefer;
search_backlog_file();
}
}
-static void statemc_start_flush(const char *why) { /* Normal => Flushing */
+void statemc_start_flush(const char *why) { /* Normal => Flushing */
assert(sms == sm_NORMAL);
dbg("starting flush (%s) (%lu >?= %lu) (%d)",
spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
}
-static int trigger_flush_ok(const char *why) {
+int trigger_flush_ok(const char *why) {
switch (sms) {
case sm_NORMAL:
}
}
-static void statemc_period_poll(void) {
+void statemc_period_poll(void) {
if (!until_flush) return;
until_flush--;
assert(until_flush>=0);
return 1;
}
-static void notice_processed(InputFile *ipf, int completed,
- const char *what, const char *spec) {
- if (!ipf) return; /* allows preterminate to be lazy */
+static void notice_processed_counts(Counts *counts, int completed,
+ InputFile *ipf, const char *what) {
#define RCI_NOTHING(x) /* nothing */
#define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
-#define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
+#define RCI_TRIPLE_VALS(x) ,RCI_TRIPLE_VALS_BASE(ipf->counts.results, [RC_##x])
-#define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
+#define CNT(art,rc) (ipf->counts.results[art_##art][RC_##rc])
char *inprog= completed
? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
: xasprintf(" inprogress=%ld", ipf->inprogress);
- char *autodefer= ipf->autodefer >= 0
+ char *autodefer= ipf && ipf->autodefer >= 0
? xasprintf(" autodeferred=%ld", ipf->autodefer)
: xasprintf("%s","");
- info("%s %s%s read=%d (+bl=%d,+err=%d)%s%s"
+ info("%s %s read=%d (+bl=%d,+err=%d)%s%s"
" missing=%d offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
,
- completed?"completed":"processed", what, spec,
- ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err,
- inprog, autodefer, ipf->count_nooffer_missing,
+ completed?"completed":"processed", what,
+ ipf->counts.events[read_ok], ipf->counts.events[read_blank],
+ ipf->counts.events[read_err],
+ inprog, autodefer, ipf->counts.events[nooffer_missing],
CNT(Unchecked,sent) + CNT(Unsolicited,sent)
, CNT(Unchecked,sent), CNT(Unsolicited,sent),
CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_VALS)
);
+ memset(&ipf->counts, 0, sizeof(ipf->counts));
+
free(inprog);
free(autodefer);
#undef CNT
}
-static void statemc_check_backlog_done(void) {
+static void notice_processed_inputfile(InputFile *ipf, int completed,
+ const char *what) {
+ if (!ipf) return; /* allows showstats to be lazy */
+ notice_processed_counts(&ipf->counts, completed, ipf, what);
+}
+
+static void backlog_accumulate_counts(InputFile *ipf) {
+ int i,j;
+ if (!ipf) return;
+
+ for (i=0; i<art_MaxState; i++)
+ for (j=0; j<RCI_max; j++)
+ backlog_counts.results[i][j] += ipf->counts.results[i][j];
+
+ for (i=0; i<ECI_max; i++)
+ backlog_counts.events[i] += ipf->counts.events[i];
+
+ memset(&ipf->counts, 0, sizeof(ipf->counts));
+ backlog_counts_report= 1;
+}
+
+void statemc_check_backlog_done(void) {
InputFile *ipf= backlog_input_file;
if (!inputfile_is_done(ipf)) return;
- const char *slash= strrchr(ipf->path, '/');
- const char *leaf= slash ? slash+1 : ipf->path;
- const char *under= strchr(slash, '_');
- const char *rest= under ? under+1 : leaf;
- if (!strncmp(rest,"backlog",7)) rest += 7;
- notice_processed(ipf,1,"backlog ",rest);
-
+ backlog_accumulate_counts(ipf);
close_input_file(ipf);
if (unlink(ipf->path)) {
if (errno != ENOENT)
return;
}
-static void statemc_check_flushing_done(void) {
+void statemc_check_flushing_done(void) {
InputFile *ipf= flushing_input_file;
if (!inputfile_is_done(ipf)) return;
assert(sms==sm_SEPARATED || sms==sm_DROPPING);
- notice_processed(ipf,1,"feedfile","");
+ notice_processed_inputfile(ipf,1,"feedfile");
close_defer();
return OOP_CONTINUE;
}
-static void queue_check_input_done(void) {
+void queue_check_input_done(void) {
loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
}
-static void statemc_setstate(StateMachineState newsms, int periods,
- const char *forlog, const char *why) {
+void statemc_setstate(StateMachineState newsms, int periods,
+ const char *forlog, const char *why) {
sms= newsms;
until_flush= periods;
/*---------- shutdown and signal handling ----------*/
-static void preterminate(void) {
+void preterminate(void) {
if (in_child) return;
- notice_processed(main_input_file,0,"feedfile","");
- notice_processed(flushing_input_file,0,"flushing","");
- if (backlog_input_file)
- notice_processed(backlog_input_file,0, "backlog file ",
- backlog_input_file->path);
+ showstats();
}
-static int signal_self_pipe[2];
+void showstats(void) {
+ notice_processed_inputfile(main_input_file, 0, "feedfile");
+ notice_processed_inputfile(flushing_input_file, 0, "flushing");
-static void raise_default(int signo) {
- xsigsetdefault(signo);
- raise(signo);
- abort();
+ backlog_accumulate_counts(backlog_input_file);
+ if (backlog_counts_report) {
+ notice_processed_counts(&backlog_counts, 0,
+ backlog_input_file, "backlogs");
+ backlog_counts_report= 0;
+ }
+ until_stats_log= stats_log_periods;
}
+static int signal_self_pipe[2];
+
static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
assert(fd=signal_self_pipe[0]);
char buf[PIPE_BUF];
write(signal_self_pipe[1],&x,1);
}
-static void init_signals(void) {
+void init_signals(void) {
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
syscrash("could not ignore SIGPIPE");