3 * tailing reliable realtime streaming feeder for inn
4 * defer.c - handling of defer and backlog files
6 * Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
8 * This program is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 * (I believe that when you compile and link this as part of the inn2
22 * build, with the Makefile runes I have provided, all the libraries
23 * and files which end up included in innduct are licence-compatible
24 * with GPLv3. If not then please let me know. -Ian Jackson.)
29 /*---------- defer and backlog files ----------*/
32 * Backlog files are also processed by innduct. We find the oldest
33 * backlog file which is at least a certain amount old, and feed it
34 * back into our processing. When every article in it has been read
35 * and processed, we unlink it and look for another backlog file.
37 * If we don't have a backlog file that we're reading, we close the
38 * defer file that we're writing and make it into a backlog file at
39 * the first convenient opportunity.
42 void open_defer(void) {
47 defer= fopen(path_defer, "a+");
48 if (!defer) sysdie("could not open defer file %s", path_defer);
50 /* truncate away any half-written records */
52 xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
54 if (stab.st_size > LONG_MAX)
55 crash("defer file %s size is far too large", path_defer);
60 long orgsize= stab.st_size;
61 long truncto= stab.st_size;
63 if (!truncto) break; /* was only (if anything) one half-truncated record */
64 if (fseek(defer, truncto-1, SEEK_SET) < 0)
65 syscrash("seek in defer file %s while truncating partial", path_defer);
70 syscrash("failed read from defer file %s", path_defer);
72 crash("defer file %s shrank while we were checking it!", path_defer);
78 if (stab.st_size != truncto) {
79 warn("truncating half-record at end of defer file %s -"
80 " shrinking by %ld bytes from %ld to %ld",
81 path_defer, orgsize - truncto, orgsize, truncto);
84 sysdie("could not flush defer file %s", path_defer);
85 if (ftruncate(fileno(defer), truncto))
86 syscrash("could not truncate defer file %s", path_defer);
89 info("continuing existing defer file %s (%ld bytes)",
92 if (fseek(defer, truncto, SEEK_SET))
93 syscrash("could not seek to new end of defer file %s", path_defer);
96 void close_defer(void) {
101 xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
103 if (fclose(defer)) sysdie("could not close defer file %s", path_defer);
108 char *backlog= masprintf("%s_backlog_%lu.%lu", feedfile,
110 (unsigned long)stab.st_ino);
111 if (link(path_defer, backlog))
112 sysdie("could not install defer file %s as backlog file %s",
113 path_defer, backlog);
114 if (unlink(path_defer))
115 syscrash("could not unlink old defer link %s to backlog file %s",
116 path_defer, backlog);
120 if (until_backlog_nextscan < 0 ||
121 until_backlog_nextscan > backlog_retry_minperiods + 1)
122 until_backlog_nextscan= backlog_retry_minperiods + 1;
125 void poll_backlog_file(void) {
126 if (until_backlog_nextscan < 0) return;
127 if (until_backlog_nextscan-- > 0) return;
128 search_backlog_file();
131 void search_backlog_file(void) {
132 /* returns non-0 iff there are any backlog files */
138 const char *oldest_path=0;
139 time_t oldest_mtime=0, now;
141 if (backlog_input_file) return;
145 r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
149 sysdie("failed to expand backlog pattern %s", globpat_backlog);
151 die("out of memory expanding backlog pattern %s", globpat_backlog);
153 for (ui=0; ui<gl.gl_pathc; ui++) {
154 const char *path= gl.gl_pathv[ui];
156 if (strchr(path,'#') || strchr(path,'~')) {
157 dbg("backlog file search skipping %s", path);
160 r= stat(path, &stab);
162 syswarn("failed to stat backlog file %s", path);
165 if (!S_ISREG(stab.st_mode)) {
166 warn("backlog file %s is not a plain file (or link to one)", path);
169 if (!oldest_path || stab.st_mtime < oldest_mtime) {
171 oldest_mtime= stab.st_mtime;
174 case GLOB_NOMATCH: /* fall through */
177 syscrash("glob expansion of backlog pattern %s gave unexpected"
178 " nonzero (error?) return value %d", globpat_backlog, r);
182 dbg("backlog scan: none");
184 if (sms==sm_DROPPED) {
186 notice("feed dropped and our work is complete");
189 if (r && errno!=ENOENT)
190 syswarn("failed to unlink cli socket for old feed");
192 xunlink(path_lock, "lockfile for old feed");
195 until_backlog_nextscan= backlog_spontrescan_periods;
200 double age= difftime(now, oldest_mtime);
201 long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
203 if (age_deficiency <= 0) {
204 dbg("backlog scan: found age=%f deficiency=%ld oldest=%s",
205 age, age_deficiency, oldest_path);
207 backlog_input_file= open_input_file(oldest_path);
208 if (!backlog_input_file) {
209 warn("backlog file %s vanished as we opened it", oldest_path);
213 inputfile_reading_start(backlog_input_file);
214 until_backlog_nextscan= -1;
218 until_backlog_nextscan= age_deficiency / period_seconds;
220 if (backlog_spontrescan_periods >= 0 &&
221 until_backlog_nextscan > backlog_spontrescan_periods)
222 until_backlog_nextscan= backlog_spontrescan_periods;
224 dbg("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
225 age, age_deficiency, until_backlog_nextscan, oldest_path);