chiark / gitweb /
73e291b839dcc6b1c82e809d00e602ed956a8465
[innduct.git] / infile.c
1 /*
2  *  innduct
3  *  tailing reliable realtime streaming feeder for inn
4  *  infile.c - monitoring and handling of input files
5  *
6  *  Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
7  * 
8  *  This program is free software: you can redistribute it and/or modify
9  *  it under the terms of the GNU General Public License as published by
10  *  the Free Software Foundation, either version 3 of the License, or
11  *  (at your option) any later version.
12  * 
13  *  This program is distributed in the hope that it will be useful,
14  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *  GNU General Public License for more details.
17  * 
18  *  You should have received a copy of the GNU General Public License
19  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
20  *
21  *  (I believe that when you compile and link this as part of the inn2
22  *  build, with the Makefile runes I have provided, all the libraries
23  *  and files which end up included in innduct are licence-compatible
24  *  with GPLv3.  If not then please let me know.  -Ian Jackson.)
25  */
26
27 #include "innduct.h"
28
29 /*========== monitoring of input files ==========*/
30
31 static void feedfile_eof(InputFile *ipf) {
32   assert(ipf != main_input_file); /* promised by tailing_try_read */
33   inputfile_reading_stop(ipf);
34
35   if (ipf == flushing_input_file) {
36     assert(sms==sm_SEPARATED || sms==sm_DROPPING);
37     if (main_input_file) inputfile_reading_start(main_input_file);
38     statemc_check_flushing_done();
39   } else if (ipf == backlog_input_file) {
40     statemc_check_backlog_done();
41   } else {
42     abort(); /* supposed to wait rather than get EOF on main input file */
43   }
44 }
45
46 InputFile *open_input_file(const char *path) {
47   int fd= open(path, O_RDWR);
48   if (fd<0) {
49     if (errno==ENOENT) return 0;
50     sysdie("unable to open input file %s", path);
51   }
52   assert(fd>0);
53
54   InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
55   memset(ipf,0,sizeof(*ipf));
56
57   ipf->fd= fd;
58   ipf->autodefer= -1;
59   LIST_INIT(ipf->queue);
60   strcpy(ipf->path, path);
61
62   dbg("started input file %p %s", ipf, path);
63
64   return ipf;
65 }
66
67 void close_input_file(InputFile *ipf) { /* does not free */
68   assert(!ipf->readable_callback); /* must have had ->on_cancel */
69   assert(!ipf->filemon); /* must have had inputfile_reading_stop */
70   assert(!ipf->rd); /* must have had inputfile_reading_stop */
71   assert(!ipf->inprogress); /* no dangling pointers pointing here */
72   xclose_perhaps(&ipf->fd, "input file ", ipf->path);
73 }
74
75
76 /*---------- dealing with articles read in the input file ----------*/
77
78 static void *feedfile_got_bad_data(InputFile *ipf, off_t offset,
79                                    const char *data, const char *how) {
80   warn("corrupted file: %s, offset %lu: %s: in %s",
81        ipf->path, (unsigned long)offset, how, sanitise(data,-1));
82   ipf->counts.events[read_err]++;
83   if (ipf->counts.events[read_err] > max_bad_data_initial +
84       (ipf->counts.events[read_ok] + ipf->counts.events[read_blank])
85                                                   / max_bad_data_ratio)
86     crash("too much garbage in input file!  (%d errs, %d ok, %d blank)",
87           ipf->counts.events[read_err], ipf->counts.events[read_ok],
88           ipf->counts.events[read_blank]);
89   return OOP_CONTINUE;
90 }
91
92 static void *feedfile_read_err(oop_source *lp, oop_read *rd,
93                                oop_rd_event ev, const char *errmsg,
94                                int errnoval, const char *data, size_t recsz,
95                                void *ipf_v) {
96   InputFile *ipf= ipf_v;
97   assert(ev == OOP_RD_SYSTEM);
98   errno= errnoval;
99   syscrash("error reading input file: %s, offset %lu",
100            ipf->path, (unsigned long)ipf->offset);
101 }
102
103 static void *feedfile_got_article(oop_source *lp, oop_read *rd,
104                                   oop_rd_event ev, const char *errmsg,
105                                   int errnoval, const char *data, size_t recsz,
106                                   void *ipf_v) {
107   InputFile *ipf= ipf_v;
108   Article *art;
109   char tokentextbuf[sizeof(TOKEN)*2+3];
110
111   if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
112
113   off_t old_offset= ipf->offset;
114   ipf->offset += recsz + !!(ev == OOP_RD_OK);
115
116 #define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m);
117
118   if (ev==OOP_RD_PARTREC)
119     feedfile_got_bad_data(ipf,old_offset,data,"missing final newline");
120     /* but process it anyway */
121
122   if (ipf->skippinglong) {
123     if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */
124     return OOP_CONTINUE;
125   }
126   if (ev==OOP_RD_LONG) {
127     ipf->skippinglong= 1;
128     X_BAD_DATA("overly long line");
129   }
130
131   if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte");
132   if (!recsz) X_BAD_DATA("empty line");
133
134   if (data[0]==' ') {
135     if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
136     ipf->counts.events[read_blank]++;
137     return OOP_CONTINUE;
138   }
139
140   char *space= strchr(data,' ');
141   int tokenlen= space-data;
142   int midlen= (int)recsz-tokenlen-1;
143   if (midlen <= 2) X_BAD_DATA("no room for messageid");
144   if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid");
145
146   if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length");
147   memcpy(tokentextbuf, data, tokenlen);
148   tokentextbuf[tokenlen]= 0;
149   if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
150
151   ipf->counts.events[read_ok]++;
152
153   art= xmalloc(sizeof(*art) - 1 + midlen + 1);
154   memset(art,0,sizeof(*art));
155   art->state= art_Unchecked;
156   art->midlen= midlen;
157   art->ipf= ipf;  ipf->inprogress++;
158   art->token= TextToToken(tokentextbuf);
159   art->offset= old_offset;
160   art->blanklen= recsz;
161   strcpy(art->messageid, space+1);
162
163   if (ipf->autodefer >= 0) {
164     article_autodefer(ipf, art);
165   } else {
166     LIST_ADDTAIL(ipf->queue, art);
167
168     if (ipf==backlog_input_file)
169       article_check_expired(art);
170   }
171
172   if (sms==sm_NORMAL && ipf==main_input_file &&
173       ipf->offset >= target_max_feedfile_size)
174     statemc_start_flush("feed file size");
175
176   check_assign_articles(); /* may destroy conn but that's OK */
177   check_reading_pause_resume(ipf);
178   return OOP_CONTINUE;
179 }
180
181 /*========== tailing input file ==========*/
182
183 static void tailing_rable_on_time(InputFile *ipf);
184
185 static void *tailing_rable_call_time(oop_source *lp, struct timeval tv,
186                                      void *user) {
187   /* lifetime of ipf here is OK because destruction will cause
188    * on_cancel which will cancel this callback */
189   InputFile *ipf= user;
190
191   //dbg("**TRACT** ipf=%p called",ipf);
192   if (!ipf->fake_readable) return OOP_CONTINUE;
193
194   /* we just keep calling readable until our caller (oop_rd)
195    * has called try_read, and try_read has found EOF so given EAGAIN */
196   //dbg("**TRACT** ipf=%p reschedule",ipf);
197   tailing_rable_on_time(ipf);
198
199   assert(ipf->readable_callback);
200   return ipf->readable_callback(loop, &ipf->readable,
201                                 ipf->readable_callback_user);
202 }
203
204 static void tailing_rable_on_time(InputFile *ipf) {
205   loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
206   loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
207   /* on_time is not idempotent - it counts.   So we need this to make
208    * sure we only have one outstanding, as otherwise our cancel doesn't work */
209 }
210
211 static void tailing_on_cancel(struct oop_readable *rable) {
212   InputFile *ipf= (void*)rable;
213   //dbg("**TOR** ipf=%p on_cancel",ipf);
214
215   if (ipf->filemon) filemon_stop(ipf);
216   //dbg("**TRACT** ipf=%p cancel",ipf);
217   loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
218   ipf->readable_callback= 0;
219 }
220
221 void tailing_make_readable(InputFile *ipf) {
222   //dbg("**TRACT** ipf=%p makereadable rcb=%p",ipf,
223   //    (void*)ipf?ipf->readable_callback:0);
224   if (!ipf || !ipf->readable_callback) /* so callers can be naive */
225     return;
226   ipf->fake_readable= 1;
227   tailing_rable_on_time(ipf);
228 }
229
230 static int tailing_on_readable(struct oop_readable *rable,
231                                 oop_readable_call *cb, void *user) {
232   InputFile *ipf= (void*)rable;
233   //dbg("**TOR** ipf=%p on_readable",ipf);
234
235   tailing_on_cancel(rable);
236   ipf->readable_callback= cb;
237   ipf->readable_callback_user= user;
238   filemon_start(ipf);
239   tailing_make_readable(ipf);
240   return 0;
241 }
242
243 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
244                                 size_t length) {
245   InputFile *ipf= (void*)rable;
246   for (;;) {
247     ssize_t r= read(ipf->fd, buffer, length);
248     if (r==-1) {
249       if (errno==EINTR) continue;
250       ipf->fake_readable= 0;
251       return r;
252     }
253     if (!r) {
254       if (ipf==main_input_file) {
255         errno=EAGAIN;
256         ipf->fake_readable= 0;
257         return -1;
258       } else if (ipf==flushing_input_file) {
259         assert(ipf->rd);
260         assert(sms==sm_SEPARATED || sms==sm_DROPPING);
261       } else if (ipf==backlog_input_file) {
262         assert(ipf->rd);
263       } else {
264         abort();
265       }
266     }
267     //dbg("**TOR** ipf=%p try_read r=%d",ipf,r);
268     return r;
269   }
270 }
271
272 /*---------- interface to start and stop an input file ----------*/
273
274 static const oop_rd_style feedfile_rdstyle= {
275   OOP_RD_DELIM_STRIP, '\n',
276   OOP_RD_NUL_PERMIT,
277   OOP_RD_SHORTREC_LONG,
278 };
279
280 void inputfile_reading_resume(InputFile *ipf) {
281   if (!ipf->rd) return;
282   if (!ipf->paused) return;
283
284   int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
285                      feedfile_got_article,ipf, feedfile_read_err, ipf);
286   if (r) syscrash("unable start reading feedfile %s",ipf->path);
287
288   ipf->paused= 0;
289 }
290
291 void inputfile_reading_pause(InputFile *ipf) {
292   if (!ipf->rd) return;
293   if (ipf->paused) return;
294   oop_rd_cancel(ipf->rd);
295   ipf->paused= 1;
296 }
297
298 void inputfile_reading_start(InputFile *ipf) {
299   assert(!ipf->rd);
300   ipf->readable.on_readable= tailing_on_readable;
301   ipf->readable.on_cancel=   tailing_on_cancel;
302   ipf->readable.try_read=    tailing_try_read;
303   ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
304   ipf->readable.delete_kill= 0;
305
306   ipf->readable_callback= 0;
307   ipf->readable_callback_user= 0;
308
309   ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
310   assert(ipf->rd);
311
312   ipf->paused= 1;
313   inputfile_reading_resume(ipf);
314 }
315
316 void inputfile_reading_stop(InputFile *ipf) {
317   assert(ipf->rd);
318   inputfile_reading_pause(ipf);
319   oop_rd_delete(ipf->rd);
320   ipf->rd= 0;
321   assert(!ipf->filemon); /* we shouldn't be monitoring it now */
322 }
323
324 void filepoll(void) {
325   tailing_make_readable(main_input_file);
326   tailing_make_readable(flushing_input_file);
327 }
328
329 char *dbg_report_ipf(InputFile *ipf) {
330   if (!ipf) return xasprintf("none");
331
332   const char *slash= strrchr(ipf->path,'/');
333   const char *path= slash ? slash+1 : ipf->path;
334
335   return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s%s",
336                    ipf, path,
337                    ipf->queue.count, ipf->inprogress, ipf->autodefer,
338                    (long)ipf->offset, ipf->fd,
339                    ipf->rd ? "" : ",!rd",
340                    ipf->skippinglong ? "*skiplong" : "",
341                    ipf->rd && ipf->paused ? "*paused" : "");
342 }