1 /*========== monitoring of input files ==========*/
3 static void feedfile_eof(InputFile *ipf) {
4 assert(ipf != main_input_file); /* promised by tailing_try_read */
5 inputfile_reading_stop(ipf);
7 if (ipf == flushing_input_file) {
8 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
9 if (main_input_file) inputfile_reading_start(main_input_file);
10 statemc_check_flushing_done();
11 } else if (ipf == backlog_input_file) {
12 statemc_check_backlog_done();
14 abort(); /* supposed to wait rather than get EOF on main input file */
18 static InputFile *open_input_file(const char *path) {
19 int fd= open(path, O_RDWR);
21 if (errno==ENOENT) return 0;
22 sysdie("unable to open input file %s", path);
26 InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
27 memset(ipf,0,sizeof(*ipf));
31 LIST_INIT(ipf->queue);
32 strcpy(ipf->path, path);
37 static void close_input_file(InputFile *ipf) { /* does not free */
38 assert(!ipf->readable_callback); /* must have had ->on_cancel */
39 assert(!ipf->filemon); /* must have had inputfile_reading_stop */
40 assert(!ipf->rd); /* must have had inputfile_reading_stop */
41 assert(!ipf->inprogress); /* no dangling pointers pointing here */
42 xclose_perhaps(&ipf->fd, "input file ", ipf->path);
46 /*---------- dealing with articles read in the input file ----------*/
48 static void *feedfile_got_bad_data(InputFile *ipf, off_t offset,
49 const char *data, const char *how) {
50 warn("corrupted file: %s, offset %lu: %s: in %s",
51 ipf->path, (unsigned long)offset, how, sanitise(data,-1));
53 if (ipf->readcount_err > max_bad_data_initial +
54 (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio)
55 crash("too much garbage in input file! (%d errs, %d ok, %d blank)",
56 ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank);
60 static void *feedfile_read_err(oop_source *lp, oop_read *rd,
61 oop_rd_event ev, const char *errmsg,
62 int errnoval, const char *data, size_t recsz,
64 InputFile *ipf= ipf_v;
65 assert(ev == OOP_RD_SYSTEM);
67 syscrash("error reading input file: %s, offset %lu",
68 ipf->path, (unsigned long)ipf->offset);
71 static void *feedfile_got_article(oop_source *lp, oop_read *rd,
72 oop_rd_event ev, const char *errmsg,
73 int errnoval, const char *data, size_t recsz,
75 InputFile *ipf= ipf_v;
77 char tokentextbuf[sizeof(TOKEN)*2+3];
79 if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
81 off_t old_offset= ipf->offset;
82 ipf->offset += recsz + !!(ev == OOP_RD_OK);
84 #define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m);
86 if (ev==OOP_RD_PARTREC)
87 feedfile_got_bad_data(ipf,old_offset,data,"missing final newline");
88 /* but process it anyway */
90 if (ipf->skippinglong) {
91 if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */
94 if (ev==OOP_RD_LONG) {
96 X_BAD_DATA("overly long line");
99 if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte");
100 if (!recsz) X_BAD_DATA("empty line");
103 if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
104 ipf->readcount_blank++;
108 char *space= strchr(data,' ');
109 int tokenlen= space-data;
110 int midlen= (int)recsz-tokenlen-1;
111 if (midlen <= 2) X_BAD_DATA("no room for messageid");
112 if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid");
114 if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length");
115 memcpy(tokentextbuf, data, tokenlen);
116 tokentextbuf[tokenlen]= 0;
117 if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
121 art= xmalloc(sizeof(*art) - 1 + midlen + 1);
122 memset(art,0,sizeof(*art));
123 art->state= art_Unchecked;
125 art->ipf= ipf; ipf->inprogress++;
126 art->token= TextToToken(tokentextbuf);
127 art->offset= old_offset;
128 art->blanklen= recsz;
129 strcpy(art->messageid, space+1);
131 if (ipf->autodefer >= 0) {
132 article_autodefer(ipf, art);
134 LIST_ADDTAIL(ipf->queue, art);
136 if (ipf==backlog_input_file)
137 article_check_expired(art);
140 if (sms==sm_NORMAL && ipf==main_input_file &&
141 ipf->offset >= target_max_feedfile_size)
142 statemc_start_flush("feed file size");
144 check_assign_articles(); /* may destroy conn but that's OK */
145 check_reading_pause_resume(ipf);
149 /*========== tailing input file ==========*/
151 static void *tailing_rable_call_time(oop_source *lp, struct timeval tv,
153 /* lifetime of ipf here is OK because destruction will cause
154 * on_cancel which will cancel this callback */
155 InputFile *ipf= user;
157 dbg("**TRACT** ipf=%p called",ipf);
158 if (!ipf->fake_readable) return OOP_CONTINUE;
160 /* we just keep calling readable until our caller (oop_rd)
161 * has called try_read, and try_read has found EOF so given EAGAIN */
162 dbg("**TRACT** ipf=%p reschedule",ipf);
163 loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
165 return ipf->readable_callback(loop, &ipf->readable,
166 ipf->readable_callback_user);
169 static void tailing_on_cancel(struct oop_readable *rable) {
170 InputFile *ipf= (void*)rable;
171 dbg("**TOR** ipf=%p on_cancel",ipf);
173 if (ipf->filemon) filemon_stop(ipf);
174 dbg("**TRACT** ipf=%p cancel",ipf);
175 loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
176 ipf->readable_callback= 0;
179 static void tailing_make_readable(InputFile *ipf) {
180 dbg("**TRACT** ipf=%p makereadable rcb=%p",ipf,
181 (void*)ipf?ipf->readable_callback:0);
182 if (!ipf || !ipf->readable_callback) /* so callers can be naive */
184 ipf->fake_readable= 1;
185 loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
188 static int tailing_on_readable(struct oop_readable *rable,
189 oop_readable_call *cb, void *user) {
190 InputFile *ipf= (void*)rable;
191 dbg("**TOR** ipf=%p on_readable",ipf);
193 tailing_on_cancel(rable);
194 ipf->readable_callback= cb;
195 ipf->readable_callback_user= user;
197 tailing_make_readable(ipf);
201 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
203 InputFile *ipf= (void*)rable;
205 ssize_t r= read(ipf->fd, buffer, length);
207 if (errno==EINTR) continue;
208 ipf->fake_readable= 0;
212 if (ipf==main_input_file) {
214 ipf->fake_readable= 0;
216 } else if (ipf==flushing_input_file) {
218 assert(sms==sm_SEPARATED || sms==sm_DROPPING);
219 } else if (ipf==backlog_input_file) {
225 dbg("**TOR** ipf=%p try_read r=%d",ipf,r);
230 /*---------- interface to start and stop an input file ----------*/
232 static const oop_rd_style feedfile_rdstyle= {
233 OOP_RD_DELIM_STRIP, '\n',
235 OOP_RD_SHORTREC_LONG,
238 static void inputfile_reading_resume(InputFile *ipf) {
239 if (!ipf->rd) return;
240 if (!ipf->paused) return;
242 int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
243 feedfile_got_article,ipf, feedfile_read_err, ipf);
244 if (r) syscrash("unable start reading feedfile %s",ipf->path);
249 static void inputfile_reading_pause(InputFile *ipf) {
250 if (!ipf->rd) return;
251 if (ipf->paused) return;
252 oop_rd_cancel(ipf->rd);
256 static void inputfile_reading_start(InputFile *ipf) {
258 ipf->readable.on_readable= tailing_on_readable;
259 ipf->readable.on_cancel= tailing_on_cancel;
260 ipf->readable.try_read= tailing_try_read;
261 ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
262 ipf->readable.delete_kill= 0;
264 ipf->readable_callback= 0;
265 ipf->readable_callback_user= 0;
267 ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
271 inputfile_reading_resume(ipf);
274 static void inputfile_reading_stop(InputFile *ipf) {
276 inputfile_reading_pause(ipf);
277 oop_rd_delete(ipf->rd);
279 assert(!ipf->filemon); /* we shouldn't be monitoring it now */
282 void filepoll(void) {
283 tailing_make_readable(main_input_file);
284 tailing_make_readable(flushing_input_file);