From f4aee95c41a0d6231d115386b8fbb23f6b8e349a Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Thu, 20 May 2010 18:06:49 +0100 Subject: [PATCH] wip split into multiple files and make compile --- Makefile.am | 4 +- Makefile.in | 11 +- README.notes | 62 + README.states | 179 +++ cli.c | 412 ++++++ conn.c | 406 ++++++ decls-junk | 57 + defer.c | 192 +++ filemon.c | 121 ++ help.c | 284 +++++ infile.c | 286 +++++ innduct.c | 3399 +------------------------------------------------ innduct.h | 408 ++++++ recv.c | 237 ++++ statemc.c | 514 ++++++++ xmit.c | 332 +++++ 16 files changed, 3533 insertions(+), 3371 deletions(-) create mode 100644 README.notes create mode 100644 README.states create mode 100644 cli.c create mode 100644 conn.c create mode 100644 decls-junk create mode 100644 defer.c create mode 100644 filemon.c create mode 100644 help.c create mode 100644 infile.c create mode 100644 innduct.h create mode 100644 recv.c create mode 100644 statemc.c create mode 100644 xmit.c diff --git a/Makefile.am b/Makefile.am index e9a82be..cf76515 100644 --- a/Makefile.am +++ b/Makefile.am @@ -16,7 +16,9 @@ WARNINGS= \ CFLAGS = @CFLAGS@ $(WARNINGS) $(WERROR) $(CMDLINE_CFLAGS) bin_PROGRAMS = innduct -innduct_SOURCES = innduct.c __oop-read-copy.c +innduct_SOURCES = innduct.c conn.c filemon.c infile.c recv.c xmit.c \ + cli.c defer.c help.c statemc.c __oop-read-copy.c + innduct_LDADD = -L/usr/lib/news -lstorage -linn -lm -loop autoconf: diff --git a/Makefile.in b/Makefile.in index 4f839a5..756a0ca 100644 --- a/Makefile.in +++ b/Makefile.in @@ -80,7 +80,9 @@ WARNINGS = -Wall -Wformat=2 -Wno-format-zero-length -Wshadow -Wpointer-arith CFLAGS = @CFLAGS@ $(WARNINGS) $(WERROR) $(CMDLINE_CFLAGS) bin_PROGRAMS = innduct -innduct_SOURCES = innduct.c __oop-read-copy.c +innduct_SOURCES = innduct.c conn.c filemon.c infile.c recv.c xmit.c cli.c defer.c help.c statemc.c __oop-read-copy.c + + innduct_LDADD = -L/usr/lib/news -lstorage -linn -lm -loop MAINTAINERCLEANFILES = configure config.h.in Makefile.in @@ -95,7 +97,8 @@ DEFS = @DEFS@ -I. -I$(srcdir) -I. CPPFLAGS = @CPPFLAGS@ LDFLAGS = @LDFLAGS@ LIBS = @LIBS@ -innduct_OBJECTS = innduct.o __oop-read-copy.o +innduct_OBJECTS = innduct.o conn.o filemon.o infile.o recv.o xmit.o \ +cli.o defer.o help.o statemc.o __oop-read-copy.o innduct_DEPENDENCIES = innduct_LDFLAGS = COMPILE = $(CC) $(DEFS) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS) @@ -109,7 +112,9 @@ DISTFILES = $(DIST_COMMON) $(SOURCES) $(HEADERS) $(TEXINFOS) $(EXTRA_DIST) TAR = tar GZIP_ENV = --best -DEP_FILES = .deps/__oop-read-copy.P .deps/innduct.P +DEP_FILES = .deps/__oop-read-copy.P .deps/cli.P .deps/conn.P \ +.deps/defer.P .deps/filemon.P .deps/help.P .deps/infile.P \ +.deps/innduct.P .deps/recv.P .deps/statemc.P .deps/xmit.P SOURCES = $(innduct_SOURCES) OBJECTS = $(innduct_OBJECTS) diff --git a/README.notes b/README.notes new file mode 100644 index 0000000..1994bd2 --- /dev/null +++ b/README.notes @@ -0,0 +1,62 @@ +NEWSFEEDS + +Newsfeeds file entries should look like this: + host.name.of.site[/exclude,exclude,...]\ + :pattern,pattern...[/distribution,distribution...]\ + :Tf,Wnm + : +or + sitename[/exclude,exclude,...]\ + :pattern,pattern...[/distribution,distribution...]\ + :Tf,Wnm + :host.name.of.site + + +FILES + +Four files full of + token messageid +or might be blanked out + .... + +F site.name main feed file + opened/created, then written, by innd + read by duct + unlinked by duct + tokens blanked out by duct when processed + site.name_lock lock preventing multiple ducts + to hold lock must open,F_SETLK[W] + and then stat to check that locked file + still has name site.name_lock + holder of this lock is "duct" + (only) lockholder may remove the lockfile +D site.name_flushing temporary feed file during flush (or crash) + hardlink created by duct + unlinked by duct + site.name_defer 431'd articles, still being written, + created, written, used by duct + + site.name_backlog.. + 431'd articles, ready for innxmit or duct + created (link/mv) by duct + site.name_backlog (where does not + contain '#' or '~') eg + site.name_backlog.manual + anything the sysadmin likes (eg, feed files + from old feeds to be merged into this one) + created (link/mv) by admin + may be symlinks (in which case links + may be written through, but only links + will be removed. + + It is safe to remove backlog files manually, + if it's desired to throw away the backlog. + +Backlog files are also processed by innduct. We find the oldest +backlog file which is at least a certain amount old, and feed it +back into our processing. When every article in it has been read +and processed, we unlink it and look for another backlog file. + +If we don't have a backlog file that we're reading, we close the +defer file that we're writing and make it into a backlog file at +the first convenient opportunity. diff --git a/README.states b/README.states new file mode 100644 index 0000000..4676b30 --- /dev/null +++ b/README.states @@ -0,0 +1,179 @@ + + +OVERALL STATES, defining interaction with innd: + + START + | + ,-->--. check F, D + | | | + | | | + | | <----------------<---------------------------------'| + | | F exists | + | | D ENOENT | + | | duct opens F | + | V | + | Normal | + | F: innd writing, duct reading | + | D: ENOENT | + | | | + | | duct decides time to flush | + | | duct makes hardlink | + | | | + | V <------------------------'| + | Hardlinked F==D | + | F == D: innd writing, duct reading both exist | + ^ | | + | | duct unlinks F | + | | <-----------<-------------<--'| + | | open D F ENOENT | + | | if exists | + | | | + | V <---------------------. | + | Moved | | + | F: ENOENT | | + | D: innd writing, duct reading; or ENOENT | | + | | | | + | | duct requests flush of feed | | + | | (others can too, harmlessly) | | + | V | | + | Flushing | | + | F: ENOENT | | + | D: innd flushing, duct; or ENOENT | | + | | | | + | | inndcomm flush fails | | + | |`-------------------------->------------------' | + | | | + | | inndcomm reports no such site | + | |`---------------------------------------------------- | -. + | | | | + | | innd finishes writing D, creates F | | + | | inndcomm reports flush successful | | + | | | | + | V | | + | Separated <----------------' | + | F: innd writing F!=D / + | D: duct reading; or ENOENT both exist / + | | / + | | duct gets to the end of D / + | | duct opens F too / + | V / + | Finishing / + | F: innd writing, duct reading | + | D: duct finishing V + | | Dropping + | | duct finishes processing D F: ENOENT + | V duct unlinks D D: duct reading + | | | + `--<--' | duct finishes + | processing D + | duct unlinks D + | duct exits + V + Dropped + F: ENOENT + D: ENOENT + duct not running + + "duct reading" means innduct is reading the file but also + overwriting processed tokens. + + + +We implement this as follows: + + .=======. + ||START|| + `=======' + | + | open F + | + | F ENOENT + |`---------------------------------------------------. + F OPEN OK | | + |`---------------- - - - | + D ENOENT | D EXISTS see OVERALL STATES diagram | + | for full startup logic | + ,--------->| | + | V | + | ============ try to | + | NORMAL open D | + | [Normal] | + | main F tail | + | ============ V + | | | + | | F IS SO BIG WE SHOULD FLUSH, OR TIMEOUT | + ^ | hardlink F to D | + | [Hardlinked] | + | | unlink F | + | | our handle onto F is now onto D | + | [Moved] | + | | | + | |<-------------------<---------------------<---------+ + | | | + | | spawn inndcomm flush | + | V | + | ================== | + | FLUSHING[-ABSENT] | + | [Flushing] | + | main D tail/none | + | ================== | + | | | + | | INNDCOMM FLUSH FAILS ^ + | |`----------------------->----------. | + | | | | + | | NO SUCH SITE V | + ^ |`--------------->----. ==================== | + | | \ FLUSHFAILED[-ABSENT] | + | | \ [Moved] | + | | FLUSH OK \ main D tail/none | + | | open F \ ==================== | + | | \ | | + | | \ | TIME TO RETRY | + | |`------->----. ,---<---'\ `----------------' + | | D NONE | | D NONE `----. + | V | | V + | ============= V V ============ + | SEPARATED-1 | | DROPPING-1 + | flsh->rd!=0 | | flsh->rd!=0 + | [Separated] | | [Dropping] + | main F idle | | main none + | flsh D tail | | flsh D tail + | ============= | | ============ + | | | | install | + ^ | EOF ON D | | defer | EOF ON D + | V | | V + | =============== | | =============== + | SEPARATED-2 | | DROPPING-2 + | flsh->rd==0 | V flsh->rd==0 + | [Finishing] | | [Dropping] + | main F tail | `. main none + | flsh D closed | `. flsh D closed + | =============== V `. =============== + | | `. | + | | ALL D PROCESSED `. | ALL D PROCESSED + | V install defer as backlog `. | install defer + ^ | close D `. | close D + | | unlink D `. | unlink D + | | | | + | | V V + `----------' ============== + DROPPED + [Dropped] + main none + flsh none + some backlog + ============== + | + | ALL BACKLOG DONE + | + | unlink lock + | exit + V + ========== + (ESRCH) + [Droppped] + ========== + + +rune for printing diagrams: + a2ps -R -B -ops README.states diff --git a/cli.c b/cli.c new file mode 100644 index 0000000..05e4827 --- /dev/null +++ b/cli.c @@ -0,0 +1,412 @@ +/*========== command and control (CLI) connections ==========*/ + +static int cli_master; + +typedef struct CliConn CliConn; +struct CliConn { + void (*destroy)(CliConn*); + int fd; + oop_read *rd; + FILE *out; + union { + struct sockaddr sa; + struct sockaddr_un un; + } sa; + socklen_t salen; +}; + +static const oop_rd_style cli_rd_style= { + OOP_RD_DELIM_STRIP, '\n', + OOP_RD_NUL_FORBID, + OOP_RD_SHORTREC_FORBID +}; + +static void cli_destroy(CliConn *cc) { + cc->destroy(cc); +} + +static void cli_checkouterr(CliConn *cc /* may destroy*/) { + if (ferror(cc->out) | fflush(cc->out)) { + info("CTRL%d write error %s", cc->fd, strerror(errno)); + cli_destroy(cc); + } +} + +static void cli_prompt(CliConn *cc /* may destroy*/) { + fprintf(cc->out, "%s| ", sitename); + cli_checkouterr(cc); +} + +struct CliCommand { + const char *cmd; + void (*f)(CliConn *cc, const CliCommand *ccmd, + const char *arg, size_t argsz); + void *xdata; + int xval; +}; + +static const CliCommand cli_commands[]; + +#define CCMD(wh) \ + static void ccmd_##wh(CliConn *cc, const CliCommand *c, \ + const char *arg, size_t argsz) + +CCMD(help) { + fputs("commands:\n", cc->out); + const CliCommand *ccmd; + for (ccmd=cli_commands; ccmd->cmd; ccmd++) + fprintf(cc->out, " %s\n", ccmd->cmd); + fputs("NB: permissible arguments are not shown above." + " Not all commands listed are safe. See innduct(8).\n", cc->out); +} + +CCMD(flush) { + int ok= trigger_flush_ok("manual request"); + if (!ok) fprintf(cc->out,"already flushing (state is %s)\n", sms_names[sms]); +} + +CCMD(stop) { + preterminate(); + notice("terminating (CTRL%d)",cc->fd); + raise_default(SIGTERM); + abort(); +} + +CCMD(dump); + +/* messing with our head: */ +CCMD(period) { period(); } +CCMD(setintarg) { *(int*)c->xdata= atoi(arg); } +CCMD(setint) { *(int*)c->xdata= c->xval; } +CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); } + +static const CliCommand cli_commands[]= { + { "h", ccmd_help }, + { "flush", ccmd_flush }, + { "stop", ccmd_stop }, + { "dump q", ccmd_dump, 0,0 }, + { "dump a", ccmd_dump, 0,1 }, + + { "p", ccmd_period }, + +#define POKES(cmd,func) \ + { cmd "flush", func, &until_flush, 1 }, \ + { cmd "conn", func, &until_connect, 0 }, \ + { cmd "blscan", func, &until_backlog_nextscan, 0 }, +POKES("next ", ccmd_setint) +POKES("prod ", ccmd_setint_period) + + { "pretend flush", ccmd_setintarg, &simulate_flush }, + { "wedge blscan", ccmd_setint, &until_backlog_nextscan, -1 }, + { 0 } +}; + +static void *cli_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, + const char *errmsg, int errnoval, + const char *data, size_t recszu, void *cc_v) { + CliConn *cc= cc_v; + + if (!data) { + info("CTRL%d closed", cc->fd); + cc->destroy(cc); + return OOP_CONTINUE; + } + + if (recszu == 0) goto prompt; + assert(recszu <= INT_MAX); + int recsz= recszu; + + const CliCommand *ccmd; + for (ccmd=cli_commands; ccmd->cmd; ccmd++) { + int l= strlen(ccmd->cmd); + if (recsz < l) continue; + if (recsz > l && data[l] != ' ') continue; + if (memcmp(data, ccmd->cmd, l)) continue; + + int argl= (int)recsz - (l+1); + ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl); + goto prompt; + } + + fputs("unknown command; h for help\n", cc->out); + + prompt: + cli_prompt(cc); + return OOP_CONTINUE; +} + +static void *cli_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev, + const char *errmsg, int errnoval, + const char *data, size_t recsz, void *cc_v) { + CliConn *cc= cc_v; + + info("CTRL%d read error %s", cc->fd, errmsg); + cc->destroy(cc); + return OOP_CONTINUE; +} + +static int cli_conn_startup(CliConn *cc /* may destroy*/, + const char *how) { + cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0); + if (!cc->rd) { warn("oop_rd_new_fd cli failed"); return -1; } + + int er= oop_rd_read(cc->rd, &cli_rd_style, MAX_CLI_COMMAND, + cli_rd_ok, cc, + cli_rd_err, cc); + if (er) { errno= er; syswarn("oop_rd_read cli failed"); return -1; } + + info("CTRL%d %s ready", cc->fd, how); + cli_prompt(cc); + return 0; +} + +static void cli_stdio_destroy(CliConn *cc) { + if (cc->rd) { + oop_rd_cancel(cc->rd); + errno= oop_rd_delete_tidy(cc->rd); + if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)"); + } + free(cc); +} + +static void cli_stdio(void) { + NEW_DECL(CliConn *,cc); + cc->destroy= cli_stdio_destroy; + + cc->fd= 0; + cc->out= stdout; + int r= cli_conn_startup(cc,"stdio"); + if (r) cc->destroy(cc); +} + +static void cli_accepted_destroy(CliConn *cc) { + if (cc->rd) { + oop_rd_cancel(cc->rd); + oop_rd_delete_kill(cc->rd); + } + if (cc->out) { fclose(cc->out); cc->fd=0; } + close_perhaps(&cc->fd); + free(cc); +} + +static void *cli_master_readable(oop_source *lp, int master, + oop_event ev, void *u) { + NEW_DECL(CliConn *,cc); + cc->destroy= cli_accepted_destroy; + + cc->salen= sizeof(cc->sa); + cc->fd= accept(master, &cc->sa.sa, &cc->salen); + if (cc->fd<0) { syswarn("error accepting cli connection"); goto x; } + + cc->out= fdopen(cc->fd, "w"); + if (!cc->out) { syswarn("error fdopening accepted cli connection"); goto x; } + + int r= cli_conn_startup(cc, "accepted"); + if (r) goto x; + + return OOP_CONTINUE; + + x: + cc->destroy(cc); + return OOP_CONTINUE; +} + +#define NOCLI(...) do{ \ + syswarn("no cli listener, because failed to " __VA_ARGS__); \ + goto nocli; \ + }while(0) + +static void cli_init(void) { + union { + struct sockaddr sa; + struct sockaddr_un un; + } sa; + + memset(&sa,0,sizeof(sa)); + int maxlen= sizeof(sa.un.sun_path); + + if (!path_cli) { + info("control command line disabled"); + return; + } + + int pathlen= strlen(path_cli); + if (pathlen > maxlen) { + warn("no cli listener, because cli socket path %s too long (%d>%d)", + path_cli, pathlen, maxlen); + return; + } + + if (path_cli_dir) { + int r= mkdir(path_cli_dir, 0700); + if (r && errno!=EEXIST) + NOCLI("create cli socket directory %s", path_cli_dir); + } + + int r= unlink(path_cli); + if (r && errno!=ENOENT) + NOCLI("remove old cli socket %s", path_cli); + + cli_master= socket(PF_UNIX, SOCK_STREAM, 0); + if (cli_master<0) NOCLI("create new cli master socket"); + + int sl= pathlen + offsetof(struct sockaddr_un, sun_path); + sa.un.sun_family= AF_UNIX; + memcpy(sa.un.sun_path, path_cli, pathlen); + + r= bind(cli_master, &sa.sa, sl); + if (r) NOCLI("bind to cli socket path %s", sa.un.sun_path); + + r= listen(cli_master, 5); + if (r) NOCLI("listen to cli master socket"); + + xsetnonblock(cli_master, 1); + + loop->on_fd(loop, cli_master, OOP_READ, cli_master_readable, 0); + info("cli ready, listening on %s", path_cli); + + return; + + nocli: + xclose_perhaps(&cli_master, "cli master",0); + return; +} + +/*========== dumping state ==========*/ + +static void dump_article_list(FILE *f, const CliCommand *c, + const ArticleList *al) { + fprintf(f, " count=%d\n", al->count); + if (!c->xval) return; + + int i; Article *art; + for (i=0, art=LIST_HEAD(*al); art; i++, art=LIST_NEXT(art)) { + fprintf(f," #%05d %-11s", i, artstate_names[art->state]); + DUMPV("%p", art->,ipf); + DUMPV("%d", art->,missing); + DUMPV("%lu", (unsigned long)art->,offset); + DUMPV("%d", art->,blanklen); + DUMPV("%d", art->,midlen); + fprintf(f, " %s %s\n", TokenToText(art->token), art->messageid); + } +} + +static void dump_input_file(FILE *f, const CliCommand *c, + InputFile *ipf, const char *wh) { + char *dipf= dbg_report_ipf(ipf); + fprintf(f,"input %s %s", wh, dipf); + free(dipf); + + if (ipf) { + DUMPV("%d", ipf->,readcount_ok); + DUMPV("%d", ipf->,readcount_blank); + DUMPV("%d", ipf->,readcount_err); + DUMPV("%d", ipf->,count_nooffer_missing); + } + fprintf(f,"\n"); + if (ipf) { + ArtState state; const char *const *statename; + for (state=0, statename=artstate_names; *statename; state++,statename++) { +#define RC_DUMP_FMT(x) " " #x "=%d" +#define RC_DUMP_VAL(x) ,ipf->counts[state][RC_##x] + fprintf(f,"input %s counts %-11s" + RESULT_COUNTS(RC_DUMP_FMT,RC_DUMP_FMT) "\n", + wh, *statename + RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL)); + } + fprintf(f,"input %s queue", wh); + dump_article_list(f,c,&ipf->queue); + } +} + +CCMD(dump) { + int i; + fprintf(cc->out, "dumping state to %s\n", path_dump); + FILE *f= fopen(path_dump, "w"); + if (!f) { fprintf(cc->out, "failed: open: %s\n", strerror(errno)); return; } + + fprintf(f,"general"); + DUMPV("%s", sms_names,[sms]); + DUMPV("%d", ,until_flush); + DUMPV("%ld", (long),self_pid); + DUMPV("%p", , defer); + DUMPV("%d", , until_connect); + DUMPV("%d", , until_backlog_nextscan); + DUMPV("%d", , simulate_flush); + fprintf(f,"\nnocheck"); + DUMPV("%#.10f", , accept_proportion); + DUMPV("%d", , nocheck); + DUMPV("%d", , nocheck_reported); + fprintf(f,"\n"); + + fprintf(f,"special"); + DUMPV("%ld", (long),connecting_child); + DUMPV("%d", , connecting_fdpass_sock); + DUMPV("%d", , cli_master); + fprintf(f,"\n"); + + fprintf(f,"lowvol"); + DUMPV("%d", , lowvol_circptr); + DUMPV("%d", , lowvol_total); + fprintf(f,":"); + for (i=0; ifd); + DUMPV("%p",conn->,rd); DUMPV("%d",conn->,max_queue); + DUMPV("%d",conn->,stream); DUMPV("\"%s\"",conn->,quitting); + DUMPV("%d",conn->,since_activity); + fprintf(f,"\n"); + + fprintf(f,"C%d waiting", conn->fd); dump_article_list(f,c,&conn->waiting); + fprintf(f,"C%d priority",conn->fd); dump_article_list(f,c,&conn->priority); + fprintf(f,"C%d sent", conn->fd); dump_article_list(f,c,&conn->sent); + + fprintf(f,"C%d xmit xmitu=%d\n", conn->fd, conn->xmitu); + for (i=0; ixmitu; i++) { + const struct iovec *iv= &conn->xmit[i]; + const XmitDetails *xd= &conn->xmitd[i]; + char *dinfo; + switch (xd->kind) { + case xk_Const: dinfo= xasprintf("Const"); break; + case xk_Artdata: dinfo= xasprintf("A%p", xd->info.sm_art); break; + default: + abort(); + } + fprintf(f," #%03d %-11s l=%d %s\n", i, dinfo, iv->iov_len, + sanitise(iv->iov_base, iv->iov_len)); + free(dinfo); + } + } + + fprintf(f,"paths"); + DUMPV("%s", , feedfile); + DUMPV("%s", , path_cli); + DUMPV("%s", , path_lock); + DUMPV("%s", , path_flushing); + DUMPV("%s", , path_defer); + DUMPV("%s", , path_dump); + DUMPV("%s", , globpat_backlog); + fprintf(f,"\n"); + + if (!!ferror(f) + !!fclose(f)) { + fprintf(cc->out, "failed: write: %s\n", strerror(errno)); + return; + } +} diff --git a/conn.c b/conn.c new file mode 100644 index 0000000..301d025 --- /dev/null +++ b/conn.c @@ -0,0 +1,406 @@ +/*========== management of connections ==========*/ + +static void reconnect_blocking_event(void) { + until_connect= reconnect_delay_periods; +} + +void conn_closefd(Conn *conn, const char *msgprefix) { + int r= close_perhaps(&conn->fd); + if (r) info("C%d %serror closing socket: %s", + conn->fd, msgprefix, strerror(errno)); +} + +static int conn_busy(Conn *conn) { + return + conn->waiting.count || + conn->priority.count || + conn->sent.count || + conn->xmitu; +} + +static void conn_dispose(Conn *conn) { + if (!conn) return; + if (conn->rd) { + oop_rd_cancel(conn->rd); + oop_rd_delete_kill(conn->rd); + conn->rd= 0; + } + if (conn->fd) { + loop->cancel_fd(loop, conn->fd, OOP_WRITE); + loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION); + } + conn_closefd(conn,""); + free(conn); +} + +static void *conn_exception(oop_source *lp, int fd, + oop_event ev, void *conn_v) { + Conn *conn= conn_v; + unsigned char ch; + assert(fd == conn->fd); + assert(ev == OOP_EXCEPTION); + int r= read(conn->fd, &ch, 1); + if (r<0) connfail(conn,"read failed: %s",strerror(errno)); + else connfail(conn,"exceptional condition on socket (peer sent urgent" + " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch); + return OOP_CONTINUE; +} + +static void vconnfail(Conn *conn, const char *fmt, va_list al) { + int requeue[art_MaxState]; + memset(requeue,0,sizeof(requeue)); + + Article *art; + + while ((art= LIST_REMHEAD(conn->priority))) + LIST_ADDTAIL(art->ipf->queue, art); + + while ((art= LIST_REMHEAD(conn->waiting))) + LIST_ADDTAIL(art->ipf->queue, art); + + while ((art= LIST_REMHEAD(conn->sent))) { + requeue[art->state]++; + if (art->state==art_Unsolicited) art->state= art_Unchecked; + LIST_ADDTAIL(art->ipf->queue,art); + check_reading_pause_resume(art->ipf); + } + + int i; + XmitDetails *d; + for (i=0, d=conn->xmitd; ixmitu; i++, d++) + xmit_free(d); + + LIST_REMOVE(conns,conn); + + char *m= xvasprintf(fmt,al); + warn("C%d (now %d) connection failed requeueing " RCI_TRIPLE_FMT_BASE ": %s", + conn->fd, conns.count, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); + free(m); + + reconnect_blocking_event(); + conn_dispose(conn); + check_assign_articles(); +} + +static void connfail(Conn *conn, const char *fmt, ...) { + va_list al; + va_start(al,fmt); + vconnfail(conn,fmt,al); + va_end(al); +} + +static void conn_idle_close(Conn *conn, const char *why) { + static const char quitcmd[]= "QUIT\r\n"; + int todo= sizeof(quitcmd)-1; + const char *p= quitcmd; + for (;;) { + int r= write(conn->fd, p, todo); + if (r<0) { + if (isewouldblock(errno)) + connfail(conn, "blocked writing QUIT to idle connection"); + else + connfail(conn, "failed to write QUIT to idle connection: %s", + strerror(errno)); + break; + } + assert(r<=todo); + todo -= r; + if (!todo) { + conn->quitting= why; + conn->since_activity= 0; + dbg("C%d is idle (%s), quitting", conn->fd, why); + break; + } + } +} + +/* + * For our last connection, we also shut it down if we have had + * less than K in the last L + */ +static void check_idle_conns(void) { + Conn *conn; + + int volthisperiod= lowvol_perperiod[lowvol_circptr]; + lowvol_circptr++; + lowvol_circptr %= lowvol_periods; + lowvol_total += volthisperiod; + lowvol_total -= lowvol_perperiod[lowvol_circptr]; + lowvol_perperiod[lowvol_circptr]= 0; + + FOR_CONN(conn) + conn->since_activity++; + + search_again: + FOR_CONN(conn) { + if (conn->since_activity <= need_activity_periods) continue; + + /* We need to shut this down */ + if (conn->quitting) + connfail(conn,"timed out waiting for response to QUIT (%s)", + conn->quitting); + else if (conn->sent.count) + connfail(conn,"timed out waiting for responses"); + else if (conn->waiting.count || conn->priority.count) + connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent"); + else if (conn->xmitu) + connfail(conn,"peer has been sending responses" + " before receiving our commands!"); + else + conn_idle_close(conn, "no activity"); + + goto search_again; + } + + conn= LIST_HEAD(conns); + if (!volthisperiod && + conns.count==1 && + lowvol_total < lowvol_thresh && + !conn_busy(conn)) + conn_idle_close(conn, "low volume"); +} + +/*---------- making new connections ----------*/ + +static pid_t connecting_child; +int connecting_fdpass_sock; + +static void connect_attempt_discard(void) { + if (connecting_child) { + int status= xwaitpid(&connecting_child, "connect"); + if (!(WIFEXITED(status) || + (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) + report_child_status("connect", status); + } + if (connecting_fdpass_sock) { + cancel_fd_read_except(connecting_fdpass_sock); + xclose_perhaps(&connecting_fdpass_sock, "connecting fdpass socket",0); + } +} + +#define PREP_DECL_MSG_CMSG(msg) \ + char msgbyte= 0; \ + struct iovec msgiov; \ + msgiov.iov_base= &msgbyte; \ + msgiov.iov_len= 1; \ + struct msghdr msg; \ + memset(&msg,0,sizeof(msg)); \ + char msg##cbuf[CMSG_SPACE(sizeof(int))]; \ + msg.msg_iov= &msgiov; \ + msg.msg_iovlen= 1; \ + msg.msg_control= msg##cbuf; \ + msg.msg_controllen= sizeof(msg##cbuf); + +static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { + Conn *conn= 0; + + assert(fd == connecting_fdpass_sock); + + PREP_DECL_MSG_CMSG(msg); + + ssize_t rs= recvmsg(fd, &msg, 0); + if (rs<0) { + if (isewouldblock(errno)) return OOP_CONTINUE; + syswarn("failed to read socket from connecting child"); + goto x; + } + + NEW(conn); + LIST_INIT(conn->waiting); + LIST_INIT(conn->priority); + LIST_INIT(conn->sent); + + struct cmsghdr *h= 0; + if (rs >= 0) h= CMSG_FIRSTHDR(&msg); + if (!h) { + int status= xwaitpid(&connecting_child, "connect child (broken)"); + + if (WIFEXITED(status)) { + if (WEXITSTATUS(status) != 0 && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM) + /* child already reported the problem */; + else { + if (e == OOP_EXCEPTION) + warn("connect: connection child exited code %d but" + " unexpected exception on fdpass socket", + WEXITSTATUS(status)); + else + warn("connect: connection child exited code %d but" + " no cmsg (rs=%d)", + WEXITSTATUS(status), (int)rs); + } + } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { + warn("connect: connection attempt timed out"); + } else { + report_child_status("connect", status); + } + goto x; + } + +#define CHK(field, val) \ + if (h->cmsg_##field != val) { \ + crash("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \ + h->cmsg_##field, val); \ + goto x; \ + } + CHK(level, SOL_SOCKET); + CHK(type, SCM_RIGHTS); + CHK(len, CMSG_LEN(sizeof(conn->fd))); +#undef CHK + + if (CMSG_NXTHDR(&msg,h)) crash("connect: child sent many cmsgs"); + + memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd)); + + int status; + pid_t got= waitpid(connecting_child, &status, 0); + if (got==-1) syscrash("connect: real wait for child"); + assert(got == connecting_child); + connecting_child= 0; + + if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; } + int es= WEXITSTATUS(status); + switch (es) { + case CONNCHILD_ESTATUS_STREAM: conn->stream= 1; break; + case CONNCHILD_ESTATUS_NOSTREAM: conn->stream= 0; break; + default: + die("connect: child gave unexpected exit status %d", es); + } + + /* Phew! */ + conn->max_queue= conn->stream ? max_queue_per_conn : 1; + + loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn); + conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */ + if (!conn->fd) crash("oop_rd_new_fd conn failed (fd=%d)",conn->fd); + int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN, + &peer_rd_ok, conn, + &peer_rd_err, conn); + if (r) syscrash("oop_rd_read for peer (fd=%d)",conn->fd); + + LIST_ADDHEAD(conns, conn); + notice("C%d (now %d) connected %s", + conn->fd, conns.count, conn->stream ? "streaming" : "plain"); + + connect_attempt_discard(); + check_assign_articles(); + return OOP_CONTINUE; + + x: + conn_dispose(conn); + connect_attempt_discard(); + reconnect_blocking_event(); + return OOP_CONTINUE; +} + +static int allow_connect_start(void) { + return conns.count < max_connections + && !connecting_child + && !until_connect; +} + +static void connect_start(void) { + assert(!connecting_child); + assert(!connecting_fdpass_sock); + + info("starting connection attempt"); + int ok_until_connect= until_connect; + reconnect_blocking_event(); + + int socks[2]; + int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks); + if (r) { syswarn("connect: cannot create socketpair for child"); return; } + + connecting_child= xfork("connection"); + + if (!connecting_child) { + FILE *cn_from, *cn_to; + char buf[NNTP_STRLEN+100]; + int exitstatus= CONNCHILD_ESTATUS_NOSTREAM; + + xclose(socks[0], "(in child) parent's connection fdpass socket",0); + + alarm(connection_setup_timeout); + if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) { + int l= strlen(buf); + int stripped=0; + while (l>0) { + unsigned char c= buf[l-1]; + if (!isspace(c)) break; + if (c=='\n' || c=='\r') stripped=1; + --l; + } + if (!buf[0]) { + sysdie("connect: connection attempt failed"); + } else { + buf[l]= 0; + die("connect: %s: %s", stripped ? "rejected" : "failed", + sanitise(buf,-1)); + } + } + if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0) + sysdie("connect: authentication failed"); + if (try_stream) { + if (fputs("MODE STREAM\r\n", cn_to)==EOF || + fflush(cn_to)) + sysdie("connect: could not send MODE STREAM"); + buf[sizeof(buf)-1]= 0; + if (!fgets(buf, sizeof(buf)-1, cn_from)) { + if (ferror(cn_from)) + sysdie("connect: could not read response to MODE STREAM"); + else + die("connect: connection close in response to MODE STREAM"); + } + int l= strlen(buf); + assert(l>=1); + if (buf[l-1]!='\n') + die("connect: response to MODE STREAM is too long: %.100s...", + sanitise(buf,-1)); + l--; if (l>0 && buf[l-1]=='\r') l--; + buf[l]= 0; + char *ep; + int rcode= strtoul(buf,&ep,10); + if (ep != &buf[3]) + die("connect: bad response to MODE STREAM: %.50s", sanitise(buf,-1)); + + switch (rcode) { + case 203: + exitstatus= CONNCHILD_ESTATUS_STREAM; + break; + case 480: + case 500: + break; + default: + warn("connect: unexpected response to MODE STREAM: %.50s", + sanitise(buf,-1)); + exitstatus= 2; + break; + } + } + int fd= fileno(cn_from); + + PREP_DECL_MSG_CMSG(msg); + struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level= SOL_SOCKET; + cmsg->cmsg_type= SCM_RIGHTS; + cmsg->cmsg_len= CMSG_LEN(sizeof(fd)); + memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd)); + + msg.msg_controllen= cmsg->cmsg_len; + r= sendmsg(socks[1], &msg, 0); + if (r<0) syscrash("sendmsg failed for new connection"); + if (r!=1) crash("sendmsg for new connection gave wrong result %d",r); + + _exit(exitstatus); + } + + xclose(socks[1], "connecting fdpass child's socket",0); + connecting_fdpass_sock= socks[0]; + xsetnonblock(connecting_fdpass_sock, 1); + on_fd_read_except(connecting_fdpass_sock, connchild_event); + + if (!conns.count) + until_connect= ok_until_connect; +} + diff --git a/decls-junk b/decls-junk new file mode 100644 index 0000000..8bf9a86 --- /dev/null +++ b/decls-junk @@ -0,0 +1,57 @@ + + + +/*----- function predeclarations -----*/ + +void conn_maybe_write(Conn *conn); +void conn_make_some_xmits(Conn *conn); +void *conn_write_some_xmits(Conn *conn); + +void xmit_free(XmitDetails *d); + +#define SMS(newstate, periods, why) \ + (statemc_setstate(sm_##newstate,(periods),#newstate,(why))) +void statemc_setstate(StateMachineState newsms, int periods, + const char *forlog, const char *why); + +void statemc_start_flush(const char *why); /* Normal => Flushing */ +void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */ +int trigger_flush_ok(const char *why /* 0 means timeout */); + /* => Flushing,FLUSHING, ret 1; or ret 0 */ + +void article_done(Article *art, int whichcount); + +void check_assign_articles(void); +void queue_check_input_done(void); +void check_reading_pause_resume(InputFile *ipf); + +void statemc_check_flushing_done(void); +void statemc_check_backlog_done(void); + +void postfork(void); +void period(void); + +void open_defer(void); +void close_defer(void); +void search_backlog_file(void); +void preterminate(void); +void raise_default(int signo) NORET; + +void inputfile_reading_start(InputFile *ipf); +void inputfile_reading_stop(InputFile *ipf); +void inputfile_reading_pause(InputFile *ipf); +void inputfile_reading_resume(InputFile *ipf); + /* pause and resume are idempotent, and no-op if not done _reading_start */ + +void filemon_start(InputFile *ipf); +void filemon_stop(InputFile *ipf); +void tailing_make_readable(InputFile *ipf); + +void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0); +void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3); + +extern const oop_rd_style peer_rd_style; +extern oop_rd_call peer_rd_err, peer_rd_ok; + + + diff --git a/defer.c b/defer.c new file mode 100644 index 0000000..9af9d58 --- /dev/null +++ b/defer.c @@ -0,0 +1,192 @@ +/*---------- defer and backlog files ----------*/ + +static void open_defer(void) { + struct stat stab; + + if (defer) return; + + defer= fopen(path_defer, "a+"); + if (!defer) sysdie("could not open defer file %s", path_defer); + + /* truncate away any half-written records */ + + xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file"); + + if (stab.st_size > LONG_MAX) + crash("defer file %s size is far too large", path_defer); + + if (!stab.st_size) + return; + + long orgsize= stab.st_size; + long truncto= stab.st_size; + for (;;) { + if (!truncto) break; /* was only (if anything) one half-truncated record */ + if (fseek(defer, truncto-1, SEEK_SET) < 0) + syscrash("seek in defer file %s while truncating partial", path_defer); + + int r= getc(defer); + if (r==EOF) { + if (ferror(defer)) + syscrash("failed read from defer file %s", path_defer); + else + crash("defer file %s shrank while we were checking it!", path_defer); + } + if (r=='\n') break; + truncto--; + } + + if (stab.st_size != truncto) { + warn("truncating half-record at end of defer file %s -" + " shrinking by %ld bytes from %ld to %ld", + path_defer, orgsize - truncto, orgsize, truncto); + + if (fflush(defer)) + sysdie("could not flush defer file %s", path_defer); + if (ftruncate(fileno(defer), truncto)) + syscrash("could not truncate defer file %s", path_defer); + + } else { + info("continuing existing defer file %s (%ld bytes)", + path_defer, orgsize); + } + if (fseek(defer, truncto, SEEK_SET)) + syscrash("could not seek to new end of defer file %s", path_defer); +} + +static void close_defer(void) { + if (!defer) + return; + + struct stat stab; + xfstat_isreg(fileno(defer), &stab, path_defer, "defer file"); + + if (fclose(defer)) sysdie("could not close defer file %s", path_defer); + defer= 0; + + time_t now= xtime(); + + char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile, + (unsigned long)now, + (unsigned long)stab.st_ino); + if (link(path_defer, backlog)) + sysdie("could not install defer file %s as backlog file %s", + path_defer, backlog); + if (unlink(path_defer)) + syscrash("could not unlink old defer link %s to backlog file %s", + path_defer, backlog); + + free(backlog); + + if (until_backlog_nextscan < 0 || + until_backlog_nextscan > backlog_retry_minperiods + 1) + until_backlog_nextscan= backlog_retry_minperiods + 1; +} + +void poll_backlog_file(void) { + if (until_backlog_nextscan < 0) return; + if (until_backlog_nextscan-- > 0) return; + search_backlog_file(); +} + +static void search_backlog_file(void) { + /* returns non-0 iff there are any backlog files */ + + glob_t gl; + int r; + unsigned ui; + struct stat stab; + const char *oldest_path=0; + time_t oldest_mtime=0, now; + + if (backlog_input_file) return; + + try_again: + + r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl); + + switch (r) { + case GLOB_ABORTED: + sysdie("failed to expand backlog pattern %s", globpat_backlog); + case GLOB_NOSPACE: + die("out of memory expanding backlog pattern %s", globpat_backlog); + case 0: + for (ui=0; ui= 0 && + until_backlog_nextscan > backlog_spontrescan_periods) + until_backlog_nextscan= backlog_spontrescan_periods; + + dbg("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s", + age, age_deficiency, until_backlog_nextscan, oldest_path); + + xfree: + globfree(&gl); + return; +} + diff --git a/filemon.c b/filemon.c new file mode 100644 index 0000000..9a362db --- /dev/null +++ b/filemon.c @@ -0,0 +1,121 @@ +/*---------- filemon implemented with inotify ----------*/ + +#if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON) +#define HAVE_FILEMON + +#include + +static int filemon_inotify_fd; +static int filemon_inotify_wdmax; +static InputFile **filemon_inotify_wd2ipf; + +struct Filemon_Perfile { + int wd; +}; + +static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { + int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY); + if (wd < 0) sysdie("inotify_add_watch %s", ipf->path); + + if (wd >= filemon_inotify_wdmax) { + int newmax= wd+2; + filemon_inotify_wd2ipf= xrealloc(filemon_inotify_wd2ipf, + sizeof(*filemon_inotify_wd2ipf) * newmax); + memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0, + sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax)); + filemon_inotify_wdmax= newmax; + } + + assert(!filemon_inotify_wd2ipf[wd]); + filemon_inotify_wd2ipf[wd]= ipf; + + dbg("filemon inotify startfile %p wd=%d wdmax=%d", + ipf, wd, filemon_inotify_wdmax); + + pf->wd= wd; +} + +static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { + int wd= pf->wd; + dbg("filemon inotify stopfile %p wd=%d", ipf, wd); + int r= inotify_rm_watch(filemon_inotify_fd, wd); + if (r) syscrash("inotify_rm_watch"); + filemon_inotify_wd2ipf[wd]= 0; +} + +static void *filemon_inotify_readable(oop_source *lp, int fd, + oop_event e, void *u) { + struct inotify_event iev; + for (;;) { + int r= read(filemon_inotify_fd, &iev, sizeof(iev)); + if (r==-1) { + if (isewouldblock(errno)) break; + syscrash("read from inotify master"); + } else if (r==sizeof(iev)) { + assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax); + } else { + crash("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev)); + } + InputFile *ipf= filemon_inotify_wd2ipf[iev.wd]; + /*dbg("filemon inotify readable read %p wd=%d", ipf, iev.wd);*/ + tailing_make_readable(ipf); + } + return OOP_CONTINUE; +} + +static int filemon_method_init(void) { + filemon_inotify_fd= inotify_init(); + if (filemon_inotify_fd<0) { + syswarn("filemon/inotify: inotify_init failed"); + return 0; + } + xsetnonblock(filemon_inotify_fd, 1); + loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable, 0); + + dbg("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd); + return 1; +} + +static void filemon_method_dump_info(FILE *f) { + int i; + fprintf(f,"inotify"); + DUMPV("%d",,filemon_inotify_fd); + DUMPV("%d",,filemon_inotify_wdmax); + for (i=0; ifilemon); + + NEW(ipf->filemon); + filemon_method_startfile(ipf, ipf->filemon); +} + +static void filemon_stop(InputFile *ipf) { + if (!ipf->filemon) return; + filemon_method_stopfile(ipf, ipf->filemon); + free(ipf->filemon); + ipf->filemon= 0; +} + diff --git a/help.c b/help.c new file mode 100644 index 0000000..144ab86 --- /dev/null +++ b/help.c @@ -0,0 +1,284 @@ +/* + * innduct + * tailing reliable realtime streaming feeder for inn + * logging and utility functions + * + * Copyright (C) 2010 Ian Jackson + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * (I believe that when you compile and link this as part of the inn2 + * build, with the Makefile runes I have provided, all the libraries + * and files which end up included in innduct are licence-compatible + * with GPLv3. If not then please let me know. -Ian Jackson.) + */ + +#include "innduct.h" + + +/* for logging, simulation, debugging, etc. */ +int simulate_flush= -1; +int logv_use_syslog; +const char *logv_prefix=""; + +/*========== logging ==========*/ + +static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3); +static void logcore(int sysloglevel, const char *fmt, ...) { + VA; + if (logv_use_syslog) { + vsyslog(sysloglevel,fmt,al); + } else { + if (self_pid) fprintf(stderr,"[%lu] ",(unsigned long)self_pid); + vfprintf(stderr,fmt,al); + putc('\n',stderr); + } + va_end(al); +} + +void logv(int sysloglevel, const char *pfx, int errnoval, + const char *fmt, va_list al) { + char msgbuf[1024]; /* NB do not call xvasprintf here or you'll recurse */ + vsnprintf(msgbuf,sizeof(msgbuf), fmt,al); + msgbuf[sizeof(msgbuf)-1]= 0; + + if (sysloglevel >= LOG_ERR && (errnoval==EACCES || errnoval==EPERM)) + sysloglevel= LOG_ERR; /* run by wrong user, probably */ + + logcore(sysloglevel, "%s%s: %s%s%s", + logv_prefix, pfx, msgbuf, + errnoval>=0 ? ": " : "", + errnoval>=0 ? strerror(errnoval) : ""); +} + +#define DEFFATAL(fn, pfx, sysloglevel, err, estatus) \ + void fn(const char *fmt, ...) { \ + preterminate(); \ + VA; \ + logv(sysloglevel, pfx, err, fmt, al); \ + exit(estatus); \ + } + +#define DEFLOG(fn, pfx, sysloglevel, err) \ + static void fn(const char *fmt, ...) { \ + VA; \ + logv(sysloglevel, pfx, err, fmt, al); \ + va_end(al); \ + } + +#define INNLOGSET_DECLARE(fn, pfx, sysloglevel) \ + static void duct_log_##fn(int l, const char *fmt, va_list al, int errval) \ + PRINTF(3,0); \ + static void duct_log_##fn(int l, const char *fmt, va_list al, int errval) { \ + logv(sysloglevel, pfx, errval ? errval : -1, fmt, al); \ + } +#define INNLOGSET_CALL(fn, pfx, sysloglevel) \ + message_handlers_##fn(1, duct_log_##fn); + + +static int innduct_fatal_cleanup(void) { return 12; } /* used for libinn die */ + +/* We want to extend the set of logging functions from inn, and we + * want to prepend the site name to all our messages. */ + +DEFFATAL(syscrash, "critical", LOG_CRIT, errno, 16); +DEFFATAL(crash, "critical", LOG_CRIT, -1, 16); + +#define INNLOGSETS(INNLOGSET) \ + INNLOGSET(die, "fatal", LOG_ERR) \ + INNLOGSET(warn, "warning", LOG_WARNING) \ + INNLOGSET(notice, "notice", LOG_NOTICE) \ + INNLOGSET(trace, "trace", LOG_NOTICE) +INNLOGSETS(INNLOGSET_DECLARE) + +DEFLOG(info, "info", LOG_INFO, -1) +DEFLOG(dbg, "debug", LOG_DEBUG, -1) + + +/*========== utility functions etc. ==========*/ + +char *xvasprintf(const char *fmt, va_list al) { + char *str; + int rc= vasprintf(&str,fmt,al); + if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt); + return str; +} + +char *xasprintf(const char *fmt, ...) { + VA; + char *str= xvasprintf(fmt,al); + va_end(al); + return str; +} + +int close_perhaps(int *fd) { + if (*fd <= 0) return 0; + int r= close(*fd); + *fd=0; + return r; +} +void xclose(int fd, const char *what, const char *what2) { + int r= close(fd); + if (r) syscrash("close %s%s",what,what2?what2:""); +} +void xclose_perhaps(int *fd, const char *what, const char *what2) { + if (*fd <= 0) return; + xclose(*fd,what,what2); + *fd=0; +} + +pid_t xfork(const char *what) { + pid_t child; + + child= fork(); + if (child==-1) sysdie("cannot fork for %s",what); + dbg("forked %s %ld", what, (unsigned long)child); + if (!child) postfork(); + return child; +} + +void on_fd_read_except(int fd, oop_call_fd callback) { + loop->on_fd(loop, fd, OOP_READ, callback, 0); + loop->on_fd(loop, fd, OOP_EXCEPTION, callback, 0); +} +void cancel_fd_read_except(int fd) { + loop->cancel_fd(loop, fd, OOP_READ); + loop->cancel_fd(loop, fd, OOP_EXCEPTION); +} + +void report_child_status(const char *what, int status) { + if (WIFEXITED(status)) { + int es= WEXITSTATUS(status); + if (es) + warn("%s: child died with error exit status %d", what, es); + } else if (WIFSIGNALED(status)) { + int sig= WTERMSIG(status); + const char *sigstr= strsignal(sig); + const char *coredump= WCOREDUMP(status) ? " (core dumped)" : ""; + if (sigstr) + warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump); + else + warn("%s: child died due to unknown fatal signal %d%s", + what, sig, coredump); + } else { + warn("%s: child died with unknown wait status %d", what,status); + } +} + +int xwaitpid(pid_t *pid, const char *what) { + int status; + + int r= kill(*pid, SIGKILL); + if (r) syscrash("cannot kill %s child", what); + + pid_t got= waitpid(*pid, &status, 0); + if (got==-1) syscrash("cannot reap %s child", what); + if (got==0) crash("cannot reap %s child", what); + + *pid= 0; + + return status; +} + +void *zxmalloc(size_t sz) { + void *p= xmalloc(sz); + memset(p,0,sz); + return p; +} + +void xunlink(const char *path, const char *what) { + int r= unlink(path); + if (r) syscrash("can't unlink %s %s", path, what); +} + +time_t xtime(void) { + time_t now= time(0); + if (now==-1) syscrash("time(2) failed"); + return now; +} + +void xsigaction(int signo, const struct sigaction *sa) { + int r= sigaction(signo,sa,0); + if (r) syscrash("sigaction failed for \"%s\"", strsignal(signo)); +} +void xsigsetdefault(int signo) { + struct sigaction sa; + memset(&sa,0,sizeof(sa)); + sa.sa_handler= SIG_DFL; + xsigaction(signo,&sa); +} + +void xgettimeofday(struct timeval *tv_r) { + int r= gettimeofday(tv_r,0); + if (r) syscrash("gettimeofday(2) failed"); +} +void xsetnonblock(int fd, int nonb) { + int errnoval= oop_fd_nonblock(fd, nonb); + if (errnoval) { errno= errnoval; syscrash("setnonblocking"); } +} + +void check_isreg(const struct stat *stab, const char *path, + const char *what) { + if (!S_ISREG(stab->st_mode)) + crash("%s %s not a plain file (mode 0%lo)", + what, path, (unsigned long)stab->st_mode); +} + +static void xfstat(int fd, struct stat *stab_r, const char *what) { + int r= fstat(fd, stab_r); + if (r) syscrash("could not fstat %s", what); +} + +static void xfstat_isreg(int fd, struct stat *stab_r, + const char *path, const char *what) { + xfstat(fd, stab_r, what); + check_isreg(stab_r, path, what); +} + +void xlstat_isreg(const char *path, struct stat *stab, + int *enoent_r /* 0 means ENOENT is fatal */, + const char *what) { + int r= lstat(path, stab); + if (r) { + if (errno==ENOENT && enoent_r) { *enoent_r=1; return; } + syscrash("could not lstat %s %s", what, path); + } + if (enoent_r) *enoent_r= 0; + check_isreg(stab, path, what); +} + +int samefile(const struct stat *a, const struct stat *b) { + assert(S_ISREG(a->st_mode)); + assert(S_ISREG(b->st_mode)); + return (a->st_ino == b->st_ino && + a->st_dev == b->st_dev); +} + +char *sanitise(const char *input, int len) { + static char sanibuf[100]; /* returns pointer to this buffer! */ + + const char *p= input; + const char *endp= len>=0 ? input+len : 0; + char *q= sanibuf; + *q++= '`'; + for (;;) { + if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"'.."); break; } + int c= (!endp || p=' ' && c<=126 && c!='\\') { *q++= c; continue; } + sprintf(q,"\\x%02x",c); + q += 4; + } + return sanibuf; +} diff --git a/infile.c b/infile.c new file mode 100644 index 0000000..223846e --- /dev/null +++ b/infile.c @@ -0,0 +1,286 @@ +/*========== monitoring of input files ==========*/ + +static void feedfile_eof(InputFile *ipf) { + assert(ipf != main_input_file); /* promised by tailing_try_read */ + inputfile_reading_stop(ipf); + + if (ipf == flushing_input_file) { + assert(sms==sm_SEPARATED || sms==sm_DROPPING); + if (main_input_file) inputfile_reading_start(main_input_file); + statemc_check_flushing_done(); + } else if (ipf == backlog_input_file) { + statemc_check_backlog_done(); + } else { + abort(); /* supposed to wait rather than get EOF on main input file */ + } +} + +static InputFile *open_input_file(const char *path) { + int fd= open(path, O_RDWR); + if (fd<0) { + if (errno==ENOENT) return 0; + sysdie("unable to open input file %s", path); + } + assert(fd>0); + + InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1); + memset(ipf,0,sizeof(*ipf)); + + ipf->fd= fd; + ipf->autodefer= -1; + LIST_INIT(ipf->queue); + strcpy(ipf->path, path); + + return ipf; +} + +static void close_input_file(InputFile *ipf) { /* does not free */ + assert(!ipf->readable_callback); /* must have had ->on_cancel */ + assert(!ipf->filemon); /* must have had inputfile_reading_stop */ + assert(!ipf->rd); /* must have had inputfile_reading_stop */ + assert(!ipf->inprogress); /* no dangling pointers pointing here */ + xclose_perhaps(&ipf->fd, "input file ", ipf->path); +} + + +/*---------- dealing with articles read in the input file ----------*/ + +static void *feedfile_got_bad_data(InputFile *ipf, off_t offset, + const char *data, const char *how) { + warn("corrupted file: %s, offset %lu: %s: in %s", + ipf->path, (unsigned long)offset, how, sanitise(data,-1)); + ipf->readcount_err++; + if (ipf->readcount_err > max_bad_data_initial + + (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio) + crash("too much garbage in input file! (%d errs, %d ok, %d blank)", + ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank); + return OOP_CONTINUE; +} + +static void *feedfile_read_err(oop_source *lp, oop_read *rd, + oop_rd_event ev, const char *errmsg, + int errnoval, const char *data, size_t recsz, + void *ipf_v) { + InputFile *ipf= ipf_v; + assert(ev == OOP_RD_SYSTEM); + errno= errnoval; + syscrash("error reading input file: %s, offset %lu", + ipf->path, (unsigned long)ipf->offset); +} + +static void *feedfile_got_article(oop_source *lp, oop_read *rd, + oop_rd_event ev, const char *errmsg, + int errnoval, const char *data, size_t recsz, + void *ipf_v) { + InputFile *ipf= ipf_v; + Article *art; + char tokentextbuf[sizeof(TOKEN)*2+3]; + + if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; } + + off_t old_offset= ipf->offset; + ipf->offset += recsz + !!(ev == OOP_RD_OK); + +#define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m); + + if (ev==OOP_RD_PARTREC) + feedfile_got_bad_data(ipf,old_offset,data,"missing final newline"); + /* but process it anyway */ + + if (ipf->skippinglong) { + if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */ + return OOP_CONTINUE; + } + if (ev==OOP_RD_LONG) { + ipf->skippinglong= 1; + X_BAD_DATA("overly long line"); + } + + if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte"); + if (!recsz) X_BAD_DATA("empty line"); + + if (data[0]==' ') { + if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked"); + ipf->readcount_blank++; + return OOP_CONTINUE; + } + + char *space= strchr(data,' '); + int tokenlen= space-data; + int midlen= (int)recsz-tokenlen-1; + if (midlen <= 2) X_BAD_DATA("no room for messageid"); + if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid"); + + if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length"); + memcpy(tokentextbuf, data, tokenlen); + tokentextbuf[tokenlen]= 0; + if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax"); + + ipf->readcount_ok++; + + art= xmalloc(sizeof(*art) - 1 + midlen + 1); + memset(art,0,sizeof(*art)); + art->state= art_Unchecked; + art->midlen= midlen; + art->ipf= ipf; ipf->inprogress++; + art->token= TextToToken(tokentextbuf); + art->offset= old_offset; + art->blanklen= recsz; + strcpy(art->messageid, space+1); + + if (ipf->autodefer >= 0) { + article_autodefer(ipf, art); + } else { + LIST_ADDTAIL(ipf->queue, art); + + if (ipf==backlog_input_file) + article_check_expired(art); + } + + if (sms==sm_NORMAL && ipf==main_input_file && + ipf->offset >= target_max_feedfile_size) + statemc_start_flush("feed file size"); + + check_assign_articles(); /* may destroy conn but that's OK */ + check_reading_pause_resume(ipf); + return OOP_CONTINUE; +} + +/*========== tailing input file ==========*/ + +static void *tailing_rable_call_time(oop_source *lp, struct timeval tv, + void *user) { + /* lifetime of ipf here is OK because destruction will cause + * on_cancel which will cancel this callback */ + InputFile *ipf= user; + + dbg("**TRACT** ipf=%p called",ipf); + if (!ipf->fake_readable) return OOP_CONTINUE; + + /* we just keep calling readable until our caller (oop_rd) + * has called try_read, and try_read has found EOF so given EAGAIN */ + dbg("**TRACT** ipf=%p reschedule",ipf); + loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); + + return ipf->readable_callback(loop, &ipf->readable, + ipf->readable_callback_user); +} + +static void tailing_on_cancel(struct oop_readable *rable) { + InputFile *ipf= (void*)rable; + dbg("**TOR** ipf=%p on_cancel",ipf); + + if (ipf->filemon) filemon_stop(ipf); + dbg("**TRACT** ipf=%p cancel",ipf); + loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); + ipf->readable_callback= 0; +} + +static void tailing_make_readable(InputFile *ipf) { + dbg("**TRACT** ipf=%p makereadable rcb=%p",ipf, + (void*)ipf?ipf->readable_callback:0); + if (!ipf || !ipf->readable_callback) /* so callers can be naive */ + return; + ipf->fake_readable= 1; + loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); +} + +static int tailing_on_readable(struct oop_readable *rable, + oop_readable_call *cb, void *user) { + InputFile *ipf= (void*)rable; + dbg("**TOR** ipf=%p on_readable",ipf); + + tailing_on_cancel(rable); + ipf->readable_callback= cb; + ipf->readable_callback_user= user; + filemon_start(ipf); + tailing_make_readable(ipf); + return 0; +} + +static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, + size_t length) { + InputFile *ipf= (void*)rable; + for (;;) { + ssize_t r= read(ipf->fd, buffer, length); + if (r==-1) { + if (errno==EINTR) continue; + ipf->fake_readable= 0; + return r; + } + if (!r) { + if (ipf==main_input_file) { + errno=EAGAIN; + ipf->fake_readable= 0; + return -1; + } else if (ipf==flushing_input_file) { + assert(ipf->rd); + assert(sms==sm_SEPARATED || sms==sm_DROPPING); + } else if (ipf==backlog_input_file) { + assert(ipf->rd); + } else { + abort(); + } + } + dbg("**TOR** ipf=%p try_read r=%d",ipf,r); + return r; + } +} + +/*---------- interface to start and stop an input file ----------*/ + +static const oop_rd_style feedfile_rdstyle= { + OOP_RD_DELIM_STRIP, '\n', + OOP_RD_NUL_PERMIT, + OOP_RD_SHORTREC_LONG, +}; + +static void inputfile_reading_resume(InputFile *ipf) { + if (!ipf->rd) return; + if (!ipf->paused) return; + + int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE, + feedfile_got_article,ipf, feedfile_read_err, ipf); + if (r) syscrash("unable start reading feedfile %s",ipf->path); + + ipf->paused= 0; +} + +static void inputfile_reading_pause(InputFile *ipf) { + if (!ipf->rd) return; + if (ipf->paused) return; + oop_rd_cancel(ipf->rd); + ipf->paused= 1; +} + +static void inputfile_reading_start(InputFile *ipf) { + assert(!ipf->rd); + ipf->readable.on_readable= tailing_on_readable; + ipf->readable.on_cancel= tailing_on_cancel; + ipf->readable.try_read= tailing_try_read; + ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */ + ipf->readable.delete_kill= 0; + + ipf->readable_callback= 0; + ipf->readable_callback_user= 0; + + ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0); + assert(ipf->rd); + + ipf->paused= 1; + inputfile_reading_resume(ipf); +} + +static void inputfile_reading_stop(InputFile *ipf) { + assert(ipf->rd); + inputfile_reading_pause(ipf); + oop_rd_delete(ipf->rd); + ipf->rd= 0; + assert(!ipf->filemon); /* we shouldn't be monitoring it now */ +} + +void filepoll(void) { + tailing_make_readable(main_input_file); + tailing_make_readable(flushing_input_file); +} + diff --git a/innduct.c b/innduct.c index 5755855..7ade8b6 100644 --- a/innduct.c +++ b/innduct.c @@ -1,6 +1,7 @@ /* * innduct * tailing reliable realtime streaming feeder for inn + * main program - option parsing and startup * * Copyright (C) 2010 Ian Jackson * @@ -23,3234 +24,42 @@ * with GPLv3. If not then please let me know. -Ian Jackson.) */ -/* - * Newsfeeds file entries should look like this: - * host.name.of.site[/exclude,exclude,...]\ - * :pattern,pattern...[/distribution,distribution...]\ - * :Tf,Wnm - * : - * or - * sitename[/exclude,exclude,...]\ - * :pattern,pattern...[/distribution,distribution...]\ - * :Tf,Wnm - * :host.name.of.site - * - * Four files full of - * token messageid - * or might be blanked out - * .... - * - * F site.name main feed file - * opened/created, then written, by innd - * read by duct - * unlinked by duct - * tokens blanked out by duct when processed - * site.name_lock lock preventing multiple ducts - * to hold lock must open,F_SETLK[W] - * and then stat to check that locked file - * still has name site.name_lock - * holder of this lock is "duct" - * (only) lockholder may remove the lockfile - * D site.name_flushing temporary feed file during flush (or crash) - * hardlink created by duct - * unlinked by duct - * site.name_defer 431'd articles, still being written, - * created, written, used by duct - * - * site.name_backlog.. - * 431'd articles, ready for innxmit or duct - * created (link/mv) by duct - * site.name_backlog (where does not - * contain '#' or '~') eg - * site.name_backlog.manual - * anything the sysadmin likes (eg, feed files - * from old feeds to be merged into this one) - * created (link/mv) by admin - * may be symlinks (in which case links - * may be written through, but only links - * will be removed. - * - * It is safe to remove backlog files manually, - * if it's desired to throw away the backlog. - * - * Backlog files are also processed by innduct. We find the oldest - * backlog file which is at least a certain amount old, and feed it - * back into our processing. When every article in it has been read - * and processed, we unlink it and look for another backlog file. - * - * If we don't have a backlog file that we're reading, we close the - * defer file that we're writing and make it into a backlog file at - * the first convenient opportunity. - * -8<- - - - OVERALL STATES: - - START - | - ,-->--. check F, D - | | | - | | | - | | <----------------<---------------------------------'| - | | F exists | - | | D ENOENT | - | | duct opens F | - | V | - | Normal | - | F: innd writing, duct reading | - | D: ENOENT | - | | | - | | duct decides time to flush | - | | duct makes hardlink | - | | | - | V <------------------------'| - | Hardlinked F==D | - | F == D: innd writing, duct reading both exist | - ^ | | - | | duct unlinks F | - | | <-----------<-------------<--'| - | | open D F ENOENT | - | | if exists | - | | | - | V <---------------------. | - | Moved | | - | F: ENOENT | | - | D: innd writing, duct reading; or ENOENT | | - | | | | - | | duct requests flush of feed | | - | | (others can too, harmlessly) | | - | V | | - | Flushing | | - | F: ENOENT | | - | D: innd flushing, duct; or ENOENT | | - | | | | - | | inndcomm flush fails | | - | |`-------------------------->------------------' | - | | | - | | inndcomm reports no such site | - | |`---------------------------------------------------- | -. - | | | | - | | innd finishes writing D, creates F | | - | | inndcomm reports flush successful | | - | | | | - | V | | - | Separated <----------------' | - | F: innd writing F!=D / - | D: duct reading; or ENOENT both exist / - | | / - | | duct gets to the end of D / - | | duct opens F too / - | V / - | Finishing / - | F: innd writing, duct reading | - | D: duct finishing V - | | Dropping - | | duct finishes processing D F: ENOENT - | V duct unlinks D D: duct reading - | | | - `--<--' | duct finishes - | processing D - | duct unlinks D - | duct exits - V - Dropped - F: ENOENT - D: ENOENT - duct not running - - "duct reading" means innduct is reading the file but also - overwriting processed tokens. - - * ->8- -^L- - * - * rune for printing diagrams: - -perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct.c |a2ps -R -B -ops - - * - */ - -/*============================== PROGRAM ==============================*/ - -#define _GNU_SOURCE 1 - -#include "config.h" -#include "storage.h" -#include "nntp.h" -#include "libinn.h" -#include "inndcomm.h" - -#include "inn/list.h" -#include "inn/innconf.h" -#include "inn/messages.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -/*----- general definitions, probably best not changed -----*/ - -#define CONNCHILD_ESTATUS_STREAM 24 -#define CONNCHILD_ESTATUS_NOSTREAM 25 - -#define INNDCOMMCHILD_ESTATUS_FAIL 26 -#define INNDCOMMCHILD_ESTATUS_NONESUCH 27 - -#define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10) -#define MAX_CLI_COMMAND 1000 - -#define VA va_list al; va_start(al,fmt) -#define PRINTF(f,a) __attribute__((__format__(printf,f,a))) -#define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a))) -#define NORET __attribute__((__noreturn__)) - -#define NEW(ptr) ((ptr)= zxmalloc(sizeof(*(ptr)))) -#define NEW_DECL(type,ptr) type ptr = zxmalloc(sizeof(*(ptr))) - -#define DUMPV(fmt,pfx,v) fprintf(f, " " #v "=" fmt, pfx v); - -#define FOR_CONN(conn) \ - for ((conn)=LIST_HEAD(conns); (conn); (conn)=LIST_NEXT((conn))) - -/*----- doubly linked lists -----*/ - -#define ISNODE(T) struct node list_node -#define DEFLIST(T) \ - typedef struct { \ - union { struct list li; T *for_type; } u; \ - int count; \ - } T##List - -#define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node) - -#define LIST_CHECKCANHAVENODE(l,n) \ - ((void)((n) == ((l).u.for_type))) /* just for the type check */ - -#define LIST_ADDSOMEHOW(l,n,list_addsomehow) \ - ( LIST_CHECKCANHAVENODE(l,n), \ - list_addsomehow(&(l).u.li, NODE((n))), \ - (void)(l).count++ \ - ) - -#define LIST_REMSOMEHOW(l,list_remsomehow) \ - ( (typeof((l).u.for_type)) \ - ( (l).count \ - ? ( (l).count--, \ - list_remsomehow(&(l).u.li) ) \ - : 0 \ - ) \ - ) - - -#define LIST_ADDHEAD(l,n) LIST_ADDSOMEHOW((l),(n),list_addhead) -#define LIST_ADDTAIL(l,n) LIST_ADDSOMEHOW((l),(n),list_addtail) -#define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead) -#define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail) - -#define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li)) -#define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l)))) -#define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n)))) -#define LIST_BACK(n) ((typeof(n))list_pred(NODE((n)))) - -#define LIST_REMOVE(l,n) \ - ( LIST_CHECKCANHAVENODE(l,n), \ - list_remove(NODE((n))), \ - (void)(l).count-- \ - ) - -#define LIST_INSERT(l,n,pred) \ - ( LIST_CHECKCANHAVENODE(l,n), \ - LIST_CHECKCANHAVENODE(l,pred), \ - list_insert((struct list*)&(l), NODE((n)), NODE((pred))), \ - (void)(l).count++ \ - ) - -/*----- type predeclarations -----*/ - -typedef struct Conn Conn; -typedef struct Article Article; -typedef struct InputFile InputFile; -typedef struct XmitDetails XmitDetails; -typedef struct Filemon_Perfile Filemon_Perfile; -typedef enum StateMachineState StateMachineState; -typedef struct CliCommand CliCommand; - -DEFLIST(Conn); -DEFLIST(Article); - -/*----- function predeclarations -----*/ - -static void conn_maybe_write(Conn *conn); -static void conn_make_some_xmits(Conn *conn); -static void *conn_write_some_xmits(Conn *conn); - -static void xmit_free(XmitDetails *d); - -#define SMS(newstate, periods, why) \ - (statemc_setstate(sm_##newstate,(periods),#newstate,(why))) -static void statemc_setstate(StateMachineState newsms, int periods, - const char *forlog, const char *why); - -static void statemc_start_flush(const char *why); /* Normal => Flushing */ -static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */ -static int trigger_flush_ok(const char *why /* 0 means timeout */); - /* => Flushing,FLUSHING, ret 1; or ret 0 */ - -static void article_done(Article *art, int whichcount); - -static void check_assign_articles(void); -static void queue_check_input_done(void); -static void check_reading_pause_resume(InputFile *ipf); - -static void statemc_check_flushing_done(void); -static void statemc_check_backlog_done(void); - -static void postfork(void); -static void period(void); - -static void open_defer(void); -static void close_defer(void); -static void search_backlog_file(void); -static void preterminate(void); -static void raise_default(int signo) NORET; - -static void inputfile_reading_start(InputFile *ipf); -static void inputfile_reading_stop(InputFile *ipf); -static void inputfile_reading_pause(InputFile *ipf); -static void inputfile_reading_resume(InputFile *ipf); - /* pause and resume are idempotent, and no-op if not done _reading_start */ - -static void filemon_start(InputFile *ipf); -static void filemon_stop(InputFile *ipf); -static void tailing_make_readable(InputFile *ipf); - -static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0); -static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3); - -static const oop_rd_style peer_rd_style; -static oop_rd_call peer_rd_err, peer_rd_ok; - - -/*----- configuration options -----*/ -/* when changing defaults, remember to update the manpage */ - -static const char *sitename, *remote_host; -static const char *feedfile, *path_run, *path_cli, *path_cli_dir; -static int quiet_multiple=0; -static int interactive=0, try_filemon=1; -static int try_stream=1; -static int port=119; -static const char *inndconffile; - -static int max_connections=10; -static int max_queue_per_conn=200; -static int target_max_feedfile_size=100000; -static int period_seconds=30; -static int filepoll_seconds=5; -static int max_queue_per_ipf=-1; - -static int connection_setup_timeout=200; -static int inndcomm_flush_timeout=100; - -static double nocheck_thresh= 95.0; /* converted from percentage by main */ -static double nocheck_decay= 100; /* conv'd from articles to lambda by main */ - -/* all these are initialised to seconds, and converted to periods in main */ -static int reconnect_delay_periods=1000; -static int flushfail_retry_periods=1000; -static int backlog_retry_minperiods=100; -static int backlog_spontrescan_periods=300; -static int spontaneous_flush_periods=100000; -static int max_separated_periods=2000; -static int need_activity_periods=1000; -static int lowvol_thresh=3; -static int lowvol_periods=1000; - -static double max_bad_data_ratio= 1; /* conv'd from percentage by main */ -static int max_bad_data_initial= 30; - /* in one corrupt 4096-byte block the number of newlines has - * mean 16 and standard deviation 3.99. 30 corresponds to z=+3.5 */ - - -/*----- statistics -----*/ - -typedef enum { /* in queue in conn->sent */ - art_Unchecked, /* not checked, not sent checking */ - art_Wanted, /* checked, wanted sent body as requested */ - art_Unsolicited, /* - sent body without check */ - art_MaxState, -} ArtState; - -static const char *const artstate_names[]= - { "Unchecked", "Wanted", "Unsolicited", 0 }; - -#define RESULT_COUNTS(RCS,RCN) \ - RCS(sent) \ - RCS(accepted) \ - RCN(unwanted) \ - RCN(rejected) \ - RCN(deferred) \ - RCN(missing) \ - RCN(connretry) - -#define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)" -#define RCI_TRIPLE_VALS_BASE(counts,x) \ - counts[art_Unchecked] x \ - + counts[art_Wanted] x \ - + counts[art_Unsolicited] x, \ - counts[art_Unchecked] x \ - , counts[art_Wanted] x \ - , counts[art_Unsolicited] x - -typedef enum { -#define RC_INDEX(x) RC_##x, - RESULT_COUNTS(RC_INDEX, RC_INDEX) - RCI_max -} ResultCountIndex; - - -/*----- transmission buffers -----*/ - -#define CONNIOVS 128 - -typedef enum { - xk_Const, xk_Artdata -} XmitKind; - -struct XmitDetails { - XmitKind kind; - union { - ARTHANDLE *sm_art; - } info; -}; - - -/*----- core operational data structure types -----*/ - -struct InputFile { - /* This is also an instance of struct oop_readable */ - struct oop_readable readable; /* first */ - oop_readable_call *readable_callback; - void *readable_callback_user; - - int fd; - Filemon_Perfile *filemon; - - oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */ - off_t offset; - int skippinglong, paused, fake_readable; - - ArticleList queue; - long inprogress; /* includes queue.count and also articles in conns */ - long autodefer; /* -1 means not doing autodefer */ - - int counts[art_MaxState][RCI_max]; - int readcount_ok, readcount_blank, readcount_err, count_nooffer_missing; - char path[]; -}; - -struct Article { - ISNODE(Article); - ArtState state; - int midlen, missing; - InputFile *ipf; - TOKEN token; - off_t offset; - int blanklen; - char messageid[1]; -}; - -#define SMS_LIST(X) \ - X(NORMAL) \ - X(FLUSHING) \ - X(FLUSHFAILED) \ - X(SEPARATED) \ - X(DROPPING) \ - X(DROPPED) - -enum StateMachineState { -#define SMS_DEF_ENUM(s) sm_##s, - SMS_LIST(SMS_DEF_ENUM) -}; - -static const char *sms_names[]= { -#define SMS_DEF_NAME(s) #s , - SMS_LIST(SMS_DEF_NAME) - 0 -}; - -struct Conn { - ISNODE(Conn); - int fd; /* may be 0, meaning closed (during construction/destruction) */ - oop_read *rd; /* likewise */ - int oopwriting; /* since on_fd is not idempotent */ - int max_queue, stream; - const char *quitting; - int since_activity; /* periods */ - ArticleList waiting; /* not yet told peer */ - ArticleList priority; /* peer says send it now */ - ArticleList sent; /* offered/transmitted - in xmit or waiting reply */ - struct iovec xmit[CONNIOVS]; - XmitDetails xmitd[CONNIOVS]; - int xmitu; -}; - - -/*----- general operational variables -----*/ - -/* main initialises */ -static oop_source *loop; -static ConnList conns; -static char *path_lock, *path_flushing, *path_defer, *path_dump; -static char *globpat_backlog; -static pid_t self_pid; -static int *lowvol_perperiod; -static int lowvol_circptr; -static int lowvol_total; /* does not include current period */ - -/* statemc_init initialises */ -static StateMachineState sms; -static int until_flush; -static InputFile *main_input_file, *flushing_input_file, *backlog_input_file; -static FILE *defer; - -/* initialisation to 0 is good */ -static int until_connect, until_backlog_nextscan; -static double accept_proportion; -static int nocheck, nocheck_reported, in_child; - -/* for logging, simulation, debugging, etc. */ -int simulate_flush= -1; -int logv_use_syslog; -static const char *logv_prefix=""; - -/*========== logging ==========*/ - -static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3); -static void logcore(int sysloglevel, const char *fmt, ...) { - VA; - if (logv_use_syslog) { - vsyslog(sysloglevel,fmt,al); - } else { - if (self_pid) fprintf(stderr,"[%lu] ",(unsigned long)self_pid); - vfprintf(stderr,fmt,al); - putc('\n',stderr); - } - va_end(al); -} - -static void logv(int sysloglevel, const char *pfx, int errnoval, - const char *fmt, va_list al) PRINTF(5,0); -static void logv(int sysloglevel, const char *pfx, int errnoval, - const char *fmt, va_list al) { - char msgbuf[1024]; /* NB do not call xvasprintf here or you'll recurse */ - vsnprintf(msgbuf,sizeof(msgbuf), fmt,al); - msgbuf[sizeof(msgbuf)-1]= 0; - - if (sysloglevel >= LOG_ERR && (errnoval==EACCES || errnoval==EPERM)) - sysloglevel= LOG_ERR; /* run by wrong user, probably */ - - logcore(sysloglevel, "%s%s: %s%s%s", - logv_prefix, pfx, msgbuf, - errnoval>=0 ? ": " : "", - errnoval>=0 ? strerror(errnoval) : ""); -} - -#define DEFFATAL(fn, pfx, sysloglevel, err, estatus) \ - static void fn(const char *fmt, ...) NORET_PRINTF(1,2); \ - static void fn(const char *fmt, ...) { \ - preterminate(); \ - VA; \ - logv(sysloglevel, pfx, err, fmt, al); \ - exit(estatus); \ - } - -#define DEFLOG(fn, pfx, sysloglevel, err) \ - static void fn(const char *fmt, ...) PRINTF(1,2); \ - static void fn(const char *fmt, ...) { \ - VA; \ - logv(sysloglevel, pfx, err, fmt, al); \ - va_end(al); \ - } - -#define INNLOGSET_DECLARE(fn, pfx, sysloglevel) \ - static void duct_log_##fn(int l, const char *fmt, va_list al, int errval) \ - PRINTF(3,0); \ - static void duct_log_##fn(int l, const char *fmt, va_list al, int errval) { \ - logv(sysloglevel, pfx, errval ? errval : -1, fmt, al); \ - } -#define INNLOGSET_CALL(fn, pfx, sysloglevel) \ - message_handlers_##fn(1, duct_log_##fn); - - -static int innduct_fatal_cleanup(void) { return 12; } /* used for libinn die */ - -/* We want to extend the set of logging functions from inn, and we - * want to prepend the site name to all our messages. */ - -DEFFATAL(syscrash, "critical", LOG_CRIT, errno, 16); -DEFFATAL(crash, "critical", LOG_CRIT, -1, 16); - -#define INNLOGSETS(INNLOGSET) \ - INNLOGSET(die, "fatal", LOG_ERR) \ - INNLOGSET(warn, "warning", LOG_WARNING) \ - INNLOGSET(notice, "notice", LOG_NOTICE) \ - INNLOGSET(trace, "trace", LOG_NOTICE) -INNLOGSETS(INNLOGSET_DECLARE) - -DEFLOG(info, "info", LOG_INFO, -1) -DEFLOG(dbg, "debug", LOG_DEBUG, -1) - - -/*========== utility functions etc. ==========*/ - -static char *xvasprintf(const char *fmt, va_list al) PRINTF(1,0); -static char *xvasprintf(const char *fmt, va_list al) { - char *str; - int rc= vasprintf(&str,fmt,al); - if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt); - return str; -} -static char *xasprintf(const char *fmt, ...) PRINTF(1,2); -static char *xasprintf(const char *fmt, ...) { - VA; - char *str= xvasprintf(fmt,al); - va_end(al); - return str; -} - -static int close_perhaps(int *fd) { - if (*fd <= 0) return 0; - int r= close(*fd); - *fd=0; - return r; -} -static void xclose(int fd, const char *what, const char *what2) { - int r= close(fd); - if (r) syscrash("close %s%s",what,what2?what2:""); -} -static void xclose_perhaps(int *fd, const char *what, const char *what2) { - if (*fd <= 0) return; - xclose(*fd,what,what2); - *fd=0; -} - -static pid_t xfork(const char *what) { - pid_t child; - - child= fork(); - if (child==-1) sysdie("cannot fork for %s",what); - dbg("forked %s %ld", what, (unsigned long)child); - if (!child) postfork(); - return child; -} - -static void on_fd_read_except(int fd, oop_call_fd callback) { - loop->on_fd(loop, fd, OOP_READ, callback, 0); - loop->on_fd(loop, fd, OOP_EXCEPTION, callback, 0); -} -static void cancel_fd_read_except(int fd) { - loop->cancel_fd(loop, fd, OOP_READ); - loop->cancel_fd(loop, fd, OOP_EXCEPTION); -} - -static void report_child_status(const char *what, int status) { - if (WIFEXITED(status)) { - int es= WEXITSTATUS(status); - if (es) - warn("%s: child died with error exit status %d", what, es); - } else if (WIFSIGNALED(status)) { - int sig= WTERMSIG(status); - const char *sigstr= strsignal(sig); - const char *coredump= WCOREDUMP(status) ? " (core dumped)" : ""; - if (sigstr) - warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump); - else - warn("%s: child died due to unknown fatal signal %d%s", - what, sig, coredump); - } else { - warn("%s: child died with unknown wait status %d", what,status); - } -} - -static int xwaitpid(pid_t *pid, const char *what) { - int status; - - int r= kill(*pid, SIGKILL); - if (r) syscrash("cannot kill %s child", what); - - pid_t got= waitpid(*pid, &status, 0); - if (got==-1) syscrash("cannot reap %s child", what); - if (got==0) crash("cannot reap %s child", what); - - *pid= 0; - - return status; -} - -static void *zxmalloc(size_t sz) { - void *p= xmalloc(sz); - memset(p,0,sz); - return p; -} - -static void xunlink(const char *path, const char *what) { - int r= unlink(path); - if (r) syscrash("can't unlink %s %s", path, what); -} - -static time_t xtime(void) { - time_t now= time(0); - if (now==-1) syscrash("time(2) failed"); - return now; -} - -static void xsigaction(int signo, const struct sigaction *sa) { - int r= sigaction(signo,sa,0); - if (r) syscrash("sigaction failed for \"%s\"", strsignal(signo)); -} - -static void xsigsetdefault(int signo) { - struct sigaction sa; - memset(&sa,0,sizeof(sa)); - sa.sa_handler= SIG_DFL; - xsigaction(signo,&sa); -} - -static void xgettimeofday(struct timeval *tv_r) { - int r= gettimeofday(tv_r,0); - if (r) syscrash("gettimeofday(2) failed"); -} - -static void xsetnonblock(int fd, int nonb) { - int errnoval= oop_fd_nonblock(fd, nonb); - if (errnoval) { errno= errnoval; syscrash("setnonblocking"); } -} - -static void check_isreg(const struct stat *stab, const char *path, - const char *what) { - if (!S_ISREG(stab->st_mode)) - crash("%s %s not a plain file (mode 0%lo)", - what, path, (unsigned long)stab->st_mode); -} - -static void xfstat(int fd, struct stat *stab_r, const char *what) { - int r= fstat(fd, stab_r); - if (r) syscrash("could not fstat %s", what); -} - -static void xfstat_isreg(int fd, struct stat *stab_r, - const char *path, const char *what) { - xfstat(fd, stab_r, what); - check_isreg(stab_r, path, what); -} - -static void xlstat_isreg(const char *path, struct stat *stab, - int *enoent_r /* 0 means ENOENT is fatal */, - const char *what) { - int r= lstat(path, stab); - if (r) { - if (errno==ENOENT && enoent_r) { *enoent_r=1; return; } - syscrash("could not lstat %s %s", what, path); - } - if (enoent_r) *enoent_r= 0; - check_isreg(stab, path, what); -} - -static int samefile(const struct stat *a, const struct stat *b) { - assert(S_ISREG(a->st_mode)); - assert(S_ISREG(b->st_mode)); - return (a->st_ino == b->st_ino && - a->st_dev == b->st_dev); -} - -static char *sanitise(const char *input, int len) { - static char sanibuf[100]; /* returns pointer to this buffer! */ - - const char *p= input; - const char *endp= len>=0 ? input+len : 0; - char *q= sanibuf; - *q++= '`'; - for (;;) { - if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"'.."); break; } - int c= (!endp || p=' ' && c<=126 && c!='\\') { *q++= c; continue; } - sprintf(q,"\\x%02x",c); - q += 4; - } - return sanibuf; -} - -static int isewouldblock(int errnoval) { - return errnoval==EWOULDBLOCK || errnoval==EAGAIN; -} - -/*========== command and control (CLI) connections ==========*/ - -static int cli_master; - -typedef struct CliConn CliConn; -struct CliConn { - void (*destroy)(CliConn*); - int fd; - oop_read *rd; - FILE *out; - union { - struct sockaddr sa; - struct sockaddr_un un; - } sa; - socklen_t salen; -}; - -static const oop_rd_style cli_rd_style= { - OOP_RD_DELIM_STRIP, '\n', - OOP_RD_NUL_FORBID, - OOP_RD_SHORTREC_FORBID -}; - -static void cli_destroy(CliConn *cc) { - cc->destroy(cc); -} - -static void cli_checkouterr(CliConn *cc /* may destroy*/) { - if (ferror(cc->out) | fflush(cc->out)) { - info("CTRL%d write error %s", cc->fd, strerror(errno)); - cli_destroy(cc); - } -} - -static void cli_prompt(CliConn *cc /* may destroy*/) { - fprintf(cc->out, "%s| ", sitename); - cli_checkouterr(cc); -} - -struct CliCommand { - const char *cmd; - void (*f)(CliConn *cc, const CliCommand *ccmd, - const char *arg, size_t argsz); - void *xdata; - int xval; -}; - -static const CliCommand cli_commands[]; - -#define CCMD(wh) \ - static void ccmd_##wh(CliConn *cc, const CliCommand *c, \ - const char *arg, size_t argsz) - -CCMD(help) { - fputs("commands:\n", cc->out); - const CliCommand *ccmd; - for (ccmd=cli_commands; ccmd->cmd; ccmd++) - fprintf(cc->out, " %s\n", ccmd->cmd); - fputs("NB: permissible arguments are not shown above." - " Not all commands listed are safe. See innduct(8).\n", cc->out); -} - -CCMD(flush) { - int ok= trigger_flush_ok("manual request"); - if (!ok) fprintf(cc->out,"already flushing (state is %s)\n", sms_names[sms]); -} - -CCMD(stop) { - preterminate(); - notice("terminating (CTRL%d)",cc->fd); - raise_default(SIGTERM); - abort(); -} - -CCMD(dump); - -/* messing with our head: */ -CCMD(period) { period(); } -CCMD(setintarg) { *(int*)c->xdata= atoi(arg); } -CCMD(setint) { *(int*)c->xdata= c->xval; } -CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); } - -static const CliCommand cli_commands[]= { - { "h", ccmd_help }, - { "flush", ccmd_flush }, - { "stop", ccmd_stop }, - { "dump q", ccmd_dump, 0,0 }, - { "dump a", ccmd_dump, 0,1 }, - - { "p", ccmd_period }, - -#define POKES(cmd,func) \ - { cmd "flush", func, &until_flush, 1 }, \ - { cmd "conn", func, &until_connect, 0 }, \ - { cmd "blscan", func, &until_backlog_nextscan, 0 }, -POKES("next ", ccmd_setint) -POKES("prod ", ccmd_setint_period) - - { "pretend flush", ccmd_setintarg, &simulate_flush }, - { "wedge blscan", ccmd_setint, &until_backlog_nextscan, -1 }, - { 0 } -}; - -static void *cli_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, - const char *errmsg, int errnoval, - const char *data, size_t recszu, void *cc_v) { - CliConn *cc= cc_v; - - if (!data) { - info("CTRL%d closed", cc->fd); - cc->destroy(cc); - return OOP_CONTINUE; - } - - if (recszu == 0) goto prompt; - assert(recszu <= INT_MAX); - int recsz= recszu; - - const CliCommand *ccmd; - for (ccmd=cli_commands; ccmd->cmd; ccmd++) { - int l= strlen(ccmd->cmd); - if (recsz < l) continue; - if (recsz > l && data[l] != ' ') continue; - if (memcmp(data, ccmd->cmd, l)) continue; - - int argl= (int)recsz - (l+1); - ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl); - goto prompt; - } - - fputs("unknown command; h for help\n", cc->out); - - prompt: - cli_prompt(cc); - return OOP_CONTINUE; -} - -static void *cli_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev, - const char *errmsg, int errnoval, - const char *data, size_t recsz, void *cc_v) { - CliConn *cc= cc_v; - - info("CTRL%d read error %s", cc->fd, errmsg); - cc->destroy(cc); - return OOP_CONTINUE; -} - -static int cli_conn_startup(CliConn *cc /* may destroy*/, - const char *how) { - cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0); - if (!cc->rd) { warn("oop_rd_new_fd cli failed"); return -1; } - - int er= oop_rd_read(cc->rd, &cli_rd_style, MAX_CLI_COMMAND, - cli_rd_ok, cc, - cli_rd_err, cc); - if (er) { errno= er; syswarn("oop_rd_read cli failed"); return -1; } - - info("CTRL%d %s ready", cc->fd, how); - cli_prompt(cc); - return 0; -} - -static void cli_stdio_destroy(CliConn *cc) { - if (cc->rd) { - oop_rd_cancel(cc->rd); - errno= oop_rd_delete_tidy(cc->rd); - if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)"); - } - free(cc); -} - -static void cli_stdio(void) { - NEW_DECL(CliConn *,cc); - cc->destroy= cli_stdio_destroy; - - cc->fd= 0; - cc->out= stdout; - int r= cli_conn_startup(cc,"stdio"); - if (r) cc->destroy(cc); -} - -static void cli_accepted_destroy(CliConn *cc) { - if (cc->rd) { - oop_rd_cancel(cc->rd); - oop_rd_delete_kill(cc->rd); - } - if (cc->out) { fclose(cc->out); cc->fd=0; } - close_perhaps(&cc->fd); - free(cc); -} - -static void *cli_master_readable(oop_source *lp, int master, - oop_event ev, void *u) { - NEW_DECL(CliConn *,cc); - cc->destroy= cli_accepted_destroy; - - cc->salen= sizeof(cc->sa); - cc->fd= accept(master, &cc->sa.sa, &cc->salen); - if (cc->fd<0) { syswarn("error accepting cli connection"); goto x; } - - cc->out= fdopen(cc->fd, "w"); - if (!cc->out) { syswarn("error fdopening accepted cli connection"); goto x; } - - int r= cli_conn_startup(cc, "accepted"); - if (r) goto x; - - return OOP_CONTINUE; - - x: - cc->destroy(cc); - return OOP_CONTINUE; -} - -#define NOCLI(...) do{ \ - syswarn("no cli listener, because failed to " __VA_ARGS__); \ - goto nocli; \ - }while(0) - -static void cli_init(void) { - union { - struct sockaddr sa; - struct sockaddr_un un; - } sa; - - memset(&sa,0,sizeof(sa)); - int maxlen= sizeof(sa.un.sun_path); - - if (!path_cli) { - info("control command line disabled"); - return; - } - - int pathlen= strlen(path_cli); - if (pathlen > maxlen) { - warn("no cli listener, because cli socket path %s too long (%d>%d)", - path_cli, pathlen, maxlen); - return; - } - - if (path_cli_dir) { - int r= mkdir(path_cli_dir, 0700); - if (r && errno!=EEXIST) - NOCLI("create cli socket directory %s", path_cli_dir); - } - - int r= unlink(path_cli); - if (r && errno!=ENOENT) - NOCLI("remove old cli socket %s", path_cli); - - cli_master= socket(PF_UNIX, SOCK_STREAM, 0); - if (cli_master<0) NOCLI("create new cli master socket"); - - int sl= pathlen + offsetof(struct sockaddr_un, sun_path); - sa.un.sun_family= AF_UNIX; - memcpy(sa.un.sun_path, path_cli, pathlen); - - r= bind(cli_master, &sa.sa, sl); - if (r) NOCLI("bind to cli socket path %s", sa.un.sun_path); - - r= listen(cli_master, 5); - if (r) NOCLI("listen to cli master socket"); - - xsetnonblock(cli_master, 1); - - loop->on_fd(loop, cli_master, OOP_READ, cli_master_readable, 0); - info("cli ready, listening on %s", path_cli); - - return; - - nocli: - xclose_perhaps(&cli_master, "cli master",0); - return; -} - -/*========== management of connections ==========*/ - -static void reconnect_blocking_event(void) { - until_connect= reconnect_delay_periods; -} - -static void conn_closefd(Conn *conn, const char *msgprefix) { - int r= close_perhaps(&conn->fd); - if (r) info("C%d %serror closing socket: %s", - conn->fd, msgprefix, strerror(errno)); -} - -static int conn_busy(Conn *conn) { - return - conn->waiting.count || - conn->priority.count || - conn->sent.count || - conn->xmitu; -} - -static void conn_dispose(Conn *conn) { - if (!conn) return; - if (conn->rd) { - oop_rd_cancel(conn->rd); - oop_rd_delete_kill(conn->rd); - conn->rd= 0; - } - if (conn->fd) { - loop->cancel_fd(loop, conn->fd, OOP_WRITE); - loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION); - } - conn_closefd(conn,""); - free(conn); -} - -static void *conn_exception(oop_source *lp, int fd, - oop_event ev, void *conn_v) { - Conn *conn= conn_v; - unsigned char ch; - assert(fd == conn->fd); - assert(ev == OOP_EXCEPTION); - int r= read(conn->fd, &ch, 1); - if (r<0) connfail(conn,"read failed: %s",strerror(errno)); - else connfail(conn,"exceptional condition on socket (peer sent urgent" - " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch); - return OOP_CONTINUE; -} - -static void vconnfail(Conn *conn, const char *fmt, va_list al) { - int requeue[art_MaxState]; - memset(requeue,0,sizeof(requeue)); - - Article *art; - - while ((art= LIST_REMHEAD(conn->priority))) - LIST_ADDTAIL(art->ipf->queue, art); - - while ((art= LIST_REMHEAD(conn->waiting))) - LIST_ADDTAIL(art->ipf->queue, art); - - while ((art= LIST_REMHEAD(conn->sent))) { - requeue[art->state]++; - if (art->state==art_Unsolicited) art->state= art_Unchecked; - LIST_ADDTAIL(art->ipf->queue,art); - check_reading_pause_resume(art->ipf); - } - - int i; - XmitDetails *d; - for (i=0, d=conn->xmitd; ixmitu; i++, d++) - xmit_free(d); - - LIST_REMOVE(conns,conn); - - char *m= xvasprintf(fmt,al); - warn("C%d (now %d) connection failed requeueing " RCI_TRIPLE_FMT_BASE ": %s", - conn->fd, conns.count, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); - free(m); - - reconnect_blocking_event(); - conn_dispose(conn); - check_assign_articles(); -} - -static void connfail(Conn *conn, const char *fmt, ...) { - va_list al; - va_start(al,fmt); - vconnfail(conn,fmt,al); - va_end(al); -} - -static void conn_idle_close(Conn *conn, const char *why) { - static const char quitcmd[]= "QUIT\r\n"; - int todo= sizeof(quitcmd)-1; - const char *p= quitcmd; - for (;;) { - int r= write(conn->fd, p, todo); - if (r<0) { - if (isewouldblock(errno)) - connfail(conn, "blocked writing QUIT to idle connection"); - else - connfail(conn, "failed to write QUIT to idle connection: %s", - strerror(errno)); - break; - } - assert(r<=todo); - todo -= r; - if (!todo) { - conn->quitting= why; - conn->since_activity= 0; - dbg("C%d is idle (%s), quitting", conn->fd, why); - break; - } - } -} - -/* - * For our last connection, we also shut it down if we have had - * less than K in the last L - */ -static void check_idle_conns(void) { - Conn *conn; - - int volthisperiod= lowvol_perperiod[lowvol_circptr]; - lowvol_circptr++; - lowvol_circptr %= lowvol_periods; - lowvol_total += volthisperiod; - lowvol_total -= lowvol_perperiod[lowvol_circptr]; - lowvol_perperiod[lowvol_circptr]= 0; - - FOR_CONN(conn) - conn->since_activity++; - - search_again: - FOR_CONN(conn) { - if (conn->since_activity <= need_activity_periods) continue; - - /* We need to shut this down */ - if (conn->quitting) - connfail(conn,"timed out waiting for response to QUIT (%s)", - conn->quitting); - else if (conn->sent.count) - connfail(conn,"timed out waiting for responses"); - else if (conn->waiting.count || conn->priority.count) - connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent"); - else if (conn->xmitu) - connfail(conn,"peer has been sending responses" - " before receiving our commands!"); - else - conn_idle_close(conn, "no activity"); - - goto search_again; - } - - conn= LIST_HEAD(conns); - if (!volthisperiod && - conns.count==1 && - lowvol_total < lowvol_thresh && - !conn_busy(conn)) - conn_idle_close(conn, "low volume"); -} - -/*---------- making new connections ----------*/ - -static pid_t connecting_child; -static int connecting_fdpass_sock; - -static void connect_attempt_discard(void) { - if (connecting_child) { - int status= xwaitpid(&connecting_child, "connect"); - if (!(WIFEXITED(status) || - (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) - report_child_status("connect", status); - } - if (connecting_fdpass_sock) { - cancel_fd_read_except(connecting_fdpass_sock); - xclose_perhaps(&connecting_fdpass_sock, "connecting fdpass socket",0); - } -} - -#define PREP_DECL_MSG_CMSG(msg) \ - char msgbyte= 0; \ - struct iovec msgiov; \ - msgiov.iov_base= &msgbyte; \ - msgiov.iov_len= 1; \ - struct msghdr msg; \ - memset(&msg,0,sizeof(msg)); \ - char msg##cbuf[CMSG_SPACE(sizeof(int))]; \ - msg.msg_iov= &msgiov; \ - msg.msg_iovlen= 1; \ - msg.msg_control= msg##cbuf; \ - msg.msg_controllen= sizeof(msg##cbuf); - -static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { - Conn *conn= 0; - - assert(fd == connecting_fdpass_sock); - - PREP_DECL_MSG_CMSG(msg); - - ssize_t rs= recvmsg(fd, &msg, 0); - if (rs<0) { - if (isewouldblock(errno)) return OOP_CONTINUE; - syswarn("failed to read socket from connecting child"); - goto x; - } - - NEW(conn); - LIST_INIT(conn->waiting); - LIST_INIT(conn->priority); - LIST_INIT(conn->sent); - - struct cmsghdr *h= 0; - if (rs >= 0) h= CMSG_FIRSTHDR(&msg); - if (!h) { - int status= xwaitpid(&connecting_child, "connect child (broken)"); - - if (WIFEXITED(status)) { - if (WEXITSTATUS(status) != 0 && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM) - /* child already reported the problem */; - else { - if (e == OOP_EXCEPTION) - warn("connect: connection child exited code %d but" - " unexpected exception on fdpass socket", - WEXITSTATUS(status)); - else - warn("connect: connection child exited code %d but" - " no cmsg (rs=%d)", - WEXITSTATUS(status), (int)rs); - } - } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { - warn("connect: connection attempt timed out"); - } else { - report_child_status("connect", status); - } - goto x; - } - -#define CHK(field, val) \ - if (h->cmsg_##field != val) { \ - crash("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \ - h->cmsg_##field, val); \ - goto x; \ - } - CHK(level, SOL_SOCKET); - CHK(type, SCM_RIGHTS); - CHK(len, CMSG_LEN(sizeof(conn->fd))); -#undef CHK - - if (CMSG_NXTHDR(&msg,h)) crash("connect: child sent many cmsgs"); - - memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd)); - - int status; - pid_t got= waitpid(connecting_child, &status, 0); - if (got==-1) syscrash("connect: real wait for child"); - assert(got == connecting_child); - connecting_child= 0; - - if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; } - int es= WEXITSTATUS(status); - switch (es) { - case CONNCHILD_ESTATUS_STREAM: conn->stream= 1; break; - case CONNCHILD_ESTATUS_NOSTREAM: conn->stream= 0; break; - default: - die("connect: child gave unexpected exit status %d", es); - } - - /* Phew! */ - conn->max_queue= conn->stream ? max_queue_per_conn : 1; - - loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn); - conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */ - if (!conn->fd) crash("oop_rd_new_fd conn failed (fd=%d)",conn->fd); - int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN, - &peer_rd_ok, conn, - &peer_rd_err, conn); - if (r) syscrash("oop_rd_read for peer (fd=%d)",conn->fd); - - LIST_ADDHEAD(conns, conn); - notice("C%d (now %d) connected %s", - conn->fd, conns.count, conn->stream ? "streaming" : "plain"); - - connect_attempt_discard(); - check_assign_articles(); - return OOP_CONTINUE; - - x: - conn_dispose(conn); - connect_attempt_discard(); - reconnect_blocking_event(); - return OOP_CONTINUE; -} - -static int allow_connect_start(void) { - return conns.count < max_connections - && !connecting_child - && !until_connect; -} - -static void connect_start(void) { - assert(!connecting_child); - assert(!connecting_fdpass_sock); - - info("starting connection attempt"); - int ok_until_connect= until_connect; - reconnect_blocking_event(); - - int socks[2]; - int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks); - if (r) { syswarn("connect: cannot create socketpair for child"); return; } - - connecting_child= xfork("connection"); - - if (!connecting_child) { - FILE *cn_from, *cn_to; - char buf[NNTP_STRLEN+100]; - int exitstatus= CONNCHILD_ESTATUS_NOSTREAM; - - xclose(socks[0], "(in child) parent's connection fdpass socket",0); - - alarm(connection_setup_timeout); - if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) { - int l= strlen(buf); - int stripped=0; - while (l>0) { - unsigned char c= buf[l-1]; - if (!isspace(c)) break; - if (c=='\n' || c=='\r') stripped=1; - --l; - } - if (!buf[0]) { - sysdie("connect: connection attempt failed"); - } else { - buf[l]= 0; - die("connect: %s: %s", stripped ? "rejected" : "failed", - sanitise(buf,-1)); - } - } - if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0) - sysdie("connect: authentication failed"); - if (try_stream) { - if (fputs("MODE STREAM\r\n", cn_to)==EOF || - fflush(cn_to)) - sysdie("connect: could not send MODE STREAM"); - buf[sizeof(buf)-1]= 0; - if (!fgets(buf, sizeof(buf)-1, cn_from)) { - if (ferror(cn_from)) - sysdie("connect: could not read response to MODE STREAM"); - else - die("connect: connection close in response to MODE STREAM"); - } - int l= strlen(buf); - assert(l>=1); - if (buf[l-1]!='\n') - die("connect: response to MODE STREAM is too long: %.100s...", - sanitise(buf,-1)); - l--; if (l>0 && buf[l-1]=='\r') l--; - buf[l]= 0; - char *ep; - int rcode= strtoul(buf,&ep,10); - if (ep != &buf[3]) - die("connect: bad response to MODE STREAM: %.50s", sanitise(buf,-1)); - - switch (rcode) { - case 203: - exitstatus= CONNCHILD_ESTATUS_STREAM; - break; - case 480: - case 500: - break; - default: - warn("connect: unexpected response to MODE STREAM: %.50s", - sanitise(buf,-1)); - exitstatus= 2; - break; - } - } - int fd= fileno(cn_from); - - PREP_DECL_MSG_CMSG(msg); - struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg); - cmsg->cmsg_level= SOL_SOCKET; - cmsg->cmsg_type= SCM_RIGHTS; - cmsg->cmsg_len= CMSG_LEN(sizeof(fd)); - memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd)); - - msg.msg_controllen= cmsg->cmsg_len; - r= sendmsg(socks[1], &msg, 0); - if (r<0) syscrash("sendmsg failed for new connection"); - if (r!=1) crash("sendmsg for new connection gave wrong result %d",r); - - _exit(exitstatus); - } - - xclose(socks[1], "connecting fdpass child's socket",0); - connecting_fdpass_sock= socks[0]; - xsetnonblock(connecting_fdpass_sock, 1); - on_fd_read_except(connecting_fdpass_sock, connchild_event); - - if (!conns.count) - until_connect= ok_until_connect; -} - -/*---------- assigning articles to conns, and transmitting ----------*/ - -static Article *dequeue_from(int peek, InputFile *ipf) { - if (!ipf) return 0; - if (peek) return LIST_HEAD(ipf->queue); - - Article *art= LIST_REMHEAD(ipf->queue); - if (!art) return 0; - check_reading_pause_resume(ipf); - return art; -} - -static Article *dequeue(int peek) { - Article *art; - art= dequeue_from(peek, flushing_input_file); if (art) return art; - art= dequeue_from(peek, backlog_input_file); if (art) return art; - art= dequeue_from(peek, main_input_file); if (art) return art; - return 0; -} - -static void check_assign_articles(void) { - for (;;) { - if (!dequeue(1)) - break; - - Conn *walk, *use=0; - int spare=0, inqueue=0; - - /* Find a connection to offer this article. We prefer a busy - * connection to an idle one, provided it's not full. We take the - * first (oldest) and since that's stable, it will mean we fill up - * connections in order. That way if we have too many - * connections, the spare ones will go away eventually. - */ - FOR_CONN(walk) { - if (walk->quitting) continue; - inqueue= walk->sent.count + walk->priority.count - + walk->waiting.count; - spare= walk->max_queue - inqueue; - assert(inqueue <= max_queue_per_conn); - assert(spare >= 0); - if (inqueue==0) /*idle*/ { if (!use) use= walk; } - else if (spare>0) /*working*/ { use= walk; break; } - } - if (use) { - if (!inqueue) use->since_activity= 0; /* reset idle counter */ - while (spare>0) { - Article *art= dequeue(0); - if (!art) break; - LIST_ADDTAIL(use->waiting, art); - lowvol_perperiod[lowvol_circptr]++; - spare--; - } - conn_maybe_write(use); - } else if (allow_connect_start()) { - connect_start(); - break; - } else { - break; - } - } -} - -static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) { - conn_maybe_write(u); - return OOP_CONTINUE; -} - -static void conn_maybe_write(Conn *conn) { - for (;;) { - conn_make_some_xmits(conn); - if (!conn->xmitu) { - loop->cancel_fd(loop, conn->fd, OOP_WRITE); - conn->oopwriting= 0; - return; - } - - void *rp= conn_write_some_xmits(conn); - if (rp==OOP_CONTINUE) { - if (!conn->oopwriting) { - loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn); - conn->oopwriting= 1; - } - return; - } else if (rp==OOP_HALT) { - return; - } else if (!rp) { - /* transmitted everything */ - } else { - abort(); - } - } -} - -/*---------- expiry, flow control and deferral ----------*/ - -/* - * flow control notes - * to ensure articles go away eventually - * separate queue for each input file - * queue expiry - * every period, check head of backlog queue for expiry with SMretrieve - * if too old: discard, and check next article - * also check every backlog article as we read it - * flush expiry - * after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping - * one-off: eat queued articles from flushing and write them to defer - * one-off: connfail all connections which have any articles from flushing - * newly read articles from flushing go straight to defer - * this should take care of it and get us out of this state - * to avoid filling up ram needlessly - * input control - * limit number of queued articles for each ipf - * pause/resume inputfile tailing - */ - -static void check_reading_pause_resume(InputFile *ipf) { - if (ipf->queue.count >= max_queue_per_ipf) - inputfile_reading_pause(ipf); - else - inputfile_reading_resume(ipf); -} - -static void article_defer(Article *art /* not on a queue */, int whichcount) { - open_defer(); - if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0 - || fflush(defer)) - sysdie("write to defer file %s",path_defer); - article_done(art, whichcount); -} - -static int article_check_expired(Article *art /* must be queued, not conn */) { - ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT); - if (artdata) { SMfreearticle(artdata); return 0; } - - LIST_REMOVE(art->ipf->queue, art); - art->missing= 1; - art->ipf->count_nooffer_missing++; - article_done(art,-1); - return 1; -} - -static void inputfile_queue_check_expired(InputFile *ipf) { - if (!ipf) return; - - for (;;) { - Article *art= LIST_HEAD(ipf->queue); - int expd= article_check_expired(art); - if (!expd) break; - } - check_reading_pause_resume(ipf); -} - -static void article_autodefer(InputFile *ipf, Article *art) { - ipf->autodefer++; - article_defer(art,-1); -} - -static int has_article_in(const ArticleList *al, InputFile *ipf) { - Article *art; - for (art=LIST_HEAD(*al); art; art=LIST_NEXT(art)) - if (art->ipf == ipf) return 1; - return 0; -} - -static void autodefer_input_file_articles(InputFile *ipf) { - Article *art; - while ((art= LIST_REMHEAD(ipf->queue))) - article_autodefer(ipf, art); -} - -static void autodefer_input_file(InputFile *ipf) { - static const char *const abandon= "stuck"; - ipf->autodefer= 0; - - autodefer_input_file_articles(ipf); - - if (ipf->inprogress) { - Conn *walk; - FOR_CONN(walk) { - if (has_article_in(&walk->waiting, ipf) || - has_article_in(&walk->priority, ipf) || - has_article_in(&walk->sent, ipf)) - walk->quitting= abandon; - } - while (ipf->inprogress) { - FOR_CONN(walk) - if (walk->quitting == abandon) goto found; - abort(); /* where are they ?? */ - - found: - connfail(walk, "connection is stuck or crawling," - " and we need to finish flush"); - autodefer_input_file_articles(ipf); - } - } - - check_reading_pause_resume(ipf); -} - -/*========== article transmission ==========*/ - -static XmitDetails *xmit_core(Conn *conn, const char *data, int len, - XmitKind kind) { /* caller must then fill in details */ - struct iovec *v= &conn->xmit[conn->xmitu]; - XmitDetails *d= &conn->xmitd[conn->xmitu++]; - v->iov_base= (char*)data; - v->iov_len= len; - d->kind= kind; - return d; -} - -static void xmit_noalloc(Conn *conn, const char *data, int len) { - xmit_core(conn,data,len, xk_Const); -} -#define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1)) - -static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) { - XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata); - d->info.sm_art= ah; -} - -static void xmit_free(XmitDetails *d) { - switch (d->kind) { - case xk_Artdata: SMfreearticle(d->info.sm_art); break; - case xk_Const: break; - default: abort(); - } -} - -static void *conn_write_some_xmits(Conn *conn) { - /* return values: - * 0: nothing more to write, no need to call us again - * OOP_CONTINUE: more to write but fd not writeable - * OOP_HALT: disaster, have destroyed conn - */ - for (;;) { - int count= conn->xmitu; - if (!count) return 0; - - if (count > IOV_MAX) count= IOV_MAX; - ssize_t rs= writev(conn->fd, conn->xmit, count); - if (rs < 0) { - if (isewouldblock(errno)) return OOP_CONTINUE; - connfail(conn, "write failed: %s", strerror(errno)); - return OOP_HALT; - } - assert(rs > 0); - - int done; - for (done=0; rs; ) { - assert(donexmitu); - struct iovec *vp= &conn->xmit[done]; - XmitDetails *dp= &conn->xmitd[done]; - assert(vp->iov_len <= SSIZE_MAX); - if ((size_t)rs >= vp->iov_len) { - rs -= vp->iov_len; - xmit_free(dp); /* vp->iov_len -= vp->iov_len, etc. */ - done++; - } else { - vp->iov_base= (char*)vp->iov_base + rs; - vp->iov_len -= rs; - break; /* rs -= rs */ - } - } - int newu= conn->xmitu - done; - memmove(conn->xmit, conn->xmit + done, newu * sizeof(*conn->xmit)); - memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd)); - conn->xmitu= newu; - } -} - -static void conn_make_some_xmits(Conn *conn) { - for (;;) { - if (conn->xmitu+5 > CONNIOVS) - break; - - Article *art= LIST_REMHEAD(conn->priority); - if (!art) art= LIST_REMHEAD(conn->waiting); - if (!art) break; - - if (art->state >= art_Wanted || (conn->stream && nocheck)) { - /* actually send it */ - - ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL); - - art->state= - art->state == art_Unchecked ? art_Unsolicited : - art->state == art_Wanted ? art_Wanted : - (abort(),-1); - - if (!artdata) art->missing= 1; - art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++; - - if (conn->stream) { - if (artdata) { - XMIT_LITERAL("TAKETHIS "); - xmit_noalloc(conn, art->messageid, art->midlen); - XMIT_LITERAL("\r\n"); - xmit_artbody(conn, artdata); - } else { - article_done(art, -1); - continue; - } - } else { - /* we got 235 from IHAVE */ - if (artdata) { - xmit_artbody(conn, artdata); - } else { - XMIT_LITERAL(".\r\n"); - } - } - - LIST_ADDTAIL(conn->sent, art); - - } else { - /* check it */ - - if (conn->stream) - XMIT_LITERAL("CHECK "); - else - XMIT_LITERAL("IHAVE "); - xmit_noalloc(conn, art->messageid, art->midlen); - XMIT_LITERAL("\r\n"); - - assert(art->state == art_Unchecked); - art->ipf->counts[art->state][RC_sent]++; - LIST_ADDTAIL(conn->sent, art); - } - } -} - -/*========== handling responses from peer ==========*/ - -static const oop_rd_style peer_rd_style= { - OOP_RD_DELIM_STRIP, '\n', - OOP_RD_NUL_FORBID, - OOP_RD_SHORTREC_FORBID -}; - -static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev, - const char *errmsg, int errnoval, - const char *data, size_t recsz, void *conn_v) { - Conn *conn= conn_v; - connfail(conn, "error receiving from peer: %s", errmsg); - return OOP_CONTINUE; -} - -static Article *article_reply_check(Conn *conn, const char *response, - int code_indicates_streaming, - int must_have_sent - /* 1:yes, -1:no, 0:dontcare */, - const char *sanitised_response) { - Article *art= LIST_HEAD(conn->sent); - - if (!art) { - connfail(conn, - "peer gave unexpected response when no commands outstanding: %s", - sanitised_response); - return 0; - } - - if (code_indicates_streaming) { - assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */ - if (!conn->stream) { - connfail(conn, "peer gave streaming response code " - " to IHAVE or subsequent body: %s", sanitised_response); - return 0; - } - const char *got_mid= response+4; - int got_midlen= strcspn(got_mid, " \n\r"); - if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') { - connfail(conn, "peer gave streaming response with syntactically invalid" - " messageid: %s", sanitised_response); - return 0; - } - if (got_midlen != art->midlen || - memcmp(got_mid, art->messageid, got_midlen)) { - connfail(conn, "peer gave streaming response code to wrong article -" - " probable synchronisation problem; we offered: %s;" - " peer said: %s", - art->messageid, sanitised_response); - return 0; - } - } else { - if (conn->stream) { - connfail(conn, "peer gave non-streaming response code to" - " CHECK/TAKETHIS: %s", sanitised_response); - return 0; - } - } - - if (must_have_sent>0 && art->state < art_Wanted) { - connfail(conn, "peer says article accepted but" - " we had not sent the body: %s", sanitised_response); - return 0; - } - if (must_have_sent<0 && art->state >= art_Wanted) { - connfail(conn, "peer says please sent the article but we just did: %s", - sanitised_response); - return 0; - } - - Article *art_again= LIST_REMHEAD(conn->sent); - assert(art_again == art); - return art; -} - -static void update_nocheck(int accepted) { - accept_proportion *= nocheck_decay; - accept_proportion += accepted * (1.0 - nocheck_decay); - int new_nocheck= accept_proportion >= nocheck_thresh; - if (new_nocheck && !nocheck_reported) { - notice("entering nocheck mode for the first time"); - nocheck_reported= 1; - } else if (new_nocheck != nocheck) { - dbg("nocheck mode %s", new_nocheck ? "start" : "stop"); - } - nocheck= new_nocheck; -} - -static void article_done(Article *art, int whichcount) { - if (whichcount>=0 && !art->missing) - art->ipf->counts[art->state][whichcount]++; - - if (whichcount == RC_accepted) update_nocheck(1); - else if (whichcount == RC_unwanted) update_nocheck(0); - - InputFile *ipf= art->ipf; - - while (art->blanklen) { - static const char spaces[]= - " " - " " - " " - " " - " " - " " - " " - " " - " "; - int nspaces= sizeof(spaces)-1; - int w= art->blanklen; if (w > nspaces) w= nspaces; - int r= pwrite(ipf->fd, spaces, w, art->offset); - if (r==-1) { - if (errno==EINTR) continue; - syscrash("failed to blank entry for %s (length %d at offset %lu) in %s", - art->messageid, art->blanklen, - (unsigned long)art->offset, ipf->path); - } - assert(r>=0 && r<=w); - art->blanklen -= w; - art->offset += w; - } - - ipf->inprogress--; - assert(ipf->inprogress >= 0); - free(art); - - if (!ipf->inprogress && ipf != main_input_file) - queue_check_input_done(); -} - -static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, - const char *errmsg, int errnoval, - const char *data, size_t recsz, void *conn_v) { - Conn *conn= conn_v; - - if (ev == OOP_RD_EOF) { - connfail(conn, "unexpected EOF from peer"); - return OOP_CONTINUE; - } - assert(ev == OOP_RD_OK); - - char *sani= sanitise(data,-1); - - char *ep; - unsigned long code= strtoul(data, &ep, 10); - if (ep != data+3 || *ep != ' ' || data[0]=='0') { - connfail(conn, "badly formatted response from peer: %s", sani); - return OOP_CONTINUE; - } - - int busy= conn_busy(conn); - - if (conn->quitting) { - if (code!=205 && code!=400) { - connfail(conn, "peer gave unexpected response to QUIT (%s): %s", - conn->quitting, sani); - } else { - LIST_REMOVE(conns,conn); - notice("C%d (now %d) idle connection closed (%s)", - conn->fd, conns.count, conn->quitting); - assert(!busy); - conn_dispose(conn); - } - return OOP_CONTINUE; - } - - conn->since_activity= 0; - Article *art; - -#define GET_ARTICLE(musthavesent) do{ \ - art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \ - if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */ \ - }while(0) - -#define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{ \ - code_streaming= (streaming); \ - GET_ARTICLE(musthavesent); \ - article_done(art, RC_##how); \ - goto dealtwith; \ - }while(0) - -#define PEERBADMSG(m) do { \ - connfail(conn, m ": %s", sani); return OOP_CONTINUE; \ - }while(0) - - int code_streaming= 0; - - switch (code) { - - default: PEERBADMSG("peer sent unexpected message"); - - case 400: - if (busy) - PEERBADMSG("peer timed us out or stopped accepting articles"); - - LIST_REMOVE(conns,conn); - notice("C%d (now %d) idle connection closed by peer", - conns.count, conn->fd); - conn_dispose(conn); - return OOP_CONTINUE; - - case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */ - case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */ - - case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */ - case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */ - - case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */ - case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */ - - case 238: /* CHECK says send it */ - code_streaming= 1; - case 335: /* IHAVE says send it */ - GET_ARTICLE(-1); - assert(art->state == art_Unchecked); - art->ipf->counts[art->state][RC_accepted]++; - art->state= art_Wanted; - LIST_ADDTAIL(conn->priority, art); - break; - - case 431: /* CHECK or TAKETHIS says try later */ - code_streaming= 1; - case 436: /* IHAVE says try later */ - GET_ARTICLE(0); - article_defer(art, RC_deferred); - break; - - } -dealtwith: - - conn_maybe_write(conn); - check_assign_articles(); - return OOP_CONTINUE; -} - - -/*========== monitoring of input files ==========*/ - -static void feedfile_eof(InputFile *ipf) { - assert(ipf != main_input_file); /* promised by tailing_try_read */ - inputfile_reading_stop(ipf); - - if (ipf == flushing_input_file) { - assert(sms==sm_SEPARATED || sms==sm_DROPPING); - if (main_input_file) inputfile_reading_start(main_input_file); - statemc_check_flushing_done(); - } else if (ipf == backlog_input_file) { - statemc_check_backlog_done(); - } else { - abort(); /* supposed to wait rather than get EOF on main input file */ - } -} - -static InputFile *open_input_file(const char *path) { - int fd= open(path, O_RDWR); - if (fd<0) { - if (errno==ENOENT) return 0; - sysdie("unable to open input file %s", path); - } - assert(fd>0); - - InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1); - memset(ipf,0,sizeof(*ipf)); - - ipf->fd= fd; - ipf->autodefer= -1; - LIST_INIT(ipf->queue); - strcpy(ipf->path, path); - - return ipf; -} - -static void close_input_file(InputFile *ipf) { /* does not free */ - assert(!ipf->readable_callback); /* must have had ->on_cancel */ - assert(!ipf->filemon); /* must have had inputfile_reading_stop */ - assert(!ipf->rd); /* must have had inputfile_reading_stop */ - assert(!ipf->inprogress); /* no dangling pointers pointing here */ - xclose_perhaps(&ipf->fd, "input file ", ipf->path); -} - - -/*---------- dealing with articles read in the input file ----------*/ - -static void *feedfile_got_bad_data(InputFile *ipf, off_t offset, - const char *data, const char *how) { - warn("corrupted file: %s, offset %lu: %s: in %s", - ipf->path, (unsigned long)offset, how, sanitise(data,-1)); - ipf->readcount_err++; - if (ipf->readcount_err > max_bad_data_initial + - (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio) - crash("too much garbage in input file! (%d errs, %d ok, %d blank)", - ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank); - return OOP_CONTINUE; -} - -static void *feedfile_read_err(oop_source *lp, oop_read *rd, - oop_rd_event ev, const char *errmsg, - int errnoval, const char *data, size_t recsz, - void *ipf_v) { - InputFile *ipf= ipf_v; - assert(ev == OOP_RD_SYSTEM); - errno= errnoval; - syscrash("error reading input file: %s, offset %lu", - ipf->path, (unsigned long)ipf->offset); -} - -static void *feedfile_got_article(oop_source *lp, oop_read *rd, - oop_rd_event ev, const char *errmsg, - int errnoval, const char *data, size_t recsz, - void *ipf_v) { - InputFile *ipf= ipf_v; - Article *art; - char tokentextbuf[sizeof(TOKEN)*2+3]; - - if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; } - - off_t old_offset= ipf->offset; - ipf->offset += recsz + !!(ev == OOP_RD_OK); - -#define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m); - - if (ev==OOP_RD_PARTREC) - feedfile_got_bad_data(ipf,old_offset,data,"missing final newline"); - /* but process it anyway */ - - if (ipf->skippinglong) { - if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */ - return OOP_CONTINUE; - } - if (ev==OOP_RD_LONG) { - ipf->skippinglong= 1; - X_BAD_DATA("overly long line"); - } - - if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte"); - if (!recsz) X_BAD_DATA("empty line"); - - if (data[0]==' ') { - if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked"); - ipf->readcount_blank++; - return OOP_CONTINUE; - } - - char *space= strchr(data,' '); - int tokenlen= space-data; - int midlen= (int)recsz-tokenlen-1; - if (midlen <= 2) X_BAD_DATA("no room for messageid"); - if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid"); - - if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length"); - memcpy(tokentextbuf, data, tokenlen); - tokentextbuf[tokenlen]= 0; - if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax"); - - ipf->readcount_ok++; - - art= xmalloc(sizeof(*art) - 1 + midlen + 1); - memset(art,0,sizeof(*art)); - art->state= art_Unchecked; - art->midlen= midlen; - art->ipf= ipf; ipf->inprogress++; - art->token= TextToToken(tokentextbuf); - art->offset= old_offset; - art->blanklen= recsz; - strcpy(art->messageid, space+1); - - if (ipf->autodefer >= 0) { - article_autodefer(ipf, art); - } else { - LIST_ADDTAIL(ipf->queue, art); - - if (ipf==backlog_input_file) - article_check_expired(art); - } - - if (sms==sm_NORMAL && ipf==main_input_file && - ipf->offset >= target_max_feedfile_size) - statemc_start_flush("feed file size"); - - check_assign_articles(); /* may destroy conn but that's OK */ - check_reading_pause_resume(ipf); - return OOP_CONTINUE; -} - -/*========== tailing input file ==========*/ - -static void *tailing_rable_call_time(oop_source *lp, struct timeval tv, - void *user) { - /* lifetime of ipf here is OK because destruction will cause - * on_cancel which will cancel this callback */ - InputFile *ipf= user; - - dbg("**TRACT** ipf=%p called",ipf); - if (!ipf->fake_readable) return OOP_CONTINUE; - - /* we just keep calling readable until our caller (oop_rd) - * has called try_read, and try_read has found EOF so given EAGAIN */ - dbg("**TRACT** ipf=%p reschedule",ipf); - loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); - - return ipf->readable_callback(loop, &ipf->readable, - ipf->readable_callback_user); -} - -static void tailing_on_cancel(struct oop_readable *rable) { - InputFile *ipf= (void*)rable; - dbg("**TOR** ipf=%p on_cancel",ipf); - - if (ipf->filemon) filemon_stop(ipf); - dbg("**TRACT** ipf=%p cancel",ipf); - loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); - ipf->readable_callback= 0; -} - -static void tailing_make_readable(InputFile *ipf) { - dbg("**TRACT** ipf=%p makereadable rcb=%p",ipf, - (void*)ipf?ipf->readable_callback:0); - if (!ipf || !ipf->readable_callback) /* so callers can be naive */ - return; - ipf->fake_readable= 1; - loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); -} - -static int tailing_on_readable(struct oop_readable *rable, - oop_readable_call *cb, void *user) { - InputFile *ipf= (void*)rable; - dbg("**TOR** ipf=%p on_readable",ipf); - - tailing_on_cancel(rable); - ipf->readable_callback= cb; - ipf->readable_callback_user= user; - filemon_start(ipf); - tailing_make_readable(ipf); - return 0; -} - -static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, - size_t length) { - InputFile *ipf= (void*)rable; - for (;;) { - ssize_t r= read(ipf->fd, buffer, length); - if (r==-1) { - if (errno==EINTR) continue; - ipf->fake_readable= 0; - return r; - } - if (!r) { - if (ipf==main_input_file) { - errno=EAGAIN; - ipf->fake_readable= 0; - return -1; - } else if (ipf==flushing_input_file) { - assert(ipf->rd); - assert(sms==sm_SEPARATED || sms==sm_DROPPING); - } else if (ipf==backlog_input_file) { - assert(ipf->rd); - } else { - abort(); - } - } - dbg("**TOR** ipf=%p try_read r=%d",ipf,r); - return r; - } -} - -/*---------- filemon implemented with inotify ----------*/ - -#if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON) -#define HAVE_FILEMON - -#include - -static int filemon_inotify_fd; -static int filemon_inotify_wdmax; -static InputFile **filemon_inotify_wd2ipf; - -struct Filemon_Perfile { - int wd; -}; - -static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { - int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY); - if (wd < 0) sysdie("inotify_add_watch %s", ipf->path); - - if (wd >= filemon_inotify_wdmax) { - int newmax= wd+2; - filemon_inotify_wd2ipf= xrealloc(filemon_inotify_wd2ipf, - sizeof(*filemon_inotify_wd2ipf) * newmax); - memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0, - sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax)); - filemon_inotify_wdmax= newmax; - } - - assert(!filemon_inotify_wd2ipf[wd]); - filemon_inotify_wd2ipf[wd]= ipf; - - dbg("filemon inotify startfile %p wd=%d wdmax=%d", - ipf, wd, filemon_inotify_wdmax); - - pf->wd= wd; -} - -static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { - int wd= pf->wd; - dbg("filemon inotify stopfile %p wd=%d", ipf, wd); - int r= inotify_rm_watch(filemon_inotify_fd, wd); - if (r) syscrash("inotify_rm_watch"); - filemon_inotify_wd2ipf[wd]= 0; -} - -static void *filemon_inotify_readable(oop_source *lp, int fd, - oop_event e, void *u) { - struct inotify_event iev; - for (;;) { - int r= read(filemon_inotify_fd, &iev, sizeof(iev)); - if (r==-1) { - if (isewouldblock(errno)) break; - syscrash("read from inotify master"); - } else if (r==sizeof(iev)) { - assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax); - } else { - crash("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev)); - } - InputFile *ipf= filemon_inotify_wd2ipf[iev.wd]; - /*dbg("filemon inotify readable read %p wd=%d", ipf, iev.wd);*/ - tailing_make_readable(ipf); - } - return OOP_CONTINUE; -} - -static int filemon_method_init(void) { - filemon_inotify_fd= inotify_init(); - if (filemon_inotify_fd<0) { - syswarn("filemon/inotify: inotify_init failed"); - return 0; - } - xsetnonblock(filemon_inotify_fd, 1); - loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable, 0); - - dbg("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd); - return 1; -} - -static void filemon_method_dump_info(FILE *f) { - int i; - fprintf(f,"inotify"); - DUMPV("%d",,filemon_inotify_fd); - DUMPV("%d",,filemon_inotify_wdmax); - for (i=0; ifilemon); - - NEW(ipf->filemon); - filemon_method_startfile(ipf, ipf->filemon); -} - -static void filemon_stop(InputFile *ipf) { - if (!ipf->filemon) return; - filemon_method_stopfile(ipf, ipf->filemon); - free(ipf->filemon); - ipf->filemon= 0; -} - -/*---------- interface to start and stop an input file ----------*/ +#include "innduct.h" -static const oop_rd_style feedfile_rdstyle= { - OOP_RD_DELIM_STRIP, '\n', - OOP_RD_NUL_PERMIT, - OOP_RD_SHORTREC_LONG, +const char *sms_names[]= { +#define SMS_DEF_NAME(s) #s , + SMS_LIST(SMS_DEF_NAME) + 0 }; -static void inputfile_reading_resume(InputFile *ipf) { - if (!ipf->rd) return; - if (!ipf->paused) return; - - int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE, - feedfile_got_article,ipf, feedfile_read_err, ipf); - if (r) syscrash("unable start reading feedfile %s",ipf->path); - - ipf->paused= 0; -} - -static void inputfile_reading_pause(InputFile *ipf) { - if (!ipf->rd) return; - if (ipf->paused) return; - oop_rd_cancel(ipf->rd); - ipf->paused= 1; -} - -static void inputfile_reading_start(InputFile *ipf) { - assert(!ipf->rd); - ipf->readable.on_readable= tailing_on_readable; - ipf->readable.on_cancel= tailing_on_cancel; - ipf->readable.try_read= tailing_try_read; - ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */ - ipf->readable.delete_kill= 0; - - ipf->readable_callback= 0; - ipf->readable_callback_user= 0; - - ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0); - assert(ipf->rd); - - ipf->paused= 1; - inputfile_reading_resume(ipf); -} - -static void inputfile_reading_stop(InputFile *ipf) { - assert(ipf->rd); - inputfile_reading_pause(ipf); - oop_rd_delete(ipf->rd); - ipf->rd= 0; - assert(!ipf->filemon); /* we shouldn't be monitoring it now */ -} - - -/*========== interaction with innd - state machine ==========*/ - -/* See official state diagram at top of file. We implement - * this as follows: - * -8<- - - .=======. - ||START|| - `=======' - | - | open F - | - | F ENOENT - |`---------------------------------------------------. - F OPEN OK | | - |`---------------- - - - | - D ENOENT | D EXISTS see OVERALL STATES diagram | - | for full startup logic | - ,--------->| | - | V | - | ============ try to | - | NORMAL open D | - | [Normal] | - | main F tail | - | ============ V - | | | - | | F IS SO BIG WE SHOULD FLUSH, OR TIMEOUT | - ^ | hardlink F to D | - | [Hardlinked] | - | | unlink F | - | | our handle onto F is now onto D | - | [Moved] | - | | | - | |<-------------------<---------------------<---------+ - | | | - | | spawn inndcomm flush | - | V | - | ================== | - | FLUSHING[-ABSENT] | - | [Flushing] | - | main D tail/none | - | ================== | - | | | - | | INNDCOMM FLUSH FAILS ^ - | |`----------------------->----------. | - | | | | - | | NO SUCH SITE V | - ^ |`--------------->----. ==================== | - | | \ FLUSHFAILED[-ABSENT] | - | | \ [Moved] | - | | FLUSH OK \ main D tail/none | - | | open F \ ==================== | - | | \ | | - | | \ | TIME TO RETRY | - | |`------->----. ,---<---'\ `----------------' - | | D NONE | | D NONE `----. - | V | | V - | ============= V V ============ - | SEPARATED-1 | | DROPPING-1 - | flsh->rd!=0 | | flsh->rd!=0 - | [Separated] | | [Dropping] - | main F idle | | main none - | flsh D tail | | flsh D tail - | ============= | | ============ - | | | | install | - ^ | EOF ON D | | defer | EOF ON D - | V | | V - | =============== | | =============== - | SEPARATED-2 | | DROPPING-2 - | flsh->rd==0 | V flsh->rd==0 - | [Finishing] | | [Dropping] - | main F tail | `. main none - | flsh D closed | `. flsh D closed - | =============== V `. =============== - | | `. | - | | ALL D PROCESSED `. | ALL D PROCESSED - | V install defer as backlog `. | install defer - ^ | close D `. | close D - | | unlink D `. | unlink D - | | | | - | | V V - `----------' ============== - DROPPED - [Dropped] - main none - flsh none - some backlog - ============== - | - | ALL BACKLOG DONE - | - | unlink lock - | exit - V - ========== - (ESRCH) - [Droppped] - ========== - * ->8- - */ - -static void startup_set_input_file(InputFile *f) { - assert(!main_input_file); - main_input_file= f; - inputfile_reading_start(f); -} - -static void statemc_lock(void) { - int lockfd; - struct stat stab, stabf; - - for (;;) { - lockfd= open(path_lock, O_CREAT|O_RDWR, 0600); - if (lockfd<0) sysdie("open lockfile %s", path_lock); - - struct flock fl; - memset(&fl,0,sizeof(fl)); - fl.l_type= F_WRLCK; - fl.l_whence= SEEK_SET; - int r= fcntl(lockfd, F_SETLK, &fl); - if (r==-1) { - if (errno==EACCES || isewouldblock(errno)) { - if (quiet_multiple) exit(0); - die("another duct holds the lockfile"); - } - sysdie("fcntl F_SETLK lockfile %s", path_lock); - } - - xfstat_isreg(lockfd, &stabf, path_lock, "lockfile"); - int lock_noent; - xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile"); - - if (!lock_noent && samefile(&stab, &stabf)) - break; - - xclose(lockfd, "stale lockfile ", path_lock); - } - - FILE *lockfile= fdopen(lockfd, "w"); - if (!lockfile) syscrash("fdopen lockfile"); - - int r= ftruncate(lockfd, 0); - if (r) syscrash("truncate lockfile to write new info"); - - if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n", - (unsigned long)self_pid, - sitename, feedfile, remote_host) == EOF || - fflush(lockfile)) - sysdie("write info to lockfile %s", path_lock); - - dbg("startup: locked"); -} - -static void statemc_init(void) { - struct stat stabdefer; - - search_backlog_file(); - - int defer_noent; - xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file"); - if (defer_noent) { - dbg("startup: ductdefer ENOENT"); - } else { - dbg("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink); - switch (stabdefer.st_nlink==1) { - case 1: - open_defer(); /* so that we will later close it and rename it */ - break; - case 2: - xunlink(path_defer, "stale defer file link" - " (presumably hardlink to backlog file)"); - break; - default: - crash("defer file %s has unexpected link count %d", - path_defer, stabdefer.st_nlink); - } - } - - struct stat stab_f, stab_d; - int noent_f; - - InputFile *file_d= open_input_file(path_flushing); - if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file"); - - xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile"); - - if (!noent_f && file_d && samefile(&stab_f, &stab_d)) { - dbg("startup: F==D => Hardlinked"); - xunlink(feedfile, "feed file (during startup)"); /* => Moved */ - noent_f= 1; - } - - if (noent_f) { - dbg("startup: F ENOENT => Moved"); - if (file_d) startup_set_input_file(file_d); - spawn_inndcomm_flush("feedfile missing at startup"); - /* => Flushing, sms:=FLUSHING */ - } else { - if (file_d) { - dbg("startup: F!=D => Separated"); - startup_set_input_file(file_d); - flushing_input_file= main_input_file; - main_input_file= open_input_file(feedfile); - if (!main_input_file) crash("feedfile vanished during startup"); - SMS(SEPARATED, max_separated_periods, - "found both old and current feed files"); - } else { - dbg("startup: F exists, D ENOENT => Normal"); - InputFile *file_f= open_input_file(feedfile); - if (!file_f) crash("feed file vanished during startup"); - startup_set_input_file(file_f); - SMS(NORMAL, spontaneous_flush_periods, "normal startup"); - } - } -} - -static void statemc_start_flush(const char *why) { /* Normal => Flushing */ - assert(sms == sm_NORMAL); - - dbg("starting flush (%s) (%lu >?= %lu) (%d)", - why, - (unsigned long)(main_input_file ? main_input_file->offset : 0), - (unsigned long)target_max_feedfile_size, - until_flush); - - int r= link(feedfile, path_flushing); - if (r) sysdie("link feedfile %s to flushing file %s", - feedfile, path_flushing); - /* => Hardlinked */ - - xunlink(feedfile, "old feedfile link"); - /* => Moved */ - - spawn_inndcomm_flush(why); /* => Flushing FLUSHING */ -} - -static int trigger_flush_ok(const char *why) { - switch (sms) { - - case sm_NORMAL: - statemc_start_flush(why ? why : "periodic"); - return 1; /* Normal => Flushing; => FLUSHING */ - - case sm_FLUSHFAILED: - spawn_inndcomm_flush(why ? why : "retry"); - return 1; /* Moved => Flushing; => FLUSHING */ - - case sm_SEPARATED: - case sm_DROPPING: - warn("abandoning old feedfile after flush (%s), autodeferring", - why ? why : "took too long to complete"); - assert(flushing_input_file); - autodefer_input_file(flushing_input_file); - return 1; - - default: - return 0; - } -} - -static void statemc_period_poll(void) { - if (!until_flush) return; - until_flush--; - assert(until_flush>=0); - - if (until_flush) return; - int ok= trigger_flush_ok(0); - assert(ok); -} - -static int inputfile_is_done(InputFile *ipf) { - if (!ipf) return 0; - if (ipf->inprogress) return 0; /* new article in the meantime */ - if (ipf->rd) return 0; /* not had EOF */ - return 1; -} - -static void notice_processed(InputFile *ipf, int completed, - const char *what, const char *spec) { - if (!ipf) return; /* allows preterminate to be lazy */ - -#define RCI_NOTHING(x) /* nothing */ -#define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE -#define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x]) - -#define CNT(art,rc) (ipf->counts[art_##art][RC_##rc]) - - char *inprog= completed - ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */ - : xasprintf(" inprogress=%ld", ipf->inprogress); - char *autodefer= ipf->autodefer >= 0 - ? xasprintf(" autodeferred=%ld", ipf->autodefer) - : xasprintf("%s",""); - - info("%s %s%s read=%d (+bl=%d,+err=%d)%s%s" - " missing=%d offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)" - RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT) - , - completed?"completed":"processed", what, spec, - ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err, - inprog, autodefer, ipf->count_nooffer_missing, - CNT(Unchecked,sent) + CNT(Unsolicited,sent) - , CNT(Unchecked,sent), CNT(Unsolicited,sent), - CNT(Wanted,accepted) + CNT(Unsolicited,accepted) - , CNT(Wanted,accepted), CNT(Unsolicited,accepted) - RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_VALS) - ); - - free(inprog); - free(autodefer); - -#undef CNT -} - -static void statemc_check_backlog_done(void) { - InputFile *ipf= backlog_input_file; - if (!inputfile_is_done(ipf)) return; - - const char *slash= strrchr(ipf->path, '/'); - const char *leaf= slash ? slash+1 : ipf->path; - const char *under= strchr(slash, '_'); - const char *rest= under ? under+1 : leaf; - if (!strncmp(rest,"backlog",7)) rest += 7; - notice_processed(ipf,1,"backlog ",rest); - - close_input_file(ipf); - if (unlink(ipf->path)) { - if (errno != ENOENT) - syscrash("could not unlink processed backlog file %s", ipf->path); - warn("backlog file %s vanished while we were reading it" - " so we couldn't remove it (but it's done now, anyway)", - ipf->path); - } - free(ipf); - backlog_input_file= 0; - search_backlog_file(); - return; -} - -static void statemc_check_flushing_done(void) { - InputFile *ipf= flushing_input_file; - if (!inputfile_is_done(ipf)) return; - - assert(sms==sm_SEPARATED || sms==sm_DROPPING); - - notice_processed(ipf,1,"feedfile",""); - - close_defer(); - - xunlink(path_flushing, "old flushing file"); - - close_input_file(flushing_input_file); - free(flushing_input_file); - flushing_input_file= 0; - - if (sms==sm_SEPARATED) { - notice("flush complete"); - SMS(NORMAL, spontaneous_flush_periods, "flush complete"); - } else if (sms==sm_DROPPING) { - SMS(DROPPED, max_separated_periods, "old flush complete"); - search_backlog_file(); - notice("feed dropped, but will continue until backlog is finished"); - } -} - -static void *statemc_check_input_done(oop_source *lp, struct timeval now, - void *u) { - /* main input file may be idle but if so that's because - * we haven't got to it yet, but that doesn't mean it's really done */ - statemc_check_flushing_done(); - statemc_check_backlog_done(); - return OOP_CONTINUE; -} - -static void queue_check_input_done(void) { - loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0); -} - -static void statemc_setstate(StateMachineState newsms, int periods, - const char *forlog, const char *why) { - sms= newsms; - until_flush= periods; - - const char *xtra= ""; - switch (sms) { - case sm_FLUSHING: - case sm_FLUSHFAILED: - if (!main_input_file) xtra= "-ABSENT"; - break; - case sm_SEPARATED: - case sm_DROPPING: - xtra= flushing_input_file->rd ? "-1" : "-2"; - break; - default:; - } - - if (periods) { - info("state %s%s[%d] %s",forlog,xtra,periods,why); - } else { - info("state %s%s %s",forlog,xtra,why); - } -} - -/*---------- defer and backlog files ----------*/ - -static void open_defer(void) { - struct stat stab; - - if (defer) return; - - defer= fopen(path_defer, "a+"); - if (!defer) sysdie("could not open defer file %s", path_defer); - - /* truncate away any half-written records */ - - xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file"); - - if (stab.st_size > LONG_MAX) - crash("defer file %s size is far too large", path_defer); - - if (!stab.st_size) - return; - - long orgsize= stab.st_size; - long truncto= stab.st_size; - for (;;) { - if (!truncto) break; /* was only (if anything) one half-truncated record */ - if (fseek(defer, truncto-1, SEEK_SET) < 0) - syscrash("seek in defer file %s while truncating partial", path_defer); - - int r= getc(defer); - if (r==EOF) { - if (ferror(defer)) - syscrash("failed read from defer file %s", path_defer); - else - crash("defer file %s shrank while we were checking it!", path_defer); - } - if (r=='\n') break; - truncto--; - } - - if (stab.st_size != truncto) { - warn("truncating half-record at end of defer file %s -" - " shrinking by %ld bytes from %ld to %ld", - path_defer, orgsize - truncto, orgsize, truncto); - - if (fflush(defer)) - sysdie("could not flush defer file %s", path_defer); - if (ftruncate(fileno(defer), truncto)) - syscrash("could not truncate defer file %s", path_defer); - - } else { - info("continuing existing defer file %s (%ld bytes)", - path_defer, orgsize); - } - if (fseek(defer, truncto, SEEK_SET)) - syscrash("could not seek to new end of defer file %s", path_defer); -} - -static void close_defer(void) { - if (!defer) - return; - - struct stat stab; - xfstat_isreg(fileno(defer), &stab, path_defer, "defer file"); - - if (fclose(defer)) sysdie("could not close defer file %s", path_defer); - defer= 0; - - time_t now= xtime(); - - char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile, - (unsigned long)now, - (unsigned long)stab.st_ino); - if (link(path_defer, backlog)) - sysdie("could not install defer file %s as backlog file %s", - path_defer, backlog); - if (unlink(path_defer)) - syscrash("could not unlink old defer link %s to backlog file %s", - path_defer, backlog); - - free(backlog); - - if (until_backlog_nextscan < 0 || - until_backlog_nextscan > backlog_retry_minperiods + 1) - until_backlog_nextscan= backlog_retry_minperiods + 1; -} - -static void poll_backlog_file(void) { - if (until_backlog_nextscan < 0) return; - if (until_backlog_nextscan-- > 0) return; - search_backlog_file(); -} - -static void search_backlog_file(void) { - /* returns non-0 iff there are any backlog files */ - - glob_t gl; - int r; - unsigned ui; - struct stat stab; - const char *oldest_path=0; - time_t oldest_mtime=0, now; - - if (backlog_input_file) return; - - try_again: - - r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl); - - switch (r) { - case GLOB_ABORTED: - sysdie("failed to expand backlog pattern %s", globpat_backlog); - case GLOB_NOSPACE: - die("out of memory expanding backlog pattern %s", globpat_backlog); - case 0: - for (ui=0; ui= 0 && - until_backlog_nextscan > backlog_spontrescan_periods) - until_backlog_nextscan= backlog_spontrescan_periods; - - dbg("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s", - age, age_deficiency, until_backlog_nextscan, oldest_path); - - xfree: - globfree(&gl); - return; -} - -/*---------- shutdown and signal handling ----------*/ - -static void preterminate(void) { - if (in_child) return; - notice_processed(main_input_file,0,"feedfile",""); - notice_processed(flushing_input_file,0,"flushing",""); - if (backlog_input_file) - notice_processed(backlog_input_file,0, "backlog file ", - backlog_input_file->path); -} - -static int signal_self_pipe[2]; -static sig_atomic_t terminate_sig_flag; - -static void raise_default(int signo) { - xsigsetdefault(signo); - raise(signo); - abort(); -} - -static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) { - assert(fd=signal_self_pipe[0]); - char buf[PIPE_BUF]; - int r= read(signal_self_pipe[0], buf, sizeof(buf)); - if (r<0 && !isewouldblock(errno)) - syscrash("failed to read signal self pipe"); - if (r==0) crash("eof on signal self pipe"); - if (terminate_sig_flag) { - preterminate(); - notice("terminating (%s)", strsignal(terminate_sig_flag)); - raise_default(terminate_sig_flag); - } - return OOP_CONTINUE; -} - -static void sigarrived_handler(int signum) { - static char x; - switch (signum) { - case SIGTERM: - case SIGINT: - if (!terminate_sig_flag) terminate_sig_flag= signum; - break; - default: - abort(); - } - write(signal_self_pipe[1],&x,1); -} - -static void init_signals(void) { - if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) - syscrash("could not ignore SIGPIPE"); - - if (pipe(signal_self_pipe)) sysdie("create self-pipe for signals"); - - xsetnonblock(signal_self_pipe[0],1); - xsetnonblock(signal_self_pipe[1],1); - - struct sigaction sa; - memset(&sa,0,sizeof(sa)); - sa.sa_handler= sigarrived_handler; - sa.sa_flags= SA_RESTART; - xsigaction(SIGTERM,&sa); - xsigaction(SIGINT,&sa); - - on_fd_read_except(signal_self_pipe[0], sigarrived_event); -} - -/*========== flushing the feed ==========*/ - -static pid_t inndcomm_child; -static int inndcomm_sentinel_fd; - -static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { - assert(inndcomm_child); - assert(fd == inndcomm_sentinel_fd); - int status= xwaitpid(&inndcomm_child, "inndcomm"); - inndcomm_child= 0; - - cancel_fd_read_except(fd); - xclose_perhaps(&fd, "inndcomm sentinel pipe",0); - inndcomm_sentinel_fd= 0; - - assert(!flushing_input_file); - - if (WIFEXITED(status)) { - switch (WEXITSTATUS(status)) { - - case INNDCOMMCHILD_ESTATUS_FAIL: - goto failed; - - case INNDCOMMCHILD_ESTATUS_NONESUCH: - notice("feed has been dropped by innd, finishing up"); - flushing_input_file= main_input_file; - tailing_make_readable(flushing_input_file); - /* we probably previously returned EAGAIN from our fake read method - * when in fact we were at EOF, so signal another readable event - * so we actually see the EOF */ - - main_input_file= 0; - - if (flushing_input_file) { - SMS(DROPPING, max_separated_periods, - "feed dropped by innd, but must finish last flush"); - } else { - close_defer(); - SMS(DROPPED, 0, "feed dropped by innd"); - search_backlog_file(); - } - return OOP_CONTINUE; - - case 0: - /* as above */ - flushing_input_file= main_input_file; - tailing_make_readable(flushing_input_file); - - main_input_file= open_input_file(feedfile); - if (!main_input_file) - crash("flush succeeded but feedfile %s does not exist!" - " (this probably means feedfile does not correspond" - " to site %s in newsfeeds)", feedfile, sitename); - - if (flushing_input_file) { - SMS(SEPARATED, max_separated_periods, "flush complete"); - } else { - close_defer(); - SMS(NORMAL, spontaneous_flush_periods, "recovery flush complete"); - } - return OOP_CONTINUE; - - default: - goto unexpected_exitstatus; - - } - } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { - warn("flush timed out trying to talk to innd"); - goto failed; - } else { - unexpected_exitstatus: - report_child_status("inndcomm child", status); - } - - failed: - SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry"); - return OOP_CONTINUE; -} - -static void inndcommfail(const char *what) { - syswarn("error communicating with innd: %s failed: %s", what, ICCfailure); - exit(INNDCOMMCHILD_ESTATUS_FAIL); -} - -void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ - int pipefds[2]; - - notice("flushing %s",why); - - assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED); - assert(!inndcomm_child); - assert(!inndcomm_sentinel_fd); - - if (pipe(pipefds)) sysdie("create pipe for inndcomm child sentinel"); - - inndcomm_child= xfork("inndcomm child"); - - if (!inndcomm_child) { - const char *flushargv[2]= { sitename, 0 }; - char *reply; - int r; - - xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0); - /* parent spots the autoclose of pipefds[1] when we die or exit */ - - if (simulate_flush>=0) { - warn("SIMULATING flush child status %d", simulate_flush); - if (simulate_flush>128) raise(simulate_flush-128); - else exit(simulate_flush); - } - - alarm(inndcomm_flush_timeout); - r= ICCopen(); if (r) inndcommfail("connect"); - r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit"); - if (!r) exit(0); /* yay! */ - - if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH); - syswarn("innd ctlinnd flush failed: innd said %s", reply); - exit(INNDCOMMCHILD_ESTATUS_FAIL); - } +struct Conn { + ISNODE(Conn); + int fd; /* may be 0, meaning closed (during construction/destruction) */ + oop_read *rd; /* likewise */ + int oopwriting; /* since on_fd is not idempotent */ + int max_queue, stream; + const char *quitting; + int since_activity; /* periods */ + ArticleList waiting; /* not yet told peer */ + ArticleList priority; /* peer says send it now */ + ArticleList sent; /* offered/transmitted - in xmit or waiting reply */ + struct iovec xmit[CONNIOVS]; + XmitDetails xmitd[CONNIOVS]; + int xmitu; +}; - simulate_flush= -1; - xclose(pipefds[1], "inndcomm sentinel child's end",0); - inndcomm_sentinel_fd= pipefds[0]; - assert(inndcomm_sentinel_fd); - on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event); +/*----- general operational variables -----*/ - SMS(FLUSHING, 0, why); -} +/* main initialises */ +oop_source *loop; +ConnList conns; +char *path_lock, *path_flushing, *path_defer, *path_dump; +char *globpat_backlog; +pid_t self_pid; +int *lowvol_perperiod; +int lowvol_circptr; +int lowvol_total; /* does not include current period */ /*========== main program ==========*/ @@ -3265,7 +74,7 @@ static void postfork_stdio(FILE *f, const char *what, const char *what2) { if (fclose(f)) syscrash("(in child) close %s%s", what, what2?what2:0); } -static void postfork(void) { +void postfork(void) { in_child= 1; xsigsetdefault(SIGTERM); @@ -3317,11 +126,6 @@ static void every(int interval, int fixed_rate, void (*f)(void)) { every_schedule(e, now); } -static void filepoll(void) { - tailing_make_readable(main_input_file); - tailing_make_readable(flushing_input_file); -} - static char *dbg_report_ipf(InputFile *ipf) { if (!ipf) return xasprintf("none"); @@ -3367,145 +171,6 @@ static void period(void) { } -/*========== dumping state ==========*/ - -static void dump_article_list(FILE *f, const CliCommand *c, - const ArticleList *al) { - fprintf(f, " count=%d\n", al->count); - if (!c->xval) return; - - int i; Article *art; - for (i=0, art=LIST_HEAD(*al); art; i++, art=LIST_NEXT(art)) { - fprintf(f," #%05d %-11s", i, artstate_names[art->state]); - DUMPV("%p", art->,ipf); - DUMPV("%d", art->,missing); - DUMPV("%lu", (unsigned long)art->,offset); - DUMPV("%d", art->,blanklen); - DUMPV("%d", art->,midlen); - fprintf(f, " %s %s\n", TokenToText(art->token), art->messageid); - } -} - -static void dump_input_file(FILE *f, const CliCommand *c, - InputFile *ipf, const char *wh) { - char *dipf= dbg_report_ipf(ipf); - fprintf(f,"input %s %s", wh, dipf); - free(dipf); - - if (ipf) { - DUMPV("%d", ipf->,readcount_ok); - DUMPV("%d", ipf->,readcount_blank); - DUMPV("%d", ipf->,readcount_err); - DUMPV("%d", ipf->,count_nooffer_missing); - } - fprintf(f,"\n"); - if (ipf) { - ArtState state; const char *const *statename; - for (state=0, statename=artstate_names; *statename; state++,statename++) { -#define RC_DUMP_FMT(x) " " #x "=%d" -#define RC_DUMP_VAL(x) ,ipf->counts[state][RC_##x] - fprintf(f,"input %s counts %-11s" - RESULT_COUNTS(RC_DUMP_FMT,RC_DUMP_FMT) "\n", - wh, *statename - RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL)); - } - fprintf(f,"input %s queue", wh); - dump_article_list(f,c,&ipf->queue); - } -} - -CCMD(dump) { - int i; - fprintf(cc->out, "dumping state to %s\n", path_dump); - FILE *f= fopen(path_dump, "w"); - if (!f) { fprintf(cc->out, "failed: open: %s\n", strerror(errno)); return; } - - fprintf(f,"general"); - DUMPV("%s", sms_names,[sms]); - DUMPV("%d", ,until_flush); - DUMPV("%ld", (long),self_pid); - DUMPV("%p", , defer); - DUMPV("%d", , until_connect); - DUMPV("%d", , until_backlog_nextscan); - DUMPV("%d", , simulate_flush); - fprintf(f,"\nnocheck"); - DUMPV("%#.10f", , accept_proportion); - DUMPV("%d", , nocheck); - DUMPV("%d", , nocheck_reported); - fprintf(f,"\n"); - - fprintf(f,"special"); - DUMPV("%ld", (long),connecting_child); - DUMPV("%d", , connecting_fdpass_sock); - DUMPV("%d", , cli_master); - fprintf(f,"\n"); - - fprintf(f,"lowvol"); - DUMPV("%d", , lowvol_circptr); - DUMPV("%d", , lowvol_total); - fprintf(f,":"); - for (i=0; ifd); - DUMPV("%p",conn->,rd); DUMPV("%d",conn->,max_queue); - DUMPV("%d",conn->,stream); DUMPV("\"%s\"",conn->,quitting); - DUMPV("%d",conn->,since_activity); - fprintf(f,"\n"); - - fprintf(f,"C%d waiting", conn->fd); dump_article_list(f,c,&conn->waiting); - fprintf(f,"C%d priority",conn->fd); dump_article_list(f,c,&conn->priority); - fprintf(f,"C%d sent", conn->fd); dump_article_list(f,c,&conn->sent); - - fprintf(f,"C%d xmit xmitu=%d\n", conn->fd, conn->xmitu); - for (i=0; ixmitu; i++) { - const struct iovec *iv= &conn->xmit[i]; - const XmitDetails *xd= &conn->xmitd[i]; - char *dinfo; - switch (xd->kind) { - case xk_Const: dinfo= xasprintf("Const"); break; - case xk_Artdata: dinfo= xasprintf("A%p", xd->info.sm_art); break; - default: - abort(); - } - fprintf(f," #%03d %-11s l=%d %s\n", i, dinfo, iv->iov_len, - sanitise(iv->iov_base, iv->iov_len)); - free(dinfo); - } - } - - fprintf(f,"paths"); - DUMPV("%s", , feedfile); - DUMPV("%s", , path_cli); - DUMPV("%s", , path_lock); - DUMPV("%s", , path_flushing); - DUMPV("%s", , path_defer); - DUMPV("%s", , path_dump); - DUMPV("%s", , globpat_backlog); - fprintf(f,"\n"); - - if (!!ferror(f) + !!fclose(f)) { - fprintf(cc->out, "failed: write: %s\n", strerror(errno)); - return; - } -} - /*========== option parsing ==========*/ static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0); diff --git a/innduct.h b/innduct.h new file mode 100644 index 0000000..1033ac7 --- /dev/null +++ b/innduct.h @@ -0,0 +1,408 @@ +/* + * innduct + * tailing reliable realtime streaming feeder for inn + * + * Copyright (C) 2010 Ian Jackson + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * (I believe that when you compile and link this as part of the inn2 + * build, with the Makefile runes I have provided, all the libraries + * and files which end up included in innduct are licence-compatible + * with GPLv3. If not then please let me know. -Ian Jackson.) + */ + +#ifndef INNDUCT_H +#define INNDUCT_H + +#define _GNU_SOURCE 1 + +#include "config.h" +#include "storage.h" +#include "nntp.h" +#include "libinn.h" +#include "inndcomm.h" + +#include "inn/list.h" +#include "inn/innconf.h" +#include "inn/messages.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +/*----- general definitions, probably best not changed -----*/ + +#define CONNCHILD_ESTATUS_STREAM 24 +#define CONNCHILD_ESTATUS_NOSTREAM 25 + +#define INNDCOMMCHILD_ESTATUS_FAIL 26 +#define INNDCOMMCHILD_ESTATUS_NONESUCH 27 + +#define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10) +#define MAX_CLI_COMMAND 1000 + +#define VA va_list al; va_start(al,fmt) +#define PRINTF(f,a) __attribute__((__format__(printf,f,a))) +#define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a))) +#define NORET __attribute__((__noreturn__)) + +#define NEW(ptr) ((ptr)= zxmalloc(sizeof(*(ptr)))) +#define NEW_DECL(type,ptr) type ptr = zxmalloc(sizeof(*(ptr))) + +#define DUMPV(fmt,pfx,v) fprintf(f, " " #v "=" fmt, pfx v); + +#define FOR_CONN(conn) \ + for ((conn)=LIST_HEAD(conns); (conn); (conn)=LIST_NEXT((conn))) + +/*----- doubly linked lists -----*/ + +#define ISNODE(T) struct node list_node +#define DEFLIST(T) \ + typedef struct { \ + union { struct list li; T *for_type; } u; \ + int count; \ + } T##List + +#define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node) + +#define LIST_CHECKCANHAVENODE(l,n) \ + ((void)((n) == ((l).u.for_type))) /* just for the type check */ + +#define LIST_ADDSOMEHOW(l,n,list_addsomehow) \ + ( LIST_CHECKCANHAVENODE(l,n), \ + list_addsomehow(&(l).u.li, NODE((n))), \ + (void)(l).count++ \ + ) + +#define LIST_REMSOMEHOW(l,list_remsomehow) \ + ( (typeof((l).u.for_type)) \ + ( (l).count \ + ? ( (l).count--, \ + list_remsomehow(&(l).u.li) ) \ + : 0 \ + ) \ + ) + + +#define LIST_ADDHEAD(l,n) LIST_ADDSOMEHOW((l),(n),list_addhead) +#define LIST_ADDTAIL(l,n) LIST_ADDSOMEHOW((l),(n),list_addtail) +#define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead) +#define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail) + +#define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li)) +#define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l)))) +#define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n)))) +#define LIST_BACK(n) ((typeof(n))list_pred(NODE((n)))) + +#define LIST_REMOVE(l,n) \ + ( LIST_CHECKCANHAVENODE(l,n), \ + list_remove(NODE((n))), \ + (void)(l).count-- \ + ) + +#define LIST_INSERT(l,n,pred) \ + ( LIST_CHECKCANHAVENODE(l,n), \ + LIST_CHECKCANHAVENODE(l,pred), \ + list_insert((struct list*)&(l), NODE((n)), NODE((pred))), \ + (void)(l).count++ \ + ) + +/*----- type predeclarations -----*/ + +typedef struct Conn Conn; +typedef struct Article Article; +typedef struct InputFile InputFile; +typedef struct XmitDetails XmitDetails; +typedef struct Filemon_Perfile Filemon_Perfile; +typedef enum StateMachineState StateMachineState; +typedef struct CliCommand CliCommand; + +DEFLIST(Conn); +DEFLIST(Article); + + +/*----- configuration options -----*/ +/* when changing defaults, remember to update the manpage */ + +static const char *sitename, *remote_host; +static const char *feedfile, *path_run, *path_cli, *path_cli_dir; +static int quiet_multiple=0; +static int interactive=0, try_filemon=1; +static int try_stream=1; +static int port=119; +static const char *inndconffile; + +static int max_connections=10; +static int max_queue_per_conn=200; +static int target_max_feedfile_size=100000; +static int period_seconds=30; +static int filepoll_seconds=5; +static int max_queue_per_ipf=-1; + +static int connection_setup_timeout=200; +static int inndcomm_flush_timeout=100; + +static double nocheck_thresh= 95.0; /* converted from percentage by main */ +static double nocheck_decay= 100; /* conv'd from articles to lambda by main */ + +/* all these are initialised to seconds, and converted to periods in main */ +static int reconnect_delay_periods=1000; +static int flushfail_retry_periods=1000; +static int backlog_retry_minperiods=100; +static int backlog_spontrescan_periods=300; +static int spontaneous_flush_periods=100000; +static int max_separated_periods=2000; +static int need_activity_periods=1000; +static int lowvol_thresh=3; +static int lowvol_periods=1000; + +static double max_bad_data_ratio= 1; /* conv'd from percentage by main */ +static int max_bad_data_initial= 30; + /* in one corrupt 4096-byte block the number of newlines has + * mean 16 and standard deviation 3.99. 30 corresponds to z=+3.5 */ + + +/*----- statistics -----*/ + +typedef enum { /* in queue in conn->sent */ + art_Unchecked, /* not checked, not sent checking */ + art_Wanted, /* checked, wanted sent body as requested */ + art_Unsolicited, /* - sent body without check */ + art_MaxState, +} ArtState; + +static const char *const artstate_names[]= + { "Unchecked", "Wanted", "Unsolicited", 0 }; + +#define RESULT_COUNTS(RCS,RCN) \ + RCS(sent) \ + RCS(accepted) \ + RCN(unwanted) \ + RCN(rejected) \ + RCN(deferred) \ + RCN(missing) \ + RCN(connretry) + +#define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)" +#define RCI_TRIPLE_VALS_BASE(counts,x) \ + counts[art_Unchecked] x \ + + counts[art_Wanted] x \ + + counts[art_Unsolicited] x, \ + counts[art_Unchecked] x \ + , counts[art_Wanted] x \ + , counts[art_Unsolicited] x + +typedef enum { +#define RC_INDEX(x) RC_##x, + RESULT_COUNTS(RC_INDEX, RC_INDEX) + RCI_max +} ResultCountIndex; + + +/*----- transmission buffers -----*/ + +#define CONNIOVS 128 + +typedef enum { + xk_Const, xk_Artdata +} XmitKind; + +struct XmitDetails { + XmitKind kind; + union { + ARTHANDLE *sm_art; + } info; +}; + + +/*----- core operational data structure types -----*/ + +struct InputFile { + /* This is also an instance of struct oop_readable */ + struct oop_readable readable; /* first */ + oop_readable_call *readable_callback; + void *readable_callback_user; + + int fd; + Filemon_Perfile *filemon; + + oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */ + off_t offset; + int skippinglong, paused, fake_readable; + + ArticleList queue; + long inprogress; /* includes queue.count and also articles in conns */ + long autodefer; /* -1 means not doing autodefer */ + + int counts[art_MaxState][RCI_max]; + int readcount_ok, readcount_blank, readcount_err, count_nooffer_missing; + char path[]; +}; + +struct Article { + ISNODE(Article); + ArtState state; + int midlen, missing; + InputFile *ipf; + TOKEN token; + off_t offset; + int blanklen; + char messageid[1]; +}; + +#define SMS_LIST(X) \ + X(NORMAL) \ + X(FLUSHING) \ + X(FLUSHFAILED) \ + X(SEPARATED) \ + X(DROPPING) \ + X(DROPPED) + +enum StateMachineState { +#define SMS_DEF_ENUM(s) sm_##s, + SMS_LIST(SMS_DEF_ENUM) +}; + +extern const char *sms_names[]; + +/*========== function declarations ==========*/ + +/*----- help.c -----*/ + +static void syscrash(const char *fmt, ...) NORET_PRINTF(1,2); +static void crash(const char *fmt, ...) NORET_PRINTF(1,2); +static void info(const char *fmt, ...) PRINTF(1,2); +static void dbg(const char *fmt, ...) PRINTF(1,2); + +void logv(int sysloglevel, const char *pfx, int errnoval, + const char *fmt, va_list al) PRINTF(5,0); + +char *xvasprintf(const char *fmt, va_list al) PRINTF(1,0); +char *xasprintf(const char *fmt, ...) PRINTF(1,2); + +int close_perhaps(int *fd); +void xclose(int fd, const char *what, const char *what2); +void xclose_perhaps(int *fd, const char *what, const char *what2); +pid_t xfork(const char *what); + +static void on_fd_read_except(int fd, oop_call_fd callback); +void cancel_fd_read_except(int fd); + +void report_child_status(const char *what, int status); +int xwaitpid(pid_t *pid, const char *what); + +void *zxmalloc(size_t sz); +void xunlink(const char *path, const char *what); +time_t xtime(void); +void xsigaction(int signo, const struct sigaction *sa); +void xsigsetdefault(int signo); + +void xgettimeofday(struct timeval *tv_r); +void xsetnonblock(int fd, int nonb); + +void check_isreg(const struct stat *stab, const char *path, + const char *what); +void xfstat(int fd, struct stat *stab_r, const char *what); +void xfstat_isreg(int fd, struct stat *stab_r, + const char *path, const char *what); +void xlstat_isreg(const char *path, struct stat *stab, + int *enoent_r /* 0 means ENOENT is fatal */, + const char *what); +int samefile(const struct stat *a, const struct stat *b); + +char *sanitise(const char *input, int len); + +static inline int isewouldblock(int errnoval) { + return errnoval==EWOULDBLOCK || errnoval==EAGAIN; +} + +/*----- innduct.c -----*/ + +void postfork(void); + +/*----- conn.c -----*/ + +void conn_closefd(Conn *conn, const char *msgprefix); + +/*----- defer.c -----*/ + +void poll_backlog_file(void); + +/*----- infile.c -----*/ + +void filepoll(void); + +/*----- statemc.c -----*/ + +sig_atomic_t terminate_sig_flag; + +/*----- xmit.c -----*/ + +void inputfile_queue_check_expired(InputFile *ipf); + +/*----- external linkage for debug/dump only -----*/ + +pid_t connecting_child; +pid_t inndcomm_child; + +/*========== general operational variables ==========*/ + +/* innduct.c */ +extern oop_source *loop; +extern ConnList conns; +extern char *path_lock, *path_flushing, *path_defer, *path_dump; +extern char *globpat_backlog; +extern pid_t self_pid; +extern int *lowvol_perperiod; +extern int lowvol_circptr; +extern int lowvol_total; /* does not include current period */ + +/* statemc.c */ +extern StateMachineState sms; +extern int until_flush; +extern InputFile *main_input_file, *flushing_input_file, *backlog_input_file; +extern FILE *defer; +extern int until_connect, until_backlog_nextscan; +extern double accept_proportion; +extern int nocheck, nocheck_reported, in_child; + +/* help.c */ +extern int simulate_flush; +extern int logv_use_syslog; +extern const char *logv_prefix; + + +#endif /*INNDUCT_H*/ diff --git a/recv.c b/recv.c new file mode 100644 index 0000000..870e0e5 --- /dev/null +++ b/recv.c @@ -0,0 +1,237 @@ +/*========== handling responses from peer ==========*/ + +static const oop_rd_style peer_rd_style= { + OOP_RD_DELIM_STRIP, '\n', + OOP_RD_NUL_FORBID, + OOP_RD_SHORTREC_FORBID +}; + +static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev, + const char *errmsg, int errnoval, + const char *data, size_t recsz, void *conn_v) { + Conn *conn= conn_v; + connfail(conn, "error receiving from peer: %s", errmsg); + return OOP_CONTINUE; +} + +static Article *article_reply_check(Conn *conn, const char *response, + int code_indicates_streaming, + int must_have_sent + /* 1:yes, -1:no, 0:dontcare */, + const char *sanitised_response) { + Article *art= LIST_HEAD(conn->sent); + + if (!art) { + connfail(conn, + "peer gave unexpected response when no commands outstanding: %s", + sanitised_response); + return 0; + } + + if (code_indicates_streaming) { + assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */ + if (!conn->stream) { + connfail(conn, "peer gave streaming response code " + " to IHAVE or subsequent body: %s", sanitised_response); + return 0; + } + const char *got_mid= response+4; + int got_midlen= strcspn(got_mid, " \n\r"); + if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') { + connfail(conn, "peer gave streaming response with syntactically invalid" + " messageid: %s", sanitised_response); + return 0; + } + if (got_midlen != art->midlen || + memcmp(got_mid, art->messageid, got_midlen)) { + connfail(conn, "peer gave streaming response code to wrong article -" + " probable synchronisation problem; we offered: %s;" + " peer said: %s", + art->messageid, sanitised_response); + return 0; + } + } else { + if (conn->stream) { + connfail(conn, "peer gave non-streaming response code to" + " CHECK/TAKETHIS: %s", sanitised_response); + return 0; + } + } + + if (must_have_sent>0 && art->state < art_Wanted) { + connfail(conn, "peer says article accepted but" + " we had not sent the body: %s", sanitised_response); + return 0; + } + if (must_have_sent<0 && art->state >= art_Wanted) { + connfail(conn, "peer says please sent the article but we just did: %s", + sanitised_response); + return 0; + } + + Article *art_again= LIST_REMHEAD(conn->sent); + assert(art_again == art); + return art; +} + +static void update_nocheck(int accepted) { + accept_proportion *= nocheck_decay; + accept_proportion += accepted * (1.0 - nocheck_decay); + int new_nocheck= accept_proportion >= nocheck_thresh; + if (new_nocheck && !nocheck_reported) { + notice("entering nocheck mode for the first time"); + nocheck_reported= 1; + } else if (new_nocheck != nocheck) { + dbg("nocheck mode %s", new_nocheck ? "start" : "stop"); + } + nocheck= new_nocheck; +} + +static void article_done(Article *art, int whichcount) { + if (whichcount>=0 && !art->missing) + art->ipf->counts[art->state][whichcount]++; + + if (whichcount == RC_accepted) update_nocheck(1); + else if (whichcount == RC_unwanted) update_nocheck(0); + + InputFile *ipf= art->ipf; + + while (art->blanklen) { + static const char spaces[]= + " " + " " + " " + " " + " " + " " + " " + " " + " "; + int nspaces= sizeof(spaces)-1; + int w= art->blanklen; if (w > nspaces) w= nspaces; + int r= pwrite(ipf->fd, spaces, w, art->offset); + if (r==-1) { + if (errno==EINTR) continue; + syscrash("failed to blank entry for %s (length %d at offset %lu) in %s", + art->messageid, art->blanklen, + (unsigned long)art->offset, ipf->path); + } + assert(r>=0 && r<=w); + art->blanklen -= w; + art->offset += w; + } + + ipf->inprogress--; + assert(ipf->inprogress >= 0); + free(art); + + if (!ipf->inprogress && ipf != main_input_file) + queue_check_input_done(); +} + +static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, + const char *errmsg, int errnoval, + const char *data, size_t recsz, void *conn_v) { + Conn *conn= conn_v; + + if (ev == OOP_RD_EOF) { + connfail(conn, "unexpected EOF from peer"); + return OOP_CONTINUE; + } + assert(ev == OOP_RD_OK); + + char *sani= sanitise(data,-1); + + char *ep; + unsigned long code= strtoul(data, &ep, 10); + if (ep != data+3 || *ep != ' ' || data[0]=='0') { + connfail(conn, "badly formatted response from peer: %s", sani); + return OOP_CONTINUE; + } + + int busy= conn_busy(conn); + + if (conn->quitting) { + if (code!=205 && code!=400) { + connfail(conn, "peer gave unexpected response to QUIT (%s): %s", + conn->quitting, sani); + } else { + LIST_REMOVE(conns,conn); + notice("C%d (now %d) idle connection closed (%s)", + conn->fd, conns.count, conn->quitting); + assert(!busy); + conn_dispose(conn); + } + return OOP_CONTINUE; + } + + conn->since_activity= 0; + Article *art; + +#define GET_ARTICLE(musthavesent) do{ \ + art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \ + if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */ \ + }while(0) + +#define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{ \ + code_streaming= (streaming); \ + GET_ARTICLE(musthavesent); \ + article_done(art, RC_##how); \ + goto dealtwith; \ + }while(0) + +#define PEERBADMSG(m) do { \ + connfail(conn, m ": %s", sani); return OOP_CONTINUE; \ + }while(0) + + int code_streaming= 0; + + switch (code) { + + default: PEERBADMSG("peer sent unexpected message"); + + case 400: + if (busy) + PEERBADMSG("peer timed us out or stopped accepting articles"); + + LIST_REMOVE(conns,conn); + notice("C%d (now %d) idle connection closed by peer", + conns.count, conn->fd); + conn_dispose(conn); + return OOP_CONTINUE; + + case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */ + case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */ + + case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */ + case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */ + + case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */ + case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */ + + case 238: /* CHECK says send it */ + code_streaming= 1; + case 335: /* IHAVE says send it */ + GET_ARTICLE(-1); + assert(art->state == art_Unchecked); + art->ipf->counts[art->state][RC_accepted]++; + art->state= art_Wanted; + LIST_ADDTAIL(conn->priority, art); + break; + + case 431: /* CHECK or TAKETHIS says try later */ + code_streaming= 1; + case 436: /* IHAVE says try later */ + GET_ARTICLE(0); + article_defer(art, RC_deferred); + break; + + } +dealtwith: + + conn_maybe_write(conn); + check_assign_articles(); + return OOP_CONTINUE; +} + + diff --git a/statemc.c b/statemc.c new file mode 100644 index 0000000..970bb6c --- /dev/null +++ b/statemc.c @@ -0,0 +1,514 @@ + + +/* statemc_init initialises */ +StateMachineState sms; +int until_flush; +InputFile *main_input_file, *flushing_input_file, *backlog_input_file; +FILE *defer; + +/* initialisation to 0 is good */ +int until_connect, until_backlog_nextscan; +double accept_proportion; +int nocheck, nocheck_reported, in_child; +sig_atomic_t terminate_sig_flag; + + +static void startup_set_input_file(InputFile *f) { + assert(!main_input_file); + main_input_file= f; + inputfile_reading_start(f); +} + +static void statemc_lock(void) { + int lockfd; + struct stat stab, stabf; + + for (;;) { + lockfd= open(path_lock, O_CREAT|O_RDWR, 0600); + if (lockfd<0) sysdie("open lockfile %s", path_lock); + + struct flock fl; + memset(&fl,0,sizeof(fl)); + fl.l_type= F_WRLCK; + fl.l_whence= SEEK_SET; + int r= fcntl(lockfd, F_SETLK, &fl); + if (r==-1) { + if (errno==EACCES || isewouldblock(errno)) { + if (quiet_multiple) exit(0); + die("another duct holds the lockfile"); + } + sysdie("fcntl F_SETLK lockfile %s", path_lock); + } + + xfstat_isreg(lockfd, &stabf, path_lock, "lockfile"); + int lock_noent; + xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile"); + + if (!lock_noent && samefile(&stab, &stabf)) + break; + + xclose(lockfd, "stale lockfile ", path_lock); + } + + FILE *lockfile= fdopen(lockfd, "w"); + if (!lockfile) syscrash("fdopen lockfile"); + + int r= ftruncate(lockfd, 0); + if (r) syscrash("truncate lockfile to write new info"); + + if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n", + (unsigned long)self_pid, + sitename, feedfile, remote_host) == EOF || + fflush(lockfile)) + sysdie("write info to lockfile %s", path_lock); + + dbg("startup: locked"); +} + +static void statemc_init(void) { + struct stat stabdefer; + + search_backlog_file(); + + int defer_noent; + xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file"); + if (defer_noent) { + dbg("startup: ductdefer ENOENT"); + } else { + dbg("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink); + switch (stabdefer.st_nlink==1) { + case 1: + open_defer(); /* so that we will later close it and rename it */ + break; + case 2: + xunlink(path_defer, "stale defer file link" + " (presumably hardlink to backlog file)"); + break; + default: + crash("defer file %s has unexpected link count %d", + path_defer, stabdefer.st_nlink); + } + } + + struct stat stab_f, stab_d; + int noent_f; + + InputFile *file_d= open_input_file(path_flushing); + if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file"); + + xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile"); + + if (!noent_f && file_d && samefile(&stab_f, &stab_d)) { + dbg("startup: F==D => Hardlinked"); + xunlink(feedfile, "feed file (during startup)"); /* => Moved */ + noent_f= 1; + } + + if (noent_f) { + dbg("startup: F ENOENT => Moved"); + if (file_d) startup_set_input_file(file_d); + spawn_inndcomm_flush("feedfile missing at startup"); + /* => Flushing, sms:=FLUSHING */ + } else { + if (file_d) { + dbg("startup: F!=D => Separated"); + startup_set_input_file(file_d); + flushing_input_file= main_input_file; + main_input_file= open_input_file(feedfile); + if (!main_input_file) crash("feedfile vanished during startup"); + SMS(SEPARATED, max_separated_periods, + "found both old and current feed files"); + } else { + dbg("startup: F exists, D ENOENT => Normal"); + InputFile *file_f= open_input_file(feedfile); + if (!file_f) crash("feed file vanished during startup"); + startup_set_input_file(file_f); + SMS(NORMAL, spontaneous_flush_periods, "normal startup"); + } + } +} + +static void statemc_start_flush(const char *why) { /* Normal => Flushing */ + assert(sms == sm_NORMAL); + + dbg("starting flush (%s) (%lu >?= %lu) (%d)", + why, + (unsigned long)(main_input_file ? main_input_file->offset : 0), + (unsigned long)target_max_feedfile_size, + until_flush); + + int r= link(feedfile, path_flushing); + if (r) sysdie("link feedfile %s to flushing file %s", + feedfile, path_flushing); + /* => Hardlinked */ + + xunlink(feedfile, "old feedfile link"); + /* => Moved */ + + spawn_inndcomm_flush(why); /* => Flushing FLUSHING */ +} + +static int trigger_flush_ok(const char *why) { + switch (sms) { + + case sm_NORMAL: + statemc_start_flush(why ? why : "periodic"); + return 1; /* Normal => Flushing; => FLUSHING */ + + case sm_FLUSHFAILED: + spawn_inndcomm_flush(why ? why : "retry"); + return 1; /* Moved => Flushing; => FLUSHING */ + + case sm_SEPARATED: + case sm_DROPPING: + warn("abandoning old feedfile after flush (%s), autodeferring", + why ? why : "took too long to complete"); + assert(flushing_input_file); + autodefer_input_file(flushing_input_file); + return 1; + + default: + return 0; + } +} + +static void statemc_period_poll(void) { + if (!until_flush) return; + until_flush--; + assert(until_flush>=0); + + if (until_flush) return; + int ok= trigger_flush_ok(0); + assert(ok); +} + +static int inputfile_is_done(InputFile *ipf) { + if (!ipf) return 0; + if (ipf->inprogress) return 0; /* new article in the meantime */ + if (ipf->rd) return 0; /* not had EOF */ + return 1; +} + +static void notice_processed(InputFile *ipf, int completed, + const char *what, const char *spec) { + if (!ipf) return; /* allows preterminate to be lazy */ + +#define RCI_NOTHING(x) /* nothing */ +#define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE +#define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x]) + +#define CNT(art,rc) (ipf->counts[art_##art][RC_##rc]) + + char *inprog= completed + ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */ + : xasprintf(" inprogress=%ld", ipf->inprogress); + char *autodefer= ipf->autodefer >= 0 + ? xasprintf(" autodeferred=%ld", ipf->autodefer) + : xasprintf("%s",""); + + info("%s %s%s read=%d (+bl=%d,+err=%d)%s%s" + " missing=%d offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)" + RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT) + , + completed?"completed":"processed", what, spec, + ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err, + inprog, autodefer, ipf->count_nooffer_missing, + CNT(Unchecked,sent) + CNT(Unsolicited,sent) + , CNT(Unchecked,sent), CNT(Unsolicited,sent), + CNT(Wanted,accepted) + CNT(Unsolicited,accepted) + , CNT(Wanted,accepted), CNT(Unsolicited,accepted) + RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_VALS) + ); + + free(inprog); + free(autodefer); + +#undef CNT +} + +static void statemc_check_backlog_done(void) { + InputFile *ipf= backlog_input_file; + if (!inputfile_is_done(ipf)) return; + + const char *slash= strrchr(ipf->path, '/'); + const char *leaf= slash ? slash+1 : ipf->path; + const char *under= strchr(slash, '_'); + const char *rest= under ? under+1 : leaf; + if (!strncmp(rest,"backlog",7)) rest += 7; + notice_processed(ipf,1,"backlog ",rest); + + close_input_file(ipf); + if (unlink(ipf->path)) { + if (errno != ENOENT) + syscrash("could not unlink processed backlog file %s", ipf->path); + warn("backlog file %s vanished while we were reading it" + " so we couldn't remove it (but it's done now, anyway)", + ipf->path); + } + free(ipf); + backlog_input_file= 0; + search_backlog_file(); + return; +} + +static void statemc_check_flushing_done(void) { + InputFile *ipf= flushing_input_file; + if (!inputfile_is_done(ipf)) return; + + assert(sms==sm_SEPARATED || sms==sm_DROPPING); + + notice_processed(ipf,1,"feedfile",""); + + close_defer(); + + xunlink(path_flushing, "old flushing file"); + + close_input_file(flushing_input_file); + free(flushing_input_file); + flushing_input_file= 0; + + if (sms==sm_SEPARATED) { + notice("flush complete"); + SMS(NORMAL, spontaneous_flush_periods, "flush complete"); + } else if (sms==sm_DROPPING) { + SMS(DROPPED, max_separated_periods, "old flush complete"); + search_backlog_file(); + notice("feed dropped, but will continue until backlog is finished"); + } +} + +static void *statemc_check_input_done(oop_source *lp, struct timeval now, + void *u) { + /* main input file may be idle but if so that's because + * we haven't got to it yet, but that doesn't mean it's really done */ + statemc_check_flushing_done(); + statemc_check_backlog_done(); + return OOP_CONTINUE; +} + +static void queue_check_input_done(void) { + loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0); +} + +static void statemc_setstate(StateMachineState newsms, int periods, + const char *forlog, const char *why) { + sms= newsms; + until_flush= periods; + + const char *xtra= ""; + switch (sms) { + case sm_FLUSHING: + case sm_FLUSHFAILED: + if (!main_input_file) xtra= "-ABSENT"; + break; + case sm_SEPARATED: + case sm_DROPPING: + xtra= flushing_input_file->rd ? "-1" : "-2"; + break; + default:; + } + + if (periods) { + info("state %s%s[%d] %s",forlog,xtra,periods,why); + } else { + info("state %s%s %s",forlog,xtra,why); + } +} + +/*========== flushing the feed ==========*/ + +pid_t inndcomm_child; +static int inndcomm_sentinel_fd; + +static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { + assert(inndcomm_child); + assert(fd == inndcomm_sentinel_fd); + int status= xwaitpid(&inndcomm_child, "inndcomm"); + inndcomm_child= 0; + + cancel_fd_read_except(fd); + xclose_perhaps(&fd, "inndcomm sentinel pipe",0); + inndcomm_sentinel_fd= 0; + + assert(!flushing_input_file); + + if (WIFEXITED(status)) { + switch (WEXITSTATUS(status)) { + + case INNDCOMMCHILD_ESTATUS_FAIL: + goto failed; + + case INNDCOMMCHILD_ESTATUS_NONESUCH: + notice("feed has been dropped by innd, finishing up"); + flushing_input_file= main_input_file; + tailing_make_readable(flushing_input_file); + /* we probably previously returned EAGAIN from our fake read method + * when in fact we were at EOF, so signal another readable event + * so we actually see the EOF */ + + main_input_file= 0; + + if (flushing_input_file) { + SMS(DROPPING, max_separated_periods, + "feed dropped by innd, but must finish last flush"); + } else { + close_defer(); + SMS(DROPPED, 0, "feed dropped by innd"); + search_backlog_file(); + } + return OOP_CONTINUE; + + case 0: + /* as above */ + flushing_input_file= main_input_file; + tailing_make_readable(flushing_input_file); + + main_input_file= open_input_file(feedfile); + if (!main_input_file) + crash("flush succeeded but feedfile %s does not exist!" + " (this probably means feedfile does not correspond" + " to site %s in newsfeeds)", feedfile, sitename); + + if (flushing_input_file) { + SMS(SEPARATED, max_separated_periods, "flush complete"); + } else { + close_defer(); + SMS(NORMAL, spontaneous_flush_periods, "recovery flush complete"); + } + return OOP_CONTINUE; + + default: + goto unexpected_exitstatus; + + } + } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { + warn("flush timed out trying to talk to innd"); + goto failed; + } else { + unexpected_exitstatus: + report_child_status("inndcomm child", status); + } + + failed: + SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry"); + return OOP_CONTINUE; +} + +static void inndcommfail(const char *what) { + syswarn("error communicating with innd: %s failed: %s", what, ICCfailure); + exit(INNDCOMMCHILD_ESTATUS_FAIL); +} + +void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ + int pipefds[2]; + + notice("flushing %s",why); + + assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED); + assert(!inndcomm_child); + assert(!inndcomm_sentinel_fd); + + if (pipe(pipefds)) sysdie("create pipe for inndcomm child sentinel"); + + inndcomm_child= xfork("inndcomm child"); + + if (!inndcomm_child) { + const char *flushargv[2]= { sitename, 0 }; + char *reply; + int r; + + xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0); + /* parent spots the autoclose of pipefds[1] when we die or exit */ + + if (simulate_flush>=0) { + warn("SIMULATING flush child status %d", simulate_flush); + if (simulate_flush>128) raise(simulate_flush-128); + else exit(simulate_flush); + } + + alarm(inndcomm_flush_timeout); + r= ICCopen(); if (r) inndcommfail("connect"); + r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit"); + if (!r) exit(0); /* yay! */ + + if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH); + syswarn("innd ctlinnd flush failed: innd said %s", reply); + exit(INNDCOMMCHILD_ESTATUS_FAIL); + } + + simulate_flush= -1; + + xclose(pipefds[1], "inndcomm sentinel child's end",0); + inndcomm_sentinel_fd= pipefds[0]; + assert(inndcomm_sentinel_fd); + on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event); + + SMS(FLUSHING, 0, why); +} + +/*---------- shutdown and signal handling ----------*/ + +static void preterminate(void) { + if (in_child) return; + notice_processed(main_input_file,0,"feedfile",""); + notice_processed(flushing_input_file,0,"flushing",""); + if (backlog_input_file) + notice_processed(backlog_input_file,0, "backlog file ", + backlog_input_file->path); +} + +static int signal_self_pipe[2]; + +static void raise_default(int signo) { + xsigsetdefault(signo); + raise(signo); + abort(); +} + +static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) { + assert(fd=signal_self_pipe[0]); + char buf[PIPE_BUF]; + int r= read(signal_self_pipe[0], buf, sizeof(buf)); + if (r<0 && !isewouldblock(errno)) + syscrash("failed to read signal self pipe"); + if (r==0) crash("eof on signal self pipe"); + if (terminate_sig_flag) { + preterminate(); + notice("terminating (%s)", strsignal(terminate_sig_flag)); + raise_default(terminate_sig_flag); + } + return OOP_CONTINUE; +} + +static void sigarrived_handler(int signum) { + static char x; + switch (signum) { + case SIGTERM: + case SIGINT: + if (!terminate_sig_flag) terminate_sig_flag= signum; + break; + default: + abort(); + } + write(signal_self_pipe[1],&x,1); +} + +static void init_signals(void) { + if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) + syscrash("could not ignore SIGPIPE"); + + if (pipe(signal_self_pipe)) sysdie("create self-pipe for signals"); + + xsetnonblock(signal_self_pipe[0],1); + xsetnonblock(signal_self_pipe[1],1); + + struct sigaction sa; + memset(&sa,0,sizeof(sa)); + sa.sa_handler= sigarrived_handler; + sa.sa_flags= SA_RESTART; + xsigaction(SIGTERM,&sa); + xsigaction(SIGINT,&sa); + + on_fd_read_except(signal_self_pipe[0], sigarrived_event); +} + diff --git a/xmit.c b/xmit.c new file mode 100644 index 0000000..dd909e4 --- /dev/null +++ b/xmit.c @@ -0,0 +1,332 @@ +/*---------- assigning articles to conns, and transmitting ----------*/ + +static Article *dequeue_from(int peek, InputFile *ipf) { + if (!ipf) return 0; + if (peek) return LIST_HEAD(ipf->queue); + + Article *art= LIST_REMHEAD(ipf->queue); + if (!art) return 0; + check_reading_pause_resume(ipf); + return art; +} + +static Article *dequeue(int peek) { + Article *art; + art= dequeue_from(peek, flushing_input_file); if (art) return art; + art= dequeue_from(peek, backlog_input_file); if (art) return art; + art= dequeue_from(peek, main_input_file); if (art) return art; + return 0; +} + +static void check_assign_articles(void) { + for (;;) { + if (!dequeue(1)) + break; + + Conn *walk, *use=0; + int spare=0, inqueue=0; + + /* Find a connection to offer this article. We prefer a busy + * connection to an idle one, provided it's not full. We take the + * first (oldest) and since that's stable, it will mean we fill up + * connections in order. That way if we have too many + * connections, the spare ones will go away eventually. + */ + FOR_CONN(walk) { + if (walk->quitting) continue; + inqueue= walk->sent.count + walk->priority.count + + walk->waiting.count; + spare= walk->max_queue - inqueue; + assert(inqueue <= max_queue_per_conn); + assert(spare >= 0); + if (inqueue==0) /*idle*/ { if (!use) use= walk; } + else if (spare>0) /*working*/ { use= walk; break; } + } + if (use) { + if (!inqueue) use->since_activity= 0; /* reset idle counter */ + while (spare>0) { + Article *art= dequeue(0); + if (!art) break; + LIST_ADDTAIL(use->waiting, art); + lowvol_perperiod[lowvol_circptr]++; + spare--; + } + conn_maybe_write(use); + } else if (allow_connect_start()) { + connect_start(); + break; + } else { + break; + } + } +} + +static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) { + conn_maybe_write(u); + return OOP_CONTINUE; +} + +static void conn_maybe_write(Conn *conn) { + for (;;) { + conn_make_some_xmits(conn); + if (!conn->xmitu) { + loop->cancel_fd(loop, conn->fd, OOP_WRITE); + conn->oopwriting= 0; + return; + } + + void *rp= conn_write_some_xmits(conn); + if (rp==OOP_CONTINUE) { + if (!conn->oopwriting) { + loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn); + conn->oopwriting= 1; + } + return; + } else if (rp==OOP_HALT) { + return; + } else if (!rp) { + /* transmitted everything */ + } else { + abort(); + } + } +} + +/*---------- expiry, flow control and deferral ----------*/ + +/* + * flow control notes + * to ensure articles go away eventually + * separate queue for each input file + * queue expiry + * every period, check head of backlog queue for expiry with SMretrieve + * if too old: discard, and check next article + * also check every backlog article as we read it + * flush expiry + * after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping + * one-off: eat queued articles from flushing and write them to defer + * one-off: connfail all connections which have any articles from flushing + * newly read articles from flushing go straight to defer + * this should take care of it and get us out of this state + * to avoid filling up ram needlessly + * input control + * limit number of queued articles for each ipf + * pause/resume inputfile tailing + */ + +static void check_reading_pause_resume(InputFile *ipf) { + if (ipf->queue.count >= max_queue_per_ipf) + inputfile_reading_pause(ipf); + else + inputfile_reading_resume(ipf); +} + +static void article_defer(Article *art /* not on a queue */, int whichcount) { + open_defer(); + if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0 + || fflush(defer)) + sysdie("write to defer file %s",path_defer); + article_done(art, whichcount); +} + +static int article_check_expired(Article *art /* must be queued, not conn */) { + ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT); + if (artdata) { SMfreearticle(artdata); return 0; } + + LIST_REMOVE(art->ipf->queue, art); + art->missing= 1; + art->ipf->count_nooffer_missing++; + article_done(art,-1); + return 1; +} + +void inputfile_queue_check_expired(InputFile *ipf) { + if (!ipf) return; + + for (;;) { + Article *art= LIST_HEAD(ipf->queue); + int expd= article_check_expired(art); + if (!expd) break; + } + check_reading_pause_resume(ipf); +} + +static void article_autodefer(InputFile *ipf, Article *art) { + ipf->autodefer++; + article_defer(art,-1); +} + +static int has_article_in(const ArticleList *al, InputFile *ipf) { + Article *art; + for (art=LIST_HEAD(*al); art; art=LIST_NEXT(art)) + if (art->ipf == ipf) return 1; + return 0; +} + +static void autodefer_input_file_articles(InputFile *ipf) { + Article *art; + while ((art= LIST_REMHEAD(ipf->queue))) + article_autodefer(ipf, art); +} + +static void autodefer_input_file(InputFile *ipf) { + static const char *const abandon= "stuck"; + ipf->autodefer= 0; + + autodefer_input_file_articles(ipf); + + if (ipf->inprogress) { + Conn *walk; + FOR_CONN(walk) { + if (has_article_in(&walk->waiting, ipf) || + has_article_in(&walk->priority, ipf) || + has_article_in(&walk->sent, ipf)) + walk->quitting= abandon; + } + while (ipf->inprogress) { + FOR_CONN(walk) + if (walk->quitting == abandon) goto found; + abort(); /* where are they ?? */ + + found: + connfail(walk, "connection is stuck or crawling," + " and we need to finish flush"); + autodefer_input_file_articles(ipf); + } + } + + check_reading_pause_resume(ipf); +} + +/*========== article transmission ==========*/ + +static XmitDetails *xmit_core(Conn *conn, const char *data, int len, + XmitKind kind) { /* caller must then fill in details */ + struct iovec *v= &conn->xmit[conn->xmitu]; + XmitDetails *d= &conn->xmitd[conn->xmitu++]; + v->iov_base= (char*)data; + v->iov_len= len; + d->kind= kind; + return d; +} + +static void xmit_noalloc(Conn *conn, const char *data, int len) { + xmit_core(conn,data,len, xk_Const); +} +#define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1)) + +static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) { + XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata); + d->info.sm_art= ah; +} + +static void xmit_free(XmitDetails *d) { + switch (d->kind) { + case xk_Artdata: SMfreearticle(d->info.sm_art); break; + case xk_Const: break; + default: abort(); + } +} + +static void *conn_write_some_xmits(Conn *conn) { + /* return values: + * 0: nothing more to write, no need to call us again + * OOP_CONTINUE: more to write but fd not writeable + * OOP_HALT: disaster, have destroyed conn + */ + for (;;) { + int count= conn->xmitu; + if (!count) return 0; + + if (count > IOV_MAX) count= IOV_MAX; + ssize_t rs= writev(conn->fd, conn->xmit, count); + if (rs < 0) { + if (isewouldblock(errno)) return OOP_CONTINUE; + connfail(conn, "write failed: %s", strerror(errno)); + return OOP_HALT; + } + assert(rs > 0); + + int done; + for (done=0; rs; ) { + assert(donexmitu); + struct iovec *vp= &conn->xmit[done]; + XmitDetails *dp= &conn->xmitd[done]; + assert(vp->iov_len <= SSIZE_MAX); + if ((size_t)rs >= vp->iov_len) { + rs -= vp->iov_len; + xmit_free(dp); /* vp->iov_len -= vp->iov_len, etc. */ + done++; + } else { + vp->iov_base= (char*)vp->iov_base + rs; + vp->iov_len -= rs; + break; /* rs -= rs */ + } + } + int newu= conn->xmitu - done; + memmove(conn->xmit, conn->xmit + done, newu * sizeof(*conn->xmit)); + memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd)); + conn->xmitu= newu; + } +} + +static void conn_make_some_xmits(Conn *conn) { + for (;;) { + if (conn->xmitu+5 > CONNIOVS) + break; + + Article *art= LIST_REMHEAD(conn->priority); + if (!art) art= LIST_REMHEAD(conn->waiting); + if (!art) break; + + if (art->state >= art_Wanted || (conn->stream && nocheck)) { + /* actually send it */ + + ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL); + + art->state= + art->state == art_Unchecked ? art_Unsolicited : + art->state == art_Wanted ? art_Wanted : + (abort(),-1); + + if (!artdata) art->missing= 1; + art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++; + + if (conn->stream) { + if (artdata) { + XMIT_LITERAL("TAKETHIS "); + xmit_noalloc(conn, art->messageid, art->midlen); + XMIT_LITERAL("\r\n"); + xmit_artbody(conn, artdata); + } else { + article_done(art, -1); + continue; + } + } else { + /* we got 235 from IHAVE */ + if (artdata) { + xmit_artbody(conn, artdata); + } else { + XMIT_LITERAL(".\r\n"); + } + } + + LIST_ADDTAIL(conn->sent, art); + + } else { + /* check it */ + + if (conn->stream) + XMIT_LITERAL("CHECK "); + else + XMIT_LITERAL("IHAVE "); + xmit_noalloc(conn, art->messageid, art->midlen); + XMIT_LITERAL("\r\n"); + + assert(art->state == art_Unchecked); + art->ipf->counts[art->state][RC_sent]++; + LIST_ADDTAIL(conn->sent, art); + } + } +} + -- 2.30.2