From 0d0b89542d5657ecb96b02f35b1bfdd53c635cb2 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Thu, 20 May 2010 19:08:57 +0100 Subject: [PATCH] split program compiles --- cli.c | 32 +++++++++- conn.c | 44 ++++++++++--- decls-junk | 57 ----------------- defer.c | 35 +++++++++-- filemon.c | 40 ++++++++++-- help.c | 42 ++++++------- infile.c | 56 ++++++++++++++--- innduct.c | 80 ++++++++++++++---------- innduct.h | 180 ++++++++++++++++++++++++++++++++++++++++------------- recv.c | 44 ++++++++++--- statemc.c | 57 +++++++++++------ xmit.c | 51 ++++++++++++--- 12 files changed, 497 insertions(+), 221 deletions(-) delete mode 100644 decls-junk diff --git a/cli.c b/cli.c index 05e4827..abb84a9 100644 --- a/cli.c +++ b/cli.c @@ -1,3 +1,31 @@ +/* + * innduct + * tailing reliable realtime streaming feeder for inn + * cli.c - command and control connections + * + * Copyright (C) 2010 Ian Jackson + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * (I believe that when you compile and link this as part of the inn2 + * build, with the Makefile runes I have provided, all the libraries + * and files which end up included in innduct are licence-compatible + * with GPLv3. If not then please let me know. -Ian Jackson.) + */ + +#include "innduct.h" + /*========== command and control (CLI) connections ==========*/ static int cli_master; @@ -169,7 +197,7 @@ static void cli_stdio_destroy(CliConn *cc) { free(cc); } -static void cli_stdio(void) { +void cli_stdio(void) { NEW_DECL(CliConn *,cc); cc->destroy= cli_stdio_destroy; @@ -216,7 +244,7 @@ static void *cli_master_readable(oop_source *lp, int master, goto nocli; \ }while(0) -static void cli_init(void) { +void cli_init(void) { union { struct sockaddr sa; struct sockaddr_un un; diff --git a/conn.c b/conn.c index 301d025..c99174c 100644 --- a/conn.c +++ b/conn.c @@ -1,3 +1,31 @@ +/* + * innduct + * tailing reliable realtime streaming feeder for inn + * conn.c - connection establishment and teardown + * + * Copyright (C) 2010 Ian Jackson + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * (I believe that when you compile and link this as part of the inn2 + * build, with the Makefile runes I have provided, all the libraries + * and files which end up included in innduct are licence-compatible + * with GPLv3. If not then please let me know. -Ian Jackson.) + */ + +#include "innduct.h" + /*========== management of connections ==========*/ static void reconnect_blocking_event(void) { @@ -10,7 +38,7 @@ void conn_closefd(Conn *conn, const char *msgprefix) { conn->fd, msgprefix, strerror(errno)); } -static int conn_busy(Conn *conn) { +int conn_busy(Conn *conn) { return conn->waiting.count || conn->priority.count || @@ -18,7 +46,7 @@ static int conn_busy(Conn *conn) { conn->xmitu; } -static void conn_dispose(Conn *conn) { +void conn_dispose(Conn *conn) { if (!conn) return; if (conn->rd) { oop_rd_cancel(conn->rd); @@ -46,7 +74,7 @@ static void *conn_exception(oop_source *lp, int fd, return OOP_CONTINUE; } -static void vconnfail(Conn *conn, const char *fmt, va_list al) { +void vconnfail(Conn *conn, const char *fmt, va_list al) { int requeue[art_MaxState]; memset(requeue,0,sizeof(requeue)); @@ -82,7 +110,7 @@ static void vconnfail(Conn *conn, const char *fmt, va_list al) { check_assign_articles(); } -static void connfail(Conn *conn, const char *fmt, ...) { +void connfail(Conn *conn, const char *fmt, ...) { va_list al; va_start(al,fmt); vconnfail(conn,fmt,al); @@ -118,7 +146,7 @@ static void conn_idle_close(Conn *conn, const char *why) { * For our last connection, we also shut it down if we have had * less than K in the last L */ -static void check_idle_conns(void) { +void check_idle_conns(void) { Conn *conn; int volthisperiod= lowvol_perperiod[lowvol_circptr]; @@ -162,7 +190,7 @@ static void check_idle_conns(void) { /*---------- making new connections ----------*/ -static pid_t connecting_child; +pid_t connecting_child; int connecting_fdpass_sock; static void connect_attempt_discard(void) { @@ -294,13 +322,13 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { return OOP_CONTINUE; } -static int allow_connect_start(void) { +int allow_connect_start(void) { return conns.count < max_connections && !connecting_child && !until_connect; } -static void connect_start(void) { +void connect_start(void) { assert(!connecting_child); assert(!connecting_fdpass_sock); diff --git a/decls-junk b/decls-junk deleted file mode 100644 index 8bf9a86..0000000 --- a/decls-junk +++ /dev/null @@ -1,57 +0,0 @@ - - - -/*----- function predeclarations -----*/ - -void conn_maybe_write(Conn *conn); -void conn_make_some_xmits(Conn *conn); -void *conn_write_some_xmits(Conn *conn); - -void xmit_free(XmitDetails *d); - -#define SMS(newstate, periods, why) \ - (statemc_setstate(sm_##newstate,(periods),#newstate,(why))) -void statemc_setstate(StateMachineState newsms, int periods, - const char *forlog, const char *why); - -void statemc_start_flush(const char *why); /* Normal => Flushing */ -void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */ -int trigger_flush_ok(const char *why /* 0 means timeout */); - /* => Flushing,FLUSHING, ret 1; or ret 0 */ - -void article_done(Article *art, int whichcount); - -void check_assign_articles(void); -void queue_check_input_done(void); -void check_reading_pause_resume(InputFile *ipf); - -void statemc_check_flushing_done(void); -void statemc_check_backlog_done(void); - -void postfork(void); -void period(void); - -void open_defer(void); -void close_defer(void); -void search_backlog_file(void); -void preterminate(void); -void raise_default(int signo) NORET; - -void inputfile_reading_start(InputFile *ipf); -void inputfile_reading_stop(InputFile *ipf); -void inputfile_reading_pause(InputFile *ipf); -void inputfile_reading_resume(InputFile *ipf); - /* pause and resume are idempotent, and no-op if not done _reading_start */ - -void filemon_start(InputFile *ipf); -void filemon_stop(InputFile *ipf); -void tailing_make_readable(InputFile *ipf); - -void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0); -void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3); - -extern const oop_rd_style peer_rd_style; -extern oop_rd_call peer_rd_err, peer_rd_ok; - - - diff --git a/defer.c b/defer.c index 9af9d58..811907e 100644 --- a/defer.c +++ b/defer.c @@ -1,6 +1,34 @@ +/* + * innduct + * tailing reliable realtime streaming feeder for inn + * defer.c - handling of defer and backlog files + * + * Copyright (C) 2010 Ian Jackson + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * (I believe that when you compile and link this as part of the inn2 + * build, with the Makefile runes I have provided, all the libraries + * and files which end up included in innduct are licence-compatible + * with GPLv3. If not then please let me know. -Ian Jackson.) + */ + +#include "innduct.h" + /*---------- defer and backlog files ----------*/ -static void open_defer(void) { +void open_defer(void) { struct stat stab; if (defer) return; @@ -54,7 +82,7 @@ static void open_defer(void) { syscrash("could not seek to new end of defer file %s", path_defer); } -static void close_defer(void) { +void close_defer(void) { if (!defer) return; @@ -89,7 +117,7 @@ void poll_backlog_file(void) { search_backlog_file(); } -static void search_backlog_file(void) { +void search_backlog_file(void) { /* returns non-0 iff there are any backlog files */ glob_t gl; @@ -189,4 +217,3 @@ static void search_backlog_file(void) { globfree(&gl); return; } - diff --git a/filemon.c b/filemon.c index 9a362db..b06bfaa 100644 --- a/filemon.c +++ b/filemon.c @@ -1,3 +1,31 @@ +/* + * innduct + * tailing reliable realtime streaming feeder for inn + * filemon.c - file monitoring (inotify, kqueue, poll, etc.) + * + * Copyright (C) 2010 Ian Jackson + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * (I believe that when you compile and link this as part of the inn2 + * build, with the Makefile runes I have provided, all the libraries + * and files which end up included in innduct are licence-compatible + * with GPLv3. If not then please let me know. -Ian Jackson.) + */ + +#include "innduct.h" + /*---------- filemon implemented with inotify ----------*/ #if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON) @@ -63,7 +91,7 @@ static void *filemon_inotify_readable(oop_source *lp, int fd, return OOP_CONTINUE; } -static int filemon_method_init(void) { +int filemon_method_init(void) { filemon_inotify_fd= inotify_init(); if (filemon_inotify_fd<0) { syswarn("filemon/inotify: inotify_init failed"); @@ -76,7 +104,7 @@ static int filemon_method_init(void) { return 1; } -static void filemon_method_dump_info(FILE *f) { +void filemon_method_dump_info(FILE *f) { int i; fprintf(f,"inotify"); DUMPV("%d",,filemon_inotify_fd); @@ -93,26 +121,26 @@ static void filemon_method_dump_info(FILE *f) { struct Filemon_Perfile { int dummy; }; -static int filemon_method_init(void) { +int filemon_method_init(void) { warn("filemon/dummy: no filemon method compiled in"); return 0; } static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { } static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { } -static void filemon_method_dump_info(FILE *f) { fprintf(f,"dummy\n"); } +void filemon_method_dump_info(FILE *f) { fprintf(f,"dummy\n"); } #endif /* !HAVE_FILEMON */ /*---------- filemon generic interface ----------*/ -static void filemon_start(InputFile *ipf) { +void filemon_start(InputFile *ipf) { assert(!ipf->filemon); NEW(ipf->filemon); filemon_method_startfile(ipf, ipf->filemon); } -static void filemon_stop(InputFile *ipf) { +void filemon_stop(InputFile *ipf) { if (!ipf->filemon) return; filemon_method_stopfile(ipf, ipf->filemon); free(ipf->filemon); diff --git a/help.c b/help.c index 144ab86..34b1018 100644 --- a/help.c +++ b/help.c @@ -1,7 +1,7 @@ /* * innduct * tailing reliable realtime streaming feeder for inn - * logging and utility functions + * help.c - logging and utility functions * * Copyright (C) 2010 Ian Jackson * @@ -70,37 +70,26 @@ void logv(int sysloglevel, const char *pfx, int errnoval, exit(estatus); \ } -#define DEFLOG(fn, pfx, sysloglevel, err) \ - static void fn(const char *fmt, ...) { \ - VA; \ - logv(sysloglevel, pfx, err, fmt, al); \ - va_end(al); \ +#define DEFLOG(fn, pfx, sysloglevel, err) \ + void fn(const char *fmt, ...) { \ + VA; \ + logv(sysloglevel, pfx, err, fmt, al); \ + va_end(al); \ } -#define INNLOGSET_DECLARE(fn, pfx, sysloglevel) \ - static void duct_log_##fn(int l, const char *fmt, va_list al, int errval) \ - PRINTF(3,0); \ - static void duct_log_##fn(int l, const char *fmt, va_list al, int errval) { \ - logv(sysloglevel, pfx, errval ? errval : -1, fmt, al); \ +#define INNLOGSET_DEFINE(fn, pfx, sysloglevel) \ + void duct_log_##fn(int l, const char *fmt, va_list al, int errval) { \ + logv(sysloglevel, pfx, errval ? errval : -1, fmt, al); \ } -#define INNLOGSET_CALL(fn, pfx, sysloglevel) \ - message_handlers_##fn(1, duct_log_##fn); -static int innduct_fatal_cleanup(void) { return 12; } /* used for libinn die */ - /* We want to extend the set of logging functions from inn, and we * want to prepend the site name to all our messages. */ DEFFATAL(syscrash, "critical", LOG_CRIT, errno, 16); DEFFATAL(crash, "critical", LOG_CRIT, -1, 16); -#define INNLOGSETS(INNLOGSET) \ - INNLOGSET(die, "fatal", LOG_ERR) \ - INNLOGSET(warn, "warning", LOG_WARNING) \ - INNLOGSET(notice, "notice", LOG_NOTICE) \ - INNLOGSET(trace, "trace", LOG_NOTICE) -INNLOGSETS(INNLOGSET_DECLARE) +INNLOGSETS(INNLOGSET_DEFINE) DEFLOG(info, "info", LOG_INFO, -1) DEFLOG(dbg, "debug", LOG_DEBUG, -1) @@ -218,6 +207,11 @@ void xsigsetdefault(int signo) { sa.sa_handler= SIG_DFL; xsigaction(signo,&sa); } +void raise_default(int signo) { + xsigsetdefault(signo); + raise(signo); + abort(); +} void xgettimeofday(struct timeval *tv_r) { int r= gettimeofday(tv_r,0); @@ -235,13 +229,13 @@ void check_isreg(const struct stat *stab, const char *path, what, path, (unsigned long)stab->st_mode); } -static void xfstat(int fd, struct stat *stab_r, const char *what) { +void xfstat(int fd, struct stat *stab_r, const char *what) { int r= fstat(fd, stab_r); if (r) syscrash("could not fstat %s", what); } -static void xfstat_isreg(int fd, struct stat *stab_r, - const char *path, const char *what) { +void xfstat_isreg(int fd, struct stat *stab_r, + const char *path, const char *what) { xfstat(fd, stab_r, what); check_isreg(stab_r, path, what); } diff --git a/infile.c b/infile.c index 223846e..3fc1ff6 100644 --- a/infile.c +++ b/infile.c @@ -1,3 +1,31 @@ +/* + * innduct + * tailing reliable realtime streaming feeder for inn + * infile.c - monitoring and handling of input files + * + * Copyright (C) 2010 Ian Jackson + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * (I believe that when you compile and link this as part of the inn2 + * build, with the Makefile runes I have provided, all the libraries + * and files which end up included in innduct are licence-compatible + * with GPLv3. If not then please let me know. -Ian Jackson.) + */ + +#include "innduct.h" + /*========== monitoring of input files ==========*/ static void feedfile_eof(InputFile *ipf) { @@ -15,7 +43,7 @@ static void feedfile_eof(InputFile *ipf) { } } -static InputFile *open_input_file(const char *path) { +InputFile *open_input_file(const char *path) { int fd= open(path, O_RDWR); if (fd<0) { if (errno==ENOENT) return 0; @@ -34,7 +62,7 @@ static InputFile *open_input_file(const char *path) { return ipf; } -static void close_input_file(InputFile *ipf) { /* does not free */ +void close_input_file(InputFile *ipf) { /* does not free */ assert(!ipf->readable_callback); /* must have had ->on_cancel */ assert(!ipf->filemon); /* must have had inputfile_reading_stop */ assert(!ipf->rd); /* must have had inputfile_reading_stop */ @@ -176,7 +204,7 @@ static void tailing_on_cancel(struct oop_readable *rable) { ipf->readable_callback= 0; } -static void tailing_make_readable(InputFile *ipf) { +void tailing_make_readable(InputFile *ipf) { dbg("**TRACT** ipf=%p makereadable rcb=%p",ipf, (void*)ipf?ipf->readable_callback:0); if (!ipf || !ipf->readable_callback) /* so callers can be naive */ @@ -235,7 +263,7 @@ static const oop_rd_style feedfile_rdstyle= { OOP_RD_SHORTREC_LONG, }; -static void inputfile_reading_resume(InputFile *ipf) { +void inputfile_reading_resume(InputFile *ipf) { if (!ipf->rd) return; if (!ipf->paused) return; @@ -246,14 +274,14 @@ static void inputfile_reading_resume(InputFile *ipf) { ipf->paused= 0; } -static void inputfile_reading_pause(InputFile *ipf) { +void inputfile_reading_pause(InputFile *ipf) { if (!ipf->rd) return; if (ipf->paused) return; oop_rd_cancel(ipf->rd); ipf->paused= 1; } -static void inputfile_reading_start(InputFile *ipf) { +void inputfile_reading_start(InputFile *ipf) { assert(!ipf->rd); ipf->readable.on_readable= tailing_on_readable; ipf->readable.on_cancel= tailing_on_cancel; @@ -271,7 +299,7 @@ static void inputfile_reading_start(InputFile *ipf) { inputfile_reading_resume(ipf); } -static void inputfile_reading_stop(InputFile *ipf) { +void inputfile_reading_stop(InputFile *ipf) { assert(ipf->rd); inputfile_reading_pause(ipf); oop_rd_delete(ipf->rd); @@ -284,3 +312,17 @@ void filepoll(void) { tailing_make_readable(flushing_input_file); } +char *dbg_report_ipf(InputFile *ipf) { + if (!ipf) return xasprintf("none"); + + const char *slash= strrchr(ipf->path,'/'); + const char *path= slash ? slash+1 : ipf->path; + + return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s%s", + ipf, path, + ipf->queue.count, ipf->inprogress, ipf->autodefer, + (long)ipf->offset, ipf->fd, + ipf->rd ? "" : ",!rd", + ipf->skippinglong ? "*skiplong" : "", + ipf->rd && ipf->paused ? "*paused" : ""); +} diff --git a/innduct.c b/innduct.c index 7ade8b6..6ff7bad 100644 --- a/innduct.c +++ b/innduct.c @@ -1,7 +1,7 @@ /* * innduct * tailing reliable realtime streaming feeder for inn - * main program - option parsing and startup + * innduct.c - main program, option parsing and startup * * Copyright (C) 2010 Ian Jackson * @@ -32,22 +32,6 @@ const char *sms_names[]= { 0 }; -struct Conn { - ISNODE(Conn); - int fd; /* may be 0, meaning closed (during construction/destruction) */ - oop_read *rd; /* likewise */ - int oopwriting; /* since on_fd is not idempotent */ - int max_queue, stream; - const char *quitting; - int since_activity; /* periods */ - ArticleList waiting; /* not yet told peer */ - ArticleList priority; /* peer says send it now */ - ArticleList sent; /* offered/transmitted - in xmit or waiting reply */ - struct iovec xmit[CONNIOVS]; - XmitDetails xmitd[CONNIOVS]; - int xmitu; -}; - /*----- general operational variables -----*/ @@ -61,6 +45,46 @@ int *lowvol_perperiod; int lowvol_circptr; int lowvol_total; /* does not include current period */ +/*---------- configuration option variables ----------*/ +/* when changing defaults, remember to update the manpage */ + +const char *sitename, *remote_host; +const char *feedfile, *path_run, *path_cli, *path_cli_dir; +int quiet_multiple=0; +int interactive=0, try_filemon=1; +int try_stream=1; +int port=119; +const char *inndconffile; + +int max_connections=10; +int max_queue_per_conn=200; +int target_max_feedfile_size=100000; +int period_seconds=30; +int filepoll_seconds=5; +int max_queue_per_ipf=-1; + +int connection_setup_timeout=200; +int inndcomm_flush_timeout=100; + +double nocheck_thresh= 95.0; /* converted from percentage by main */ +double nocheck_decay= 100; /* conv'd from articles to lambda by main */ + +/* all these are initialised to seconds, and converted to periods in main */ +int reconnect_delay_periods=1000; +int flushfail_retry_periods=1000; +int backlog_retry_minperiods=100; +int backlog_spontrescan_periods=300; +int spontaneous_flush_periods=100000; +int max_separated_periods=2000; +int need_activity_periods=1000; +int lowvol_thresh=3; +int lowvol_periods=1000; + +double max_bad_data_ratio= 1; /* conv'd from percentage by main */ +int max_bad_data_initial= 30; + /* in one corrupt 4096-byte block the number of newlines has + * mean 16 and standard deviation 3.99. 30 corresponds to z=+3.5 */ + /*========== main program ==========*/ static void postfork_inputfile(InputFile *ipf) { @@ -126,22 +150,7 @@ static void every(int interval, int fixed_rate, void (*f)(void)) { every_schedule(e, now); } -static char *dbg_report_ipf(InputFile *ipf) { - if (!ipf) return xasprintf("none"); - - const char *slash= strrchr(ipf->path,'/'); - const char *path= slash ? slash+1 : ipf->path; - - return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s%s", - ipf, path, - ipf->queue.count, ipf->inprogress, ipf->autodefer, - (long)ipf->offset, ipf->fd, - ipf->rd ? "" : ",!rd", - ipf->skippinglong ? "*skiplong" : "", - ipf->rd && ipf->paused ? "*paused" : ""); -} - -static void period(void) { +void period(void) { char *dipf_main= dbg_report_ipf(main_input_file); char *dipf_flushing= dbg_report_ipf(flushing_input_file); char *dipf_backlog= dbg_report_ipf(backlog_input_file); @@ -404,10 +413,15 @@ static int path_ends_slash(const char *specified) { return specified[l-1] == '/'; } +static int innduct_fatal_cleanup(void) { return 12; } /* used for libinn die */ + int main(int argc, char **argv) { /* set up libinn logging */ message_program_name= "innduct"; message_fatal_cleanup= innduct_fatal_cleanup; + +#define INNLOGSET_CALL(fn, pfx, sysloglevel) \ + message_handlers_##fn(1, duct_log_##fn); INNLOGSETS(INNLOGSET_CALL) if (!argv[1]) { diff --git a/innduct.h b/innduct.h index 1033ac7..4ccae7a 100644 --- a/innduct.h +++ b/innduct.h @@ -155,44 +155,32 @@ DEFLIST(Article); /*----- configuration options -----*/ -/* when changing defaults, remember to update the manpage */ -static const char *sitename, *remote_host; -static const char *feedfile, *path_run, *path_cli, *path_cli_dir; -static int quiet_multiple=0; -static int interactive=0, try_filemon=1; -static int try_stream=1; -static int port=119; -static const char *inndconffile; +extern const char *sitename, *remote_host; +extern const char *feedfile, *path_run, *path_cli, *path_cli_dir; +extern int quiet_multiple, interactive, try_filemon, try_stream, port; +extern const char *inndconffile; -static int max_connections=10; -static int max_queue_per_conn=200; -static int target_max_feedfile_size=100000; -static int period_seconds=30; -static int filepoll_seconds=5; -static int max_queue_per_ipf=-1; +extern int max_connections, max_queue_per_conn, target_max_feedfile_size; +extern int period_seconds, filepoll_seconds, max_queue_per_ipf; +extern int connection_setup_timeout, inndcomm_flush_timeout; -static int connection_setup_timeout=200; -static int inndcomm_flush_timeout=100; - -static double nocheck_thresh= 95.0; /* converted from percentage by main */ -static double nocheck_decay= 100; /* conv'd from articles to lambda by main */ +extern double nocheck_thresh; +extern double nocheck_decay; /* all these are initialised to seconds, and converted to periods in main */ -static int reconnect_delay_periods=1000; -static int flushfail_retry_periods=1000; -static int backlog_retry_minperiods=100; -static int backlog_spontrescan_periods=300; -static int spontaneous_flush_periods=100000; -static int max_separated_periods=2000; -static int need_activity_periods=1000; -static int lowvol_thresh=3; -static int lowvol_periods=1000; - -static double max_bad_data_ratio= 1; /* conv'd from percentage by main */ -static int max_bad_data_initial= 30; - /* in one corrupt 4096-byte block the number of newlines has - * mean 16 and standard deviation 3.99. 30 corresponds to z=+3.5 */ +extern int reconnect_delay_periods; +extern int flushfail_retry_periods; +extern int backlog_retry_minperiods; +extern int backlog_spontrescan_periods; +extern int spontaneous_flush_periods; +extern int max_separated_periods; +extern int need_activity_periods; +extern int lowvol_thresh; +extern int lowvol_periods; + +extern double max_bad_data_ratio; +extern int max_bad_data_initial; /*----- statistics -----*/ @@ -203,9 +191,7 @@ typedef enum { /* in queue in conn->sent */ art_Unsolicited, /* - sent body without check */ art_MaxState, } ArtState; - -static const char *const artstate_names[]= - { "Unchecked", "Wanted", "Unsolicited", 0 }; +extern const char *const artstate_names[]; /* xmit.c */ #define RESULT_COUNTS(RCS,RCN) \ RCS(sent) \ @@ -283,6 +269,22 @@ struct Article { char messageid[1]; }; +struct Conn { + ISNODE(Conn); + int fd; /* may be 0, meaning closed (during construction/destruction) */ + oop_read *rd; /* likewise */ + int oopwriting; /* since on_fd is not idempotent */ + int max_queue, stream; + const char *quitting; + int since_activity; /* periods */ + ArticleList waiting; /* not yet told peer */ + ArticleList priority; /* peer says send it now */ + ArticleList sent; /* offered/transmitted - in xmit or waiting reply */ + struct iovec xmit[CONNIOVS]; + XmitDetails xmitd[CONNIOVS]; + int xmitu; +}; + #define SMS_LIST(X) \ X(NORMAL) \ X(FLUSHING) \ @@ -302,10 +304,10 @@ extern const char *sms_names[]; /*----- help.c -----*/ -static void syscrash(const char *fmt, ...) NORET_PRINTF(1,2); -static void crash(const char *fmt, ...) NORET_PRINTF(1,2); -static void info(const char *fmt, ...) PRINTF(1,2); -static void dbg(const char *fmt, ...) PRINTF(1,2); +void syscrash(const char *fmt, ...) NORET_PRINTF(1,2); +void crash(const char *fmt, ...) NORET_PRINTF(1,2); +void info(const char *fmt, ...) PRINTF(1,2); +void dbg(const char *fmt, ...) PRINTF(1,2); void logv(int sysloglevel, const char *pfx, int errnoval, const char *fmt, va_list al) PRINTF(5,0); @@ -318,7 +320,7 @@ void xclose(int fd, const char *what, const char *what2); void xclose_perhaps(int *fd, const char *what, const char *what2); pid_t xfork(const char *what); -static void on_fd_read_except(int fd, oop_call_fd callback); +void on_fd_read_except(int fd, oop_call_fd callback); void cancel_fd_read_except(int fd); void report_child_status(const char *what, int status); @@ -329,6 +331,7 @@ void xunlink(const char *path, const char *what); time_t xtime(void); void xsigaction(int signo, const struct sigaction *sa); void xsigsetdefault(int signo); +void raise_default(int signo) NORET; void xgettimeofday(struct timeval *tv_r); void xsetnonblock(int fd, int nonb); @@ -349,34 +352,123 @@ static inline int isewouldblock(int errnoval) { return errnoval==EWOULDBLOCK || errnoval==EAGAIN; } +#define INNLOGSETS(INNLOGSET) \ + INNLOGSET(die, "fatal", LOG_ERR) \ + INNLOGSET(warn, "warning", LOG_WARNING) \ + INNLOGSET(notice, "notice", LOG_NOTICE) \ + INNLOGSET(trace, "trace", LOG_NOTICE) +#define INNLOGSET_DECLARE(fn, pfx, sysloglevel) \ + void duct_log_##fn(int l, const char *fmt, va_list al, int errval) \ + PRINTF(3,0); +INNLOGSETS(INNLOGSET_DECLARE) + /*----- innduct.c -----*/ void postfork(void); +void period(void); + +/*----- cli.c -----*/ + +void cli_init(void); +void cli_stdio(void); /*----- conn.c -----*/ void conn_closefd(Conn *conn, const char *msgprefix); +void check_idle_conns(void); +int conn_busy(Conn *conn); +void conn_dispose(Conn *conn); + +void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0); +void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3); + +int allow_connect_start(void); +void connect_start(void); /*----- defer.c -----*/ void poll_backlog_file(void); +void search_backlog_file(void); +void open_defer(void); +void close_defer(void); + +/*----- filemon.c -----*/ + +int filemon_method_init(void); +void filemon_method_dump_info(FILE *f); + +void filemon_start(InputFile *ipf); +void filemon_stop(InputFile *ipf); /*----- infile.c -----*/ void filepoll(void); +void inputfile_reading_start(InputFile *ipf); +void inputfile_reading_stop(InputFile *ipf); +void inputfile_reading_pause(InputFile *ipf); +void inputfile_reading_resume(InputFile *ipf); + /* pause and resume are idempotent, and no-op if not done _reading_start */ + +InputFile *open_input_file(const char *path); +void close_input_file(InputFile *ipf); /* does not free */ +char *dbg_report_ipf(InputFile *ipf); + +void tailing_make_readable(InputFile *ipf); + +/*----- recv.c -----*/ + +extern const oop_rd_style peer_rd_style; +oop_rd_call peer_rd_ok, peer_rd_err; + +void article_done(Article *art, int whichcount); + /*----- statemc.c -----*/ -sig_atomic_t terminate_sig_flag; +void statemc_check_flushing_done(void); +void statemc_check_backlog_done(void); + +extern sig_atomic_t terminate_sig_flag; +void statemc_period_poll(void); +void statemc_lock(void); +void init_signals(void); +void statemc_init(void); + +#define SMS(newstate, periods, why) \ + (statemc_setstate(sm_##newstate,(periods),#newstate,(why))) +void statemc_setstate(StateMachineState newsms, int periods, + const char *forlog, const char *why); + +void statemc_start_flush(const char *why); /* Normal => Flushing */ +void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */ +int trigger_flush_ok(const char *why /* 0 means timeout */); + /* => Flushing,FLUSHING, ret 1; or ret 0 */ + +void preterminate(void); /*----- xmit.c -----*/ void inputfile_queue_check_expired(InputFile *ipf); +void check_assign_articles(void); +void queue_check_input_done(void); +void check_reading_pause_resume(InputFile *ipf); + +void conn_maybe_write(Conn *conn); +void conn_make_some_xmits(Conn *conn); +void *conn_write_some_xmits(Conn *conn); + +void xmit_free(XmitDetails *d); + +int article_check_expired(Article *art /* must be queued, not conn */); +void article_autodefer(InputFile *ipf, Article *art); +void article_defer(Article *art /* not on a queue */, int whichcount); +void autodefer_input_file(InputFile *ipf); /*----- external linkage for debug/dump only -----*/ -pid_t connecting_child; -pid_t inndcomm_child; +extern pid_t connecting_child; +extern int connecting_fdpass_sock; +extern pid_t inndcomm_child; /*========== general operational variables ==========*/ diff --git a/recv.c b/recv.c index 870e0e5..dbeb4b6 100644 --- a/recv.c +++ b/recv.c @@ -1,14 +1,42 @@ +/* + * innduct + * tailing reliable realtime streaming feeder for inn + * recv.c - receiving peer responses and disposing of articles + * + * Copyright (C) 2010 Ian Jackson + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * (I believe that when you compile and link this as part of the inn2 + * build, with the Makefile runes I have provided, all the libraries + * and files which end up included in innduct are licence-compatible + * with GPLv3. If not then please let me know. -Ian Jackson.) + */ + +#include "innduct.h" + /*========== handling responses from peer ==========*/ -static const oop_rd_style peer_rd_style= { +const oop_rd_style peer_rd_style= { OOP_RD_DELIM_STRIP, '\n', OOP_RD_NUL_FORBID, OOP_RD_SHORTREC_FORBID }; -static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev, - const char *errmsg, int errnoval, - const char *data, size_t recsz, void *conn_v) { +void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev, + const char *errmsg, int errnoval, + const char *data, size_t recsz, void *conn_v) { Conn *conn= conn_v; connfail(conn, "error receiving from peer: %s", errmsg); return OOP_CONTINUE; @@ -87,7 +115,7 @@ static void update_nocheck(int accepted) { nocheck= new_nocheck; } -static void article_done(Article *art, int whichcount) { +void article_done(Article *art, int whichcount) { if (whichcount>=0 && !art->missing) art->ipf->counts[art->state][whichcount]++; @@ -129,9 +157,9 @@ static void article_done(Article *art, int whichcount) { queue_check_input_done(); } -static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, - const char *errmsg, int errnoval, - const char *data, size_t recsz, void *conn_v) { +void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, + const char *errmsg, int errnoval, + const char *data, size_t recsz, void *conn_v) { Conn *conn= conn_v; if (ev == OOP_RD_EOF) { diff --git a/statemc.c b/statemc.c index 970bb6c..27ae861 100644 --- a/statemc.c +++ b/statemc.c @@ -1,3 +1,30 @@ +/* + * innduct + * tailing reliable realtime streaming feeder for inn + * statemc.c - state machine core (see README.states). + * + * Copyright (C) 2010 Ian Jackson + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * (I believe that when you compile and link this as part of the inn2 + * build, with the Makefile runes I have provided, all the libraries + * and files which end up included in innduct are licence-compatible + * with GPLv3. If not then please let me know. -Ian Jackson.) + */ + +#include "innduct.h" /* statemc_init initialises */ @@ -19,7 +46,7 @@ static void startup_set_input_file(InputFile *f) { inputfile_reading_start(f); } -static void statemc_lock(void) { +void statemc_lock(void) { int lockfd; struct stat stab, stabf; @@ -65,7 +92,7 @@ static void statemc_lock(void) { dbg("startup: locked"); } -static void statemc_init(void) { +void statemc_init(void) { struct stat stabdefer; search_backlog_file(); @@ -128,7 +155,7 @@ static void statemc_init(void) { } } -static void statemc_start_flush(const char *why) { /* Normal => Flushing */ +void statemc_start_flush(const char *why) { /* Normal => Flushing */ assert(sms == sm_NORMAL); dbg("starting flush (%s) (%lu >?= %lu) (%d)", @@ -148,7 +175,7 @@ static void statemc_start_flush(const char *why) { /* Normal => Flushing */ spawn_inndcomm_flush(why); /* => Flushing FLUSHING */ } -static int trigger_flush_ok(const char *why) { +int trigger_flush_ok(const char *why) { switch (sms) { case sm_NORMAL: @@ -172,7 +199,7 @@ static int trigger_flush_ok(const char *why) { } } -static void statemc_period_poll(void) { +void statemc_period_poll(void) { if (!until_flush) return; until_flush--; assert(until_flush>=0); @@ -226,7 +253,7 @@ static void notice_processed(InputFile *ipf, int completed, #undef CNT } -static void statemc_check_backlog_done(void) { +void statemc_check_backlog_done(void) { InputFile *ipf= backlog_input_file; if (!inputfile_is_done(ipf)) return; @@ -251,7 +278,7 @@ static void statemc_check_backlog_done(void) { return; } -static void statemc_check_flushing_done(void) { +void statemc_check_flushing_done(void) { InputFile *ipf= flushing_input_file; if (!inputfile_is_done(ipf)) return; @@ -286,12 +313,12 @@ static void *statemc_check_input_done(oop_source *lp, struct timeval now, return OOP_CONTINUE; } -static void queue_check_input_done(void) { +void queue_check_input_done(void) { loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0); } -static void statemc_setstate(StateMachineState newsms, int periods, - const char *forlog, const char *why) { +void statemc_setstate(StateMachineState newsms, int periods, + const char *forlog, const char *why) { sms= newsms; until_flush= periods; @@ -448,7 +475,7 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ /*---------- shutdown and signal handling ----------*/ -static void preterminate(void) { +void preterminate(void) { if (in_child) return; notice_processed(main_input_file,0,"feedfile",""); notice_processed(flushing_input_file,0,"flushing",""); @@ -459,12 +486,6 @@ static void preterminate(void) { static int signal_self_pipe[2]; -static void raise_default(int signo) { - xsigsetdefault(signo); - raise(signo); - abort(); -} - static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) { assert(fd=signal_self_pipe[0]); char buf[PIPE_BUF]; @@ -493,7 +514,7 @@ static void sigarrived_handler(int signum) { write(signal_self_pipe[1],&x,1); } -static void init_signals(void) { +void init_signals(void) { if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) syscrash("could not ignore SIGPIPE"); diff --git a/xmit.c b/xmit.c index dd909e4..4f873f3 100644 --- a/xmit.c +++ b/xmit.c @@ -1,3 +1,34 @@ +/* + * innduct + * tailing reliable realtime streaming feeder for inn + * xmit.c - transmitting checks and articles, flow control, expiry + * + * Copyright (C) 2010 Ian Jackson + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * (I believe that when you compile and link this as part of the inn2 + * build, with the Makefile runes I have provided, all the libraries + * and files which end up included in innduct are licence-compatible + * with GPLv3. If not then please let me know. -Ian Jackson.) + */ + +#include "innduct.h" + +const char *const artstate_names[]= + { "Unchecked", "Wanted", "Unsolicited", 0 }; + /*---------- assigning articles to conns, and transmitting ----------*/ static Article *dequeue_from(int peek, InputFile *ipf) { @@ -18,7 +49,7 @@ static Article *dequeue(int peek) { return 0; } -static void check_assign_articles(void) { +void check_assign_articles(void) { for (;;) { if (!dequeue(1)) break; @@ -66,7 +97,7 @@ static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) { return OOP_CONTINUE; } -static void conn_maybe_write(Conn *conn) { +void conn_maybe_write(Conn *conn) { for (;;) { conn_make_some_xmits(conn); if (!conn->xmitu) { @@ -114,14 +145,14 @@ static void conn_maybe_write(Conn *conn) { * pause/resume inputfile tailing */ -static void check_reading_pause_resume(InputFile *ipf) { +void check_reading_pause_resume(InputFile *ipf) { if (ipf->queue.count >= max_queue_per_ipf) inputfile_reading_pause(ipf); else inputfile_reading_resume(ipf); } -static void article_defer(Article *art /* not on a queue */, int whichcount) { +void article_defer(Article *art /* not on a queue */, int whichcount) { open_defer(); if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0 || fflush(defer)) @@ -129,7 +160,7 @@ static void article_defer(Article *art /* not on a queue */, int whichcount) { article_done(art, whichcount); } -static int article_check_expired(Article *art /* must be queued, not conn */) { +int article_check_expired(Article *art /* must be queued, not conn */) { ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT); if (artdata) { SMfreearticle(artdata); return 0; } @@ -151,7 +182,7 @@ void inputfile_queue_check_expired(InputFile *ipf) { check_reading_pause_resume(ipf); } -static void article_autodefer(InputFile *ipf, Article *art) { +void article_autodefer(InputFile *ipf, Article *art) { ipf->autodefer++; article_defer(art,-1); } @@ -169,7 +200,7 @@ static void autodefer_input_file_articles(InputFile *ipf) { article_autodefer(ipf, art); } -static void autodefer_input_file(InputFile *ipf) { +void autodefer_input_file(InputFile *ipf) { static const char *const abandon= "stuck"; ipf->autodefer= 0; @@ -220,7 +251,7 @@ static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) { d->info.sm_art= ah; } -static void xmit_free(XmitDetails *d) { +void xmit_free(XmitDetails *d) { switch (d->kind) { case xk_Artdata: SMfreearticle(d->info.sm_art); break; case xk_Const: break; @@ -228,7 +259,7 @@ static void xmit_free(XmitDetails *d) { } } -static void *conn_write_some_xmits(Conn *conn) { +void *conn_write_some_xmits(Conn *conn) { /* return values: * 0: nothing more to write, no need to call us again * OOP_CONTINUE: more to write but fd not writeable @@ -270,7 +301,7 @@ static void *conn_write_some_xmits(Conn *conn) { } } -static void conn_make_some_xmits(Conn *conn) { +void conn_make_some_xmits(Conn *conn) { for (;;) { if (conn->xmitu+5 > CONNIOVS) break; -- 2.30.2