+/*
+ * innduct
+ * tailing reliable realtime streaming feeder for inn
+ * cli.c - command and control connections
+ *
+ * Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * (I believe that when you compile and link this as part of the inn2
+ * build, with the Makefile runes I have provided, all the libraries
+ * and files which end up included in innduct are licence-compatible
+ * with GPLv3. If not then please let me know. -Ian Jackson.)
+ */
+
+#include "innduct.h"
+
/*========== command and control (CLI) connections ==========*/
static int cli_master;
free(cc);
}
-static void cli_stdio(void) {
+void cli_stdio(void) {
NEW_DECL(CliConn *,cc);
cc->destroy= cli_stdio_destroy;
goto nocli; \
}while(0)
-static void cli_init(void) {
+void cli_init(void) {
union {
struct sockaddr sa;
struct sockaddr_un un;
+/*
+ * innduct
+ * tailing reliable realtime streaming feeder for inn
+ * conn.c - connection establishment and teardown
+ *
+ * Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * (I believe that when you compile and link this as part of the inn2
+ * build, with the Makefile runes I have provided, all the libraries
+ * and files which end up included in innduct are licence-compatible
+ * with GPLv3. If not then please let me know. -Ian Jackson.)
+ */
+
+#include "innduct.h"
+
/*========== management of connections ==========*/
static void reconnect_blocking_event(void) {
conn->fd, msgprefix, strerror(errno));
}
-static int conn_busy(Conn *conn) {
+int conn_busy(Conn *conn) {
return
conn->waiting.count ||
conn->priority.count ||
conn->xmitu;
}
-static void conn_dispose(Conn *conn) {
+void conn_dispose(Conn *conn) {
if (!conn) return;
if (conn->rd) {
oop_rd_cancel(conn->rd);
return OOP_CONTINUE;
}
-static void vconnfail(Conn *conn, const char *fmt, va_list al) {
+void vconnfail(Conn *conn, const char *fmt, va_list al) {
int requeue[art_MaxState];
memset(requeue,0,sizeof(requeue));
check_assign_articles();
}
-static void connfail(Conn *conn, const char *fmt, ...) {
+void connfail(Conn *conn, const char *fmt, ...) {
va_list al;
va_start(al,fmt);
vconnfail(conn,fmt,al);
* For our last connection, we also shut it down if we have had
* less than K in the last L
*/
-static void check_idle_conns(void) {
+void check_idle_conns(void) {
Conn *conn;
int volthisperiod= lowvol_perperiod[lowvol_circptr];
/*---------- making new connections ----------*/
-static pid_t connecting_child;
+pid_t connecting_child;
int connecting_fdpass_sock;
static void connect_attempt_discard(void) {
return OOP_CONTINUE;
}
-static int allow_connect_start(void) {
+int allow_connect_start(void) {
return conns.count < max_connections
&& !connecting_child
&& !until_connect;
}
-static void connect_start(void) {
+void connect_start(void) {
assert(!connecting_child);
assert(!connecting_fdpass_sock);
+++ /dev/null
-
-
-
-/*----- function predeclarations -----*/
-
-void conn_maybe_write(Conn *conn);
-void conn_make_some_xmits(Conn *conn);
-void *conn_write_some_xmits(Conn *conn);
-
-void xmit_free(XmitDetails *d);
-
-#define SMS(newstate, periods, why) \
- (statemc_setstate(sm_##newstate,(periods),#newstate,(why)))
-void statemc_setstate(StateMachineState newsms, int periods,
- const char *forlog, const char *why);
-
-void statemc_start_flush(const char *why); /* Normal => Flushing */
-void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
-int trigger_flush_ok(const char *why /* 0 means timeout */);
- /* => Flushing,FLUSHING, ret 1; or ret 0 */
-
-void article_done(Article *art, int whichcount);
-
-void check_assign_articles(void);
-void queue_check_input_done(void);
-void check_reading_pause_resume(InputFile *ipf);
-
-void statemc_check_flushing_done(void);
-void statemc_check_backlog_done(void);
-
-void postfork(void);
-void period(void);
-
-void open_defer(void);
-void close_defer(void);
-void search_backlog_file(void);
-void preterminate(void);
-void raise_default(int signo) NORET;
-
-void inputfile_reading_start(InputFile *ipf);
-void inputfile_reading_stop(InputFile *ipf);
-void inputfile_reading_pause(InputFile *ipf);
-void inputfile_reading_resume(InputFile *ipf);
- /* pause and resume are idempotent, and no-op if not done _reading_start */
-
-void filemon_start(InputFile *ipf);
-void filemon_stop(InputFile *ipf);
-void tailing_make_readable(InputFile *ipf);
-
-void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
-void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3);
-
-extern const oop_rd_style peer_rd_style;
-extern oop_rd_call peer_rd_err, peer_rd_ok;
-
-
-
+/*
+ * innduct
+ * tailing reliable realtime streaming feeder for inn
+ * defer.c - handling of defer and backlog files
+ *
+ * Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * (I believe that when you compile and link this as part of the inn2
+ * build, with the Makefile runes I have provided, all the libraries
+ * and files which end up included in innduct are licence-compatible
+ * with GPLv3. If not then please let me know. -Ian Jackson.)
+ */
+
+#include "innduct.h"
+
/*---------- defer and backlog files ----------*/
-static void open_defer(void) {
+void open_defer(void) {
struct stat stab;
if (defer) return;
syscrash("could not seek to new end of defer file %s", path_defer);
}
-static void close_defer(void) {
+void close_defer(void) {
if (!defer)
return;
search_backlog_file();
}
-static void search_backlog_file(void) {
+void search_backlog_file(void) {
/* returns non-0 iff there are any backlog files */
glob_t gl;
globfree(&gl);
return;
}
-
+/*
+ * innduct
+ * tailing reliable realtime streaming feeder for inn
+ * filemon.c - file monitoring (inotify, kqueue, poll, etc.)
+ *
+ * Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * (I believe that when you compile and link this as part of the inn2
+ * build, with the Makefile runes I have provided, all the libraries
+ * and files which end up included in innduct are licence-compatible
+ * with GPLv3. If not then please let me know. -Ian Jackson.)
+ */
+
+#include "innduct.h"
+
/*---------- filemon implemented with inotify ----------*/
#if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON)
return OOP_CONTINUE;
}
-static int filemon_method_init(void) {
+int filemon_method_init(void) {
filemon_inotify_fd= inotify_init();
if (filemon_inotify_fd<0) {
syswarn("filemon/inotify: inotify_init failed");
return 1;
}
-static void filemon_method_dump_info(FILE *f) {
+void filemon_method_dump_info(FILE *f) {
int i;
fprintf(f,"inotify");
DUMPV("%d",,filemon_inotify_fd);
struct Filemon_Perfile { int dummy; };
-static int filemon_method_init(void) {
+int filemon_method_init(void) {
warn("filemon/dummy: no filemon method compiled in");
return 0;
}
static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { }
static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { }
-static void filemon_method_dump_info(FILE *f) { fprintf(f,"dummy\n"); }
+void filemon_method_dump_info(FILE *f) { fprintf(f,"dummy\n"); }
#endif /* !HAVE_FILEMON */
/*---------- filemon generic interface ----------*/
-static void filemon_start(InputFile *ipf) {
+void filemon_start(InputFile *ipf) {
assert(!ipf->filemon);
NEW(ipf->filemon);
filemon_method_startfile(ipf, ipf->filemon);
}
-static void filemon_stop(InputFile *ipf) {
+void filemon_stop(InputFile *ipf) {
if (!ipf->filemon) return;
filemon_method_stopfile(ipf, ipf->filemon);
free(ipf->filemon);
/*
* innduct
* tailing reliable realtime streaming feeder for inn
- * logging and utility functions
+ * help.c - logging and utility functions
*
* Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
*
exit(estatus); \
}
-#define DEFLOG(fn, pfx, sysloglevel, err) \
- static void fn(const char *fmt, ...) { \
- VA; \
- logv(sysloglevel, pfx, err, fmt, al); \
- va_end(al); \
+#define DEFLOG(fn, pfx, sysloglevel, err) \
+ void fn(const char *fmt, ...) { \
+ VA; \
+ logv(sysloglevel, pfx, err, fmt, al); \
+ va_end(al); \
}
-#define INNLOGSET_DECLARE(fn, pfx, sysloglevel) \
- static void duct_log_##fn(int l, const char *fmt, va_list al, int errval) \
- PRINTF(3,0); \
- static void duct_log_##fn(int l, const char *fmt, va_list al, int errval) { \
- logv(sysloglevel, pfx, errval ? errval : -1, fmt, al); \
+#define INNLOGSET_DEFINE(fn, pfx, sysloglevel) \
+ void duct_log_##fn(int l, const char *fmt, va_list al, int errval) { \
+ logv(sysloglevel, pfx, errval ? errval : -1, fmt, al); \
}
-#define INNLOGSET_CALL(fn, pfx, sysloglevel) \
- message_handlers_##fn(1, duct_log_##fn);
-static int innduct_fatal_cleanup(void) { return 12; } /* used for libinn die */
-
/* We want to extend the set of logging functions from inn, and we
* want to prepend the site name to all our messages. */
DEFFATAL(syscrash, "critical", LOG_CRIT, errno, 16);
DEFFATAL(crash, "critical", LOG_CRIT, -1, 16);
-#define INNLOGSETS(INNLOGSET) \
- INNLOGSET(die, "fatal", LOG_ERR) \
- INNLOGSET(warn, "warning", LOG_WARNING) \
- INNLOGSET(notice, "notice", LOG_NOTICE) \
- INNLOGSET(trace, "trace", LOG_NOTICE)
-INNLOGSETS(INNLOGSET_DECLARE)
+INNLOGSETS(INNLOGSET_DEFINE)
DEFLOG(info, "info", LOG_INFO, -1)
DEFLOG(dbg, "debug", LOG_DEBUG, -1)
sa.sa_handler= SIG_DFL;
xsigaction(signo,&sa);
}
+void raise_default(int signo) {
+ xsigsetdefault(signo);
+ raise(signo);
+ abort();
+}
void xgettimeofday(struct timeval *tv_r) {
int r= gettimeofday(tv_r,0);
what, path, (unsigned long)stab->st_mode);
}
-static void xfstat(int fd, struct stat *stab_r, const char *what) {
+void xfstat(int fd, struct stat *stab_r, const char *what) {
int r= fstat(fd, stab_r);
if (r) syscrash("could not fstat %s", what);
}
-static void xfstat_isreg(int fd, struct stat *stab_r,
- const char *path, const char *what) {
+void xfstat_isreg(int fd, struct stat *stab_r,
+ const char *path, const char *what) {
xfstat(fd, stab_r, what);
check_isreg(stab_r, path, what);
}
+/*
+ * innduct
+ * tailing reliable realtime streaming feeder for inn
+ * infile.c - monitoring and handling of input files
+ *
+ * Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * (I believe that when you compile and link this as part of the inn2
+ * build, with the Makefile runes I have provided, all the libraries
+ * and files which end up included in innduct are licence-compatible
+ * with GPLv3. If not then please let me know. -Ian Jackson.)
+ */
+
+#include "innduct.h"
+
/*========== monitoring of input files ==========*/
static void feedfile_eof(InputFile *ipf) {
}
}
-static InputFile *open_input_file(const char *path) {
+InputFile *open_input_file(const char *path) {
int fd= open(path, O_RDWR);
if (fd<0) {
if (errno==ENOENT) return 0;
return ipf;
}
-static void close_input_file(InputFile *ipf) { /* does not free */
+void close_input_file(InputFile *ipf) { /* does not free */
assert(!ipf->readable_callback); /* must have had ->on_cancel */
assert(!ipf->filemon); /* must have had inputfile_reading_stop */
assert(!ipf->rd); /* must have had inputfile_reading_stop */
ipf->readable_callback= 0;
}
-static void tailing_make_readable(InputFile *ipf) {
+void tailing_make_readable(InputFile *ipf) {
dbg("**TRACT** ipf=%p makereadable rcb=%p",ipf,
(void*)ipf?ipf->readable_callback:0);
if (!ipf || !ipf->readable_callback) /* so callers can be naive */
OOP_RD_SHORTREC_LONG,
};
-static void inputfile_reading_resume(InputFile *ipf) {
+void inputfile_reading_resume(InputFile *ipf) {
if (!ipf->rd) return;
if (!ipf->paused) return;
ipf->paused= 0;
}
-static void inputfile_reading_pause(InputFile *ipf) {
+void inputfile_reading_pause(InputFile *ipf) {
if (!ipf->rd) return;
if (ipf->paused) return;
oop_rd_cancel(ipf->rd);
ipf->paused= 1;
}
-static void inputfile_reading_start(InputFile *ipf) {
+void inputfile_reading_start(InputFile *ipf) {
assert(!ipf->rd);
ipf->readable.on_readable= tailing_on_readable;
ipf->readable.on_cancel= tailing_on_cancel;
inputfile_reading_resume(ipf);
}
-static void inputfile_reading_stop(InputFile *ipf) {
+void inputfile_reading_stop(InputFile *ipf) {
assert(ipf->rd);
inputfile_reading_pause(ipf);
oop_rd_delete(ipf->rd);
tailing_make_readable(flushing_input_file);
}
+char *dbg_report_ipf(InputFile *ipf) {
+ if (!ipf) return xasprintf("none");
+
+ const char *slash= strrchr(ipf->path,'/');
+ const char *path= slash ? slash+1 : ipf->path;
+
+ return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s%s",
+ ipf, path,
+ ipf->queue.count, ipf->inprogress, ipf->autodefer,
+ (long)ipf->offset, ipf->fd,
+ ipf->rd ? "" : ",!rd",
+ ipf->skippinglong ? "*skiplong" : "",
+ ipf->rd && ipf->paused ? "*paused" : "");
+}
/*
* innduct
* tailing reliable realtime streaming feeder for inn
- * main program - option parsing and startup
+ * innduct.c - main program, option parsing and startup
*
* Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
*
0
};
-struct Conn {
- ISNODE(Conn);
- int fd; /* may be 0, meaning closed (during construction/destruction) */
- oop_read *rd; /* likewise */
- int oopwriting; /* since on_fd is not idempotent */
- int max_queue, stream;
- const char *quitting;
- int since_activity; /* periods */
- ArticleList waiting; /* not yet told peer */
- ArticleList priority; /* peer says send it now */
- ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
- struct iovec xmit[CONNIOVS];
- XmitDetails xmitd[CONNIOVS];
- int xmitu;
-};
-
/*----- general operational variables -----*/
int lowvol_circptr;
int lowvol_total; /* does not include current period */
+/*---------- configuration option variables ----------*/
+/* when changing defaults, remember to update the manpage */
+
+const char *sitename, *remote_host;
+const char *feedfile, *path_run, *path_cli, *path_cli_dir;
+int quiet_multiple=0;
+int interactive=0, try_filemon=1;
+int try_stream=1;
+int port=119;
+const char *inndconffile;
+
+int max_connections=10;
+int max_queue_per_conn=200;
+int target_max_feedfile_size=100000;
+int period_seconds=30;
+int filepoll_seconds=5;
+int max_queue_per_ipf=-1;
+
+int connection_setup_timeout=200;
+int inndcomm_flush_timeout=100;
+
+double nocheck_thresh= 95.0; /* converted from percentage by main */
+double nocheck_decay= 100; /* conv'd from articles to lambda by main */
+
+/* all these are initialised to seconds, and converted to periods in main */
+int reconnect_delay_periods=1000;
+int flushfail_retry_periods=1000;
+int backlog_retry_minperiods=100;
+int backlog_spontrescan_periods=300;
+int spontaneous_flush_periods=100000;
+int max_separated_periods=2000;
+int need_activity_periods=1000;
+int lowvol_thresh=3;
+int lowvol_periods=1000;
+
+double max_bad_data_ratio= 1; /* conv'd from percentage by main */
+int max_bad_data_initial= 30;
+ /* in one corrupt 4096-byte block the number of newlines has
+ * mean 16 and standard deviation 3.99. 30 corresponds to z=+3.5 */
+
/*========== main program ==========*/
static void postfork_inputfile(InputFile *ipf) {
every_schedule(e, now);
}
-static char *dbg_report_ipf(InputFile *ipf) {
- if (!ipf) return xasprintf("none");
-
- const char *slash= strrchr(ipf->path,'/');
- const char *path= slash ? slash+1 : ipf->path;
-
- return xasprintf("%p/%s:queue=%d,ip=%ld,autodef=%ld,off=%ld,fd=%d%s%s%s",
- ipf, path,
- ipf->queue.count, ipf->inprogress, ipf->autodefer,
- (long)ipf->offset, ipf->fd,
- ipf->rd ? "" : ",!rd",
- ipf->skippinglong ? "*skiplong" : "",
- ipf->rd && ipf->paused ? "*paused" : "");
-}
-
-static void period(void) {
+void period(void) {
char *dipf_main= dbg_report_ipf(main_input_file);
char *dipf_flushing= dbg_report_ipf(flushing_input_file);
char *dipf_backlog= dbg_report_ipf(backlog_input_file);
return specified[l-1] == '/';
}
+static int innduct_fatal_cleanup(void) { return 12; } /* used for libinn die */
+
int main(int argc, char **argv) {
/* set up libinn logging */
message_program_name= "innduct";
message_fatal_cleanup= innduct_fatal_cleanup;
+
+#define INNLOGSET_CALL(fn, pfx, sysloglevel) \
+ message_handlers_##fn(1, duct_log_##fn);
INNLOGSETS(INNLOGSET_CALL)
if (!argv[1]) {
/*----- configuration options -----*/
-/* when changing defaults, remember to update the manpage */
-static const char *sitename, *remote_host;
-static const char *feedfile, *path_run, *path_cli, *path_cli_dir;
-static int quiet_multiple=0;
-static int interactive=0, try_filemon=1;
-static int try_stream=1;
-static int port=119;
-static const char *inndconffile;
+extern const char *sitename, *remote_host;
+extern const char *feedfile, *path_run, *path_cli, *path_cli_dir;
+extern int quiet_multiple, interactive, try_filemon, try_stream, port;
+extern const char *inndconffile;
-static int max_connections=10;
-static int max_queue_per_conn=200;
-static int target_max_feedfile_size=100000;
-static int period_seconds=30;
-static int filepoll_seconds=5;
-static int max_queue_per_ipf=-1;
+extern int max_connections, max_queue_per_conn, target_max_feedfile_size;
+extern int period_seconds, filepoll_seconds, max_queue_per_ipf;
+extern int connection_setup_timeout, inndcomm_flush_timeout;
-static int connection_setup_timeout=200;
-static int inndcomm_flush_timeout=100;
-
-static double nocheck_thresh= 95.0; /* converted from percentage by main */
-static double nocheck_decay= 100; /* conv'd from articles to lambda by main */
+extern double nocheck_thresh;
+extern double nocheck_decay;
/* all these are initialised to seconds, and converted to periods in main */
-static int reconnect_delay_periods=1000;
-static int flushfail_retry_periods=1000;
-static int backlog_retry_minperiods=100;
-static int backlog_spontrescan_periods=300;
-static int spontaneous_flush_periods=100000;
-static int max_separated_periods=2000;
-static int need_activity_periods=1000;
-static int lowvol_thresh=3;
-static int lowvol_periods=1000;
-
-static double max_bad_data_ratio= 1; /* conv'd from percentage by main */
-static int max_bad_data_initial= 30;
- /* in one corrupt 4096-byte block the number of newlines has
- * mean 16 and standard deviation 3.99. 30 corresponds to z=+3.5 */
+extern int reconnect_delay_periods;
+extern int flushfail_retry_periods;
+extern int backlog_retry_minperiods;
+extern int backlog_spontrescan_periods;
+extern int spontaneous_flush_periods;
+extern int max_separated_periods;
+extern int need_activity_periods;
+extern int lowvol_thresh;
+extern int lowvol_periods;
+
+extern double max_bad_data_ratio;
+extern int max_bad_data_initial;
/*----- statistics -----*/
art_Unsolicited, /* - sent body without check */
art_MaxState,
} ArtState;
-
-static const char *const artstate_names[]=
- { "Unchecked", "Wanted", "Unsolicited", 0 };
+extern const char *const artstate_names[]; /* xmit.c */
#define RESULT_COUNTS(RCS,RCN) \
RCS(sent) \
char messageid[1];
};
+struct Conn {
+ ISNODE(Conn);
+ int fd; /* may be 0, meaning closed (during construction/destruction) */
+ oop_read *rd; /* likewise */
+ int oopwriting; /* since on_fd is not idempotent */
+ int max_queue, stream;
+ const char *quitting;
+ int since_activity; /* periods */
+ ArticleList waiting; /* not yet told peer */
+ ArticleList priority; /* peer says send it now */
+ ArticleList sent; /* offered/transmitted - in xmit or waiting reply */
+ struct iovec xmit[CONNIOVS];
+ XmitDetails xmitd[CONNIOVS];
+ int xmitu;
+};
+
#define SMS_LIST(X) \
X(NORMAL) \
X(FLUSHING) \
/*----- help.c -----*/
-static void syscrash(const char *fmt, ...) NORET_PRINTF(1,2);
-static void crash(const char *fmt, ...) NORET_PRINTF(1,2);
-static void info(const char *fmt, ...) PRINTF(1,2);
-static void dbg(const char *fmt, ...) PRINTF(1,2);
+void syscrash(const char *fmt, ...) NORET_PRINTF(1,2);
+void crash(const char *fmt, ...) NORET_PRINTF(1,2);
+void info(const char *fmt, ...) PRINTF(1,2);
+void dbg(const char *fmt, ...) PRINTF(1,2);
void logv(int sysloglevel, const char *pfx, int errnoval,
const char *fmt, va_list al) PRINTF(5,0);
void xclose_perhaps(int *fd, const char *what, const char *what2);
pid_t xfork(const char *what);
-static void on_fd_read_except(int fd, oop_call_fd callback);
+void on_fd_read_except(int fd, oop_call_fd callback);
void cancel_fd_read_except(int fd);
void report_child_status(const char *what, int status);
time_t xtime(void);
void xsigaction(int signo, const struct sigaction *sa);
void xsigsetdefault(int signo);
+void raise_default(int signo) NORET;
void xgettimeofday(struct timeval *tv_r);
void xsetnonblock(int fd, int nonb);
return errnoval==EWOULDBLOCK || errnoval==EAGAIN;
}
+#define INNLOGSETS(INNLOGSET) \
+ INNLOGSET(die, "fatal", LOG_ERR) \
+ INNLOGSET(warn, "warning", LOG_WARNING) \
+ INNLOGSET(notice, "notice", LOG_NOTICE) \
+ INNLOGSET(trace, "trace", LOG_NOTICE)
+#define INNLOGSET_DECLARE(fn, pfx, sysloglevel) \
+ void duct_log_##fn(int l, const char *fmt, va_list al, int errval) \
+ PRINTF(3,0);
+INNLOGSETS(INNLOGSET_DECLARE)
+
/*----- innduct.c -----*/
void postfork(void);
+void period(void);
+
+/*----- cli.c -----*/
+
+void cli_init(void);
+void cli_stdio(void);
/*----- conn.c -----*/
void conn_closefd(Conn *conn, const char *msgprefix);
+void check_idle_conns(void);
+int conn_busy(Conn *conn);
+void conn_dispose(Conn *conn);
+
+void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0);
+void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3);
+
+int allow_connect_start(void);
+void connect_start(void);
/*----- defer.c -----*/
void poll_backlog_file(void);
+void search_backlog_file(void);
+void open_defer(void);
+void close_defer(void);
+
+/*----- filemon.c -----*/
+
+int filemon_method_init(void);
+void filemon_method_dump_info(FILE *f);
+
+void filemon_start(InputFile *ipf);
+void filemon_stop(InputFile *ipf);
/*----- infile.c -----*/
void filepoll(void);
+void inputfile_reading_start(InputFile *ipf);
+void inputfile_reading_stop(InputFile *ipf);
+void inputfile_reading_pause(InputFile *ipf);
+void inputfile_reading_resume(InputFile *ipf);
+ /* pause and resume are idempotent, and no-op if not done _reading_start */
+
+InputFile *open_input_file(const char *path);
+void close_input_file(InputFile *ipf); /* does not free */
+char *dbg_report_ipf(InputFile *ipf);
+
+void tailing_make_readable(InputFile *ipf);
+
+/*----- recv.c -----*/
+
+extern const oop_rd_style peer_rd_style;
+oop_rd_call peer_rd_ok, peer_rd_err;
+
+void article_done(Article *art, int whichcount);
+
/*----- statemc.c -----*/
-sig_atomic_t terminate_sig_flag;
+void statemc_check_flushing_done(void);
+void statemc_check_backlog_done(void);
+
+extern sig_atomic_t terminate_sig_flag;
+void statemc_period_poll(void);
+void statemc_lock(void);
+void init_signals(void);
+void statemc_init(void);
+
+#define SMS(newstate, periods, why) \
+ (statemc_setstate(sm_##newstate,(periods),#newstate,(why)))
+void statemc_setstate(StateMachineState newsms, int periods,
+ const char *forlog, const char *why);
+
+void statemc_start_flush(const char *why); /* Normal => Flushing */
+void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */
+int trigger_flush_ok(const char *why /* 0 means timeout */);
+ /* => Flushing,FLUSHING, ret 1; or ret 0 */
+
+void preterminate(void);
/*----- xmit.c -----*/
void inputfile_queue_check_expired(InputFile *ipf);
+void check_assign_articles(void);
+void queue_check_input_done(void);
+void check_reading_pause_resume(InputFile *ipf);
+
+void conn_maybe_write(Conn *conn);
+void conn_make_some_xmits(Conn *conn);
+void *conn_write_some_xmits(Conn *conn);
+
+void xmit_free(XmitDetails *d);
+
+int article_check_expired(Article *art /* must be queued, not conn */);
+void article_autodefer(InputFile *ipf, Article *art);
+void article_defer(Article *art /* not on a queue */, int whichcount);
+void autodefer_input_file(InputFile *ipf);
/*----- external linkage for debug/dump only -----*/
-pid_t connecting_child;
-pid_t inndcomm_child;
+extern pid_t connecting_child;
+extern int connecting_fdpass_sock;
+extern pid_t inndcomm_child;
/*========== general operational variables ==========*/
+/*
+ * innduct
+ * tailing reliable realtime streaming feeder for inn
+ * recv.c - receiving peer responses and disposing of articles
+ *
+ * Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * (I believe that when you compile and link this as part of the inn2
+ * build, with the Makefile runes I have provided, all the libraries
+ * and files which end up included in innduct are licence-compatible
+ * with GPLv3. If not then please let me know. -Ian Jackson.)
+ */
+
+#include "innduct.h"
+
/*========== handling responses from peer ==========*/
-static const oop_rd_style peer_rd_style= {
+const oop_rd_style peer_rd_style= {
OOP_RD_DELIM_STRIP, '\n',
OOP_RD_NUL_FORBID,
OOP_RD_SHORTREC_FORBID
};
-static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
- const char *errmsg, int errnoval,
- const char *data, size_t recsz, void *conn_v) {
+void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
+ const char *errmsg, int errnoval,
+ const char *data, size_t recsz, void *conn_v) {
Conn *conn= conn_v;
connfail(conn, "error receiving from peer: %s", errmsg);
return OOP_CONTINUE;
nocheck= new_nocheck;
}
-static void article_done(Article *art, int whichcount) {
+void article_done(Article *art, int whichcount) {
if (whichcount>=0 && !art->missing)
art->ipf->counts[art->state][whichcount]++;
queue_check_input_done();
}
-static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
- const char *errmsg, int errnoval,
- const char *data, size_t recsz, void *conn_v) {
+void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
+ const char *errmsg, int errnoval,
+ const char *data, size_t recsz, void *conn_v) {
Conn *conn= conn_v;
if (ev == OOP_RD_EOF) {
+/*
+ * innduct
+ * tailing reliable realtime streaming feeder for inn
+ * statemc.c - state machine core (see README.states).
+ *
+ * Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * (I believe that when you compile and link this as part of the inn2
+ * build, with the Makefile runes I have provided, all the libraries
+ * and files which end up included in innduct are licence-compatible
+ * with GPLv3. If not then please let me know. -Ian Jackson.)
+ */
+
+#include "innduct.h"
/* statemc_init initialises */
inputfile_reading_start(f);
}
-static void statemc_lock(void) {
+void statemc_lock(void) {
int lockfd;
struct stat stab, stabf;
dbg("startup: locked");
}
-static void statemc_init(void) {
+void statemc_init(void) {
struct stat stabdefer;
search_backlog_file();
}
}
-static void statemc_start_flush(const char *why) { /* Normal => Flushing */
+void statemc_start_flush(const char *why) { /* Normal => Flushing */
assert(sms == sm_NORMAL);
dbg("starting flush (%s) (%lu >?= %lu) (%d)",
spawn_inndcomm_flush(why); /* => Flushing FLUSHING */
}
-static int trigger_flush_ok(const char *why) {
+int trigger_flush_ok(const char *why) {
switch (sms) {
case sm_NORMAL:
}
}
-static void statemc_period_poll(void) {
+void statemc_period_poll(void) {
if (!until_flush) return;
until_flush--;
assert(until_flush>=0);
#undef CNT
}
-static void statemc_check_backlog_done(void) {
+void statemc_check_backlog_done(void) {
InputFile *ipf= backlog_input_file;
if (!inputfile_is_done(ipf)) return;
return;
}
-static void statemc_check_flushing_done(void) {
+void statemc_check_flushing_done(void) {
InputFile *ipf= flushing_input_file;
if (!inputfile_is_done(ipf)) return;
return OOP_CONTINUE;
}
-static void queue_check_input_done(void) {
+void queue_check_input_done(void) {
loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0);
}
-static void statemc_setstate(StateMachineState newsms, int periods,
- const char *forlog, const char *why) {
+void statemc_setstate(StateMachineState newsms, int periods,
+ const char *forlog, const char *why) {
sms= newsms;
until_flush= periods;
/*---------- shutdown and signal handling ----------*/
-static void preterminate(void) {
+void preterminate(void) {
if (in_child) return;
notice_processed(main_input_file,0,"feedfile","");
notice_processed(flushing_input_file,0,"flushing","");
static int signal_self_pipe[2];
-static void raise_default(int signo) {
- xsigsetdefault(signo);
- raise(signo);
- abort();
-}
-
static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) {
assert(fd=signal_self_pipe[0]);
char buf[PIPE_BUF];
write(signal_self_pipe[1],&x,1);
}
-static void init_signals(void) {
+void init_signals(void) {
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
syscrash("could not ignore SIGPIPE");
+/*
+ * innduct
+ * tailing reliable realtime streaming feeder for inn
+ * xmit.c - transmitting checks and articles, flow control, expiry
+ *
+ * Copyright (C) 2010 Ian Jackson <ijackson@chiark.greenend.org.uk>
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * (I believe that when you compile and link this as part of the inn2
+ * build, with the Makefile runes I have provided, all the libraries
+ * and files which end up included in innduct are licence-compatible
+ * with GPLv3. If not then please let me know. -Ian Jackson.)
+ */
+
+#include "innduct.h"
+
+const char *const artstate_names[]=
+ { "Unchecked", "Wanted", "Unsolicited", 0 };
+
/*---------- assigning articles to conns, and transmitting ----------*/
static Article *dequeue_from(int peek, InputFile *ipf) {
return 0;
}
-static void check_assign_articles(void) {
+void check_assign_articles(void) {
for (;;) {
if (!dequeue(1))
break;
return OOP_CONTINUE;
}
-static void conn_maybe_write(Conn *conn) {
+void conn_maybe_write(Conn *conn) {
for (;;) {
conn_make_some_xmits(conn);
if (!conn->xmitu) {
* pause/resume inputfile tailing
*/
-static void check_reading_pause_resume(InputFile *ipf) {
+void check_reading_pause_resume(InputFile *ipf) {
if (ipf->queue.count >= max_queue_per_ipf)
inputfile_reading_pause(ipf);
else
inputfile_reading_resume(ipf);
}
-static void article_defer(Article *art /* not on a queue */, int whichcount) {
+void article_defer(Article *art /* not on a queue */, int whichcount) {
open_defer();
if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
|| fflush(defer))
article_done(art, whichcount);
}
-static int article_check_expired(Article *art /* must be queued, not conn */) {
+int article_check_expired(Article *art /* must be queued, not conn */) {
ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
if (artdata) { SMfreearticle(artdata); return 0; }
check_reading_pause_resume(ipf);
}
-static void article_autodefer(InputFile *ipf, Article *art) {
+void article_autodefer(InputFile *ipf, Article *art) {
ipf->autodefer++;
article_defer(art,-1);
}
article_autodefer(ipf, art);
}
-static void autodefer_input_file(InputFile *ipf) {
+void autodefer_input_file(InputFile *ipf) {
static const char *const abandon= "stuck";
ipf->autodefer= 0;
d->info.sm_art= ah;
}
-static void xmit_free(XmitDetails *d) {
+void xmit_free(XmitDetails *d) {
switch (d->kind) {
case xk_Artdata: SMfreearticle(d->info.sm_art); break;
case xk_Const: break;
}
}
-static void *conn_write_some_xmits(Conn *conn) {
+void *conn_write_some_xmits(Conn *conn) {
/* return values:
* 0: nothing more to write, no need to call us again
* OOP_CONTINUE: more to write but fd not writeable
}
}
-static void conn_make_some_xmits(Conn *conn) {
+void conn_make_some_xmits(Conn *conn) {
for (;;) {
if (conn->xmitu+5 > CONNIOVS)
break;