From c2c88becbf08aa2b088ee42e6ed3331f733cbaaf Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Mon, 26 Apr 2010 12:05:26 +0100 Subject: [PATCH] wip compile --- backends/innduct.c | 125 ++++++++++++++++++++++++++++++--------------- 1 file changed, 83 insertions(+), 42 deletions(-) diff --git a/backends/innduct.c b/backends/innduct.c index 03737c8..0258ad0 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -192,6 +192,8 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #define INNDCOMMCHILD_ESTATUS_FAIL 6 #define INNDCOMMCHILD_ESTATUS_NONESUCH 7 +#define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10) + /*----- doubly linked lists -----*/ #define ISNODE(T) struct { T *succ, *pred; } node /* must be at start */ @@ -263,10 +265,12 @@ static void xmit_free(XmitDetails *d); static void statemc_setstate(StateMachineState newsms, int periods, const char *forlog, const char *why); +static void statemc_start_flush(const char *why); /* Normal => Flushing */ +static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */ + static void check_master_queue(void); static void queue_check_input_done(void); -static void statemc_start_flush(const char *why); /* Normal => Flushing */ static void statemc_check_flushing_done(void); static void statemc_check_backlog_done(void); @@ -274,6 +278,8 @@ static void postfork(const char *what); static void postfork_inputfile(InputFile *ipf); static void open_defer(void); +static void close_defer(void); +static void search_backlog_file(void); static void inputfile_tailing_start(InputFile *ipf); static void inputfile_tailing_stop(InputFile *ipf); @@ -375,6 +381,7 @@ struct InputFile { oop_read *rd; long inprogress; /* no. of articles read but not processed */ off_t offset; + int skippinglong; int counts[art_MaxState][RCI_max]; int readcount_ok, readcount_blank, readcount_err; @@ -431,7 +438,7 @@ static int until_connect; static ConnList conns; static ArticleList queue; -static char *path_lock, *path_flushing, *path_defer; +static char *path_lock, *path_flushing, *path_defer, *globpat_backlog; #define SMS(newstate, periods, why) \ (statemc_setstate(sm_##newstate,(periods),#newstate,(why))) @@ -1355,8 +1362,7 @@ static void close_input_file(InputFile *ipf) { /*---------- dealing with articles read in the input file ----------*/ static void *feedfile_got_bad_data(InputFile *ipf, off_t offset, - const char *data, size_t recsz, - const char *how) { + const char *data, const char *how) { warn("corrupted file: %s, offset %lu: %s: %s", ipf->path, (unsigned long)offset, how, sanitise(data)); ipf->readcount_err++; @@ -1367,6 +1373,17 @@ static void *feedfile_got_bad_data(InputFile *ipf, off_t offset, return OOP_CONTINUE; } +static void *feedfile_read_err(oop_source *lp, oop_read *rd, + oop_rd_event ev, const char *errmsg, + int errnoval, const char *data, size_t recsz, + void *ipf_v) { + InputFile *ipf= ipf_v; + assert(ev == OOP_RD_SYSTEM); + errno= errnoval; + sysdie("error reading input file: %s, offset %lu", + ipf->path, (unsigned long)ipf->offset); +} + static void *feedfile_got_article(oop_source *lp, oop_read *rd, oop_rd_event ev, const char *errmsg, int errnoval, const char *data, size_t recsz, @@ -1380,13 +1397,26 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, off_t old_offset= ipf->offset; ipf->offset += recsz + 1; -#define BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,recsz,m); +#define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m); - if (memchr(data,'\0',recsz)) BAD_DATA("nul byte"); - if (!recsz) BAD_DATA("empty line"); + if (ev==OOP_RD_PARTREC) + feedfile_got_bad_data(ipf,old_offset,data,"missing final newline"); + /* but process it anyway */ + + if (ipf->skippinglong) { + if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */ + return; + } + if (ev==OOP_RD_LONG) { + ipf->skippinglong= 1; + X_BAD_DATA("overly long line"); + } + + if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte"); + if (!recsz) X_BAD_DATA("empty line"); if (data[0]==' ') { - if (strspn(data," ") != recsz) BAD_DATA("line partially blanked"); + if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked"); ipf->readcount_blank++; return OOP_CONTINUE; } @@ -1394,12 +1424,13 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, char *space= strchr(data,' '); int tokenlen= space-data; int midlen= (int)recsz-tokenlen-1; - if (midlen <= 0) BAD_DATA("no room for messageid"); + if (midlen <= 2) X_BAD_DATA("no room for messageid"); + if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid"); - if (tokenlen != sizeof(TOKEN)*2+2) BAD_DATA("token wrong length"); + if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length"); memcpy(tokentextbuf, data, tokenlen); tokentextbuf[tokenlen]= 0; - if (!IsToken(tokentextbuf)) BAD_DATA("token wrong syntax"); + if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax"); ipf->readcount_ok++; @@ -1601,12 +1632,12 @@ static void filemon_callback(InputFile *ipf) { static const oop_rd_style feedfile_rdstyle= { OOP_RD_DELIM_STRIP, '\n', OOP_RD_NUL_PERMIT, - OOP_RD_SHORTREC_EOF, + OOP_RD_SHORTREC_LONG, }; static void inputfile_tailing_start(InputFile *ipf) { assert(!ipf->fd); - ipf->readable->on_readable= tailing_on_readable; + ipf->readable.on_readable= tailing_on_readable; ipf->readable.on_cancel= tailing_on_cancel; ipf->readable.try_read= tailing_try_read; ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */ @@ -1619,7 +1650,7 @@ static void inputfile_tailing_start(InputFile *ipf) { assert(ipf->fd); int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE, - feedfile_got_article,ipf, feedfile_problem,ipf); + feedfile_got_article,ipf, feedfile_read_err, ipf); if (r) sysdie("unable start reading feedfile %s",ipf->path); } @@ -1733,6 +1764,12 @@ static void inputfile_tailing_stop(InputFile *ipf) { * ->8- */ +static void startup_set_input_file(InputFile *f) { + assert(!main_input_file); + main_input_file= f; + inputfile_tailing_start(f); +} + static void statemc_init(void) { struct stat stab, stabf; @@ -1749,7 +1786,7 @@ static void statemc_init(void) { memset(&fl,0,sizeof(fl)); fl.l_type= F_WRLCK; fl.l_whence= SEEK_SET; - r= fcntl(lockfd, F_SETLK, &fl); + int r= fcntl(lockfd, F_SETLK, &fl); if (r==-1) { if (errno==EACCES || errno==EAGAIN) { if (quiet_multiple) exit(0); @@ -1758,7 +1795,7 @@ static void statemc_init(void) { sysdie("fcntl F_SETLK lockfile %s", path_lock); } - xfstat_isreg(lockfd, &stabf, "lockfile"); + xfstat_isreg(lockfd, &stabf, path_lock, "lockfile"); int lock_noent; xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile"); @@ -1796,7 +1833,7 @@ static void statemc_init(void) { int noent_f; InputFile *file_d= open_input_file(path_flushing); - if (file_d) xfstat_isreg(file_d->fd, &stab_d, "flushing file"); + if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file"); xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile"); @@ -1818,7 +1855,7 @@ static void statemc_init(void) { SMS(SEPARATED, 0, "found both old and current feed files"); } else { debug("startup: F exists, D ENOENT => Normal"); - FILE *file_f= open_input_file(feedfile); + InputFile *file_f= open_input_file(feedfile); if (!file_f) die("feed file vanished during startup"); startup_set_input_file(file_f); SMS(NORMAL, flushfail_retry_periods, "normal startup"); @@ -1829,14 +1866,15 @@ static void statemc_init(void) { static void statemc_start_flush(const char *why) { /* Normal => Flushing */ assert(sms == sm_NORMAL); - debug("starting flush (%s) (%lu >= %lu) (%d)", + debug("starting flush (%s) (%lu >?= %lu) (%d)", why, - (unsigned long)ipf->offset, (unsigned long)flush_threshold, + (unsigned long)(main_input_file ? main_input_file->offset : 0), + (unsigned long)target_max_feedfile_size, sm_period_counter); - int r= link(feedfile, duct_path); - if (r) sysdie("link feedfile %s to flushing file %s", feedfile, - path_duct); + int r= link(feedfile, path_flushing); + if (r) sysdie("link feedfile %s to flushing file %s", + feedfile, path_flushing); /* => Hardlinked */ xunlink(feedfile, "old feedfile link"); @@ -1863,12 +1901,6 @@ static void statemc_period_poll(void) { } } -static void startup_set_input_file(InputFile *f) { - assert(!main_input_file); - main_input_file= f; - inputfile_tailing_start(f); -} - static int inputfile_is_done(InputFile *ipf) { if (!ipf) return 0; if (ipf->inprogress) return 0; /* new article in the meantime */ @@ -1880,7 +1912,9 @@ static void notice_processed(InputFile *ipf, const char *what, const char *spec) { #define RCI_NOTHING(x) /* nothing */ #define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE -#define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, .x) +#define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x]) + +#define CNT(art,rc) (ipf->counts[art_##art][RC_##rc]) info("processed %s%s read=%d(+%dbl,+%derr)" " offered=%d(ch%d,nc%d) accepted=%d(ch%d+nc%d)" @@ -1888,21 +1922,23 @@ static void notice_processed(InputFile *ipf, const char *what, , what, spec, ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err, - ipf->counts[art_Unchecked].sent + ipf->counts[art_Unsolicited].sent - , ipf->counts[art_Unchecked].sent, ipf->counts[art_Unsolicited].sent, - ipf->counts[art_Wanted].accepted + ipf->counts[art_Unsolicited].accepted - ,ipf->counts[art_Wanted].accepted,ipf->counts[art_Unsolicited].accepted + CNT(Unchecked,sent) + CNT(Unsolicited,sent) + , CNT(Unchecked,sent), CNT(Unsolicited,sent), + CNT(Wanted,accepted) + CNT(Unsolicited,accepted) + , CNT(Wanted,accepted), CNT(Unsolicited,accepted) RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_VALS) ); + +#undef CNT } static void statemc_check_backlog_done(void) { - InputFile *ipf= backlog_input_file(); + InputFile *ipf= backlog_input_file; if (!inputfile_is_done(ipf)) return; - const char *slash= strrchr(ipf->path, "/"); + const char *slash= strrchr(ipf->path, '/'); const char *leaf= slash ? slash+1 : ipf->path; - const char *under= strchr(slash, "_"); + const char *under= strchr(slash, '_'); const char *rest= under ? under+1 : leaf; if (!strncmp(rest,"backlog",7)) rest += 7; notice_processed(ipf,"backlog:",rest); @@ -1994,7 +2030,7 @@ static void open_defer(void) { /* truncate away any half-written records */ - xfstat_isreg(fileno(defer), &stab, "newly opened defer file"); + xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file"); if (stab.st_size > LONG_MAX) die("defer file %s size is far too large", path_defer); @@ -2009,7 +2045,7 @@ static void open_defer(void) { if (fseek(defer, truncto-1, SEEK_SET) < 0) sysdie("seek in defer file %s while truncating partial", path_defer); - r= getc(defer); + int r= getc(defer); if (r==EOF) { if (ferror(defer)) sysdie("failed read from defer file %s", path_defer); @@ -2042,21 +2078,26 @@ static void close_defer(void) { if (!defer) return; - xfstat(fileno(defer), &stab, "defer file"); + xfstat(fileno(defer), &stab, path_defer, "defer file"); if (fclose(defer)) sysfatal("could not close defer file %s", path_defer); defer= 0; + time_t now= time(0); + if (now==-1) sysdie("time(2) failed"); + char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile, - (unsigned long)now.tv_sec, + (unsigned long)now, (unsigned long)stab.st_ino); - if (link(path_defer, path_backlog)) + if (link(path_defer, backlog)) sysfatal("could not install defer file %s as backlog file %s", path_defer, backlog); if (unlink(path_defer)) sysdie("could not unlink old defer link %s to backlog file %s", path_defer, backlog); + free(backlog); + if (until_backlog_nextscan < 0 || until_backlog_nextscan > backlog_retry_minperiods + 1) until_backlog_nextscan= backlog_retry_minperiods + 1; -- 2.30.2