3 * tailing reliable realtime streaming feeder for inn
4 * infile.c - monitoring and handling of input files
6 * Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
8 * This program is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation, either version 3 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 * (I believe that when you compile and link this as part of the inn2
22 * build, with the Makefile runes I have provided, all the libraries
23 * and files which end up included in innduct are licence-compatible
24 * with GPLv3. If not then please let me know. -Ian Jackson.)
29 /*========== monitoring of input files ==========*/
31 static void feedfile_eof(InputFile *ipf) {
32 assert(ipf != main_input_file); /* promised by tailing_try_read */
33 inputfile_reading_stop(ipf);
35 if (ipf == flushing_input_file) {
36 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
37 if (main_input_file) inputfile_reading_start(main_input_file);
38 statemc_check_flushing_done();
39 } else if (ipf == backlog_input_file) {
40 statemc_check_backlog_done();
42 abort(); /* supposed to wait rather than get EOF on main input file */
46 InputFile *open_input_file(const char *path) {
47 int fd= open(path, O_RDWR);
49 if (errno==ENOENT) return 0;
50 sysdie("unable to open input file %s", path);
54 InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
55 memset(ipf,0,sizeof(*ipf));
59 LIST_INIT(ipf->queue);
60 strcpy(ipf->path, path);
62 dbg("started input file %p %s", ipf, path);
67 void close_input_file(InputFile *ipf) { /* does not free */
68 assert(!ipf->readable_callback); /* must have had ->on_cancel */
69 assert(!ipf->filemon); /* must have had inputfile_reading_stop */
70 assert(!ipf->rd); /* must have had inputfile_reading_stop */
71 assert(!ipf->inprogress); /* no dangling pointers pointing here */
72 xclose_perhaps(&ipf->fd, "input file ", ipf->path);
76 /*---------- dealing with articles read in the input file ----------*/
78 static void *feedfile_got_bad_data(InputFile *ipf, off_t offset,
79 const char *data, const char *how) {
80 warn("corrupted file: %s, offset %lu: %s: in %s",
81 ipf->path, (unsigned long)offset, how, sanitise(data,-1));
82 ipf->counts.events[read_err]++;
83 if (ipf->counts.events[read_err] > max_bad_data_initial +
84 (ipf->counts.events[read_ok] + ipf->counts.events[read_blank])
86 crash("too much garbage in input file! (%d errs, %d ok, %d blank)",
87 ipf->counts.events[read_err], ipf->counts.events[read_ok],
88 ipf->counts.events[read_blank]);
92 static void *feedfile_read_err(oop_source *lp, oop_read *rd,
93 oop_rd_event ev, const char *errmsg,
94 int errnoval, const char *data, size_t recsz,
96 InputFile *ipf= ipf_v;
97 assert(ev == OOP_RD_SYSTEM);
99 syscrash("error reading input file: %s, offset %lu",
100 ipf->path, (unsigned long)ipf->offset);
103 static void *feedfile_got_article(oop_source *lp, oop_read *rd,
104 oop_rd_event ev, const char *errmsg,
105 int errnoval, const char *data, size_t recsz,
107 InputFile *ipf= ipf_v;
109 char tokentextbuf[sizeof(TOKEN)*2+3];
111 if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
113 off_t old_offset= ipf->offset;
114 ipf->offset += recsz + !!(ev == OOP_RD_OK);
116 #define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m);
118 if (ev==OOP_RD_PARTREC)
119 feedfile_got_bad_data(ipf,old_offset,data,"missing final newline");
120 /* but process it anyway */
122 if (ipf->skippinglong) {
123 if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */
126 if (ev==OOP_RD_LONG) {
127 ipf->skippinglong= 1;
128 X_BAD_DATA("overly long line");
131 if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte");
132 if (!recsz) X_BAD_DATA("empty line");
135 if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
136 ipf->counts.events[read_blank]++;
140 char *space= strchr(data,' ');
141 int tokenlen= space-data;
142 int midlen= (int)recsz-tokenlen-1;
143 if (midlen <= 2) X_BAD_DATA("no room for messageid");
144 if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid");
146 if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length");
147 memcpy(tokentextbuf, data, tokenlen);
148 tokentextbuf[tokenlen]= 0;
149 if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
151 ipf->counts.events[read_ok]++;
153 art= xmalloc(sizeof(*art) - 1 + midlen + 1);
154 memset(art,0,sizeof(*art));
155 art->state= art_Unchecked;
157 art->ipf= ipf; ipf->inprogress++;
158 art->token= TextToToken(tokentextbuf);
159 art->offset= old_offset;
160 art->blanklen= recsz;
161 strcpy(art->messageid, space+1);
163 if (ipf->autodefer >= 0) {
164 article_autodefer(ipf, art);
166 LIST_ADDTAIL(ipf->queue, art);
168 if (ipf==backlog_input_file)
169 article_check_expired(art);
172 if (sms==sm_NORMAL && ipf==main_input_file &&
173 ipf->offset >= target_max_feedfile_size)
174 statemc_start_flush("feed file size");
176 check_assign_articles(); /* may destroy conn but that's OK */
177 check_reading_pause_resume(ipf);
181 /*========== tailing input file ==========*/
183 static void tailing_rable_on_time(InputFile *ipf);
185 static void *tailing_rable_call_time(oop_source *lp, struct timeval tv,
187 /* lifetime of ipf here is OK because destruction will cause
188 * on_cancel which will cancel this callback */
189 InputFile *ipf= user;
191 //dbg("**TRACT** ipf=%p called",ipf);
192 if (!ipf->fake_readable) return OOP_CONTINUE;
194 /* we just keep calling readable until our caller (oop_rd)
195 * has called try_read, and try_read has found EOF so given EAGAIN */
196 //dbg("**TRACT** ipf=%p reschedule",ipf);
197 tailing_rable_on_time(ipf);
199 assert(ipf->readable_callback);
200 return ipf->readable_callback(loop, &ipf->readable,
201 ipf->readable_callback_user);
204 static void tailing_rable_on_time(InputFile *ipf) {
205 loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
206 loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
207 /* on_time is not idempotent - it counts. So we need this to make
208 * sure we only have one outstanding, as otherwise our cancel doesn't work */
211 static void tailing_on_cancel(struct oop_readable *rable) {
212 InputFile *ipf= (void*)rable;
213 //dbg("**TOR** ipf=%p on_cancel",ipf);
215 if (ipf->filemon) filemon_stop(ipf);
216 //dbg("**TRACT** ipf=%p cancel",ipf);
217 loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
218 ipf->readable_callback= 0;
221 void tailing_make_readable(InputFile *ipf) {
222 //dbg("**TRACT** ipf=%p makereadable rcb=%p",ipf,
223 // (void*)ipf?ipf->readable_callback:0);
224 if (!ipf || !ipf->readable_callback) /* so callers can be naive */
226 ipf->fake_readable= 1;
227 tailing_rable_on_time(ipf);
230 static int tailing_on_readable(struct oop_readable *rable,
231 oop_readable_call *cb, void *user) {
232 InputFile *ipf= (void*)rable;
233 //dbg("**TOR** ipf=%p on_readable",ipf);
235 tailing_on_cancel(rable);
236 ipf->readable_callback= cb;
237 ipf->readable_callback_user= user;
239 tailing_make_readable(ipf);
243 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
245 InputFile *ipf= (void*)rable;
247 ssize_t r= read(ipf->fd, buffer, length);
249 if (errno==EINTR) continue;
250 ipf->fake_readable= 0;
254 if (ipf==main_input_file) {
256 ipf->fake_readable= 0;
258 } else if (ipf==flushing_input_file) {
260 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
261 } else if (ipf==backlog_input_file) {
267 //dbg("**TOR** ipf=%p try_read r=%d",ipf,r);
272 /*---------- interface to start and stop an input file ----------*/
274 static const oop_rd_style feedfile_rdstyle= {
275 OOP_RD_DELIM_STRIP, '\n',
277 OOP_RD_SHORTREC_LONG,
280 void inputfile_reading_resume(InputFile *ipf) {
281 if (!ipf->rd) return;
282 if (!ipf->paused) return;
284 int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
285 feedfile_got_article,ipf, feedfile_read_err, ipf);
286 if (r) syscrash("unable start reading feedfile %s",ipf->path);
291 void inputfile_reading_pause(InputFile *ipf) {
292 if (!ipf->rd) return;
293 if (ipf->paused) return;
294 oop_rd_cancel(ipf->rd);
298 void inputfile_reading_start(InputFile *ipf) {
300 ipf->readable.on_readable= tailing_on_readable;
301 ipf->readable.on_cancel= tailing_on_cancel;
302 ipf->readable.try_read= tailing_try_read;
303 ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
304 ipf->readable.delete_kill= 0;
306 ipf->readable_callback= 0;
307 ipf->readable_callback_user= 0;
309 ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
313 inputfile_reading_resume(ipf);
316 void inputfile_reading_stop(InputFile *ipf) {
318 inputfile_reading_pause(ipf);
319 oop_rd_delete(ipf->rd);
321 assert(!ipf->filemon); /* we shouldn't be monitoring it now */
324 void filepoll(void) {
325 tailing_make_readable(main_input_file);
326 tailing_make_readable(flushing_input_file);
329 char *dbg_report_ipf(InputFile *ipf) {
330 if (!ipf) return masprintf("none");
332 const char *slash= strrchr(ipf->path,'/');
333 const char *path= slash ? slash+1 : ipf->path;
335 return masprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s%s",
337 ipf->queue.count, ipf->inprogress, ipf->autodefer,
338 (long)ipf->offset, ipf->fd,
339 ipf->rd ? "" : ",!rd",
340 ipf->skippinglong ? "*skiplong" : "",
341 ipf->rd && ipf->paused ? "*paused" : "");