X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=innduct.git;a=blobdiff_plain;f=infile.c;h=a8f0558cfaa67e5d2d94257a1b8cb8409e5e396d;hp=223846eecc66635ab2d779b4b5f50b7fba4e7d1b;hb=066ebb56423371b95b755803cbaec910a035b0ee;hpb=f4aee95c41a0d6231d115386b8fbb23f6b8e349a diff --git a/infile.c b/infile.c index 223846e..a8f0558 100644 --- a/infile.c +++ b/infile.c @@ -1,3 +1,31 @@ +/* + * innduct + * tailing reliable realtime streaming feeder for inn + * infile.c - monitoring and handling of input files + * + * Copyright (C) 2010 Ian Jackson + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * (I believe that when you compile and link this as part of the inn2 + * build, with the Makefile runes I have provided, all the libraries + * and files which end up included in innduct are licence-compatible + * with GPLv3. If not then please let me know. -Ian Jackson.) + */ + +#include "innduct.h" + /*========== monitoring of input files ==========*/ static void feedfile_eof(InputFile *ipf) { @@ -15,7 +43,7 @@ static void feedfile_eof(InputFile *ipf) { } } -static InputFile *open_input_file(const char *path) { +InputFile *open_input_file(const char *path) { int fd= open(path, O_RDWR); if (fd<0) { if (errno==ENOENT) return 0; @@ -34,7 +62,7 @@ static InputFile *open_input_file(const char *path) { return ipf; } -static void close_input_file(InputFile *ipf) { /* does not free */ +void close_input_file(InputFile *ipf) { /* does not free */ assert(!ipf->readable_callback); /* must have had ->on_cancel */ assert(!ipf->filemon); /* must have had inputfile_reading_stop */ assert(!ipf->rd); /* must have had inputfile_reading_stop */ @@ -49,11 +77,13 @@ static void *feedfile_got_bad_data(InputFile *ipf, off_t offset, const char *data, const char *how) { warn("corrupted file: %s, offset %lu: %s: in %s", ipf->path, (unsigned long)offset, how, sanitise(data,-1)); - ipf->readcount_err++; - if (ipf->readcount_err > max_bad_data_initial + - (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio) + ipf->counts.events[read_err]++; + if (ipf->counts.events[read_err] > max_bad_data_initial + + (ipf->counts.events[read_ok] + ipf->counts.events[read_blank]) + / max_bad_data_ratio) crash("too much garbage in input file! (%d errs, %d ok, %d blank)", - ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank); + ipf->counts.events[read_err], ipf->counts.events[read_ok], + ipf->counts.events[read_blank]); return OOP_CONTINUE; } @@ -101,7 +131,7 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, if (data[0]==' ') { if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked"); - ipf->readcount_blank++; + ipf->counts.events[read_blank]++; return OOP_CONTINUE; } @@ -116,7 +146,7 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, tokentextbuf[tokenlen]= 0; if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax"); - ipf->readcount_ok++; + ipf->counts.events[read_ok]++; art= xmalloc(sizeof(*art) - 1 + midlen + 1); memset(art,0,sizeof(*art)); @@ -148,47 +178,57 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, /*========== tailing input file ==========*/ +static void tailing_rable_on_time(InputFile *ipf); + static void *tailing_rable_call_time(oop_source *lp, struct timeval tv, void *user) { /* lifetime of ipf here is OK because destruction will cause * on_cancel which will cancel this callback */ InputFile *ipf= user; - dbg("**TRACT** ipf=%p called",ipf); + //dbg("**TRACT** ipf=%p called",ipf); if (!ipf->fake_readable) return OOP_CONTINUE; /* we just keep calling readable until our caller (oop_rd) * has called try_read, and try_read has found EOF so given EAGAIN */ - dbg("**TRACT** ipf=%p reschedule",ipf); - loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); + //dbg("**TRACT** ipf=%p reschedule",ipf); + tailing_rable_on_time(ipf); + assert(ipf->readable_callback); return ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user); } +static void tailing_rable_on_time(InputFile *ipf) { + loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); + loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); + /* on_time is not idempotent - it counts. So we need this to make + * sure we only have one outstanding, as otherwise our cancel doesn't work */ +} + static void tailing_on_cancel(struct oop_readable *rable) { InputFile *ipf= (void*)rable; - dbg("**TOR** ipf=%p on_cancel",ipf); + //dbg("**TOR** ipf=%p on_cancel",ipf); if (ipf->filemon) filemon_stop(ipf); - dbg("**TRACT** ipf=%p cancel",ipf); + //dbg("**TRACT** ipf=%p cancel",ipf); loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); ipf->readable_callback= 0; } -static void tailing_make_readable(InputFile *ipf) { - dbg("**TRACT** ipf=%p makereadable rcb=%p",ipf, - (void*)ipf?ipf->readable_callback:0); +void tailing_make_readable(InputFile *ipf) { + //dbg("**TRACT** ipf=%p makereadable rcb=%p",ipf, + // (void*)ipf?ipf->readable_callback:0); if (!ipf || !ipf->readable_callback) /* so callers can be naive */ return; ipf->fake_readable= 1; - loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); + tailing_rable_on_time(ipf); } static int tailing_on_readable(struct oop_readable *rable, oop_readable_call *cb, void *user) { InputFile *ipf= (void*)rable; - dbg("**TOR** ipf=%p on_readable",ipf); + //dbg("**TOR** ipf=%p on_readable",ipf); tailing_on_cancel(rable); ipf->readable_callback= cb; @@ -222,7 +262,7 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, abort(); } } - dbg("**TOR** ipf=%p try_read r=%d",ipf,r); + //dbg("**TOR** ipf=%p try_read r=%d",ipf,r); return r; } } @@ -235,7 +275,7 @@ static const oop_rd_style feedfile_rdstyle= { OOP_RD_SHORTREC_LONG, }; -static void inputfile_reading_resume(InputFile *ipf) { +void inputfile_reading_resume(InputFile *ipf) { if (!ipf->rd) return; if (!ipf->paused) return; @@ -246,14 +286,14 @@ static void inputfile_reading_resume(InputFile *ipf) { ipf->paused= 0; } -static void inputfile_reading_pause(InputFile *ipf) { +void inputfile_reading_pause(InputFile *ipf) { if (!ipf->rd) return; if (ipf->paused) return; oop_rd_cancel(ipf->rd); ipf->paused= 1; } -static void inputfile_reading_start(InputFile *ipf) { +void inputfile_reading_start(InputFile *ipf) { assert(!ipf->rd); ipf->readable.on_readable= tailing_on_readable; ipf->readable.on_cancel= tailing_on_cancel; @@ -271,7 +311,7 @@ static void inputfile_reading_start(InputFile *ipf) { inputfile_reading_resume(ipf); } -static void inputfile_reading_stop(InputFile *ipf) { +void inputfile_reading_stop(InputFile *ipf) { assert(ipf->rd); inputfile_reading_pause(ipf); oop_rd_delete(ipf->rd); @@ -284,3 +324,17 @@ void filepoll(void) { tailing_make_readable(flushing_input_file); } +char *dbg_report_ipf(InputFile *ipf) { + if (!ipf) return xasprintf("none"); + + const char *slash= strrchr(ipf->path,'/'); + const char *path= slash ? slash+1 : ipf->path; + + return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s%s", + ipf, path, + ipf->queue.count, ipf->inprogress, ipf->autodefer, + (long)ipf->offset, ipf->fd, + ipf->rd ? "" : ",!rd", + ipf->skippinglong ? "*skiplong" : "", + ipf->rd && ipf->paused ? "*paused" : ""); +}