chiark / gitweb /
Cope with NNTP_MSGID_MAXLEN abolishment (replaced with NNTP_MAXLEN_MSGID)
[innduct.git] / defer.c
1 /*
2  *  innduct
3  *  tailing reliable realtime streaming feeder for inn
4  *  defer.c - handling of defer and backlog files
5  *
6  *  Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
7  * 
8  *  This program is free software: you can redistribute it and/or modify
9  *  it under the terms of the GNU General Public License as published by
10  *  the Free Software Foundation, either version 3 of the License, or
11  *  (at your option) any later version.
12  * 
13  *  This program is distributed in the hope that it will be useful,
14  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *  GNU General Public License for more details.
17  * 
18  *  You should have received a copy of the GNU General Public License
19  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
20  *
21  *  (I believe that when you compile and link this as part of the inn2
22  *  build, with the Makefile runes I have provided, all the libraries
23  *  and files which end up included in innduct are licence-compatible
24  *  with GPLv3.  If not then please let me know.  -Ian Jackson.)
25  */
26
27 #include "innduct.h"
28
29 /*---------- defer and backlog files ----------*/
30
31 /*
32  * Backlog files are also processed by innduct.  We find the oldest
33  * backlog file which is at least a certain amount old, and feed it
34  * back into our processing.  When every article in it has been read
35  * and processed, we unlink it and look for another backlog file.
36  *
37  * If we don't have a backlog file that we're reading, we close the
38  * defer file that we're writing and make it into a backlog file at
39  * the first convenient opportunity.
40  */
41
42 void open_defer(void) {
43   struct stat stab;
44
45   if (defer) return;
46
47   defer= fopen(path_defer, "a+");
48   if (!defer) sysdie("could not open defer file %s", path_defer);
49
50   /* truncate away any half-written records */
51
52   xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
53
54   if (stab.st_size > LONG_MAX)
55     crash("defer file %s size is far too large", path_defer);
56
57   if (!stab.st_size)
58     return;
59
60   long orgsize= stab.st_size;
61   long truncto= stab.st_size;
62   for (;;) {
63     if (!truncto) break; /* was only (if anything) one half-truncated record */
64     if (fseek(defer, truncto-1, SEEK_SET) < 0)
65       syscrash("seek in defer file %s while truncating partial", path_defer);
66
67     int r= getc(defer);
68     if (r==EOF) {
69       if (ferror(defer))
70         syscrash("failed read from defer file %s", path_defer);
71       else
72         crash("defer file %s shrank while we were checking it!", path_defer);
73     }
74     if (r=='\n') break;
75     truncto--;
76   }
77
78   if (stab.st_size != truncto) {
79     warn("truncating half-record at end of defer file %s -"
80          " shrinking by %ld bytes from %ld to %ld",
81          path_defer, orgsize - truncto, orgsize, truncto);
82
83     if (fflush(defer))
84       sysdie("could not flush defer file %s", path_defer);
85     if (ftruncate(fileno(defer), truncto))
86       syscrash("could not truncate defer file %s", path_defer);
87
88   } else {
89     info("continuing existing defer file %s (%ld bytes)",
90          path_defer, orgsize);
91   }
92   if (fseek(defer, truncto, SEEK_SET))
93     syscrash("could not seek to new end of defer file %s", path_defer);
94 }
95
96 void close_defer(void) {
97   if (!defer)
98     return;
99
100   struct stat stab;
101   xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
102
103   if (fclose(defer)) sysdie("could not close defer file %s", path_defer);
104   defer= 0;
105
106   time_t now= xtime();
107
108   char *backlog= masprintf("%s_backlog_%lu.%lu", feedfile,
109                            (unsigned long)now,
110                            (unsigned long)stab.st_ino);
111   if (link(path_defer, backlog))
112     sysdie("could not install defer file %s as backlog file %s",
113            path_defer, backlog);
114   if (unlink(path_defer))
115     syscrash("could not unlink old defer link %s to backlog file %s",
116              path_defer, backlog);
117
118   free(backlog);
119
120   if (until_backlog_nextscan < 0 ||
121       until_backlog_nextscan > backlog_retry_minperiods + 1)
122     until_backlog_nextscan= backlog_retry_minperiods + 1;
123 }
124
125 void poll_backlog_file(void) {
126   if (until_backlog_nextscan < 0) return;
127   if (until_backlog_nextscan-- > 0) return;
128   search_backlog_file();
129 }
130
131 void search_backlog_file(void) {
132   /* returns non-0 iff there are any backlog files */
133
134   glob_t gl;
135   int r;
136   unsigned ui;
137   struct stat stab;
138   const char *oldest_path=0;
139   time_t oldest_mtime=0, now;
140
141   if (backlog_input_file) return;
142
143  try_again:
144
145   r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
146
147   switch (r) {
148   case GLOB_ABORTED:
149     sysdie("failed to expand backlog pattern %s", globpat_backlog);
150   case GLOB_NOSPACE:
151     die("out of memory expanding backlog pattern %s", globpat_backlog);
152   case 0:
153     for (ui=0; ui<gl.gl_pathc; ui++) {
154       const char *path= gl.gl_pathv[ui];
155
156       if (strchr(path,'#') || strchr(path,'~')) {
157         dbg("backlog file search skipping %s", path);
158         continue;
159       }
160       r= stat(path, &stab);
161       if (r) {
162         syswarn("failed to stat backlog file %s", path);
163         continue;
164       }
165       if (!S_ISREG(stab.st_mode)) {
166         warn("backlog file %s is not a plain file (or link to one)", path);
167         continue;
168       }
169       if (!oldest_path || stab.st_mtime < oldest_mtime) {
170         oldest_path= path;
171         oldest_mtime= stab.st_mtime;
172       }
173     }
174   case GLOB_NOMATCH: /* fall through */
175     break;
176   default:
177     syscrash("glob expansion of backlog pattern %s gave unexpected"
178              " nonzero (error?) return value %d", globpat_backlog, r);
179   }
180
181   if (!oldest_path) {
182     dbg("backlog scan: none");
183
184     if (sms==sm_DROPPED) {
185       preterminate();
186       notice("feed dropped and our work is complete");
187
188       r= unlink(path_cli);
189       if (r && errno!=ENOENT)
190         syswarn("failed to unlink cli socket for old feed");
191
192       xunlink(path_lock, "lockfile for old feed");
193       exit(4);
194     }
195     until_backlog_nextscan= backlog_spontrescan_periods;
196     goto xfree;
197   }
198
199   now= xtime();
200   double age= difftime(now, oldest_mtime);
201   long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
202
203   if (age_deficiency <= 0) {
204     dbg("backlog scan: found age=%f deficiency=%ld oldest=%s",
205           age, age_deficiency, oldest_path);
206
207     backlog_input_file= open_input_file(oldest_path);
208     if (!backlog_input_file) {
209       warn("backlog file %s vanished as we opened it", oldest_path);
210       globfree(&gl);
211       goto try_again;
212     }
213     inputfile_reading_start(backlog_input_file);
214     until_backlog_nextscan= -1;
215     goto xfree;
216   }
217
218   until_backlog_nextscan= age_deficiency / period_seconds;
219
220   if (backlog_spontrescan_periods >= 0 &&
221       until_backlog_nextscan > backlog_spontrescan_periods)
222     until_backlog_nextscan= backlog_spontrescan_periods;
223
224   dbg("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
225         age, age_deficiency, until_backlog_nextscan, oldest_path);
226
227  xfree:
228   globfree(&gl);
229   return;
230 }