chiark / gitweb /
wip split into multiple files and make compile
[innduct.git] / infile.c
1 /*========== monitoring of input files ==========*/
2
3 static void feedfile_eof(InputFile *ipf) {
4   assert(ipf != main_input_file); /* promised by tailing_try_read */
5   inputfile_reading_stop(ipf);
6
7   if (ipf == flushing_input_file) {
8     assert(sms==sm_SEPARATED || sms==sm_DROPPING);
9     if (main_input_file) inputfile_reading_start(main_input_file);
10     statemc_check_flushing_done();
11   } else if (ipf == backlog_input_file) {
12     statemc_check_backlog_done();
13   } else {
14     abort(); /* supposed to wait rather than get EOF on main input file */
15   }
16 }
17
18 static InputFile *open_input_file(const char *path) {
19   int fd= open(path, O_RDWR);
20   if (fd<0) {
21     if (errno==ENOENT) return 0;
22     sysdie("unable to open input file %s", path);
23   }
24   assert(fd>0);
25
26   InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1);
27   memset(ipf,0,sizeof(*ipf));
28
29   ipf->fd= fd;
30   ipf->autodefer= -1;
31   LIST_INIT(ipf->queue);
32   strcpy(ipf->path, path);
33
34   return ipf;
35 }
36
37 static void close_input_file(InputFile *ipf) { /* does not free */
38   assert(!ipf->readable_callback); /* must have had ->on_cancel */
39   assert(!ipf->filemon); /* must have had inputfile_reading_stop */
40   assert(!ipf->rd); /* must have had inputfile_reading_stop */
41   assert(!ipf->inprogress); /* no dangling pointers pointing here */
42   xclose_perhaps(&ipf->fd, "input file ", ipf->path);
43 }
44
45
46 /*---------- dealing with articles read in the input file ----------*/
47
48 static void *feedfile_got_bad_data(InputFile *ipf, off_t offset,
49                                    const char *data, const char *how) {
50   warn("corrupted file: %s, offset %lu: %s: in %s",
51        ipf->path, (unsigned long)offset, how, sanitise(data,-1));
52   ipf->readcount_err++;
53   if (ipf->readcount_err > max_bad_data_initial +
54       (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio)
55     crash("too much garbage in input file!  (%d errs, %d ok, %d blank)",
56           ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank);
57   return OOP_CONTINUE;
58 }
59
60 static void *feedfile_read_err(oop_source *lp, oop_read *rd,
61                                oop_rd_event ev, const char *errmsg,
62                                int errnoval, const char *data, size_t recsz,
63                                void *ipf_v) {
64   InputFile *ipf= ipf_v;
65   assert(ev == OOP_RD_SYSTEM);
66   errno= errnoval;
67   syscrash("error reading input file: %s, offset %lu",
68            ipf->path, (unsigned long)ipf->offset);
69 }
70
71 static void *feedfile_got_article(oop_source *lp, oop_read *rd,
72                                   oop_rd_event ev, const char *errmsg,
73                                   int errnoval, const char *data, size_t recsz,
74                                   void *ipf_v) {
75   InputFile *ipf= ipf_v;
76   Article *art;
77   char tokentextbuf[sizeof(TOKEN)*2+3];
78
79   if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; }
80
81   off_t old_offset= ipf->offset;
82   ipf->offset += recsz + !!(ev == OOP_RD_OK);
83
84 #define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m);
85
86   if (ev==OOP_RD_PARTREC)
87     feedfile_got_bad_data(ipf,old_offset,data,"missing final newline");
88     /* but process it anyway */
89
90   if (ipf->skippinglong) {
91     if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */
92     return OOP_CONTINUE;
93   }
94   if (ev==OOP_RD_LONG) {
95     ipf->skippinglong= 1;
96     X_BAD_DATA("overly long line");
97   }
98
99   if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte");
100   if (!recsz) X_BAD_DATA("empty line");
101
102   if (data[0]==' ') {
103     if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
104     ipf->readcount_blank++;
105     return OOP_CONTINUE;
106   }
107
108   char *space= strchr(data,' ');
109   int tokenlen= space-data;
110   int midlen= (int)recsz-tokenlen-1;
111   if (midlen <= 2) X_BAD_DATA("no room for messageid");
112   if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid");
113
114   if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length");
115   memcpy(tokentextbuf, data, tokenlen);
116   tokentextbuf[tokenlen]= 0;
117   if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
118
119   ipf->readcount_ok++;
120
121   art= xmalloc(sizeof(*art) - 1 + midlen + 1);
122   memset(art,0,sizeof(*art));
123   art->state= art_Unchecked;
124   art->midlen= midlen;
125   art->ipf= ipf;  ipf->inprogress++;
126   art->token= TextToToken(tokentextbuf);
127   art->offset= old_offset;
128   art->blanklen= recsz;
129   strcpy(art->messageid, space+1);
130
131   if (ipf->autodefer >= 0) {
132     article_autodefer(ipf, art);
133   } else {
134     LIST_ADDTAIL(ipf->queue, art);
135
136     if (ipf==backlog_input_file)
137       article_check_expired(art);
138   }
139
140   if (sms==sm_NORMAL && ipf==main_input_file &&
141       ipf->offset >= target_max_feedfile_size)
142     statemc_start_flush("feed file size");
143
144   check_assign_articles(); /* may destroy conn but that's OK */
145   check_reading_pause_resume(ipf);
146   return OOP_CONTINUE;
147 }
148
149 /*========== tailing input file ==========*/
150
151 static void *tailing_rable_call_time(oop_source *lp, struct timeval tv,
152                                      void *user) {
153   /* lifetime of ipf here is OK because destruction will cause
154    * on_cancel which will cancel this callback */
155   InputFile *ipf= user;
156
157   dbg("**TRACT** ipf=%p called",ipf);
158   if (!ipf->fake_readable) return OOP_CONTINUE;
159
160   /* we just keep calling readable until our caller (oop_rd)
161    * has called try_read, and try_read has found EOF so given EAGAIN */
162   dbg("**TRACT** ipf=%p reschedule",ipf);
163   loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
164
165   return ipf->readable_callback(loop, &ipf->readable,
166                                 ipf->readable_callback_user);
167 }
168
169 static void tailing_on_cancel(struct oop_readable *rable) {
170   InputFile *ipf= (void*)rable;
171   dbg("**TOR** ipf=%p on_cancel",ipf);
172
173   if (ipf->filemon) filemon_stop(ipf);
174   dbg("**TRACT** ipf=%p cancel",ipf);
175   loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
176   ipf->readable_callback= 0;
177 }
178
179 static void tailing_make_readable(InputFile *ipf) {
180   dbg("**TRACT** ipf=%p makereadable rcb=%p",ipf,
181       (void*)ipf?ipf->readable_callback:0);
182   if (!ipf || !ipf->readable_callback) /* so callers can be naive */
183     return;
184   ipf->fake_readable= 1;
185   loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
186 }
187
188 static int tailing_on_readable(struct oop_readable *rable,
189                                 oop_readable_call *cb, void *user) {
190   InputFile *ipf= (void*)rable;
191   dbg("**TOR** ipf=%p on_readable",ipf);
192
193   tailing_on_cancel(rable);
194   ipf->readable_callback= cb;
195   ipf->readable_callback_user= user;
196   filemon_start(ipf);
197   tailing_make_readable(ipf);
198   return 0;
199 }
200
201 static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer,
202                                 size_t length) {
203   InputFile *ipf= (void*)rable;
204   for (;;) {
205     ssize_t r= read(ipf->fd, buffer, length);
206     if (r==-1) {
207       if (errno==EINTR) continue;
208       ipf->fake_readable= 0;
209       return r;
210     }
211     if (!r) {
212       if (ipf==main_input_file) {
213         errno=EAGAIN;
214         ipf->fake_readable= 0;
215         return -1;
216       } else if (ipf==flushing_input_file) {
217         assert(ipf->rd);
218         assert(sms==sm_SEPARATED || sms==sm_DROPPING);
219       } else if (ipf==backlog_input_file) {
220         assert(ipf->rd);
221       } else {
222         abort();
223       }
224     }
225     dbg("**TOR** ipf=%p try_read r=%d",ipf,r);
226     return r;
227   }
228 }
229
230 /*---------- interface to start and stop an input file ----------*/
231
232 static const oop_rd_style feedfile_rdstyle= {
233   OOP_RD_DELIM_STRIP, '\n',
234   OOP_RD_NUL_PERMIT,
235   OOP_RD_SHORTREC_LONG,
236 };
237
238 static void inputfile_reading_resume(InputFile *ipf) {
239   if (!ipf->rd) return;
240   if (!ipf->paused) return;
241
242   int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE,
243                      feedfile_got_article,ipf, feedfile_read_err, ipf);
244   if (r) syscrash("unable start reading feedfile %s",ipf->path);
245
246   ipf->paused= 0;
247 }
248
249 static void inputfile_reading_pause(InputFile *ipf) {
250   if (!ipf->rd) return;
251   if (ipf->paused) return;
252   oop_rd_cancel(ipf->rd);
253   ipf->paused= 1;
254 }
255
256 static void inputfile_reading_start(InputFile *ipf) {
257   assert(!ipf->rd);
258   ipf->readable.on_readable= tailing_on_readable;
259   ipf->readable.on_cancel=   tailing_on_cancel;
260   ipf->readable.try_read=    tailing_try_read;
261   ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */
262   ipf->readable.delete_kill= 0;
263
264   ipf->readable_callback= 0;
265   ipf->readable_callback_user= 0;
266
267   ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0);
268   assert(ipf->rd);
269
270   ipf->paused= 1;
271   inputfile_reading_resume(ipf);
272 }
273
274 static void inputfile_reading_stop(InputFile *ipf) {
275   assert(ipf->rd);
276   inputfile_reading_pause(ipf);
277   oop_rd_delete(ipf->rd);
278   ipf->rd= 0;
279   assert(!ipf->filemon); /* we shouldn't be monitoring it now */
280 }
281
282 void filepoll(void) {
283   tailing_make_readable(main_input_file);
284   tailing_make_readable(flushing_input_file);
285 }
286