chiark / gitweb /
Cope with NNTP_STRLEN abolishment (replaced with NNTP_MAXLEN_COMMAND)
[innduct.git] / statemc.c
index 970bb6cb6e4e818f25be8506a627f45a70753914..04f0a1d82628a2ab291d8cfe498a54eb1ad40e2d 100644 (file)
--- a/statemc.c
+++ b/statemc.c
@@ -1,9 +1,38 @@
+/*
+ *  innduct
+ *  tailing reliable realtime streaming feeder for inn
+ *  statemc.c - state machine core (see README.states).
+ *
+ *  Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
+ * 
+ *  This program is free software: you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation, either version 3 of the License, or
+ *  (at your option) any later version.
+ * 
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ * 
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ *  (I believe that when you compile and link this as part of the inn2
+ *  build, with the Makefile runes I have provided, all the libraries
+ *  and files which end up included in innduct are licence-compatible
+ *  with GPLv3.  If not then please let me know.  -Ian Jackson.)
+ */
+
+#include "innduct.h"
 
 
 /* statemc_init initialises */
 StateMachineState sms;
 int until_flush;
 InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
+Counts backlog_counts;
+int backlog_counts_report;
 FILE *defer;
 
 /* initialisation to 0 is good */
@@ -19,7 +48,7 @@ static void startup_set_input_file(InputFile *f) {
   inputfile_reading_start(f);
 }
 
-static void statemc_lock(void) {
+void statemc_lock(void) {
   int lockfd;
   struct stat stab, stabf;
   
@@ -65,7 +94,7 @@ static void statemc_lock(void) {
   dbg("startup: locked");
 }
 
-static void statemc_init(void) {
+void statemc_init(void) {
   struct stat stabdefer;
 
   search_backlog_file();
@@ -128,7 +157,7 @@ static void statemc_init(void) {
   }
 }
 
-static void statemc_start_flush(const char *why) { /* Normal => Flushing */
+void statemc_start_flush(const char *why) { /* Normal => Flushing */
   assert(sms == sm_NORMAL);
 
   dbg("starting flush (%s) (%lu >?= %lu) (%d)",
@@ -148,7 +177,7 @@ static void statemc_start_flush(const char *why) { /* Normal => Flushing */
   spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
 }
 
-static int trigger_flush_ok(const char *why) {
+int trigger_flush_ok(const char *why) {
   switch (sms) {
 
   case sm_NORMAL:
@@ -161,8 +190,12 @@ static int trigger_flush_ok(const char *why) {
 
   case sm_SEPARATED:
   case sm_DROPPING:
-    warn("abandoning old feedfile after flush (%s), autodeferring",
-        why ? why : "took too long to complete");
+    if (conns.count)
+      warn("abandoning old feedfile after flush (%s), autodeferring",
+          why ? why : "took too long to complete");
+    else
+      info("autodeferring after flush (%s)",
+          why ? why : "no connections");
     assert(flushing_input_file);
     autodefer_input_file(flushing_input_file);
     return 1;
@@ -172,7 +205,7 @@ static int trigger_flush_ok(const char *why) {
   }
 }
 
-static void statemc_period_poll(void) {
+void statemc_period_poll(void) {
   if (!until_flush) return;
   until_flush--;
   assert(until_flush>=0);
@@ -189,30 +222,30 @@ static int inputfile_is_done(InputFile *ipf) {
   return 1;
 }
 
-static void notice_processed(InputFile *ipf, int completed,
-                            const char *what, const char *spec) {
-  if (!ipf) return; /* allows preterminate to be lazy */
+static void notice_processed_counts(Counts *counts, int completed,
+                                   InputFile *ipf_xtra, const char *what) {
 
 #define RCI_NOTHING(x) /* nothing */
 #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
-#define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
+#define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(counts->results, [RC_##x])
 
-#define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
+#define CNT(art,rc) (counts->results[art_##art][RC_##rc])
 
-  char *inprog= completed
-    ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
-    : xasprintf(" inprogress=%ld", ipf->inprogress);
-  char *autodefer= ipf->autodefer >= 0
-    ? xasprintf(" autodeferred=%ld", ipf->autodefer)
-    : xasprintf("%s","");
+  char *inprog= ipf_xtra && !completed
+    ? masprintf(" inprogress=%ld", ipf_xtra->inprogress)
+    : masprintf("%s",""); /* GCC produces a stupid warning for printf("") ! */
+  char *autodefer= ipf_xtra && ipf_xtra->autodefer >= 0
+    ? masprintf(" autodeferred=%ld", ipf_xtra->autodefer)
+    : masprintf("%s","");
 
-  info("%s %s%s read=%d (+bl=%d,+err=%d)%s%s"
+  notice("%s %s read=%d (+bl=%d,+err=%d)%s%s"
        " missing=%d offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
        RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
        ,
-       completed?"completed":"processed", what, spec,
-       ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err,
-       inprog, autodefer, ipf->count_nooffer_missing,
+       completed?"completed":"processed", what,
+       counts->events[read_ok], counts->events[read_blank],
+         counts->events[read_err],
+       inprog, autodefer, counts->events[nooffer_missing],
        CNT(Unchecked,sent) + CNT(Unsolicited,sent)
        , CNT(Unchecked,sent), CNT(Unsolicited,sent),
        CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
@@ -220,23 +253,41 @@ static void notice_processed(InputFile *ipf, int completed,
        RESULT_COUNTS(RCI_NOTHING,  RCI_TRIPLE_VALS)
        );
 
+  memset(counts, 0, sizeof(*counts));
+
   free(inprog);
   free(autodefer);
 
 #undef CNT
 }
 
-static void statemc_check_backlog_done(void) {
+static void notice_processed_inputfile(InputFile *ipf, int completed,
+                                      const char *what) {
+  if (!ipf) return; /* allows showstats to be lazy */
+  notice_processed_counts(&ipf->counts, completed, ipf, what);
+}
+
+static void backlog_accumulate_counts(InputFile *ipf) {
+  int i,j;
+  if (!ipf) return;
+
+  for (i=0; i<art_MaxState; i++)
+    for (j=0; j<RCI_max; j++)
+      backlog_counts.results[i][j] += ipf->counts.results[i][j];
+
+  for (i=0; i<ECI_max; i++)
+    backlog_counts.events[i] += ipf->counts.events[i];
+
+  memset(&ipf->counts, 0, sizeof(ipf->counts));
+  backlog_counts_report= 1;
+}
+
+void statemc_check_backlog_done(void) {
   InputFile *ipf= backlog_input_file;
   if (!inputfile_is_done(ipf)) return;
 
-  const char *slash= strrchr(ipf->path, '/');
-  const char *leaf= slash ? slash+1 : ipf->path;
-  const char *under= strchr(slash, '_');
-  const char *rest= under ? under+1 : leaf;
-  if (!strncmp(rest,"backlog",7)) rest += 7;
-  notice_processed(ipf,1,"backlog ",rest);
-
+  dbg("backlog file %p %s complete", ipf, ipf->path);
+  backlog_accumulate_counts(ipf);
   close_input_file(ipf);
   if (unlink(ipf->path)) {
     if (errno != ENOENT)
@@ -251,13 +302,13 @@ static void statemc_check_backlog_done(void) {
   return;
 }
 
-static void statemc_check_flushing_done(void) {
+void statemc_check_flushing_done(void) {
   InputFile *ipf= flushing_input_file;
   if (!inputfile_is_done(ipf)) return;
 
   assert(sms==sm_SEPARATED || sms==sm_DROPPING);
 
-  notice_processed(ipf,1,"feedfile","");
+  notice_processed_inputfile(ipf,1,"batch");
 
   close_defer();
 
@@ -286,12 +337,12 @@ static void *statemc_check_input_done(oop_source *lp, struct timeval now,
   return OOP_CONTINUE;
 }
 
-static void queue_check_input_done(void) {
+void queue_check_input_done(void) {
   loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
 }
 
-static void statemc_setstate(StateMachineState newsms, int periods,
-                            const char *forlog, const char *why) {
+void statemc_setstate(StateMachineState newsms, int periods,
+                     const char *forlog, const char *why) {
   sms= newsms;
   until_flush= periods;
 
@@ -448,23 +499,27 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
 
 /*---------- shutdown and signal handling ----------*/
 
-static void preterminate(void) {
+void preterminate(void) {
   if (in_child) return;
-  notice_processed(main_input_file,0,"feedfile","");
-  notice_processed(flushing_input_file,0,"flushing","");
-  if (backlog_input_file)
-    notice_processed(backlog_input_file,0, "backlog file ",
-                    backlog_input_file->path);
+  showstats();
 }
 
-static int signal_self_pipe[2];
+void showstats(void) {
+  notice_conns_stats();
+  notice_processed_inputfile(main_input_file,     0, "feedfile");
+  notice_processed_inputfile(flushing_input_file, 0, "flushing");
 
-static void raise_default(int signo) {
-  xsigsetdefault(signo);
-  raise(signo);
-  abort();
+  backlog_accumulate_counts(backlog_input_file);
+  if (backlog_counts_report) {
+    notice_processed_counts(&backlog_counts, 0,
+                           backlog_input_file, "backlogs");
+    backlog_counts_report= 0;
+  }
+  until_stats_log= stats_log_periods;
 }
 
+static int signal_self_pipe[2];
+
 static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
   assert(fd=signal_self_pipe[0]);
   char buf[PIPE_BUF];
@@ -493,7 +548,7 @@ static void sigarrived_handler(int signum) {
   write(signal_self_pipe[1],&x,1);
 }
 
-static void init_signals(void) {
+void init_signals(void) {
   if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
     syscrash("could not ignore SIGPIPE");