From: Ian Jackson Date: Mon, 26 Apr 2010 10:38:24 +0000 (+0100) Subject: wip compile X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=commitdiff_plain;h=a25ae985e43ba31d8bcd0f7dbbf30d08c53b66f1 wip compile --- diff --git a/backends/innduct.c b/backends/innduct.c index 6cb8eb4..03737c8 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -247,6 +247,7 @@ typedef struct Conn Conn; typedef struct Article Article; typedef struct InputFile InputFile; typedef struct XmitDetails XmitDetails; +typedef struct Filemon_Perfile Filemon_Perfile; typedef enum StateMachineState StateMachineState; DEFLIST(Conn); @@ -260,10 +261,6 @@ static void *conn_write_some_xmits(Conn *conn); static void xmit_free(XmitDetails *d); -static int filemon_init(void); -static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path); -static void filemon_callback(void); - static void statemc_setstate(StateMachineState newsms, int periods, const char *forlog, const char *why); static void check_master_queue(void); @@ -281,6 +278,11 @@ static void open_defer(void); static void inputfile_tailing_start(InputFile *ipf); static void inputfile_tailing_stop(InputFile *ipf); +static int filemon_init(void); +static void filemon_start(InputFile *ipf); +static void filemon_stop(InputFile *ipf); +static void filemon_callback(InputFile *ipf); + /*----- configuration options -----*/ static char *sitename, *feedfile; @@ -290,6 +292,11 @@ static int quiet_multiple=0, become_daemon=1; static int max_connections=10, max_queue_per_conn=200; static int target_max_feedfile_size=100000; +static double max_bad_data_ratio= 0.01; +static int max_bad_data_initial= 30; + /* in one corrupt 4096-byte block the number of newlines has + * mean 16 and standard deviation 3.99. 30 corresponds to z=+3.5 */ + static int connection_setup_timeout=200, port=119, try_stream=1; static int inndcomm_flush_timeout=100; static int reconnect_delay_periods, flushfail_retry_periods, open_wait_periods; @@ -363,13 +370,14 @@ struct InputFile { void *readable_callback_user; int fd; - struct Filemon_Perfile *filemon; + Filemon_Perfile *filemon; oop_read *rd; long inprogress; /* no. of articles read but not processed */ off_t offset; int counts[art_MaxState][RCI_max]; + int readcount_ok, readcount_blank, readcount_err; char path[]; }; @@ -740,7 +748,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { setnonblock(conn->fd, 1); conn->max_queue= conn->stream ? max_queue_per_conn : 1; LIST_ADDHEAD(conns, conn); - notice("#%d connected %s", conn->fd, conn->stream ? "streaming" : "plain"); + notice("C%d connected %s", conn->fd, conn->stream ? "streaming" : "plain"); connect_attempt_discard(); check_master_queue(); return 0; @@ -938,7 +946,7 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) { xmit_free(d); char *m= xvasprintf(fmt,al); - warn("#%d connection failed, requeueing " RCI_TRIPLE_FMT_BASE ": %s", + warn("C%d connection failed (requeueing " RCI_TRIPLE_FMT_BASE "): %s", conn->fd, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); free(m); @@ -1226,7 +1234,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, if (code!=205 && code!=503) { connfail(conn, "peer gave unexpected response to QUIT: %s", sani); } else { - notice("#%d idle connection closed\n"); + notice("C%d idle connection closed\n"); assert(!conn->waiting.count); assert(!conn->priority.count); assert(!conn->sent.count); @@ -1346,77 +1354,86 @@ static void close_input_file(InputFile *ipf) { /*---------- dealing with articles read in the input file ----------*/ -void *feedfile_got_article(oop_source *lp, oop_read *rd, - oop_rd_event ev, const char *errmsg, - int errnoval, const char *data, size_t recsz, - void *ipf_v) { +static void *feedfile_got_bad_data(InputFile *ipf, off_t offset, + const char *data, size_t recsz, + const char *how) { + warn("corrupted file: %s, offset %lu: %s: %s", + ipf->path, (unsigned long)offset, how, sanitise(data)); + ipf->readcount_err++; + if (ipf->readcount_err > max_bad_data_initial + + (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio) + die("too much garbage in input file! (%d errs, %d ok, %d blank)", + ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank); + return OOP_CONTINUE; +} + +static void *feedfile_got_article(oop_source *lp, oop_read *rd, + oop_rd_event ev, const char *errmsg, + int errnoval, const char *data, size_t recsz, + void *ipf_v) { InputFile *ipf= ipf_v; Article *art; char tokentextbuf[sizeof(TOKEN)*2+3]; if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; } - if (data[0] && data[0]!=' ') { - char *space= strchr(data,' '); - int tokenlen= space-data; - int midlen= (int)recsz-tokenlen-1; - if (midlen < 0) goto bad_data; - - if (tokenlen != sizeof(TOKEN)*2+2) goto bad_data; - memcpy(tokentextbuf, data, tokenlen); - tokentextbuf[tokenlen]= 0; - if (!IsToken(tokentextbuf)) goto bad_data; - - art= xmalloc(sizeof(*art) - 1 + midlen + 1); - art->offset= ipf->offset; - art->blanklen= recsz; - art->midlen= midlen; - art->state= art_Unchecked; - art->ipf= ipf; ipf->inprogress++; - art->token= TextToToken(tokentextbuf); - strcpy(art->messageid, space+1); - LIST_ADDTAIL(queue, art); - } + off_t old_offset= ipf->offset; ipf->offset += recsz + 1; +#define BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,recsz,m); + + if (memchr(data,'\0',recsz)) BAD_DATA("nul byte"); + if (!recsz) BAD_DATA("empty line"); + + if (data[0]==' ') { + if (strspn(data," ") != recsz) BAD_DATA("line partially blanked"); + ipf->readcount_blank++; + return OOP_CONTINUE; + } + + char *space= strchr(data,' '); + int tokenlen= space-data; + int midlen= (int)recsz-tokenlen-1; + if (midlen <= 0) BAD_DATA("no room for messageid"); + + if (tokenlen != sizeof(TOKEN)*2+2) BAD_DATA("token wrong length"); + memcpy(tokentextbuf, data, tokenlen); + tokentextbuf[tokenlen]= 0; + if (!IsToken(tokentextbuf)) BAD_DATA("token wrong syntax"); + + ipf->readcount_ok++; + + art= xmalloc(sizeof(*art) - 1 + midlen + 1); + art->offset= ipf->offset; + art->blanklen= recsz; + art->midlen= midlen; + art->state= art_Unchecked; + art->ipf= ipf; ipf->inprogress++; + art->token= TextToToken(tokentextbuf); + strcpy(art->messageid, space+1); + LIST_ADDTAIL(queue, art); + if (sms==sm_NORMAL && ipf==main_input_file && ipf->offset >= target_max_feedfile_size) statemc_start_flush("feed file size"); check_master_queue(); + return OOP_CONTINUE; } /*========== tailing input file ==========*/ -static void filemon_start(InputFile *ipf) { - assert(!ipf->filemon); - - ipf->filemon= xmalloc(sizeof(*ipf->filemon)); - memset(ipf->filemon, 0, sizeof(*ipf->filemon)); - filemon_method_startfile(ipf, ipf->filemon); -} - -static void filemon_stop(InputFile *ipf) { - if (!ipf->filemon) return; - filemon_method_stopfile(ipf, ipf->filemon); - free(ipf->filemon); - ipf->filemon= 0; -} - -static void filemon_callback(InputFile *ipf) { - ipf->readable_callback(ipf->readable_callback_user); -} - static void *tailing_rable_call_time(oop_source *loop, struct timeval tv, void *user) { InputFile *ipf= user; - return ipf->readable_callback(ipf->readable_callback_user); + return ipf->readable_callback(loop, &ipf->readable, + ipf->readable_callback_user); } -static void on_cancel(struct oop_readable *rable) { +static void tailing_on_cancel(struct oop_readable *rable) { InputFile *ipf= (void*)rable; - if (ipf->filemon) filemon_stopfile(ipf); + if (ipf->filemon) filemon_stop(ipf); loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); ipf->readable_callback= 0; } @@ -1434,7 +1451,7 @@ static int tailing_on_readable(struct oop_readable *rable, tailing_on_cancel(rable); ipf->readable_callback= cb; ipf->readable_callback_user= user; - filemon_startfile(ipf); + filemon_start(ipf); tailing_queue_readable(ipf); return 0; @@ -1477,9 +1494,9 @@ static int filemon_inotify_fd; static int filemon_inotify_wdmax; static InputFile **filemon_inotify_wd2ipf; -typedef struct Filemon_Perfile { +struct Filemon_Perfile { int wd; -} Filemon_Inotify_Perfile; +}; static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY); @@ -1544,13 +1561,13 @@ static int filemon_method_init(void) { return 1; } -#endif /* HAVE_INOTIFY && !HAVE_FILEMON *// +#endif /* HAVE_INOTIFY && !HAVE_FILEMON */ /*---------- filemon dummy implementation ----------*/ #if !defined(HAVE_FILEMON) -typedef struct Filemon_Perfile { int dummy; } Filemon_Dummy_Perfile; +struct Filemon_Perfile { int dummy; }; static int filemon_method_init(void) { return 0; } static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { } @@ -1558,21 +1575,42 @@ static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { } #endif /* !HAVE_FILEMON */ +/*---------- filemon generic interface ----------*/ + +static void filemon_start(InputFile *ipf) { + assert(!ipf->filemon); + + ipf->filemon= xmalloc(sizeof(*ipf->filemon)); + memset(ipf->filemon, 0, sizeof(*ipf->filemon)); + filemon_method_startfile(ipf, ipf->filemon); +} + +static void filemon_stop(InputFile *ipf) { + if (!ipf->filemon) return; + filemon_method_stopfile(ipf, ipf->filemon); + free(ipf->filemon); + ipf->filemon= 0; +} + +static void filemon_callback(InputFile *ipf) { + ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user); +} + /*---------- interface to start and stop an input file ----------*/ static const oop_rd_style feedfile_rdstyle= { OOP_RD_DELIM_STRIP, '\n', - OOP_RD_NUL_FORBID, + OOP_RD_NUL_PERMIT, OOP_RD_SHORTREC_EOF, }; static void inputfile_tailing_start(InputFile *ipf) { assert(!ipf->fd); ipf->readable->on_readable= tailing_on_readable; - ipf->readable->on_cancel= tailing_on_cancel; - ipf->readable->try_read= tailing_try_read; - ipf->readable->delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */ - ipf->readable->delete_kill= 0; + ipf->readable.on_cancel= tailing_on_cancel; + ipf->readable.try_read= tailing_try_read; + ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */ + ipf->readable.delete_kill= 0; ipf->readable_callback= 0; ipf->readable_callback_user= 0; @@ -1844,10 +1882,12 @@ static void notice_processed(InputFile *ipf, const char *what, #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE #define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, .x) - info("processed %s%s offered=%d(ch%d,nc%d) accepted=%d(ch%d+nc%d)" + info("processed %s%s read=%d(+%dbl,+%derr)" + " offered=%d(ch%d,nc%d) accepted=%d(ch%d+nc%d)" RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT) , - what,spec, + what, spec, + ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err, ipf->counts[art_Unchecked].sent + ipf->counts[art_Unsolicited].sent , ipf->counts[art_Unchecked].sent, ipf->counts[art_Unsolicited].sent, ipf->counts[art_Wanted].accepted + ipf->counts[art_Unsolicited].accepted