+/*
+ * innduct
+ * tailing reliable realtime streaming feeder for inn
+ * infile.c - monitoring and handling of input files
+ *
+ * Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * (I believe that when you compile and link this as part of the inn2
+ * build, with the Makefile runes I have provided, all the libraries
+ * and files which end up included in innduct are licence-compatible
+ * with GPLv3. If not then please let me know. -Ian Jackson.)
+ */
+
+#include "innduct.h"
+
/*========== monitoring of input files ==========*/
static void feedfile_eof(InputFile *ipf) {
}
}
-static InputFile *open_input_file(const char *path) {
+InputFile *open_input_file(const char *path) {
int fd= open(path, O_RDWR);
if (fd<0) {
if (errno==ENOENT) return 0;
LIST_INIT(ipf->queue);
strcpy(ipf->path, path);
+ dbg("started input file %p %s", ipf, path);
+
return ipf;
}
-static void close_input_file(InputFile *ipf) { /* does not free */
+void close_input_file(InputFile *ipf) { /* does not free */
assert(!ipf->readable_callback); /* must have had ->on_cancel */
assert(!ipf->filemon); /* must have had inputfile_reading_stop */
assert(!ipf->rd); /* must have had inputfile_reading_stop */
const char *data, const char *how) {
warn("corrupted file: %s, offset %lu: %s: in %s",
ipf->path, (unsigned long)offset, how, sanitise(data,-1));
- ipf->readcount_err++;
- if (ipf->readcount_err > max_bad_data_initial +
- (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio)
+ ipf->counts.events[read_err]++;
+ if (ipf->counts.events[read_err] > max_bad_data_initial +
+ (ipf->counts.events[read_ok] + ipf->counts.events[read_blank])
+ / max_bad_data_ratio)
crash("too much garbage in input file! (%d errs, %d ok, %d blank)",
- ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank);
+ ipf->counts.events[read_err], ipf->counts.events[read_ok],
+ ipf->counts.events[read_blank]);
return OOP_CONTINUE;
}
if (data[0]==' ') {
if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked");
- ipf->readcount_blank++;
+ ipf->counts.events[read_blank]++;
return OOP_CONTINUE;
}
tokentextbuf[tokenlen]= 0;
if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax");
- ipf->readcount_ok++;
+ ipf->counts.events[read_ok]++;
art= xmalloc(sizeof(*art) - 1 + midlen + 1);
memset(art,0,sizeof(*art));
/*========== tailing input file ==========*/
+static void tailing_rable_on_time(InputFile *ipf);
+
static void *tailing_rable_call_time(oop_source *lp, struct timeval tv,
void *user) {
/* lifetime of ipf here is OK because destruction will cause
* on_cancel which will cancel this callback */
InputFile *ipf= user;
- dbg("**TRACT** ipf=%p called",ipf);
+ //dbg("**TRACT** ipf=%p called",ipf);
if (!ipf->fake_readable) return OOP_CONTINUE;
/* we just keep calling readable until our caller (oop_rd)
* has called try_read, and try_read has found EOF so given EAGAIN */
- dbg("**TRACT** ipf=%p reschedule",ipf);
- loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
+ //dbg("**TRACT** ipf=%p reschedule",ipf);
+ tailing_rable_on_time(ipf);
+ assert(ipf->readable_callback);
return ipf->readable_callback(loop, &ipf->readable,
ipf->readable_callback_user);
}
+static void tailing_rable_on_time(InputFile *ipf) {
+ loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
+ loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
+ /* on_time is not idempotent - it counts. So we need this to make
+ * sure we only have one outstanding, as otherwise our cancel doesn't work */
+}
+
static void tailing_on_cancel(struct oop_readable *rable) {
InputFile *ipf= (void*)rable;
- dbg("**TOR** ipf=%p on_cancel",ipf);
+ //dbg("**TOR** ipf=%p on_cancel",ipf);
if (ipf->filemon) filemon_stop(ipf);
- dbg("**TRACT** ipf=%p cancel",ipf);
+ //dbg("**TRACT** ipf=%p cancel",ipf);
loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
ipf->readable_callback= 0;
}
-static void tailing_make_readable(InputFile *ipf) {
- dbg("**TRACT** ipf=%p makereadable rcb=%p",ipf,
- (void*)ipf?ipf->readable_callback:0);
+void tailing_make_readable(InputFile *ipf) {
+ //dbg("**TRACT** ipf=%p makereadable rcb=%p",ipf,
+ // (void*)ipf?ipf->readable_callback:0);
if (!ipf || !ipf->readable_callback) /* so callers can be naive */
return;
ipf->fake_readable= 1;
- loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf);
+ tailing_rable_on_time(ipf);
}
static int tailing_on_readable(struct oop_readable *rable,
oop_readable_call *cb, void *user) {
InputFile *ipf= (void*)rable;
- dbg("**TOR** ipf=%p on_readable",ipf);
+ //dbg("**TOR** ipf=%p on_readable",ipf);
tailing_on_cancel(rable);
ipf->readable_callback= cb;
abort();
}
}
- dbg("**TOR** ipf=%p try_read r=%d",ipf,r);
+ //dbg("**TOR** ipf=%p try_read r=%d",ipf,r);
return r;
}
}
OOP_RD_SHORTREC_LONG,
};
-static void inputfile_reading_resume(InputFile *ipf) {
+void inputfile_reading_resume(InputFile *ipf) {
if (!ipf->rd) return;
if (!ipf->paused) return;
ipf->paused= 0;
}
-static void inputfile_reading_pause(InputFile *ipf) {
+void inputfile_reading_pause(InputFile *ipf) {
if (!ipf->rd) return;
if (ipf->paused) return;
oop_rd_cancel(ipf->rd);
ipf->paused= 1;
}
-static void inputfile_reading_start(InputFile *ipf) {
+void inputfile_reading_start(InputFile *ipf) {
assert(!ipf->rd);
ipf->readable.on_readable= tailing_on_readable;
ipf->readable.on_cancel= tailing_on_cancel;
inputfile_reading_resume(ipf);
}
-static void inputfile_reading_stop(InputFile *ipf) {
+void inputfile_reading_stop(InputFile *ipf) {
assert(ipf->rd);
inputfile_reading_pause(ipf);
oop_rd_delete(ipf->rd);
tailing_make_readable(flushing_input_file);
}
+char *dbg_report_ipf(InputFile *ipf) {
+ if (!ipf) return masprintf("none");
+
+ const char *slash= strrchr(ipf->path,'/');
+ const char *path= slash ? slash+1 : ipf->path;
+
+ return masprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s%s",
+ ipf, path,
+ ipf->queue.count, ipf->inprogress, ipf->autodefer,
+ (long)ipf->offset, ipf->fd,
+ ipf->rd ? "" : ",!rd",
+ ipf->skippinglong ? "*skiplong" : "",
+ ipf->rd && ipf->paused ? "*paused" : "");
+}