chiark / gitweb /
wip split into multiple files and make compile
[innduct.git] / statemc.c
diff --git a/statemc.c b/statemc.c
new file mode 100644 (file)
index 0000000..970bb6c
--- /dev/null
+++ b/statemc.c
@@ -0,0 +1,514 @@
+
+
+/* statemc_init initialises */
+StateMachineState sms;
+int until_flush;
+InputFile *main_input_file, *flushing_input_file, *backlog_input_file;
+FILE *defer;
+
+/* initialisation to 0 is good */
+int until_connect, until_backlog_nextscan;
+double accept_proportion;
+int nocheck, nocheck_reported, in_child;
+sig_atomic_t terminate_sig_flag;
+
+
+static void startup_set_input_file(InputFile *f) {
+  assert(!main_input_file);
+  main_input_file= f;
+  inputfile_reading_start(f);
+}
+
+static void statemc_lock(void) {
+  int lockfd;
+  struct stat stab, stabf;
+  
+  for (;;) {
+    lockfd= open(path_lock, O_CREAT|O_RDWR, 0600);
+    if (lockfd<0) sysdie("open lockfile %s", path_lock);
+
+    struct flock fl;
+    memset(&fl,0,sizeof(fl));
+    fl.l_type= F_WRLCK;
+    fl.l_whence= SEEK_SET;
+    int r= fcntl(lockfd, F_SETLK, &fl);
+    if (r==-1) {
+      if (errno==EACCES || isewouldblock(errno)) {
+       if (quiet_multiple) exit(0);
+       die("another duct holds the lockfile");
+      }
+      sysdie("fcntl F_SETLK lockfile %s", path_lock);
+    }
+
+    xfstat_isreg(lockfd, &stabf, path_lock, "lockfile");
+    int lock_noent;
+    xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile");
+
+    if (!lock_noent && samefile(&stab, &stabf))
+      break;
+
+    xclose(lockfd, "stale lockfile ", path_lock);
+  }
+
+  FILE *lockfile= fdopen(lockfd, "w");
+  if (!lockfile) syscrash("fdopen lockfile");
+
+  int r= ftruncate(lockfd, 0);
+  if (r) syscrash("truncate lockfile to write new info");
+
+  if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n",
+             (unsigned long)self_pid,
+             sitename, feedfile, remote_host) == EOF ||
+      fflush(lockfile))
+    sysdie("write info to lockfile %s", path_lock);
+
+  dbg("startup: locked");
+}
+
+static void statemc_init(void) {
+  struct stat stabdefer;
+
+  search_backlog_file();
+
+  int defer_noent;
+  xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file");
+  if (defer_noent) {
+    dbg("startup: ductdefer ENOENT");
+  } else {
+    dbg("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink);
+    switch (stabdefer.st_nlink==1) {
+    case 1:
+      open_defer(); /* so that we will later close it and rename it */
+      break;
+    case 2:
+      xunlink(path_defer, "stale defer file link"
+             " (presumably hardlink to backlog file)");
+      break;
+    default:
+      crash("defer file %s has unexpected link count %d",
+           path_defer, stabdefer.st_nlink);
+    }
+  }
+
+  struct stat stab_f, stab_d;
+  int noent_f;
+
+  InputFile *file_d= open_input_file(path_flushing);
+  if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file");
+
+  xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile");
+
+  if (!noent_f && file_d && samefile(&stab_f, &stab_d)) {
+    dbg("startup: F==D => Hardlinked");
+    xunlink(feedfile, "feed file (during startup)"); /* => Moved */
+    noent_f= 1;
+  }
+
+  if (noent_f) {
+    dbg("startup: F ENOENT => Moved");
+    if (file_d) startup_set_input_file(file_d);
+    spawn_inndcomm_flush("feedfile missing at startup");
+    /* => Flushing, sms:=FLUSHING */
+  } else {
+    if (file_d) {
+      dbg("startup: F!=D => Separated");
+      startup_set_input_file(file_d);
+      flushing_input_file= main_input_file;
+      main_input_file= open_input_file(feedfile);
+      if (!main_input_file) crash("feedfile vanished during startup");
+      SMS(SEPARATED, max_separated_periods,
+         "found both old and current feed files");
+    } else {
+      dbg("startup: F exists, D ENOENT => Normal");
+      InputFile *file_f= open_input_file(feedfile);
+      if (!file_f) crash("feed file vanished during startup");
+      startup_set_input_file(file_f);
+      SMS(NORMAL, spontaneous_flush_periods, "normal startup");
+    }
+  }
+}
+
+static void statemc_start_flush(const char *why) { /* Normal => Flushing */
+  assert(sms == sm_NORMAL);
+
+  dbg("starting flush (%s) (%lu >?= %lu) (%d)",
+       why,
+       (unsigned long)(main_input_file ? main_input_file->offset : 0),
+       (unsigned long)target_max_feedfile_size,
+       until_flush);
+
+  int r= link(feedfile, path_flushing);
+  if (r) sysdie("link feedfile %s to flushing file %s",
+               feedfile, path_flushing);
+  /* => Hardlinked */
+
+  xunlink(feedfile, "old feedfile link");
+  /* => Moved */
+
+  spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
+}
+
+static int trigger_flush_ok(const char *why) {
+  switch (sms) {
+
+  case sm_NORMAL:
+    statemc_start_flush(why ? why : "periodic");
+    return 1;                           /* Normal => Flushing; => FLUSHING */
+
+  case sm_FLUSHFAILED:
+    spawn_inndcomm_flush(why ? why : "retry");
+    return 1;                            /* Moved => Flushing; => FLUSHING */
+
+  case sm_SEPARATED:
+  case sm_DROPPING:
+    warn("abandoning old feedfile after flush (%s), autodeferring",
+        why ? why : "took too long to complete");
+    assert(flushing_input_file);
+    autodefer_input_file(flushing_input_file);
+    return 1;
+
+  default:
+    return 0;
+  }
+}
+
+static void statemc_period_poll(void) {
+  if (!until_flush) return;
+  until_flush--;
+  assert(until_flush>=0);
+
+  if (until_flush) return;
+  int ok= trigger_flush_ok(0);
+  assert(ok);
+}
+
+static int inputfile_is_done(InputFile *ipf) {
+  if (!ipf) return 0;
+  if (ipf->inprogress) return 0; /* new article in the meantime */
+  if (ipf->rd) return 0; /* not had EOF */
+  return 1;
+}
+
+static void notice_processed(InputFile *ipf, int completed,
+                            const char *what, const char *spec) {
+  if (!ipf) return; /* allows preterminate to be lazy */
+
+#define RCI_NOTHING(x) /* nothing */
+#define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE
+#define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x])
+
+#define CNT(art,rc) (ipf->counts[art_##art][RC_##rc])
+
+  char *inprog= completed
+    ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */
+    : xasprintf(" inprogress=%ld", ipf->inprogress);
+  char *autodefer= ipf->autodefer >= 0
+    ? xasprintf(" autodeferred=%ld", ipf->autodefer)
+    : xasprintf("%s","");
+
+  info("%s %s%s read=%d (+bl=%d,+err=%d)%s%s"
+       " missing=%d offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)"
+       RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT)
+       ,
+       completed?"completed":"processed", what, spec,
+       ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err,
+       inprog, autodefer, ipf->count_nooffer_missing,
+       CNT(Unchecked,sent) + CNT(Unsolicited,sent)
+       , CNT(Unchecked,sent), CNT(Unsolicited,sent),
+       CNT(Wanted,accepted) + CNT(Unsolicited,accepted)
+       , CNT(Wanted,accepted), CNT(Unsolicited,accepted)
+       RESULT_COUNTS(RCI_NOTHING,  RCI_TRIPLE_VALS)
+       );
+
+  free(inprog);
+  free(autodefer);
+
+#undef CNT
+}
+
+static void statemc_check_backlog_done(void) {
+  InputFile *ipf= backlog_input_file;
+  if (!inputfile_is_done(ipf)) return;
+
+  const char *slash= strrchr(ipf->path, '/');
+  const char *leaf= slash ? slash+1 : ipf->path;
+  const char *under= strchr(slash, '_');
+  const char *rest= under ? under+1 : leaf;
+  if (!strncmp(rest,"backlog",7)) rest += 7;
+  notice_processed(ipf,1,"backlog ",rest);
+
+  close_input_file(ipf);
+  if (unlink(ipf->path)) {
+    if (errno != ENOENT)
+      syscrash("could not unlink processed backlog file %s", ipf->path);
+    warn("backlog file %s vanished while we were reading it"
+        " so we couldn't remove it (but it's done now, anyway)",
+        ipf->path);
+  }
+  free(ipf);
+  backlog_input_file= 0;
+  search_backlog_file();
+  return;
+}
+
+static void statemc_check_flushing_done(void) {
+  InputFile *ipf= flushing_input_file;
+  if (!inputfile_is_done(ipf)) return;
+
+  assert(sms==sm_SEPARATED || sms==sm_DROPPING);
+
+  notice_processed(ipf,1,"feedfile","");
+
+  close_defer();
+
+  xunlink(path_flushing, "old flushing file");
+
+  close_input_file(flushing_input_file);
+  free(flushing_input_file);
+  flushing_input_file= 0;
+
+  if (sms==sm_SEPARATED) {
+    notice("flush complete");
+    SMS(NORMAL, spontaneous_flush_periods, "flush complete");
+  } else if (sms==sm_DROPPING) {
+    SMS(DROPPED, max_separated_periods, "old flush complete");
+    search_backlog_file();
+    notice("feed dropped, but will continue until backlog is finished");
+  }
+}
+
+static void *statemc_check_input_done(oop_source *lp, struct timeval now,
+                                     void *u) {
+  /* main input file may be idle but if so that's because
+   * we haven't got to it yet, but that doesn't mean it's really done */
+  statemc_check_flushing_done();
+  statemc_check_backlog_done();
+  return OOP_CONTINUE;
+}
+
+static void queue_check_input_done(void) {
+  loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
+}
+
+static void statemc_setstate(StateMachineState newsms, int periods,
+                            const char *forlog, const char *why) {
+  sms= newsms;
+  until_flush= periods;
+
+  const char *xtra= "";
+  switch (sms) {
+  case sm_FLUSHING:
+  case sm_FLUSHFAILED:
+    if (!main_input_file) xtra= "-ABSENT";
+    break;
+  case sm_SEPARATED:
+  case sm_DROPPING:
+    xtra= flushing_input_file->rd ? "-1" : "-2";
+    break;
+  default:;
+  }
+
+  if (periods) {
+    info("state %s%s[%d] %s",forlog,xtra,periods,why);
+  } else {
+    info("state %s%s %s",forlog,xtra,why);
+  }
+}
+
+/*========== flushing the feed ==========*/
+
+pid_t inndcomm_child;
+static int inndcomm_sentinel_fd;
+
+static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) {
+  assert(inndcomm_child);
+  assert(fd == inndcomm_sentinel_fd);
+  int status= xwaitpid(&inndcomm_child, "inndcomm");
+  inndcomm_child= 0;
+  
+  cancel_fd_read_except(fd);
+  xclose_perhaps(&fd, "inndcomm sentinel pipe",0);
+  inndcomm_sentinel_fd= 0;
+
+  assert(!flushing_input_file);
+
+  if (WIFEXITED(status)) {
+    switch (WEXITSTATUS(status)) {
+
+    case INNDCOMMCHILD_ESTATUS_FAIL:
+      goto failed;
+
+    case INNDCOMMCHILD_ESTATUS_NONESUCH:
+      notice("feed has been dropped by innd, finishing up");
+      flushing_input_file= main_input_file;
+      tailing_make_readable(flushing_input_file);
+       /* we probably previously returned EAGAIN from our fake read method
+        * when in fact we were at EOF, so signal another readable event
+        * so we actually see the EOF */
+
+      main_input_file= 0;
+
+      if (flushing_input_file) {
+       SMS(DROPPING, max_separated_periods,
+           "feed dropped by innd, but must finish last flush");
+      } else {
+       close_defer();
+       SMS(DROPPED, 0, "feed dropped by innd");
+       search_backlog_file();
+      }
+      return OOP_CONTINUE;
+
+    case 0:
+      /* as above */
+      flushing_input_file= main_input_file;
+      tailing_make_readable(flushing_input_file);
+
+      main_input_file= open_input_file(feedfile);
+      if (!main_input_file)
+       crash("flush succeeded but feedfile %s does not exist!"
+             " (this probably means feedfile does not correspond"
+             " to site %s in newsfeeds)", feedfile, sitename);
+
+      if (flushing_input_file) {
+       SMS(SEPARATED, max_separated_periods, "flush complete");
+      } else {
+       close_defer();
+       SMS(NORMAL, spontaneous_flush_periods, "recovery flush complete");
+      }
+      return OOP_CONTINUE;
+
+    default:
+      goto unexpected_exitstatus;
+
+    }
+  } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
+    warn("flush timed out trying to talk to innd");
+    goto failed;
+  } else {
+  unexpected_exitstatus:
+    report_child_status("inndcomm child", status);
+  }
+
+ failed:
+  SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry");
+  return OOP_CONTINUE;
+}
+
+static void inndcommfail(const char *what) {
+  syswarn("error communicating with innd: %s failed: %s", what, ICCfailure);
+  exit(INNDCOMMCHILD_ESTATUS_FAIL);
+}
+
+void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */
+  int pipefds[2];
+
+  notice("flushing %s",why);
+
+  assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED);
+  assert(!inndcomm_child);
+  assert(!inndcomm_sentinel_fd);
+
+  if (pipe(pipefds)) sysdie("create pipe for inndcomm child sentinel");
+
+  inndcomm_child= xfork("inndcomm child");
+
+  if (!inndcomm_child) {
+    const char *flushargv[2]= { sitename, 0 };
+    char *reply;
+    int r;
+
+    xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0);
+    /* parent spots the autoclose of pipefds[1] when we die or exit */
+
+    if (simulate_flush>=0) {
+      warn("SIMULATING flush child status %d", simulate_flush);
+      if (simulate_flush>128) raise(simulate_flush-128);
+      else exit(simulate_flush);
+    }
+
+    alarm(inndcomm_flush_timeout);
+    r= ICCopen();                         if (r)   inndcommfail("connect");
+    r= ICCcommand('f',flushargv,&reply);  if (r<0) inndcommfail("transmit");
+    if (!r) exit(0); /* yay! */
+
+    if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH);
+    syswarn("innd ctlinnd flush failed: innd said %s", reply);
+    exit(INNDCOMMCHILD_ESTATUS_FAIL);
+  }
+
+  simulate_flush= -1;
+
+  xclose(pipefds[1], "inndcomm sentinel child's end",0);
+  inndcomm_sentinel_fd= pipefds[0];
+  assert(inndcomm_sentinel_fd);
+  on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event);
+
+  SMS(FLUSHING, 0, why);
+}
+
+/*---------- shutdown and signal handling ----------*/
+
+static void preterminate(void) {
+  if (in_child) return;
+  notice_processed(main_input_file,0,"feedfile","");
+  notice_processed(flushing_input_file,0,"flushing","");
+  if (backlog_input_file)
+    notice_processed(backlog_input_file,0, "backlog file ",
+                    backlog_input_file->path);
+}
+
+static int signal_self_pipe[2];
+
+static void raise_default(int signo) {
+  xsigsetdefault(signo);
+  raise(signo);
+  abort();
+}
+
+static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
+  assert(fd=signal_self_pipe[0]);
+  char buf[PIPE_BUF];
+  int r= read(signal_self_pipe[0], buf, sizeof(buf));
+  if (r<0 && !isewouldblock(errno))
+    syscrash("failed to read signal self pipe");
+  if (r==0) crash("eof on signal self pipe");
+  if (terminate_sig_flag) {
+    preterminate();
+    notice("terminating (%s)", strsignal(terminate_sig_flag));
+    raise_default(terminate_sig_flag);
+  }
+  return OOP_CONTINUE;
+}
+
+static void sigarrived_handler(int signum) {
+  static char x;
+  switch (signum) {
+  case SIGTERM:
+  case SIGINT:
+    if (!terminate_sig_flag) terminate_sig_flag= signum;
+    break;
+  default:
+    abort();
+  }
+  write(signal_self_pipe[1],&x,1);
+}
+
+static void init_signals(void) {
+  if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
+    syscrash("could not ignore SIGPIPE");
+
+  if (pipe(signal_self_pipe)) sysdie("create self-pipe for signals");
+
+  xsetnonblock(signal_self_pipe[0],1);
+  xsetnonblock(signal_self_pipe[1],1);
+
+  struct sigaction sa;
+  memset(&sa,0,sizeof(sa));
+  sa.sa_handler= sigarrived_handler;
+  sa.sa_flags= SA_RESTART;
+  xsigaction(SIGTERM,&sa);
+  xsigaction(SIGINT,&sa);
+
+  on_fd_read_except(signal_self_pipe[0], sigarrived_event);
+}
+