chiark / gitweb /
rename innduct.c to duct.c
[inn-innduct.git] / defer.c
1 /*
2  *  innduct
3  *  tailing reliable realtime streaming feeder for inn
4  *  defer.c - handling of defer and backlog files
5  *
6  *  Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
7  * 
8  *  This program is free software: you can redistribute it and/or modify
9  *  it under the terms of the GNU General Public License as published by
10  *  the Free Software Foundation, either version 3 of the License, or
11  *  (at your option) any later version.
12  * 
13  *  This program is distributed in the hope that it will be useful,
14  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *  GNU General Public License for more details.
17  * 
18  *  You should have received a copy of the GNU General Public License
19  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
20  *
21  *  (I believe that when you compile and link this as part of the inn2
22  *  build, with the Makefile runes I have provided, all the libraries
23  *  and files which end up included in innduct are licence-compatible
24  *  with GPLv3.  If not then please let me know.  -Ian Jackson.)
25  */
26
27 #include "innduct.h"
28
29 /*---------- defer and backlog files ----------*/
30
31 void open_defer(void) {
32   struct stat stab;
33
34   if (defer) return;
35
36   defer= fopen(path_defer, "a+");
37   if (!defer) sysdie("could not open defer file %s", path_defer);
38
39   /* truncate away any half-written records */
40
41   xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file");
42
43   if (stab.st_size > LONG_MAX)
44     crash("defer file %s size is far too large", path_defer);
45
46   if (!stab.st_size)
47     return;
48
49   long orgsize= stab.st_size;
50   long truncto= stab.st_size;
51   for (;;) {
52     if (!truncto) break; /* was only (if anything) one half-truncated record */
53     if (fseek(defer, truncto-1, SEEK_SET) < 0)
54       syscrash("seek in defer file %s while truncating partial", path_defer);
55
56     int r= getc(defer);
57     if (r==EOF) {
58       if (ferror(defer))
59         syscrash("failed read from defer file %s", path_defer);
60       else
61         crash("defer file %s shrank while we were checking it!", path_defer);
62     }
63     if (r=='\n') break;
64     truncto--;
65   }
66
67   if (stab.st_size != truncto) {
68     warn("truncating half-record at end of defer file %s -"
69          " shrinking by %ld bytes from %ld to %ld",
70          path_defer, orgsize - truncto, orgsize, truncto);
71
72     if (fflush(defer))
73       sysdie("could not flush defer file %s", path_defer);
74     if (ftruncate(fileno(defer), truncto))
75       syscrash("could not truncate defer file %s", path_defer);
76
77   } else {
78     info("continuing existing defer file %s (%ld bytes)",
79          path_defer, orgsize);
80   }
81   if (fseek(defer, truncto, SEEK_SET))
82     syscrash("could not seek to new end of defer file %s", path_defer);
83 }
84
85 void close_defer(void) {
86   if (!defer)
87     return;
88
89   struct stat stab;
90   xfstat_isreg(fileno(defer), &stab, path_defer, "defer file");
91
92   if (fclose(defer)) sysdie("could not close defer file %s", path_defer);
93   defer= 0;
94
95   time_t now= xtime();
96
97   char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile,
98                            (unsigned long)now,
99                            (unsigned long)stab.st_ino);
100   if (link(path_defer, backlog))
101     sysdie("could not install defer file %s as backlog file %s",
102            path_defer, backlog);
103   if (unlink(path_defer))
104     syscrash("could not unlink old defer link %s to backlog file %s",
105              path_defer, backlog);
106
107   free(backlog);
108
109   if (until_backlog_nextscan < 0 ||
110       until_backlog_nextscan > backlog_retry_minperiods + 1)
111     until_backlog_nextscan= backlog_retry_minperiods + 1;
112 }
113
114 void poll_backlog_file(void) {
115   if (until_backlog_nextscan < 0) return;
116   if (until_backlog_nextscan-- > 0) return;
117   search_backlog_file();
118 }
119
120 void search_backlog_file(void) {
121   /* returns non-0 iff there are any backlog files */
122
123   glob_t gl;
124   int r;
125   unsigned ui;
126   struct stat stab;
127   const char *oldest_path=0;
128   time_t oldest_mtime=0, now;
129
130   if (backlog_input_file) return;
131
132  try_again:
133
134   r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl);
135
136   switch (r) {
137   case GLOB_ABORTED:
138     sysdie("failed to expand backlog pattern %s", globpat_backlog);
139   case GLOB_NOSPACE:
140     die("out of memory expanding backlog pattern %s", globpat_backlog);
141   case 0:
142     for (ui=0; ui<gl.gl_pathc; ui++) {
143       const char *path= gl.gl_pathv[ui];
144
145       if (strchr(path,'#') || strchr(path,'~')) {
146         dbg("backlog file search skipping %s", path);
147         continue;
148       }
149       r= stat(path, &stab);
150       if (r) {
151         syswarn("failed to stat backlog file %s", path);
152         continue;
153       }
154       if (!S_ISREG(stab.st_mode)) {
155         warn("backlog file %s is not a plain file (or link to one)", path);
156         continue;
157       }
158       if (!oldest_path || stab.st_mtime < oldest_mtime) {
159         oldest_path= path;
160         oldest_mtime= stab.st_mtime;
161       }
162     }
163   case GLOB_NOMATCH: /* fall through */
164     break;
165   default:
166     syscrash("glob expansion of backlog pattern %s gave unexpected"
167              " nonzero (error?) return value %d", globpat_backlog, r);
168   }
169
170   if (!oldest_path) {
171     dbg("backlog scan: none");
172
173     if (sms==sm_DROPPED) {
174       preterminate();
175       notice("feed dropped and our work is complete");
176
177       r= unlink(path_cli);
178       if (r && errno!=ENOENT)
179         syswarn("failed to unlink cli socket for old feed");
180
181       xunlink(path_lock, "lockfile for old feed");
182       exit(4);
183     }
184     until_backlog_nextscan= backlog_spontrescan_periods;
185     goto xfree;
186   }
187
188   now= xtime();
189   double age= difftime(now, oldest_mtime);
190   long age_deficiency= (backlog_retry_minperiods * period_seconds) - age;
191
192   if (age_deficiency <= 0) {
193     dbg("backlog scan: found age=%f deficiency=%ld oldest=%s",
194           age, age_deficiency, oldest_path);
195
196     backlog_input_file= open_input_file(oldest_path);
197     if (!backlog_input_file) {
198       warn("backlog file %s vanished as we opened it", oldest_path);
199       globfree(&gl);
200       goto try_again;
201     }
202     inputfile_reading_start(backlog_input_file);
203     until_backlog_nextscan= -1;
204     goto xfree;
205   }
206
207   until_backlog_nextscan= age_deficiency / period_seconds;
208
209   if (backlog_spontrescan_periods >= 0 &&
210       until_backlog_nextscan > backlog_spontrescan_periods)
211     until_backlog_nextscan= backlog_spontrescan_periods;
212
213   dbg("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s",
214         age, age_deficiency, until_backlog_nextscan, oldest_path);
215
216  xfree:
217   globfree(&gl);
218   return;
219 }