3 * tailing reliable realtime streaming feeder for inn
4 * defer.c - handling of defer and backlog files
6 * Copyright Ian Jackson <ijackson@chiark.greenend.org.uk>
7 * and contributors; see LICENCE.txt.
8 * SPDX-License-Identifier: GPL-3.0-or-later
13 /*---------- defer and backlog files ----------*/
16 * Backlog files are also processed by innduct. We find the oldest
17 * backlog file which is at least a certain amount old, and feed it
18 * back into our processing. When every article in it has been read
19 * and processed, we unlink it and look for another backlog file.
21 * If we don't have a backlog file that we're reading, we close the
22 * defer file that we're writing and make it into a backlog file at
23 * the first convenient opportunity.
26 void open_defer(void) {
31 defer= fopen(path_defer, "a+");
32 if (!defer) sysdie("could not open defer file %s", path_defer);
34 /* truncate away any half-written records */
36 xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
38 if (stab.st_size > LONG_MAX)
39 crash("defer file %s size is far too large", path_defer);
44 long orgsize= stab.st_size;
45 long truncto= stab.st_size;
47 if (!truncto) break; /* was only (if anything) one half-truncated record */
48 if (fseek(defer, truncto-1, SEEK_SET) < 0)
49 syscrash("seek in defer file %s while truncating partial", path_defer);
54 syscrash("failed read from defer file %s", path_defer);
56 crash("defer file %s shrank while we were checking it!", path_defer);
62 if (stab.st_size != truncto) {
63 warn("truncating half-record at end of defer file %s -"
64 " shrinking by %ld bytes from %ld to %ld",
65 path_defer, orgsize - truncto, orgsize, truncto);
68 sysdie("could not flush defer file %s", path_defer);
69 if (ftruncate(fileno(defer), truncto))
70 syscrash("could not truncate defer file %s", path_defer);
73 info("continuing existing defer file %s (%ld bytes)",
76 if (fseek(defer, truncto, SEEK_SET))
77 syscrash("could not seek to new end of defer file %s", path_defer);
80 void close_defer(void) {
85 xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
87 if (fclose(defer)) sysdie("could not close defer file %s", path_defer);
92 char *backlog= masprintf("%s_backlog_%lu.%lu", feedfile,
94 (unsigned long)stab.st_ino);
95 if (link(path_defer, backlog))
96 sysdie("could not install defer file %s as backlog file %s",
98 if (unlink(path_defer))
99 syscrash("could not unlink old defer link %s to backlog file %s",
100 path_defer, backlog);
104 if (until_backlog_nextscan < 0 ||
105 until_backlog_nextscan > backlog_retry_minperiods + 1)
106 until_backlog_nextscan= backlog_retry_minperiods + 1;
109 void poll_backlog_file(void) {
110 if (until_backlog_nextscan < 0) return;
111 if (until_backlog_nextscan-- > 0) return;
112 search_backlog_file();
115 void search_backlog_file(void) {
116 /* returns non-0 iff there are any backlog files */
122 const char *oldest_path=0;
123 time_t oldest_mtime=0, now;
125 if (backlog_input_file) return;
129 r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
133 sysdie("failed to expand backlog pattern %s", globpat_backlog);
135 die("out of memory expanding backlog pattern %s", globpat_backlog);
137 for (ui=0; ui<gl.gl_pathc; ui++) {
138 const char *path= gl.gl_pathv[ui];
140 if (strchr(path,'#') || strchr(path,'~')) {
141 dbg("backlog file search skipping %s", path);
144 r= stat(path, &stab);
146 syswarn("failed to stat backlog file %s", path);
149 if (!S_ISREG(stab.st_mode)) {
150 warn("backlog file %s is not a plain file (or link to one)", path);
153 if (!oldest_path || stab.st_mtime < oldest_mtime) {
155 oldest_mtime= stab.st_mtime;
158 case GLOB_NOMATCH: /* fall through */
161 syscrash("glob expansion of backlog pattern %s gave unexpected"
162 " nonzero (error?) return value %d", globpat_backlog, r);
166 dbg("backlog scan: none");
168 if (sms==sm_DROPPED) {
170 notice("feed dropped and our work is complete");
173 if (r && errno!=ENOENT)
174 syswarn("failed to unlink cli socket for old feed");
176 xunlink(path_lock, "lockfile for old feed");
179 until_backlog_nextscan= backlog_spontrescan_periods;
184 double age= difftime(now, oldest_mtime);
185 long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
187 if (age_deficiency <= 0) {
188 dbg("backlog scan: found age=%f deficiency=%ld oldest=%s",
189 age, age_deficiency, oldest_path);
191 backlog_input_file= open_input_file(oldest_path);
192 if (!backlog_input_file) {
193 warn("backlog file %s vanished as we opened it", oldest_path);
197 inputfile_reading_start(backlog_input_file);
198 until_backlog_nextscan= -1;
202 until_backlog_nextscan= age_deficiency / period_seconds;
204 if (backlog_spontrescan_periods >= 0 &&
205 until_backlog_nextscan > backlog_spontrescan_periods)
206 until_backlog_nextscan= backlog_spontrescan_periods;
208 dbg("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
209 age, age_deficiency, until_backlog_nextscan, oldest_path);