chiark / gitweb /
775d1c9afcf5da715397fea8075c7176d367f31e
[innduct.git] / defer.c
1 /*
2  *  innduct
3  *  tailing reliable realtime streaming feeder for inn
4  *  defer.c - handling of defer and backlog files
5  *
6  *  Copyright Ian Jackson <ijackson@chiark.greenend.org.uk>
7  *  and contributors; see LICENCE.txt.
8  *  SPDX-License-Identifier: GPL-3.0-or-later
9  */
10
11 #include "innduct.h"
12
13 /*---------- defer and backlog files ----------*/
14
15 /*
16  * Backlog files are also processed by innduct.  We find the oldest
17  * backlog file which is at least a certain amount old, and feed it
18  * back into our processing.  When every article in it has been read
19  * and processed, we unlink it and look for another backlog file.
20  *
21  * If we don't have a backlog file that we're reading, we close the
22  * defer file that we're writing and make it into a backlog file at
23  * the first convenient opportunity.
24  */
25
26 void open_defer(void) {
27   struct stat stab;
28
29   if (defer) return;
30
31   defer= fopen(path_defer, "a+");
32   if (!defer) sysdie("could not open defer file %s", path_defer);
33
34   /* truncate away any half-written records */
35
36   xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
37
38   if (stab.st_size > LONG_MAX)
39     crash("defer file %s size is far too large", path_defer);
40
41   if (!stab.st_size)
42     return;
43
44   long orgsize= stab.st_size;
45   long truncto= stab.st_size;
46   for (;;) {
47     if (!truncto) break; /* was only (if anything) one half-truncated record */
48     if (fseek(defer, truncto-1, SEEK_SET) < 0)
49       syscrash("seek in defer file %s while truncating partial", path_defer);
50
51     int r= getc(defer);
52     if (r==EOF) {
53       if (ferror(defer))
54         syscrash("failed read from defer file %s", path_defer);
55       else
56         crash("defer file %s shrank while we were checking it!", path_defer);
57     }
58     if (r=='\n') break;
59     truncto--;
60   }
61
62   if (stab.st_size != truncto) {
63     warn("truncating half-record at end of defer file %s -"
64          " shrinking by %ld bytes from %ld to %ld",
65          path_defer, orgsize - truncto, orgsize, truncto);
66
67     if (fflush(defer))
68       sysdie("could not flush defer file %s", path_defer);
69     if (ftruncate(fileno(defer), truncto))
70       syscrash("could not truncate defer file %s", path_defer);
71
72   } else {
73     info("continuing existing defer file %s (%ld bytes)",
74          path_defer, orgsize);
75   }
76   if (fseek(defer, truncto, SEEK_SET))
77     syscrash("could not seek to new end of defer file %s", path_defer);
78 }
79
80 void close_defer(void) {
81   if (!defer)
82     return;
83
84   struct stat stab;
85   xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
86
87   if (fclose(defer)) sysdie("could not close defer file %s", path_defer);
88   defer= 0;
89
90   time_t now= xtime();
91
92   char *backlog= masprintf("%s_backlog_%lu.%lu", feedfile,
93                            (unsigned long)now,
94                            (unsigned long)stab.st_ino);
95   if (link(path_defer, backlog))
96     sysdie("could not install defer file %s as backlog file %s",
97            path_defer, backlog);
98   if (unlink(path_defer))
99     syscrash("could not unlink old defer link %s to backlog file %s",
100              path_defer, backlog);
101
102   free(backlog);
103
104   if (until_backlog_nextscan < 0 ||
105       until_backlog_nextscan > backlog_retry_minperiods + 1)
106     until_backlog_nextscan= backlog_retry_minperiods + 1;
107 }
108
109 void poll_backlog_file(void) {
110   if (until_backlog_nextscan < 0) return;
111   if (until_backlog_nextscan-- > 0) return;
112   search_backlog_file();
113 }
114
115 void search_backlog_file(void) {
116   /* returns non-0 iff there are any backlog files */
117
118   glob_t gl;
119   int r;
120   unsigned ui;
121   struct stat stab;
122   const char *oldest_path=0;
123   time_t oldest_mtime=0, now;
124
125   if (backlog_input_file) return;
126
127  try_again:
128
129   r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
130
131   switch (r) {
132   case GLOB_ABORTED:
133     sysdie("failed to expand backlog pattern %s", globpat_backlog);
134   case GLOB_NOSPACE:
135     die("out of memory expanding backlog pattern %s", globpat_backlog);
136   case 0:
137     for (ui=0; ui<gl.gl_pathc; ui++) {
138       const char *path= gl.gl_pathv[ui];
139
140       if (strchr(path,'#') || strchr(path,'~')) {
141         dbg("backlog file search skipping %s", path);
142         continue;
143       }
144       r= stat(path, &stab);
145       if (r) {
146         syswarn("failed to stat backlog file %s", path);
147         continue;
148       }
149       if (!S_ISREG(stab.st_mode)) {
150         warn("backlog file %s is not a plain file (or link to one)", path);
151         continue;
152       }
153       if (!oldest_path || stab.st_mtime < oldest_mtime) {
154         oldest_path= path;
155         oldest_mtime= stab.st_mtime;
156       }
157     }
158   case GLOB_NOMATCH: /* fall through */
159     break;
160   default:
161     syscrash("glob expansion of backlog pattern %s gave unexpected"
162              " nonzero (error?) return value %d", globpat_backlog, r);
163   }
164
165   if (!oldest_path) {
166     dbg("backlog scan: none");
167
168     if (sms==sm_DROPPED) {
169       preterminate();
170       notice("feed dropped and our work is complete");
171
172       r= unlink(path_cli);
173       if (r && errno!=ENOENT)
174         syswarn("failed to unlink cli socket for old feed");
175
176       xunlink(path_lock, "lockfile for old feed");
177       exit(4);
178     }
179     until_backlog_nextscan= backlog_spontrescan_periods;
180     goto xfree;
181   }
182
183   now= xtime();
184   double age= difftime(now, oldest_mtime);
185   long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
186
187   if (age_deficiency <= 0) {
188     dbg("backlog scan: found age=%f deficiency=%ld oldest=%s",
189           age, age_deficiency, oldest_path);
190
191     backlog_input_file= open_input_file(oldest_path);
192     if (!backlog_input_file) {
193       warn("backlog file %s vanished as we opened it", oldest_path);
194       globfree(&gl);
195       goto try_again;
196     }
197     inputfile_reading_start(backlog_input_file);
198     until_backlog_nextscan= -1;
199     goto xfree;
200   }
201
202   until_backlog_nextscan= age_deficiency / period_seconds;
203
204   if (backlog_spontrescan_periods >= 0 &&
205       until_backlog_nextscan > backlog_spontrescan_periods)
206     until_backlog_nextscan= backlog_spontrescan_periods;
207
208   dbg("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
209         age, age_deficiency, until_backlog_nextscan, oldest_path);
210
211  xfree:
212   globfree(&gl);
213   return;
214 }