chiark / gitweb /
wip split into multiple files and make compile
[innduct.git] / defer.c
diff --git a/defer.c b/defer.c
new file mode 100644 (file)
index 0000000..9af9d58
--- /dev/null
+++ b/defer.c
@@ -0,0 +1,192 @@
+/*---------- defer and backlog files ----------*/
+
+static void open_defer(void) {
+  struct stat stab;
+
+  if (defer) return;
+
+  defer= fopen(path_defer, "a+");
+  if (!defer) sysdie("could not open defer file %s", path_defer);
+
+  /* truncate away any half-written records */
+
+  xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
+
+  if (stab.st_size > LONG_MAX)
+    crash("defer file %s size is far too large", path_defer);
+
+  if (!stab.st_size)
+    return;
+
+  long orgsize= stab.st_size;
+  long truncto= stab.st_size;
+  for (;;) {
+    if (!truncto) break; /* was only (if anything) one half-truncated record */
+    if (fseek(defer, truncto-1, SEEK_SET) < 0)
+      syscrash("seek in defer file %s while truncating partial", path_defer);
+
+    int r= getc(defer);
+    if (r==EOF) {
+      if (ferror(defer))
+       syscrash("failed read from defer file %s", path_defer);
+      else
+       crash("defer file %s shrank while we were checking it!", path_defer);
+    }
+    if (r=='\n') break;
+    truncto--;
+  }
+
+  if (stab.st_size != truncto) {
+    warn("truncating half-record at end of defer file %s -"
+        " shrinking by %ld bytes from %ld to %ld",
+        path_defer, orgsize - truncto, orgsize, truncto);
+
+    if (fflush(defer))
+      sysdie("could not flush defer file %s", path_defer);
+    if (ftruncate(fileno(defer), truncto))
+      syscrash("could not truncate defer file %s", path_defer);
+
+  } else {
+    info("continuing existing defer file %s (%ld bytes)",
+        path_defer, orgsize);
+  }
+  if (fseek(defer, truncto, SEEK_SET))
+    syscrash("could not seek to new end of defer file %s", path_defer);
+}
+
+static void close_defer(void) {
+  if (!defer)
+    return;
+
+  struct stat stab;
+  xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
+
+  if (fclose(defer)) sysdie("could not close defer file %s", path_defer);
+  defer= 0;
+
+  time_t now= xtime();
+
+  char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile,
+                          (unsigned long)now,
+                          (unsigned long)stab.st_ino);
+  if (link(path_defer, backlog))
+    sysdie("could not install defer file %s as backlog file %s",
+          path_defer, backlog);
+  if (unlink(path_defer))
+    syscrash("could not unlink old defer link %s to backlog file %s",
+            path_defer, backlog);
+
+  free(backlog);
+
+  if (until_backlog_nextscan < 0 ||
+      until_backlog_nextscan > backlog_retry_minperiods + 1)
+    until_backlog_nextscan= backlog_retry_minperiods + 1;
+}
+
+void poll_backlog_file(void) {
+  if (until_backlog_nextscan < 0) return;
+  if (until_backlog_nextscan-- > 0) return;
+  search_backlog_file();
+}
+
+static void search_backlog_file(void) {
+  /* returns non-0 iff there are any backlog files */
+
+  glob_t gl;
+  int r;
+  unsigned ui;
+  struct stat stab;
+  const char *oldest_path=0;
+  time_t oldest_mtime=0, now;
+
+  if (backlog_input_file) return;
+
+ try_again:
+
+  r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
+
+  switch (r) {
+  case GLOB_ABORTED:
+    sysdie("failed to expand backlog pattern %s", globpat_backlog);
+  case GLOB_NOSPACE:
+    die("out of memory expanding backlog pattern %s", globpat_backlog);
+  case 0:
+    for (ui=0; ui<gl.gl_pathc; ui++) {
+      const char *path= gl.gl_pathv[ui];
+
+      if (strchr(path,'#') || strchr(path,'~')) {
+       dbg("backlog file search skipping %s", path);
+       continue;
+      }
+      r= stat(path, &stab);
+      if (r) {
+       syswarn("failed to stat backlog file %s", path);
+       continue;
+      }
+      if (!S_ISREG(stab.st_mode)) {
+       warn("backlog file %s is not a plain file (or link to one)", path);
+       continue;
+      }
+      if (!oldest_path || stab.st_mtime < oldest_mtime) {
+       oldest_path= path;
+       oldest_mtime= stab.st_mtime;
+      }
+    }
+  case GLOB_NOMATCH: /* fall through */
+    break;
+  default:
+    syscrash("glob expansion of backlog pattern %s gave unexpected"
+            " nonzero (error?) return value %d", globpat_backlog, r);
+  }
+
+  if (!oldest_path) {
+    dbg("backlog scan: none");
+
+    if (sms==sm_DROPPED) {
+      preterminate();
+      notice("feed dropped and our work is complete");
+
+      r= unlink(path_cli);
+      if (r && errno!=ENOENT)
+       syswarn("failed to unlink cli socket for old feed");
+
+      xunlink(path_lock, "lockfile for old feed");
+      exit(4);
+    }
+    until_backlog_nextscan= backlog_spontrescan_periods;
+    goto xfree;
+  }
+
+  now= xtime();
+  double age= difftime(now, oldest_mtime);
+  long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
+
+  if (age_deficiency <= 0) {
+    dbg("backlog scan: found age=%f deficiency=%ld oldest=%s",
+         age, age_deficiency, oldest_path);
+
+    backlog_input_file= open_input_file(oldest_path);
+    if (!backlog_input_file) {
+      warn("backlog file %s vanished as we opened it", oldest_path);
+      globfree(&gl);
+      goto try_again;
+    }
+    inputfile_reading_start(backlog_input_file);
+    until_backlog_nextscan= -1;
+    goto xfree;
+  }
+
+  until_backlog_nextscan= age_deficiency / period_seconds;
+
+  if (backlog_spontrescan_periods >= 0 &&
+      until_backlog_nextscan > backlog_spontrescan_periods)
+    until_backlog_nextscan= backlog_spontrescan_periods;
+
+  dbg("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
+       age, age_deficiency, until_backlog_nextscan, oldest_path);
+
+ xfree:
+  globfree(&gl);
+  return;
+}
+