3 * tailing reliable realtime streaming feeder for inn
4 * defer.c - handling of defer and backlog files
6 * Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
8 * This program is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 * (I believe that when you compile and link this as part of the inn2
22 * build, with the Makefile runes I have provided, all the libraries
23 * and files which end up included in innduct are licence-compatible
24 * with GPLv3. If not then please let me know. -Ian Jackson.)
29 /*---------- defer and backlog files ----------*/
31 void open_defer(void) {
36 defer= fopen(path_defer, "a+");
37 if (!defer) sysdie("could not open defer file %s", path_defer);
39 /* truncate away any half-written records */
41 xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
43 if (stab.st_size > LONG_MAX)
44 crash("defer file %s size is far too large", path_defer);
49 long orgsize= stab.st_size;
50 long truncto= stab.st_size;
52 if (!truncto) break; /* was only (if anything) one half-truncated record */
53 if (fseek(defer, truncto-1, SEEK_SET) < 0)
54 syscrash("seek in defer file %s while truncating partial", path_defer);
59 syscrash("failed read from defer file %s", path_defer);
61 crash("defer file %s shrank while we were checking it!", path_defer);
67 if (stab.st_size != truncto) {
68 warn("truncating half-record at end of defer file %s -"
69 " shrinking by %ld bytes from %ld to %ld",
70 path_defer, orgsize - truncto, orgsize, truncto);
73 sysdie("could not flush defer file %s", path_defer);
74 if (ftruncate(fileno(defer), truncto))
75 syscrash("could not truncate defer file %s", path_defer);
78 info("continuing existing defer file %s (%ld bytes)",
81 if (fseek(defer, truncto, SEEK_SET))
82 syscrash("could not seek to new end of defer file %s", path_defer);
85 void close_defer(void) {
90 xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
92 if (fclose(defer)) sysdie("could not close defer file %s", path_defer);
97 char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile,
99 (unsigned long)stab.st_ino);
100 if (link(path_defer, backlog))
101 sysdie("could not install defer file %s as backlog file %s",
102 path_defer, backlog);
103 if (unlink(path_defer))
104 syscrash("could not unlink old defer link %s to backlog file %s",
105 path_defer, backlog);
109 if (until_backlog_nextscan < 0 ||
110 until_backlog_nextscan > backlog_retry_minperiods + 1)
111 until_backlog_nextscan= backlog_retry_minperiods + 1;
114 void poll_backlog_file(void) {
115 if (until_backlog_nextscan < 0) return;
116 if (until_backlog_nextscan-- > 0) return;
117 search_backlog_file();
120 void search_backlog_file(void) {
121 /* returns non-0 iff there are any backlog files */
127 const char *oldest_path=0;
128 time_t oldest_mtime=0, now;
130 if (backlog_input_file) return;
134 r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
138 sysdie("failed to expand backlog pattern %s", globpat_backlog);
140 die("out of memory expanding backlog pattern %s", globpat_backlog);
142 for (ui=0; ui<gl.gl_pathc; ui++) {
143 const char *path= gl.gl_pathv[ui];
145 if (strchr(path,'#') || strchr(path,'~')) {
146 dbg("backlog file search skipping %s", path);
149 r= stat(path, &stab);
151 syswarn("failed to stat backlog file %s", path);
154 if (!S_ISREG(stab.st_mode)) {
155 warn("backlog file %s is not a plain file (or link to one)", path);
158 if (!oldest_path || stab.st_mtime < oldest_mtime) {
160 oldest_mtime= stab.st_mtime;
163 case GLOB_NOMATCH: /* fall through */
166 syscrash("glob expansion of backlog pattern %s gave unexpected"
167 " nonzero (error?) return value %d", globpat_backlog, r);
171 dbg("backlog scan: none");
173 if (sms==sm_DROPPED) {
175 notice("feed dropped and our work is complete");
178 if (r && errno!=ENOENT)
179 syswarn("failed to unlink cli socket for old feed");
181 xunlink(path_lock, "lockfile for old feed");
184 until_backlog_nextscan= backlog_spontrescan_periods;
189 double age= difftime(now, oldest_mtime);
190 long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
192 if (age_deficiency <= 0) {
193 dbg("backlog scan: found age=%f deficiency=%ld oldest=%s",
194 age, age_deficiency, oldest_path);
196 backlog_input_file= open_input_file(oldest_path);
197 if (!backlog_input_file) {
198 warn("backlog file %s vanished as we opened it", oldest_path);
202 inputfile_reading_start(backlog_input_file);
203 until_backlog_nextscan= -1;
207 until_backlog_nextscan= age_deficiency / period_seconds;
209 if (backlog_spontrescan_periods >= 0 &&
210 until_backlog_nextscan > backlog_spontrescan_periods)
211 until_backlog_nextscan= backlog_spontrescan_periods;
213 dbg("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
214 age, age_deficiency, until_backlog_nextscan, oldest_path);