chiark / gitweb /
changelog: Finalise 2.2
[innduct.git] / infile.c
1 /*
2  *  innduct
3  *  tailing reliable realtime streaming feeder for inn
4  *  infile.c - monitoring and handling of input files
5  *
6  *  Copyright Ian Jackson <ijackson@chiark.greenend.org.uk>
7  *  and contributors; see LICENCE.txt.
8  *  SPDX-License-Identifier: GPL-3.0-or-later
9  */
10
11 #include "innduct.h"
12
13 /*========== monitoring of input files ==========*/
14
15 static void feedfile_eof(InputFile *ipf) {
16   assert(ipf != main_input_file); /* promised by tailing_try_read */
17   inputfile_reading_stop(ipf);
18
19   if (ipf == flushing_input_file) {
20     assert(sms==sm_SEPARATED || sms==sm_DROPPING);
21     if (main_input_file) inputfile_reading_start(main_input_file);
22     statemc_check_flushing_done();
23   } else if (ipf == backlog_input_file) {
24     statemc_check_backlog_done();
25   } else {
26     abort(); /* supposed to wait rather than get EOF on main input file */
27   }
28 }
29
30 InputFile *open_input_file(const char *path) {
31   int fd= open(path, O_RDWR);
32   if (fd<0) {
33     if (errno==ENOENT) return 0;
34     sysdie("unable to open input file %s", path);
35   }
36   assert(fd>0);
37
38   InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
39   memset(ipf,0,sizeof(*ipf));
40
41   ipf->fd= fd;
42   ipf->autodefer= -1;
43   LIST_INIT(ipf->queue);
44   strcpy(ipf->path, path);
45
46   dbg("started input file %p %s", ipf, path);
47
48   return ipf;
49 }
50
51 void close_input_file(InputFile *ipf) { /* does not free */
52   assert(!ipf->readable_callback); /* must have had ->on_cancel */
53   assert(!ipf->filemon); /* must have had inputfile_reading_stop */
54   assert(!ipf->rd); /* must have had inputfile_reading_stop */
55   assert(!ipf->inprogress); /* no dangling pointers pointing here */
56   xclose_perhaps(&ipf->fd, "input file ", ipf->path);
57 }
58
59
60 /*---------- dealing with articles read in the input file ----------*/
61
62 static void *feedfile_got_bad_data(InputFile *ipf, off_t offset,
63                                    const char *data, const char *how) {
64   warn("corrupted file: %s, offset %lu: %s: in %s",
65        ipf->path, (unsigned long)offset, how, sanitise(data,-1));
66   ipf->counts.events[read_err]++;
67   if (ipf->counts.events[read_err] > max_bad_data_initial +
68       (ipf->counts.events[read_ok] + ipf->counts.events[read_blank])
69                                                   / max_bad_data_ratio)
70     crash("too much garbage in input file!  (%d errs, %d ok, %d blank)",
71           ipf->counts.events[read_err], ipf->counts.events[read_ok],
72           ipf->counts.events[read_blank]);
73   return OOP_CONTINUE;
74 }
75
76 static void *feedfile_read_err(oop_source *lp, oop_read *rd,
77                                oop_rd_event ev, const char *errmsg,
78                                int errnoval, const char *data, size_t recsz,
79                                void *ipf_v) {
80   InputFile *ipf= ipf_v;
81   assert(ev == OOP_RD_SYSTEM);
82   errno= errnoval;
83   syscrash("error reading input file: %s, offset %lu",
84            ipf->path, (unsigned long)ipf->offset);
85 }
86
87 static void *feedfile_got_article(oop_source *lp, oop_read *rd,
88                                   oop_rd_event ev, const char *errmsg,
89                                   int errnoval, const char *data, size_t recsz,
90                                   void *ipf_v) {
91   InputFile *ipf= ipf_v;
92   Article *art;
93   char tokentextbuf[sizeof(TOKEN)*2+3];
94
95   if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
96
97   off_t old_offset= ipf->offset;
98   ipf->offset += recsz + !!(ev == OOP_RD_OK);
99
100 #define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m);
101
102   if (ev==OOP_RD_PARTREC)
103     feedfile_got_bad_data(ipf,old_offset,data,"missing final newline");
104     /* but process it anyway */
105
106   if (ipf->skippinglong) {
107     if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */
108     return OOP_CONTINUE;
109   }
110   if (ev==OOP_RD_LONG) {
111     ipf->skippinglong= 1;
112     X_BAD_DATA("overly long line");
113   }
114
115   if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte");
116   if (!recsz) X_BAD_DATA("empty line");
117
118   if (data[0]==' ') {
119     if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
120     ipf->counts.events[read_blank]++;
121     return OOP_CONTINUE;
122   }
123
124   char *space= strchr(data,' ');
125   int tokenlen= space-data;
126   int midlen= (int)recsz-tokenlen-1;
127   if (midlen <= 2) X_BAD_DATA("no room for messageid");
128   if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid");
129
130   if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length");
131   memcpy(tokentextbuf, data, tokenlen);
132   tokentextbuf[tokenlen]= 0;
133   if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
134
135   ipf->counts.events[read_ok]++;
136
137   art= xmalloc(sizeof(*art) - 1 + midlen + 1);
138   memset(art,0,sizeof(*art));
139   art->state= art_Unchecked;
140   art->midlen= midlen;
141   art->ipf= ipf;  ipf->inprogress++;
142   art->token= TextToToken(tokentextbuf);
143   art->offset= old_offset;
144   art->blanklen= recsz;
145   strcpy(art->messageid, space+1);
146
147   if (ipf->autodefer >= 0) {
148     article_autodefer(ipf, art);
149   } else {
150     LIST_ADDTAIL(ipf->queue, art);
151
152     if (ipf==backlog_input_file)
153       article_check_expired(art);
154   }
155
156   if (sms==sm_NORMAL && ipf==main_input_file &&
157       ipf->offset >= target_max_feedfile_size)
158     statemc_start_flush("feed file size");
159
160   check_assign_articles(); /* may destroy conn but that's OK */
161   check_reading_pause_resume(ipf);
162   return OOP_CONTINUE;
163 }
164
165 /*========== tailing input file ==========*/
166
167 static void tailing_rable_on_time(InputFile *ipf);
168
169 static void *tailing_rable_call_time(oop_source *lp, struct timeval tv,
170                                      void *user) {
171   /* lifetime of ipf here is OK because destruction will cause
172    * on_cancel which will cancel this callback */
173   InputFile *ipf= user;
174
175   //dbg("**TRACT** ipf=%p called",ipf);
176   if (!ipf->fake_readable) return OOP_CONTINUE;
177
178   /* we just keep calling readable until our caller (oop_rd)
179    * has called try_read, and try_read has found EOF so given EAGAIN */
180   //dbg("**TRACT** ipf=%p reschedule",ipf);
181   tailing_rable_on_time(ipf);
182
183   assert(ipf->readable_callback);
184   return ipf->readable_callback(loop, &ipf->readable,
185                                 ipf->readable_callback_user);
186 }
187
188 static void tailing_rable_on_time(InputFile *ipf) {
189   loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
190   loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
191   /* on_time is not idempotent - it counts.   So we need this to make
192    * sure we only have one outstanding, as otherwise our cancel doesn't work */
193 }
194
195 static void tailing_on_cancel(struct oop_readable *rable) {
196   InputFile *ipf= (void*)rable;
197   //dbg("**TOR** ipf=%p on_cancel",ipf);
198
199   if (ipf->filemon) filemon_stop(ipf);
200   //dbg("**TRACT** ipf=%p cancel",ipf);
201   loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
202   ipf->readable_callback= 0;
203 }
204
205 void tailing_make_readable(InputFile *ipf) {
206   //dbg("**TRACT** ipf=%p makereadable rcb=%p",ipf,
207   //    (void*)ipf?ipf->readable_callback:0);
208   if (!ipf || !ipf->readable_callback) /* so callers can be naive */
209     return;
210   ipf->fake_readable= 1;
211   tailing_rable_on_time(ipf);
212 }
213
214 static int tailing_on_readable(struct oop_readable *rable,
215                                 oop_readable_call *cb, void *user) {
216   InputFile *ipf= (void*)rable;
217   //dbg("**TOR** ipf=%p on_readable",ipf);
218
219   tailing_on_cancel(rable);
220   ipf->readable_callback= cb;
221   ipf->readable_callback_user= user;
222   filemon_start(ipf);
223   tailing_make_readable(ipf);
224   return 0;
225 }
226
227 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
228                                 size_t length) {
229   InputFile *ipf= (void*)rable;
230   for (;;) {
231     ssize_t r= read(ipf->fd, buffer, length);
232     if (r==-1) {
233       if (errno==EINTR) continue;
234       ipf->fake_readable= 0;
235       return r;
236     }
237     if (!r) {
238       if (ipf==main_input_file) {
239         errno=EAGAIN;
240         ipf->fake_readable= 0;
241         return -1;
242       } else if (ipf==flushing_input_file) {
243         assert(ipf->rd);
244         assert(sms==sm_SEPARATED || sms==sm_DROPPING);
245       } else if (ipf==backlog_input_file) {
246         assert(ipf->rd);
247       } else {
248         abort();
249       }
250     }
251     //dbg("**TOR** ipf=%p try_read r=%d",ipf,r);
252     return r;
253   }
254 }
255
256 /*---------- interface to start and stop an input file ----------*/
257
258 static const oop_rd_style feedfile_rdstyle= {
259   OOP_RD_DELIM_STRIP, '\n',
260   OOP_RD_NUL_PERMIT,
261   OOP_RD_SHORTREC_LONG,
262 };
263
264 void inputfile_reading_resume(InputFile *ipf) {
265   if (!ipf->rd) return;
266   if (!ipf->paused) return;
267
268   int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
269                      feedfile_got_article,ipf, feedfile_read_err, ipf);
270   if (r) syscrash("unable start reading feedfile %s",ipf->path);
271
272   ipf->paused= 0;
273 }
274
275 void inputfile_reading_pause(InputFile *ipf) {
276   if (!ipf->rd) return;
277   if (ipf->paused) return;
278   oop_rd_cancel(ipf->rd);
279   ipf->paused= 1;
280 }
281
282 void inputfile_reading_start(InputFile *ipf) {
283   assert(!ipf->rd);
284   ipf->readable.on_readable= tailing_on_readable;
285   ipf->readable.on_cancel=   tailing_on_cancel;
286   ipf->readable.try_read=    tailing_try_read;
287   ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
288   ipf->readable.delete_kill= 0;
289
290   ipf->readable_callback= 0;
291   ipf->readable_callback_user= 0;
292
293   ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
294   assert(ipf->rd);
295
296   ipf->paused= 1;
297   inputfile_reading_resume(ipf);
298 }
299
300 void inputfile_reading_stop(InputFile *ipf) {
301   assert(ipf->rd);
302   inputfile_reading_pause(ipf);
303   oop_rd_delete(ipf->rd);
304   ipf->rd= 0;
305   assert(!ipf->filemon); /* we shouldn't be monitoring it now */
306 }
307
308 void filepoll(void) {
309   tailing_make_readable(main_input_file);
310   tailing_make_readable(flushing_input_file);
311 }
312
313 char *dbg_report_ipf(InputFile *ipf) {
314   if (!ipf) return masprintf("none");
315
316   const char *slash= strrchr(ipf->path,'/');
317   const char *path= slash ? slash+1 : ipf->path;
318
319   return masprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s%s",
320                    ipf, path,
321                    ipf->queue.count, ipf->inprogress, ipf->autodefer,
322                    (long)ipf->offset, ipf->fd,
323                    ipf->rd ? "" : ",!rd",
324                    ipf->skippinglong ? "*skiplong" : "",
325                    ipf->rd && ipf->paused ? "*paused" : "");
326 }