X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;f=innduct.c;h=7ade8b6ac3291c35f8b6195f77fdd06eb094e20e;hb=f4aee95c41a0d6231d115386b8fbb23f6b8e349a;hp=445157da1527c16fe557d2da91b97ae65cf78ae6;hpb=f8e8b4f4647f437f8931663fcfb5efe711d98d5e;p=innduct.git diff --git a/innduct.c b/innduct.c index 445157d..7ade8b6 100644 --- a/innduct.c +++ b/innduct.c @@ -1,6 +1,7 @@ /* * innduct * tailing reliable realtime streaming feeder for inn + * main program - option parsing and startup * * Copyright (C) 2010 Ian Jackson * @@ -23,3227 +24,42 @@ * with GPLv3. If not then please let me know. -Ian Jackson.) */ -/* - * Newsfeeds file entries should look like this: - * host.name.of.site[/exclude,exclude,...]\ - * :pattern,pattern...[/distribution,distribution...]\ - * :Tf,Wnm - * : - * or - * sitename[/exclude,exclude,...]\ - * :pattern,pattern...[/distribution,distribution...]\ - * :Tf,Wnm - * :host.name.of.site - * - * Four files full of - * token messageid - * or might be blanked out - * .... - * - * F site.name main feed file - * opened/created, then written, by innd - * read by duct - * unlinked by duct - * tokens blanked out by duct when processed - * site.name_lock lock preventing multiple ducts - * to hold lock must open,F_SETLK[W] - * and then stat to check that locked file - * still has name site.name_lock - * holder of this lock is "duct" - * (only) lockholder may remove the lockfile - * D site.name_flushing temporary feed file during flush (or crash) - * hardlink created by duct - * unlinked by duct - * site.name_defer 431'd articles, still being written, - * created, written, used by duct - * - * site.name_backlog.. - * 431'd articles, ready for innxmit or duct - * created (link/mv) by duct - * site.name_backlog (where does not - * contain '#' or '~') eg - * site.name_backlog.manual - * anything the sysadmin likes (eg, feed files - * from old feeds to be merged into this one) - * created (link/mv) by admin - * may be symlinks (in which case links - * may be written through, but only links - * will be removed. - * - * It is safe to remove backlog files manually, - * if it's desired to throw away the backlog. - * - * Backlog files are also processed by innduct. We find the oldest - * backlog file which is at least a certain amount old, and feed it - * back into our processing. When every article in it has been read - * and processed, we unlink it and look for another backlog file. - * - * If we don't have a backlog file that we're reading, we close the - * defer file that we're writing and make it into a backlog file at - * the first convenient opportunity. - * -8<- - - - OVERALL STATES: - - START - | - ,-->--. check F, D - | | | - | | | - | | <----------------<---------------------------------'| - | | F exists | - | | D ENOENT | - | | duct opens F | - | V | - | Normal | - | F: innd writing, duct reading | - | D: ENOENT | - | | | - | | duct decides time to flush | - | | duct makes hardlink | - | | | - | V <------------------------'| - | Hardlinked F==D | - | F == D: innd writing, duct reading both exist | - ^ | | - | | duct unlinks F | - | | <-----------<-------------<--'| - | | open D F ENOENT | - | | if exists | - | | | - | V <---------------------. | - | Moved | | - | F: ENOENT | | - | D: innd writing, duct reading; or ENOENT | | - | | | | - | | duct requests flush of feed | | - | | (others can too, harmlessly) | | - | V | | - | Flushing | | - | F: ENOENT | | - | D: innd flushing, duct; or ENOENT | | - | | | | - | | inndcomm flush fails | | - | |`-------------------------->------------------' | - | | | - | | inndcomm reports no such site | - | |`---------------------------------------------------- | -. - | | | | - | | innd finishes writing D, creates F | | - | | inndcomm reports flush successful | | - | | | | - | V | | - | Separated <----------------' | - | F: innd writing F!=D / - | D: duct reading; or ENOENT both exist / - | | / - | | duct gets to the end of D / - | | duct opens F too / - | V / - | Finishing / - | F: innd writing, duct reading | - | D: duct finishing V - | | Dropping - | | duct finishes processing D F: ENOENT - | V duct unlinks D D: duct reading - | | | - `--<--' | duct finishes - | processing D - | duct unlinks D - | duct exits - V - Dropped - F: ENOENT - D: ENOENT - duct not running - - "duct reading" means innduct is reading the file but also - overwriting processed tokens. - - * ->8- -^L- - * - * rune for printing diagrams: - -perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct.c |a2ps -R -B -ops - - * - */ - -/*============================== PROGRAM ==============================*/ - -#define _GNU_SOURCE 1 - -#include "config.h" -#include "storage.h" -#include "nntp.h" -#include "libinn.h" -#include "inndcomm.h" - -#include "inn/list.h" -#include "inn/innconf.h" -#include "inn/messages.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -/*----- general definitions, probably best not changed -----*/ - -#define CONNCHILD_ESTATUS_STREAM 24 -#define CONNCHILD_ESTATUS_NOSTREAM 25 - -#define INNDCOMMCHILD_ESTATUS_FAIL 26 -#define INNDCOMMCHILD_ESTATUS_NONESUCH 27 - -#define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10) -#define MAX_CLI_COMMAND 1000 - -#define VA va_list al; va_start(al,fmt) -#define PRINTF(f,a) __attribute__((__format__(printf,f,a))) -#define NORET_PRINTF(f,a) __attribute__((__noreturn__,__format__(printf,f,a))) -#define NORET __attribute__((__noreturn__)) - -#define NEW(ptr) ((ptr)= zxmalloc(sizeof(*(ptr)))) -#define NEW_DECL(type,ptr) type ptr = zxmalloc(sizeof(*(ptr))) - -#define DUMPV(fmt,pfx,v) fprintf(f, " " #v "=" fmt, pfx v); - -#define FOR_CONN(conn) \ - for ((conn)=LIST_HEAD(conns); (conn); (conn)=LIST_NEXT((conn))) - -/*----- doubly linked lists -----*/ - -#define ISNODE(T) struct node list_node -#define DEFLIST(T) \ - typedef struct { \ - union { struct list li; T *for_type; } u; \ - int count; \ - } T##List - -#define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node) - -#define LIST_CHECKCANHAVENODE(l,n) \ - ((void)((n) == ((l).u.for_type))) /* just for the type check */ - -#define LIST_ADDSOMEHOW(l,n,list_addsomehow) \ - ( LIST_CHECKCANHAVENODE(l,n), \ - list_addsomehow(&(l).u.li, NODE((n))), \ - (void)(l).count++ \ - ) - -#define LIST_REMSOMEHOW(l,list_remsomehow) \ - ( (typeof((l).u.for_type)) \ - ( (l).count \ - ? ( (l).count--, \ - list_remsomehow(&(l).u.li) ) \ - : 0 \ - ) \ - ) - - -#define LIST_ADDHEAD(l,n) LIST_ADDSOMEHOW((l),(n),list_addhead) -#define LIST_ADDTAIL(l,n) LIST_ADDSOMEHOW((l),(n),list_addtail) -#define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead) -#define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail) - -#define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li)) -#define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l)))) -#define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n)))) -#define LIST_BACK(n) ((typeof(n))list_pred(NODE((n)))) - -#define LIST_REMOVE(l,n) \ - ( LIST_CHECKCANHAVENODE(l,n), \ - list_remove(NODE((n))), \ - (void)(l).count-- \ - ) - -#define LIST_INSERT(l,n,pred) \ - ( LIST_CHECKCANHAVENODE(l,n), \ - LIST_CHECKCANHAVENODE(l,pred), \ - list_insert((struct list*)&(l), NODE((n)), NODE((pred))), \ - (void)(l).count++ \ - ) - -/*----- type predeclarations -----*/ - -typedef struct Conn Conn; -typedef struct Article Article; -typedef struct InputFile InputFile; -typedef struct XmitDetails XmitDetails; -typedef struct Filemon_Perfile Filemon_Perfile; -typedef enum StateMachineState StateMachineState; -typedef struct CliCommand CliCommand; - -DEFLIST(Conn); -DEFLIST(Article); - -/*----- function predeclarations -----*/ - -static void conn_maybe_write(Conn *conn); -static void conn_make_some_xmits(Conn *conn); -static void *conn_write_some_xmits(Conn *conn); - -static void xmit_free(XmitDetails *d); - -#define SMS(newstate, periods, why) \ - (statemc_setstate(sm_##newstate,(periods),#newstate,(why))) -static void statemc_setstate(StateMachineState newsms, int periods, - const char *forlog, const char *why); - -static void statemc_start_flush(const char *why); /* Normal => Flushing */ -static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */ -static int trigger_flush_ok(const char *why /* 0 means timeout */); - /* => Flushing,FLUSHING, ret 1; or ret 0 */ - -static void article_done(Article *art, int whichcount); - -static void check_assign_articles(void); -static void queue_check_input_done(void); -static void check_reading_pause_resume(InputFile *ipf); - -static void statemc_check_flushing_done(void); -static void statemc_check_backlog_done(void); - -static void postfork(void); -static void period(void); - -static void open_defer(void); -static void close_defer(void); -static void search_backlog_file(void); -static void preterminate(void); -static void raise_default(int signo) NORET; - -static void inputfile_reading_start(InputFile *ipf); -static void inputfile_reading_stop(InputFile *ipf); -static void inputfile_reading_pause(InputFile *ipf); -static void inputfile_reading_resume(InputFile *ipf); - /* pause and resume are idempotent, and no-op if not done _reading_start */ - -static void filemon_start(InputFile *ipf); -static void filemon_stop(InputFile *ipf); -static void tailing_make_readable(InputFile *ipf); - -static void vconnfail(Conn *conn, const char *fmt, va_list al) PRINTF(2,0); -static void connfail(Conn *conn, const char *fmt, ...) PRINTF(2,3); - -static const oop_rd_style peer_rd_style; -static oop_rd_call peer_rd_err, peer_rd_ok; - - -/*----- configuration options -----*/ -/* when changing defaults, remember to update the manpage */ - -static const char *sitename, *remote_host; -static const char *feedfile, *path_run, *path_cli, *path_cli_dir; -static int quiet_multiple=0; -static int interactive=0, try_filemon=1; -static int try_stream=1; -static int port=119; -static const char *inndconffile; - -static int max_connections=10; -static int max_queue_per_conn=200; -static int target_max_feedfile_size=100000; -static int period_seconds=30; -static int filepoll_seconds=5; -static int max_queue_per_ipf=-1; - -static int connection_setup_timeout=200; -static int inndcomm_flush_timeout=100; - -static double nocheck_thresh= 95.0; /* converted from percentage by main */ -static double nocheck_decay= 100; /* conv'd from articles to lambda by main */ - -/* all these are initialised to seconds, and converted to periods in main */ -static int reconnect_delay_periods=1000; -static int flushfail_retry_periods=1000; -static int backlog_retry_minperiods=100; -static int backlog_spontrescan_periods=300; -static int spontaneous_flush_periods=100000; -static int max_separated_periods=2000; -static int need_activity_periods=1000; -static int lowvol_thresh=3; -static int lowvol_periods=1000; - -static double max_bad_data_ratio= 1; /* conv'd from percentage by main */ -static int max_bad_data_initial= 30; - /* in one corrupt 4096-byte block the number of newlines has - * mean 16 and standard deviation 3.99. 30 corresponds to z=+3.5 */ - - -/*----- statistics -----*/ - -typedef enum { /* in queue in conn->sent */ - art_Unchecked, /* not checked, not sent checking */ - art_Wanted, /* checked, wanted sent body as requested */ - art_Unsolicited, /* - sent body without check */ - art_MaxState, -} ArtState; - -static const char *const artstate_names[]= - { "Unchecked", "Wanted", "Unsolicited", 0 }; - -#define RESULT_COUNTS(RCS,RCN) \ - RCS(sent) \ - RCS(accepted) \ - RCN(unwanted) \ - RCN(rejected) \ - RCN(deferred) \ - RCN(missing) \ - RCN(connretry) - -#define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)" -#define RCI_TRIPLE_VALS_BASE(counts,x) \ - counts[art_Unchecked] x \ - + counts[art_Wanted] x \ - + counts[art_Unsolicited] x, \ - counts[art_Unchecked] x \ - , counts[art_Wanted] x \ - , counts[art_Unsolicited] x - -typedef enum { -#define RC_INDEX(x) RC_##x, - RESULT_COUNTS(RC_INDEX, RC_INDEX) - RCI_max -} ResultCountIndex; - - -/*----- transmission buffers -----*/ - -#define CONNIOVS 128 - -typedef enum { - xk_Const, xk_Artdata -} XmitKind; - -struct XmitDetails { - XmitKind kind; - union { - ARTHANDLE *sm_art; - } info; -}; - - -/*----- core operational data structure types -----*/ - -struct InputFile { - /* This is also an instance of struct oop_readable */ - struct oop_readable readable; /* first */ - oop_readable_call *readable_callback; - void *readable_callback_user; - - int fd; - Filemon_Perfile *filemon; - - oop_read *rd; /* non-0: reading; 0: constructing, or had EOF */ - off_t offset; - int skippinglong, paused, fake_readable; - - ArticleList queue; - long inprogress; /* includes queue.count and also articles in conns */ - long autodefer; /* -1 means not doing autodefer */ - - int counts[art_MaxState][RCI_max]; - int readcount_ok, readcount_blank, readcount_err, count_nooffer_missing; - char path[]; -}; - -struct Article { - ISNODE(Article); - ArtState state; - int midlen, missing; - InputFile *ipf; - TOKEN token; - off_t offset; - int blanklen; - char messageid[1]; -}; - -#define SMS_LIST(X) \ - X(NORMAL) \ - X(FLUSHING) \ - X(FLUSHFAILED) \ - X(SEPARATED) \ - X(DROPPING) \ - X(DROPPED) - -enum StateMachineState { -#define SMS_DEF_ENUM(s) sm_##s, - SMS_LIST(SMS_DEF_ENUM) -}; - -static const char *sms_names[]= { -#define SMS_DEF_NAME(s) #s , - SMS_LIST(SMS_DEF_NAME) - 0 -}; - -struct Conn { - ISNODE(Conn); - int fd; /* may be 0, meaning closed (during construction/destruction) */ - oop_read *rd; /* likewise */ - int oopwriting; /* since on_fd is not idempotent */ - int max_queue, stream; - const char *quitting; - int since_activity; /* periods */ - ArticleList waiting; /* not yet told peer */ - ArticleList priority; /* peer says send it now */ - ArticleList sent; /* offered/transmitted - in xmit or waiting reply */ - struct iovec xmit[CONNIOVS]; - XmitDetails xmitd[CONNIOVS]; - int xmitu; -}; - - -/*----- general operational variables -----*/ - -/* main initialises */ -static oop_source *loop; -static ConnList conns; -static char *path_lock, *path_flushing, *path_defer, *path_dump; -static char *globpat_backlog; -static pid_t self_pid; -static int *lowvol_perperiod; -static int lowvol_circptr; -static int lowvol_total; /* does not include current period */ - -/* statemc_init initialises */ -static StateMachineState sms; -static int until_flush; -static InputFile *main_input_file, *flushing_input_file, *backlog_input_file; -static FILE *defer; - -/* initialisation to 0 is good */ -static int until_connect, until_backlog_nextscan; -static double accept_proportion; -static int nocheck, nocheck_reported, in_child; - -/* for logging, simulation, debugging, etc. */ -int simulate_flush= -1; -int logv_use_syslog; -static const char *logv_prefix=""; - -/*========== logging ==========*/ - -static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3); -static void logcore(int sysloglevel, const char *fmt, ...) { - VA; - if (logv_use_syslog) { - vsyslog(sysloglevel,fmt,al); - } else { - if (self_pid) fprintf(stderr,"[%lu] ",(unsigned long)self_pid); - vfprintf(stderr,fmt,al); - putc('\n',stderr); - } - va_end(al); -} - -static void logv(int sysloglevel, const char *pfx, int errnoval, - const char *fmt, va_list al) PRINTF(5,0); -static void logv(int sysloglevel, const char *pfx, int errnoval, - const char *fmt, va_list al) { - char msgbuf[1024]; /* NB do not call xvasprintf here or you'll recurse */ - vsnprintf(msgbuf,sizeof(msgbuf), fmt,al); - msgbuf[sizeof(msgbuf)-1]= 0; - - if (sysloglevel >= LOG_ERR && (errnoval==EACCES || errnoval==EPERM)) - sysloglevel= LOG_ERR; /* run by wrong user, probably */ - - logcore(sysloglevel, "%s%s: %s%s%s", - logv_prefix, pfx, msgbuf, - errnoval>=0 ? ": " : "", - errnoval>=0 ? strerror(errnoval) : ""); -} - -#define DEFFATAL(fn, pfx, sysloglevel, err, estatus) \ - static void fn(const char *fmt, ...) NORET_PRINTF(1,2); \ - static void fn(const char *fmt, ...) { \ - preterminate(); \ - VA; \ - logv(sysloglevel, pfx, err, fmt, al); \ - exit(estatus); \ - } - -#define DEFLOG(fn, pfx, sysloglevel, err) \ - static void fn(const char *fmt, ...) PRINTF(1,2); \ - static void fn(const char *fmt, ...) { \ - VA; \ - logv(sysloglevel, pfx, err, fmt, al); \ - va_end(al); \ - } - -#define INNLOGSET_DECLARE(fn, pfx, sysloglevel) \ - static void duct_log_##fn(int l, const char *fmt, va_list al, int errval) { \ - logv(sysloglevel, pfx, errval ? errval : -1, fmt, al); \ - } -#define INNLOGSET_CALL(fn, pfx, sysloglevel) \ - message_handlers_##fn(1, duct_log_##fn); - - -static int innduct_fatal_cleanup(void) { return 12; } /* used for libinn die */ - -/* We want to extend the set of logging functions from inn, and we - * want to prepend the site name to all our messages. */ - -DEFFATAL(syscrash, "critical", LOG_CRIT, errno, 16); -DEFFATAL(crash, "critical", LOG_CRIT, -1, 16); - -#define INNLOGSETS(INNLOGSET) \ - INNLOGSET(die, "fatal", LOG_ERR) \ - INNLOGSET(warn, "warning", LOG_WARNING) \ - INNLOGSET(notice, "notice", LOG_NOTICE) \ - INNLOGSET(trace, "trace", LOG_NOTICE) -INNLOGSETS(INNLOGSET_DECLARE) - -DEFLOG(info, "info", LOG_INFO, -1) -DEFLOG(dbg, "debug", LOG_DEBUG, -1) - - -/*========== utility functions etc. ==========*/ - -static char *xvasprintf(const char *fmt, va_list al) PRINTF(1,0); -static char *xvasprintf(const char *fmt, va_list al) { - char *str; - int rc= vasprintf(&str,fmt,al); - if (rc<0) sysdie("vasprintf(\"%s\",...) failed", fmt); - return str; -} -static char *xasprintf(const char *fmt, ...) PRINTF(1,2); -static char *xasprintf(const char *fmt, ...) { - VA; - char *str= xvasprintf(fmt,al); - va_end(al); - return str; -} - -static int close_perhaps(int *fd) { - if (*fd <= 0) return 0; - int r= close(*fd); - *fd=0; - return r; -} -static void xclose(int fd, const char *what, const char *what2) { - int r= close(fd); - if (r) syscrash("close %s%s",what,what2?what2:""); -} -static void xclose_perhaps(int *fd, const char *what, const char *what2) { - if (*fd <= 0) return; - xclose(*fd,what,what2); - *fd=0; -} - -static pid_t xfork(const char *what) { - pid_t child; - - child= fork(); - if (child==-1) sysdie("cannot fork for %s",what); - dbg("forked %s %ld", what, (unsigned long)child); - if (!child) postfork(); - return child; -} - -static void on_fd_read_except(int fd, oop_call_fd callback) { - loop->on_fd(loop, fd, OOP_READ, callback, 0); - loop->on_fd(loop, fd, OOP_EXCEPTION, callback, 0); -} -static void cancel_fd_read_except(int fd) { - loop->cancel_fd(loop, fd, OOP_READ); - loop->cancel_fd(loop, fd, OOP_EXCEPTION); -} - -static void report_child_status(const char *what, int status) { - if (WIFEXITED(status)) { - int es= WEXITSTATUS(status); - if (es) - warn("%s: child died with error exit status %d", what, es); - } else if (WIFSIGNALED(status)) { - int sig= WTERMSIG(status); - const char *sigstr= strsignal(sig); - const char *coredump= WCOREDUMP(status) ? " (core dumped)" : ""; - if (sigstr) - warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump); - else - warn("%s: child died due to unknown fatal signal %d%s", - what, sig, coredump); - } else { - warn("%s: child died with unknown wait status %d", what,status); - } -} - -static int xwaitpid(pid_t *pid, const char *what) { - int status; - - int r= kill(*pid, SIGKILL); - if (r) syscrash("cannot kill %s child", what); - - pid_t got= waitpid(*pid, &status, 0); - if (got==-1) syscrash("cannot reap %s child", what); - if (got==0) crash("cannot reap %s child", what); - - *pid= 0; - - return status; -} - -static void *zxmalloc(size_t sz) { - void *p= xmalloc(sz); - memset(p,0,sz); - return p; -} - -static void xunlink(const char *path, const char *what) { - int r= unlink(path); - if (r) syscrash("can't unlink %s %s", path, what); -} - -static time_t xtime(void) { - time_t now= time(0); - if (now==-1) syscrash("time(2) failed"); - return now; -} - -static void xsigaction(int signo, const struct sigaction *sa) { - int r= sigaction(signo,sa,0); - if (r) syscrash("sigaction failed for \"%s\"", strsignal(signo)); -} - -static void xsigsetdefault(int signo) { - struct sigaction sa; - memset(&sa,0,sizeof(sa)); - sa.sa_handler= SIG_DFL; - xsigaction(signo,&sa); -} - -static void xgettimeofday(struct timeval *tv_r) { - int r= gettimeofday(tv_r,0); - if (r) syscrash("gettimeofday(2) failed"); -} - -static void xsetnonblock(int fd, int nonblocking) { - int errnoval= oop_fd_nonblock(fd, nonblocking); - if (errnoval) { errno= errnoval; syscrash("setnonblocking"); } -} - -static void check_isreg(const struct stat *stab, const char *path, - const char *what) { - if (!S_ISREG(stab->st_mode)) - crash("%s %s not a plain file (mode 0%lo)", - what, path, (unsigned long)stab->st_mode); -} - -static void xfstat(int fd, struct stat *stab_r, const char *what) { - int r= fstat(fd, stab_r); - if (r) syscrash("could not fstat %s", what); -} - -static void xfstat_isreg(int fd, struct stat *stab_r, - const char *path, const char *what) { - xfstat(fd, stab_r, what); - check_isreg(stab_r, path, what); -} - -static void xlstat_isreg(const char *path, struct stat *stab, - int *enoent_r /* 0 means ENOENT is fatal */, - const char *what) { - int r= lstat(path, stab); - if (r) { - if (errno==ENOENT && enoent_r) { *enoent_r=1; return; } - syscrash("could not lstat %s %s", what, path); - } - if (enoent_r) *enoent_r= 0; - check_isreg(stab, path, what); -} - -static int samefile(const struct stat *a, const struct stat *b) { - assert(S_ISREG(a->st_mode)); - assert(S_ISREG(b->st_mode)); - return (a->st_ino == b->st_ino && - a->st_dev == b->st_dev); -} - -static char *sanitise(const char *input, int len) { - static char sanibuf[100]; /* returns pointer to this buffer! */ - - const char *p= input; - const char *endp= len>=0 ? input+len : 0; - char *q= sanibuf; - *q++= '`'; - for (;;) { - if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"'.."); break; } - int c= (!endp || p=' ' && c<=126 && c!='\\') { *q++= c; continue; } - sprintf(q,"\\x%02x",c); - q += 4; - } - return sanibuf; -} - -static int isewouldblock(int errnoval) { - return errnoval==EWOULDBLOCK || errnoval==EAGAIN; -} - -/*========== command and control (CLI) connections ==========*/ - -static int cli_master; - -typedef struct CliConn CliConn; -struct CliConn { - void (*destroy)(CliConn*); - int fd; - oop_read *rd; - FILE *out; - union { - struct sockaddr sa; - struct sockaddr_un un; - } sa; - socklen_t salen; -}; - -static const oop_rd_style cli_rd_style= { - OOP_RD_DELIM_STRIP, '\n', - OOP_RD_NUL_FORBID, - OOP_RD_SHORTREC_FORBID -}; - -static void cli_destroy(CliConn *cc) { - cc->destroy(cc); -} - -static void cli_checkouterr(CliConn *cc /* may destroy*/) { - if (ferror(cc->out) | fflush(cc->out)) { - info("CTRL%d write error %s", cc->fd, strerror(errno)); - cli_destroy(cc); - } -} - -static void cli_prompt(CliConn *cc /* may destroy*/) { - fprintf(cc->out, "%s| ", sitename); - cli_checkouterr(cc); -} - -struct CliCommand { - const char *cmd; - void (*f)(CliConn *cc, const CliCommand *ccmd, - const char *arg, size_t argsz); - void *xdata; - int xval; -}; - -static const CliCommand cli_commands[]; - -#define CCMD(wh) \ - static void ccmd_##wh(CliConn *cc, const CliCommand *c, \ - const char *arg, size_t argsz) - -CCMD(help) { - fputs("commands:\n", cc->out); - const CliCommand *ccmd; - for (ccmd=cli_commands; ccmd->cmd; ccmd++) - fprintf(cc->out, " %s\n", ccmd->cmd); - fputs("NB: permissible arguments are not shown above." - " Not all commands listed are safe. See innduct(8).\n", cc->out); -} - -CCMD(flush) { - int ok= trigger_flush_ok("manual request"); - if (!ok) fprintf(cc->out,"already flushing (state is %s)\n", sms_names[sms]); -} - -CCMD(stop) { - preterminate(); - notice("terminating (CTRL%d)",cc->fd); - raise_default(SIGTERM); - abort(); -} - -CCMD(dump); - -/* messing with our head: */ -CCMD(period) { period(); } -CCMD(setintarg) { *(int*)c->xdata= atoi(arg); } -CCMD(setint) { *(int*)c->xdata= c->xval; } -CCMD(setint_period) { *(int*)c->xdata= c->xval; period(); } - -static const CliCommand cli_commands[]= { - { "h", ccmd_help }, - { "flush", ccmd_flush }, - { "stop", ccmd_stop }, - { "dump q", ccmd_dump, 0,0 }, - { "dump a", ccmd_dump, 0,1 }, - - { "p", ccmd_period }, - -#define POKES(cmd,func) \ - { cmd "flush", func, &until_flush, 1 }, \ - { cmd "conn", func, &until_connect, 0 }, \ - { cmd "blscan", func, &until_backlog_nextscan, 0 }, -POKES("next ", ccmd_setint) -POKES("prod ", ccmd_setint_period) - - { "pretend flush", ccmd_setintarg, &simulate_flush }, - { "wedge blscan", ccmd_setint, &until_backlog_nextscan, -1 }, - { 0 } -}; - -static void *cli_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, - const char *errmsg, int errnoval, - const char *data, size_t recsz, void *cc_v) { - CliConn *cc= cc_v; - - if (!data) { - info("CTRL%d closed", cc->fd); - cc->destroy(cc); - return OOP_CONTINUE; - } - - if (recsz == 0) goto prompt; - - const CliCommand *ccmd; - for (ccmd=cli_commands; ccmd->cmd; ccmd++) { - int l= strlen(ccmd->cmd); - if (recsz < l) continue; - if (recsz > l && data[l] != ' ') continue; - if (memcmp(data, ccmd->cmd, l)) continue; - - int argl= (int)recsz - (l+1); - ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl); - goto prompt; - } - - fputs("unknown command; h for help\n", cc->out); - - prompt: - cli_prompt(cc); - return OOP_CONTINUE; -} - -static void *cli_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev, - const char *errmsg, int errnoval, - const char *data, size_t recsz, void *cc_v) { - CliConn *cc= cc_v; - - info("CTRL%d read error %s", cc->fd, errmsg); - cc->destroy(cc); - return OOP_CONTINUE; -} - -static int cli_conn_startup(CliConn *cc /* may destroy*/, - const char *how) { - cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0); - if (!cc->rd) { warn("oop_rd_new_fd cli failed"); return -1; } - - int er= oop_rd_read(cc->rd, &cli_rd_style, MAX_CLI_COMMAND, - cli_rd_ok, cc, - cli_rd_err, cc); - if (er) { errno= er; syswarn("oop_rd_read cli failed"); return -1; } - - info("CTRL%d %s ready", cc->fd, how); - cli_prompt(cc); - return 0; -} - -static void cli_stdio_destroy(CliConn *cc) { - if (cc->rd) { - oop_rd_cancel(cc->rd); - errno= oop_rd_delete_tidy(cc->rd); - if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)"); - } - free(cc); -} - -static void cli_stdio(void) { - NEW_DECL(CliConn *,cc); - cc->destroy= cli_stdio_destroy; - - cc->fd= 0; - cc->out= stdout; - int r= cli_conn_startup(cc,"stdio"); - if (r) cc->destroy(cc); -} - -static void cli_accepted_destroy(CliConn *cc) { - if (cc->rd) { - oop_rd_cancel(cc->rd); - oop_rd_delete_kill(cc->rd); - } - if (cc->out) { fclose(cc->out); cc->fd=0; } - close_perhaps(&cc->fd); - free(cc); -} - -static void *cli_master_readable(oop_source *lp, int master, - oop_event ev, void *u) { - NEW_DECL(CliConn *,cc); - cc->destroy= cli_accepted_destroy; - - cc->salen= sizeof(cc->sa); - cc->fd= accept(master, &cc->sa.sa, &cc->salen); - if (cc->fd<0) { syswarn("error accepting cli connection"); goto x; } - - cc->out= fdopen(cc->fd, "w"); - if (!cc->out) { syswarn("error fdopening accepted cli connection"); goto x; } - - int r= cli_conn_startup(cc, "accepted"); - if (r) goto x; - - return OOP_CONTINUE; - - x: - cc->destroy(cc); - return OOP_CONTINUE; -} - -#define NOCLI(...) do{ \ - syswarn("no cli listener, because failed to " __VA_ARGS__); \ - goto nocli; \ - }while(0) - -static void cli_init(void) { - union { - struct sockaddr sa; - struct sockaddr_un un; - } sa; - - memset(&sa,0,sizeof(sa)); - int maxlen= sizeof(sa.un.sun_path); - - if (!path_cli) { - info("control command line disabled"); - return; - } - - int pathlen= strlen(path_cli); - if (pathlen > maxlen) { - warn("no cli listener, because cli socket path %s too long (%d>%d)", - path_cli, pathlen, maxlen); - return; - } - - if (path_cli_dir) { - int r= mkdir(path_cli_dir, 0700); - if (r && errno!=EEXIST) - NOCLI("create cli socket directory %s", path_cli_dir); - } - - int r= unlink(path_cli); - if (r && errno!=ENOENT) - NOCLI("remove old cli socket %s", path_cli); - - cli_master= socket(PF_UNIX, SOCK_STREAM, 0); - if (cli_master<0) NOCLI("create new cli master socket"); - - int sl= pathlen + offsetof(struct sockaddr_un, sun_path); - sa.un.sun_family= AF_UNIX; - memcpy(sa.un.sun_path, path_cli, pathlen); - - r= bind(cli_master, &sa.sa, sl); - if (r) NOCLI("bind to cli socket path %s", sa.un.sun_path); - - r= listen(cli_master, 5); - if (r) NOCLI("listen to cli master socket"); - - xsetnonblock(cli_master, 1); - - loop->on_fd(loop, cli_master, OOP_READ, cli_master_readable, 0); - info("cli ready, listening on %s", path_cli); - - return; - - nocli: - xclose_perhaps(&cli_master, "cli master",0); - return; -} - -/*========== management of connections ==========*/ - -static void reconnect_blocking_event(void) { - until_connect= reconnect_delay_periods; -} - -static void conn_closefd(Conn *conn, const char *msgprefix) { - int r= close_perhaps(&conn->fd); - if (r) info("C%d %serror closing socket: %s", - conn->fd, msgprefix, strerror(errno)); -} - -static int conn_busy(Conn *conn) { - return - conn->waiting.count || - conn->priority.count || - conn->sent.count || - conn->xmitu; -} - -static void conn_dispose(Conn *conn) { - if (!conn) return; - if (conn->rd) { - oop_rd_cancel(conn->rd); - oop_rd_delete_kill(conn->rd); - conn->rd= 0; - } - if (conn->fd) { - loop->cancel_fd(loop, conn->fd, OOP_WRITE); - loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION); - } - conn_closefd(conn,""); - free(conn); -} - -static void *conn_exception(oop_source *lp, int fd, - oop_event ev, void *conn_v) { - Conn *conn= conn_v; - unsigned char ch; - assert(fd == conn->fd); - assert(ev == OOP_EXCEPTION); - int r= read(conn->fd, &ch, 1); - if (r<0) connfail(conn,"read failed: %s",strerror(errno)); - else connfail(conn,"exceptional condition on socket (peer sent urgent" - " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch); - return OOP_CONTINUE; -} - -static void vconnfail(Conn *conn, const char *fmt, va_list al) { - int requeue[art_MaxState]; - memset(requeue,0,sizeof(requeue)); - - Article *art; - - while ((art= LIST_REMHEAD(conn->priority))) - LIST_ADDTAIL(art->ipf->queue, art); - - while ((art= LIST_REMHEAD(conn->waiting))) - LIST_ADDTAIL(art->ipf->queue, art); - - while ((art= LIST_REMHEAD(conn->sent))) { - requeue[art->state]++; - if (art->state==art_Unsolicited) art->state= art_Unchecked; - LIST_ADDTAIL(art->ipf->queue,art); - check_reading_pause_resume(art->ipf); - } - - int i; - XmitDetails *d; - for (i=0, d=conn->xmitd; ixmitu; i++, d++) - xmit_free(d); - - LIST_REMOVE(conns,conn); - - char *m= xvasprintf(fmt,al); - warn("C%d (now %d) connection failed requeueing " RCI_TRIPLE_FMT_BASE ": %s", - conn->fd, conns.count, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m); - free(m); - - reconnect_blocking_event(); - conn_dispose(conn); - check_assign_articles(); -} - -static void connfail(Conn *conn, const char *fmt, ...) { - va_list al; - va_start(al,fmt); - vconnfail(conn,fmt,al); - va_end(al); -} - -static void conn_idle_close(Conn *conn, const char *why) { - static const char quitcmd[]= "QUIT\r\n"; - int todo= sizeof(quitcmd)-1; - const char *p= quitcmd; - for (;;) { - int r= write(conn->fd, p, todo); - if (r<0) { - if (isewouldblock(errno)) - connfail(conn, "blocked writing QUIT to idle connection"); - else - connfail(conn, "failed to write QUIT to idle connection: %s", - strerror(errno)); - break; - } - assert(r<=todo); - todo -= r; - if (!todo) { - conn->quitting= why; - conn->since_activity= 0; - dbg("C%d is idle (%s), quitting", conn->fd, why); - break; - } - } -} - -/* - * For our last connection, we also shut it down if we have had - * less than K in the last L - */ -static void check_idle_conns(void) { - Conn *conn; - - int volthisperiod= lowvol_perperiod[lowvol_circptr]; - lowvol_circptr++; - lowvol_circptr %= lowvol_periods; - lowvol_total += volthisperiod; - lowvol_total -= lowvol_perperiod[lowvol_circptr]; - lowvol_perperiod[lowvol_circptr]= 0; - - FOR_CONN(conn) - conn->since_activity++; - - search_again: - FOR_CONN(conn) { - if (conn->since_activity <= need_activity_periods) continue; - - /* We need to shut this down */ - if (conn->quitting) - connfail(conn,"timed out waiting for response to QUIT (%s)", - conn->quitting); - else if (conn->sent.count) - connfail(conn,"timed out waiting for responses"); - else if (conn->waiting.count || conn->priority.count) - connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent"); - else if (conn->xmitu) - connfail(conn,"peer has been sending responses" - " before receiving our commands!"); - else - conn_idle_close(conn, "no activity"); - - goto search_again; - } - - conn= LIST_HEAD(conns); - if (!volthisperiod && - conns.count==1 && - lowvol_total < lowvol_thresh && - !conn_busy(conn)) - conn_idle_close(conn, "low volume"); -} - -/*---------- making new connections ----------*/ - -static pid_t connecting_child; -static int connecting_fdpass_sock; - -static void connect_attempt_discard(void) { - if (connecting_child) { - int status= xwaitpid(&connecting_child, "connect"); - if (!(WIFEXITED(status) || - (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) - report_child_status("connect", status); - } - if (connecting_fdpass_sock) { - cancel_fd_read_except(connecting_fdpass_sock); - xclose_perhaps(&connecting_fdpass_sock, "connecting fdpass socket",0); - } -} - -#define PREP_DECL_MSG_CMSG(msg) \ - char msgbyte= 0; \ - struct iovec msgiov; \ - msgiov.iov_base= &msgbyte; \ - msgiov.iov_len= 1; \ - struct msghdr msg; \ - memset(&msg,0,sizeof(msg)); \ - char msg##cbuf[CMSG_SPACE(sizeof(int))]; \ - msg.msg_iov= &msgiov; \ - msg.msg_iovlen= 1; \ - msg.msg_control= msg##cbuf; \ - msg.msg_controllen= sizeof(msg##cbuf); - -static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { - Conn *conn= 0; - - assert(fd == connecting_fdpass_sock); - - PREP_DECL_MSG_CMSG(msg); - - ssize_t rs= recvmsg(fd, &msg, 0); - if (rs<0) { - if (isewouldblock(errno)) return OOP_CONTINUE; - syswarn("failed to read socket from connecting child"); - goto x; - } - - NEW(conn); - LIST_INIT(conn->waiting); - LIST_INIT(conn->priority); - LIST_INIT(conn->sent); - - struct cmsghdr *h= 0; - if (rs >= 0) h= CMSG_FIRSTHDR(&msg); - if (!h) { - int status= xwaitpid(&connecting_child, "connect child (broken)"); - - if (WIFEXITED(status)) { - if (WEXITSTATUS(status) != 0 && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM) - /* child already reported the problem */; - else { - if (e == OOP_EXCEPTION) - warn("connect: connection child exited code %d but" - " unexpected exception on fdpass socket", - WEXITSTATUS(status)); - else - warn("connect: connection child exited code %d but" - " no cmsg (rs=%d)", - WEXITSTATUS(status), (int)rs); - } - } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { - warn("connect: connection attempt timed out"); - } else { - report_child_status("connect", status); - } - goto x; - } - -#define CHK(field, val) \ - if (h->cmsg_##field != val) { \ - crash("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \ - h->cmsg_##field, val); \ - goto x; \ - } - CHK(level, SOL_SOCKET); - CHK(type, SCM_RIGHTS); - CHK(len, CMSG_LEN(sizeof(conn->fd))); -#undef CHK - - if (CMSG_NXTHDR(&msg,h)) crash("connect: child sent many cmsgs"); - - memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd)); - - int status; - pid_t got= waitpid(connecting_child, &status, 0); - if (got==-1) syscrash("connect: real wait for child"); - assert(got == connecting_child); - connecting_child= 0; - - if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; } - int es= WEXITSTATUS(status); - switch (es) { - case CONNCHILD_ESTATUS_STREAM: conn->stream= 1; break; - case CONNCHILD_ESTATUS_NOSTREAM: conn->stream= 0; break; - default: - die("connect: child gave unexpected exit status %d", es); - } - - /* Phew! */ - conn->max_queue= conn->stream ? max_queue_per_conn : 1; - - loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn); - conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */ - if (!conn->fd) crash("oop_rd_new_fd conn failed (fd=%d)",conn->fd); - int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN, - &peer_rd_ok, conn, - &peer_rd_err, conn); - if (r) syscrash("oop_rd_read for peer (fd=%d)",conn->fd); - - LIST_ADDHEAD(conns, conn); - notice("C%d (now %d) connected %s", - conn->fd, conns.count, conn->stream ? "streaming" : "plain"); - - connect_attempt_discard(); - check_assign_articles(); - return OOP_CONTINUE; - - x: - conn_dispose(conn); - connect_attempt_discard(); - reconnect_blocking_event(); - return OOP_CONTINUE; -} - -static int allow_connect_start(void) { - return conns.count < max_connections - && !connecting_child - && !until_connect; -} - -static void connect_start(void) { - assert(!connecting_child); - assert(!connecting_fdpass_sock); - - info("starting connection attempt"); - int ok_until_connect= until_connect; - reconnect_blocking_event(); - - int socks[2]; - int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks); - if (r) { syswarn("connect: cannot create socketpair for child"); return; } - - connecting_child= xfork("connection"); - - if (!connecting_child) { - FILE *cn_from, *cn_to; - char buf[NNTP_STRLEN+100]; - int exitstatus= CONNCHILD_ESTATUS_NOSTREAM; - - xclose(socks[0], "(in child) parent's connection fdpass socket",0); - - alarm(connection_setup_timeout); - if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) { - int l= strlen(buf); - int stripped=0; - while (l>0) { - unsigned char c= buf[l-1]; - if (!isspace(c)) break; - if (c=='\n' || c=='\r') stripped=1; - --l; - } - if (!buf[0]) { - sysdie("connect: connection attempt failed"); - } else { - buf[l]= 0; - die("connect: %s: %s", stripped ? "rejected" : "failed", - sanitise(buf,-1)); - } - } - if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0) - sysdie("connect: authentication failed"); - if (try_stream) { - if (fputs("MODE STREAM\r\n", cn_to)==EOF || - fflush(cn_to)) - sysdie("connect: could not send MODE STREAM"); - buf[sizeof(buf)-1]= 0; - if (!fgets(buf, sizeof(buf)-1, cn_from)) { - if (ferror(cn_from)) - sysdie("connect: could not read response to MODE STREAM"); - else - die("connect: connection close in response to MODE STREAM"); - } - int l= strlen(buf); - assert(l>=1); - if (buf[l-1]!='\n') - die("connect: response to MODE STREAM is too long: %.100s...", - sanitise(buf,-1)); - l--; if (l>0 && buf[l-1]=='\r') l--; - buf[l]= 0; - char *ep; - int rcode= strtoul(buf,&ep,10); - if (ep != &buf[3]) - die("connect: bad response to MODE STREAM: %.50s", sanitise(buf,-1)); - - switch (rcode) { - case 203: - exitstatus= CONNCHILD_ESTATUS_STREAM; - break; - case 480: - case 500: - break; - default: - warn("connect: unexpected response to MODE STREAM: %.50s", - sanitise(buf,-1)); - exitstatus= 2; - break; - } - } - int fd= fileno(cn_from); - - PREP_DECL_MSG_CMSG(msg); - struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg); - cmsg->cmsg_level= SOL_SOCKET; - cmsg->cmsg_type= SCM_RIGHTS; - cmsg->cmsg_len= CMSG_LEN(sizeof(fd)); - memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd)); - - msg.msg_controllen= cmsg->cmsg_len; - r= sendmsg(socks[1], &msg, 0); - if (r<0) syscrash("sendmsg failed for new connection"); - if (r!=1) crash("sendmsg for new connection gave wrong result %d",r); - - _exit(exitstatus); - } - - xclose(socks[1], "connecting fdpass child's socket",0); - connecting_fdpass_sock= socks[0]; - xsetnonblock(connecting_fdpass_sock, 1); - on_fd_read_except(connecting_fdpass_sock, connchild_event); - - if (!conns.count) - until_connect= ok_until_connect; -} - -/*---------- assigning articles to conns, and transmitting ----------*/ - -static Article *dequeue_from(int peek, InputFile *ipf) { - if (!ipf) return 0; - if (peek) return LIST_HEAD(ipf->queue); - - Article *art= LIST_REMHEAD(ipf->queue); - if (!art) return 0; - check_reading_pause_resume(ipf); - return art; -} - -static Article *dequeue(int peek) { - Article *art; - art= dequeue_from(peek, flushing_input_file); if (art) return art; - art= dequeue_from(peek, backlog_input_file); if (art) return art; - art= dequeue_from(peek, main_input_file); if (art) return art; - return 0; -} - -static void check_assign_articles(void) { - for (;;) { - if (!dequeue(1)) - break; - - Conn *walk, *use=0; - int spare=0, inqueue=0; - - /* Find a connection to offer this article. We prefer a busy - * connection to an idle one, provided it's not full. We take the - * first (oldest) and since that's stable, it will mean we fill up - * connections in order. That way if we have too many - * connections, the spare ones will go away eventually. - */ - FOR_CONN(walk) { - if (walk->quitting) continue; - inqueue= walk->sent.count + walk->priority.count - + walk->waiting.count; - spare= walk->max_queue - inqueue; - assert(inqueue <= max_queue_per_conn); - assert(spare >= 0); - if (inqueue==0) /*idle*/ { if (!use) use= walk; } - else if (spare>0) /*working*/ { use= walk; break; } - } - if (use) { - if (!inqueue) use->since_activity= 0; /* reset idle counter */ - while (spare>0) { - Article *art= dequeue(0); - if (!art) break; - LIST_ADDTAIL(use->waiting, art); - lowvol_perperiod[lowvol_circptr]++; - spare--; - } - conn_maybe_write(use); - } else if (allow_connect_start()) { - connect_start(); - break; - } else { - break; - } - } -} - -static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) { - conn_maybe_write(u); - return OOP_CONTINUE; -} - -static void conn_maybe_write(Conn *conn) { - for (;;) { - conn_make_some_xmits(conn); - if (!conn->xmitu) { - loop->cancel_fd(loop, conn->fd, OOP_WRITE); - conn->oopwriting= 0; - return; - } - - void *rp= conn_write_some_xmits(conn); - if (rp==OOP_CONTINUE) { - if (!conn->oopwriting) { - loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn); - conn->oopwriting= 1; - } - return; - } else if (rp==OOP_HALT) { - return; - } else if (!rp) { - /* transmitted everything */ - } else { - abort(); - } - } -} - -/*---------- expiry, flow control and deferral ----------*/ - -/* - * flow control notes - * to ensure articles go away eventually - * separate queue for each input file - * queue expiry - * every period, check head of backlog queue for expiry with SMretrieve - * if too old: discard, and check next article - * also check every backlog article as we read it - * flush expiry - * after too long in SEPARATED/DROPPING ie Separated/Finishing/Dropping - * one-off: eat queued articles from flushing and write them to defer - * one-off: connfail all connections which have any articles from flushing - * newly read articles from flushing go straight to defer - * this should take care of it and get us out of this state - * to avoid filling up ram needlessly - * input control - * limit number of queued articles for each ipf - * pause/resume inputfile tailing - */ - -static void check_reading_pause_resume(InputFile *ipf) { - if (ipf->queue.count >= max_queue_per_ipf) - inputfile_reading_pause(ipf); - else - inputfile_reading_resume(ipf); -} - -static void article_defer(Article *art /* not on a queue */, int whichcount) { - open_defer(); - if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0 - || fflush(defer)) - sysdie("write to defer file %s",path_defer); - article_done(art, whichcount); -} - -static int article_check_expired(Article *art /* must be queued, not conn */) { - ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT); - if (artdata) { SMfreearticle(artdata); return 0; } - - LIST_REMOVE(art->ipf->queue, art); - art->missing= 1; - art->ipf->count_nooffer_missing++; - article_done(art,-1); - return 1; -} - -static void inputfile_queue_check_expired(InputFile *ipf) { - if (!ipf) return; - - for (;;) { - Article *art= LIST_HEAD(ipf->queue); - int exp= article_check_expired(art); - if (!exp) break; - } - check_reading_pause_resume(ipf); -} - -static void article_autodefer(InputFile *ipf, Article *art) { - ipf->autodefer++; - article_defer(art,-1); -} - -static int has_article_in(const ArticleList *al, InputFile *ipf) { - Article *art; - for (art=LIST_HEAD(*al); art; art=LIST_NEXT(art)) - if (art->ipf == ipf) return 1; - return 0; -} - -static void autodefer_input_file_articles(InputFile *ipf) { - Article *art; - while ((art= LIST_REMHEAD(ipf->queue))) - article_autodefer(ipf, art); -} - -static void autodefer_input_file(InputFile *ipf) { - static const char *const abandon= "stuck"; - ipf->autodefer= 0; - - autodefer_input_file_articles(ipf); - - if (ipf->inprogress) { - Conn *walk; - FOR_CONN(walk) { - if (has_article_in(&walk->waiting, ipf) || - has_article_in(&walk->priority, ipf) || - has_article_in(&walk->sent, ipf)) - walk->quitting= abandon; - } - while (ipf->inprogress) { - FOR_CONN(walk) - if (walk->quitting == abandon) goto found; - abort(); /* where are they ?? */ - - found: - connfail(walk, "connection is stuck or crawling," - " and we need to finish flush"); - autodefer_input_file_articles(ipf); - } - } - - check_reading_pause_resume(ipf); -} - -/*========== article transmission ==========*/ - -static XmitDetails *xmit_core(Conn *conn, const char *data, int len, - XmitKind kind) { /* caller must then fill in details */ - struct iovec *v= &conn->xmit[conn->xmitu]; - XmitDetails *d= &conn->xmitd[conn->xmitu++]; - v->iov_base= (char*)data; - v->iov_len= len; - d->kind= kind; - return d; -} - -static void xmit_noalloc(Conn *conn, const char *data, int len) { - xmit_core(conn,data,len, xk_Const); -} -#define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1)) - -static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) { - XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata); - d->info.sm_art= ah; -} - -static void xmit_free(XmitDetails *d) { - switch (d->kind) { - case xk_Artdata: SMfreearticle(d->info.sm_art); break; - case xk_Const: break; - default: abort(); - } -} - -static void *conn_write_some_xmits(Conn *conn) { - /* return values: - * 0: nothing more to write, no need to call us again - * OOP_CONTINUE: more to write but fd not writeable - * OOP_HALT: disaster, have destroyed conn - */ - for (;;) { - int count= conn->xmitu; - if (!count) return 0; - - if (count > IOV_MAX) count= IOV_MAX; - ssize_t rs= writev(conn->fd, conn->xmit, count); - if (rs < 0) { - if (isewouldblock(errno)) return OOP_CONTINUE; - connfail(conn, "write failed: %s", strerror(errno)); - return OOP_HALT; - } - assert(rs > 0); - - int done; - for (done=0; rs; ) { - assert(donexmitu); - struct iovec *vp= &conn->xmit[done]; - XmitDetails *dp= &conn->xmitd[done]; - if (rs >= vp->iov_len) { - rs -= vp->iov_len; - xmit_free(dp); /* vp->iov_len -= vp->iov_len, etc. */ - done++; - } else { - vp->iov_base= (char*)vp->iov_base + rs; - vp->iov_len -= rs; - break; /* rs -= rs */ - } - } - int newu= conn->xmitu - done; - memmove(conn->xmit, conn->xmit + done, newu * sizeof(*conn->xmit)); - memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd)); - conn->xmitu= newu; - } -} - -static void conn_make_some_xmits(Conn *conn) { - for (;;) { - if (conn->xmitu+5 > CONNIOVS) - break; - - Article *art= LIST_REMHEAD(conn->priority); - if (!art) art= LIST_REMHEAD(conn->waiting); - if (!art) break; - - if (art->state >= art_Wanted || (conn->stream && nocheck)) { - /* actually send it */ - - ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL); - - art->state= - art->state == art_Unchecked ? art_Unsolicited : - art->state == art_Wanted ? art_Wanted : - (abort(),-1); - - if (!artdata) art->missing= 1; - art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++; - - if (conn->stream) { - if (artdata) { - XMIT_LITERAL("TAKETHIS "); - xmit_noalloc(conn, art->messageid, art->midlen); - XMIT_LITERAL("\r\n"); - xmit_artbody(conn, artdata); - } else { - article_done(art, -1); - continue; - } - } else { - /* we got 235 from IHAVE */ - if (artdata) { - xmit_artbody(conn, artdata); - } else { - XMIT_LITERAL(".\r\n"); - } - } - - LIST_ADDTAIL(conn->sent, art); - - } else { - /* check it */ - - if (conn->stream) - XMIT_LITERAL("CHECK "); - else - XMIT_LITERAL("IHAVE "); - xmit_noalloc(conn, art->messageid, art->midlen); - XMIT_LITERAL("\r\n"); - - assert(art->state == art_Unchecked); - art->ipf->counts[art->state][RC_sent]++; - LIST_ADDTAIL(conn->sent, art); - } - } -} - -/*========== handling responses from peer ==========*/ - -static const oop_rd_style peer_rd_style= { - OOP_RD_DELIM_STRIP, '\n', - OOP_RD_NUL_FORBID, - OOP_RD_SHORTREC_FORBID -}; - -static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev, - const char *errmsg, int errnoval, - const char *data, size_t recsz, void *conn_v) { - Conn *conn= conn_v; - connfail(conn, "error receiving from peer: %s", errmsg); - return OOP_CONTINUE; -} - -static Article *article_reply_check(Conn *conn, const char *response, - int code_indicates_streaming, - int must_have_sent - /* 1:yes, -1:no, 0:dontcare */, - const char *sanitised_response) { - Article *art= LIST_HEAD(conn->sent); - - if (!art) { - connfail(conn, - "peer gave unexpected response when no commands outstanding: %s", - sanitised_response); - return 0; - } - - if (code_indicates_streaming) { - assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */ - if (!conn->stream) { - connfail(conn, "peer gave streaming response code " - " to IHAVE or subsequent body: %s", sanitised_response); - return 0; - } - const char *got_mid= response+4; - int got_midlen= strcspn(got_mid, " \n\r"); - if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') { - connfail(conn, "peer gave streaming response with syntactically invalid" - " messageid: %s", sanitised_response); - return 0; - } - if (got_midlen != art->midlen || - memcmp(got_mid, art->messageid, got_midlen)) { - connfail(conn, "peer gave streaming response code to wrong article -" - " probable synchronisation problem; we offered: %s;" - " peer said: %s", - art->messageid, sanitised_response); - return 0; - } - } else { - if (conn->stream) { - connfail(conn, "peer gave non-streaming response code to" - " CHECK/TAKETHIS: %s", sanitised_response); - return 0; - } - } - - if (must_have_sent>0 && art->state < art_Wanted) { - connfail(conn, "peer says article accepted but" - " we had not sent the body: %s", sanitised_response); - return 0; - } - if (must_have_sent<0 && art->state >= art_Wanted) { - connfail(conn, "peer says please sent the article but we just did: %s", - sanitised_response); - return 0; - } - - Article *art_again= LIST_REMHEAD(conn->sent); - assert(art_again == art); - return art; -} - -static void update_nocheck(int accepted) { - accept_proportion *= nocheck_decay; - accept_proportion += accepted * (1.0 - nocheck_decay); - int new_nocheck= accept_proportion >= nocheck_thresh; - if (new_nocheck && !nocheck_reported) { - notice("entering nocheck mode for the first time"); - nocheck_reported= 1; - } else if (new_nocheck != nocheck) { - dbg("nocheck mode %s", new_nocheck ? "start" : "stop"); - } - nocheck= new_nocheck; -} - -static void article_done(Article *art, int whichcount) { - if (whichcount>=0 && !art->missing) - art->ipf->counts[art->state][whichcount]++; - - if (whichcount == RC_accepted) update_nocheck(1); - else if (whichcount == RC_unwanted) update_nocheck(0); - - InputFile *ipf= art->ipf; - - while (art->blanklen) { - static const char spaces[]= - " " - " " - " " - " " - " " - " " - " " - " " - " "; - int w= art->blanklen; if (w >= sizeof(spaces)) w= sizeof(spaces)-1; - int r= pwrite(ipf->fd, spaces, w, art->offset); - if (r==-1) { - if (errno==EINTR) continue; - syscrash("failed to blank entry for %s (length %d at offset %lu) in %s", - art->messageid, art->blanklen, - (unsigned long)art->offset, ipf->path); - } - assert(r>=0 && r<=w); - art->blanklen -= w; - art->offset += w; - } - - ipf->inprogress--; - assert(ipf->inprogress >= 0); - free(art); - - if (!ipf->inprogress && ipf != main_input_file) - queue_check_input_done(); -} - -static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, - const char *errmsg, int errnoval, - const char *data, size_t recsz, void *conn_v) { - Conn *conn= conn_v; - - if (ev == OOP_RD_EOF) { - connfail(conn, "unexpected EOF from peer"); - return OOP_CONTINUE; - } - assert(ev == OOP_RD_OK); - - char *sani= sanitise(data,-1); - - char *ep; - unsigned long code= strtoul(data, &ep, 10); - if (ep != data+3 || *ep != ' ' || data[0]=='0') { - connfail(conn, "badly formatted response from peer: %s", sani); - return OOP_CONTINUE; - } - - int busy= conn_busy(conn); - - if (conn->quitting) { - if (code!=205 && code!=400) { - connfail(conn, "peer gave unexpected response to QUIT (%s): %s", - conn->quitting, sani); - } else { - LIST_REMOVE(conns,conn); - notice("C%d (now %d) idle connection closed (%s)", - conn->fd, conns.count, conn->quitting); - assert(!busy); - conn_dispose(conn); - } - return OOP_CONTINUE; - } - - conn->since_activity= 0; - Article *art; - -#define GET_ARTICLE(musthavesent) do{ \ - art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \ - if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */ \ - }while(0) - -#define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{ \ - code_streaming= (streaming); \ - GET_ARTICLE(musthavesent); \ - article_done(art, RC_##how); \ - goto dealtwith; \ - }while(0) - -#define PEERBADMSG(m) do { \ - connfail(conn, m ": %s", sani); return OOP_CONTINUE; \ - }while(0) - - int code_streaming= 0; - - switch (code) { - - default: PEERBADMSG("peer sent unexpected message"); - - case 400: - if (busy) - PEERBADMSG("peer timed us out or stopped accepting articles"); - - LIST_REMOVE(conns,conn); - notice("C%d (now %d) idle connection closed by peer", - conns.count, conn->fd); - conn_dispose(conn); - return OOP_CONTINUE; - - case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */ - case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */ - - case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */ - case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */ - - case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */ - case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */ - - case 238: /* CHECK says send it */ - code_streaming= 1; - case 335: /* IHAVE says send it */ - GET_ARTICLE(-1); - assert(art->state == art_Unchecked); - art->ipf->counts[art->state][RC_accepted]++; - art->state= art_Wanted; - LIST_ADDTAIL(conn->priority, art); - break; - - case 431: /* CHECK or TAKETHIS says try later */ - code_streaming= 1; - case 436: /* IHAVE says try later */ - GET_ARTICLE(0); - article_defer(art, RC_deferred); - break; - - } -dealtwith: - - conn_maybe_write(conn); - check_assign_articles(); - return OOP_CONTINUE; -} - - -/*========== monitoring of input files ==========*/ - -static void feedfile_eof(InputFile *ipf) { - assert(ipf != main_input_file); /* promised by tailing_try_read */ - inputfile_reading_stop(ipf); - - if (ipf == flushing_input_file) { - assert(sms==sm_SEPARATED || sms==sm_DROPPING); - if (main_input_file) inputfile_reading_start(main_input_file); - statemc_check_flushing_done(); - } else if (ipf == backlog_input_file) { - statemc_check_backlog_done(); - } else { - abort(); /* supposed to wait rather than get EOF on main input file */ - } -} - -static InputFile *open_input_file(const char *path) { - int fd= open(path, O_RDWR); - if (fd<0) { - if (errno==ENOENT) return 0; - sysdie("unable to open input file %s", path); - } - assert(fd>0); - - InputFile *ipf= xmalloc(sizeof(*ipf) + strlen(path) + 1); - memset(ipf,0,sizeof(*ipf)); - - ipf->fd= fd; - ipf->autodefer= -1; - LIST_INIT(ipf->queue); - strcpy(ipf->path, path); - - return ipf; -} - -static void close_input_file(InputFile *ipf) { /* does not free */ - assert(!ipf->readable_callback); /* must have had ->on_cancel */ - assert(!ipf->filemon); /* must have had inputfile_reading_stop */ - assert(!ipf->rd); /* must have had inputfile_reading_stop */ - assert(!ipf->inprogress); /* no dangling pointers pointing here */ - xclose_perhaps(&ipf->fd, "input file ", ipf->path); -} - - -/*---------- dealing with articles read in the input file ----------*/ - -static void *feedfile_got_bad_data(InputFile *ipf, off_t offset, - const char *data, const char *how) { - warn("corrupted file: %s, offset %lu: %s: in %s", - ipf->path, (unsigned long)offset, how, sanitise(data,-1)); - ipf->readcount_err++; - if (ipf->readcount_err > max_bad_data_initial + - (ipf->readcount_ok+ipf->readcount_blank) / max_bad_data_ratio) - crash("too much garbage in input file! (%d errs, %d ok, %d blank)", - ipf->readcount_err, ipf->readcount_ok, ipf->readcount_blank); - return OOP_CONTINUE; -} - -static void *feedfile_read_err(oop_source *lp, oop_read *rd, - oop_rd_event ev, const char *errmsg, - int errnoval, const char *data, size_t recsz, - void *ipf_v) { - InputFile *ipf= ipf_v; - assert(ev == OOP_RD_SYSTEM); - errno= errnoval; - syscrash("error reading input file: %s, offset %lu", - ipf->path, (unsigned long)ipf->offset); -} - -static void *feedfile_got_article(oop_source *lp, oop_read *rd, - oop_rd_event ev, const char *errmsg, - int errnoval, const char *data, size_t recsz, - void *ipf_v) { - InputFile *ipf= ipf_v; - Article *art; - char tokentextbuf[sizeof(TOKEN)*2+3]; - - if (!data) { feedfile_eof(ipf); return OOP_CONTINUE; } - - off_t old_offset= ipf->offset; - ipf->offset += recsz + !!(ev == OOP_RD_OK); - -#define X_BAD_DATA(m) return feedfile_got_bad_data(ipf,old_offset,data,m); - - if (ev==OOP_RD_PARTREC) - feedfile_got_bad_data(ipf,old_offset,data,"missing final newline"); - /* but process it anyway */ - - if (ipf->skippinglong) { - if (ev==OOP_RD_OK) ipf->skippinglong= 0; /* fine now */ - return OOP_CONTINUE; - } - if (ev==OOP_RD_LONG) { - ipf->skippinglong= 1; - X_BAD_DATA("overly long line"); - } - - if (memchr(data,'\0',recsz)) X_BAD_DATA("nul byte"); - if (!recsz) X_BAD_DATA("empty line"); - - if (data[0]==' ') { - if (strspn(data," ") != recsz) X_BAD_DATA("line partially blanked"); - ipf->readcount_blank++; - return OOP_CONTINUE; - } - - char *space= strchr(data,' '); - int tokenlen= space-data; - int midlen= (int)recsz-tokenlen-1; - if (midlen <= 2) X_BAD_DATA("no room for messageid"); - if (space[1]!='<' || space[midlen]!='>') X_BAD_DATA("invalid messageid"); - - if (tokenlen != sizeof(TOKEN)*2+2) X_BAD_DATA("token wrong length"); - memcpy(tokentextbuf, data, tokenlen); - tokentextbuf[tokenlen]= 0; - if (!IsToken(tokentextbuf)) X_BAD_DATA("token wrong syntax"); - - ipf->readcount_ok++; - - art= xmalloc(sizeof(*art) - 1 + midlen + 1); - memset(art,0,sizeof(*art)); - art->state= art_Unchecked; - art->midlen= midlen; - art->ipf= ipf; ipf->inprogress++; - art->token= TextToToken(tokentextbuf); - art->offset= old_offset; - art->blanklen= recsz; - strcpy(art->messageid, space+1); - - if (ipf->autodefer >= 0) { - article_autodefer(ipf, art); - } else { - LIST_ADDTAIL(ipf->queue, art); - - if (ipf==backlog_input_file) - article_check_expired(art); - } - - if (sms==sm_NORMAL && ipf==main_input_file && - ipf->offset >= target_max_feedfile_size) - statemc_start_flush("feed file size"); - - check_assign_articles(); /* may destroy conn but that's OK */ - check_reading_pause_resume(ipf); - return OOP_CONTINUE; -} - -/*========== tailing input file ==========*/ - -static void *tailing_rable_call_time(oop_source *loop, struct timeval tv, - void *user) { - /* lifetime of ipf here is OK because destruction will cause - * on_cancel which will cancel this callback */ - InputFile *ipf= user; - - dbg("**TRACT** ipf=%p called",ipf); - if (!ipf->fake_readable) return OOP_CONTINUE; - - /* we just keep calling readable until our caller (oop_rd) - * has called try_read, and try_read has found EOF so given EAGAIN */ - dbg("**TRACT** ipf=%p reschedule",ipf); - loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); - - return ipf->readable_callback(loop, &ipf->readable, - ipf->readable_callback_user); -} - -static void tailing_on_cancel(struct oop_readable *rable) { - InputFile *ipf= (void*)rable; - dbg("**TOR** ipf=%p on_cancel",ipf); - - if (ipf->filemon) filemon_stop(ipf); - dbg("**TRACT** ipf=%p cancel",ipf); - loop->cancel_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); - ipf->readable_callback= 0; -} - -static void tailing_make_readable(InputFile *ipf) { - dbg("**TRACT** ipf=%p makereadable rcb=%p",ipf, - (void*)ipf?ipf->readable_callback:0); - if (!ipf || !ipf->readable_callback) /* so callers can be naive */ - return; - ipf->fake_readable= 1; - loop->on_time(loop, OOP_TIME_NOW, tailing_rable_call_time, ipf); -} - -static int tailing_on_readable(struct oop_readable *rable, - oop_readable_call *cb, void *user) { - InputFile *ipf= (void*)rable; - dbg("**TOR** ipf=%p on_readable",ipf); - - tailing_on_cancel(rable); - ipf->readable_callback= cb; - ipf->readable_callback_user= user; - filemon_start(ipf); - tailing_make_readable(ipf); - return 0; -} - -static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, - size_t length) { - InputFile *ipf= (void*)rable; - for (;;) { - ssize_t r= read(ipf->fd, buffer, length); - if (r==-1) { - if (errno==EINTR) continue; - ipf->fake_readable= 0; - return r; - } - if (!r) { - if (ipf==main_input_file) { - errno=EAGAIN; - ipf->fake_readable= 0; - return -1; - } else if (ipf==flushing_input_file) { - assert(ipf->rd); - assert(sms==sm_SEPARATED || sms==sm_DROPPING); - } else if (ipf==backlog_input_file) { - assert(ipf->rd); - } else { - abort(); - } - } - dbg("**TOR** ipf=%p try_read r=%d",ipf,r); - return r; - } -} - -/*---------- filemon implemented with inotify ----------*/ - -#if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON) -#define HAVE_FILEMON - -#include - -static int filemon_inotify_fd; -static int filemon_inotify_wdmax; -static InputFile **filemon_inotify_wd2ipf; - -struct Filemon_Perfile { - int wd; -}; - -static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { - int wd= inotify_add_watch(filemon_inotify_fd, ipf->path, IN_MODIFY); - if (wd < 0) sysdie("inotify_add_watch %s", ipf->path); - - if (wd >= filemon_inotify_wdmax) { - int newmax= wd+2; - filemon_inotify_wd2ipf= xrealloc(filemon_inotify_wd2ipf, - sizeof(*filemon_inotify_wd2ipf) * newmax); - memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0, - sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax)); - filemon_inotify_wdmax= newmax; - } - - assert(!filemon_inotify_wd2ipf[wd]); - filemon_inotify_wd2ipf[wd]= ipf; - - dbg("filemon inotify startfile %p wd=%d wdmax=%d", - ipf, wd, filemon_inotify_wdmax); - - pf->wd= wd; -} - -static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { - int wd= pf->wd; - dbg("filemon inotify stopfile %p wd=%d", ipf, wd); - int r= inotify_rm_watch(filemon_inotify_fd, wd); - if (r) syscrash("inotify_rm_watch"); - filemon_inotify_wd2ipf[wd]= 0; -} - -static void *filemon_inotify_readable(oop_source *lp, int fd, - oop_event e, void *u) { - struct inotify_event iev; - for (;;) { - int r= read(filemon_inotify_fd, &iev, sizeof(iev)); - if (r==-1) { - if (isewouldblock(errno)) break; - syscrash("read from inotify master"); - } else if (r==sizeof(iev)) { - assert(iev.wd >= 0 && iev.wd < filemon_inotify_wdmax); - } else { - crash("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev)); - } - InputFile *ipf= filemon_inotify_wd2ipf[iev.wd]; - /*dbg("filemon inotify readable read %p wd=%d", ipf, iev.wd);*/ - tailing_make_readable(ipf); - } - return OOP_CONTINUE; -} - -static int filemon_method_init(void) { - filemon_inotify_fd= inotify_init(); - if (filemon_inotify_fd<0) { - syswarn("filemon/inotify: inotify_init failed"); - return 0; - } - xsetnonblock(filemon_inotify_fd, 1); - loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable, 0); - - dbg("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd); - return 1; -} - -static void filemon_method_dump_info(FILE *f) { - int i; - fprintf(f,"inotify"); - DUMPV("%d",,filemon_inotify_fd); - DUMPV("%d",,filemon_inotify_wdmax); - for (i=0; ifilemon); - - NEW(ipf->filemon); - filemon_method_startfile(ipf, ipf->filemon); -} - -static void filemon_stop(InputFile *ipf) { - if (!ipf->filemon) return; - filemon_method_stopfile(ipf, ipf->filemon); - free(ipf->filemon); - ipf->filemon= 0; -} - -/*---------- interface to start and stop an input file ----------*/ +#include "innduct.h" -static const oop_rd_style feedfile_rdstyle= { - OOP_RD_DELIM_STRIP, '\n', - OOP_RD_NUL_PERMIT, - OOP_RD_SHORTREC_LONG, +const char *sms_names[]= { +#define SMS_DEF_NAME(s) #s , + SMS_LIST(SMS_DEF_NAME) + 0 }; -static void inputfile_reading_resume(InputFile *ipf) { - if (!ipf->rd) return; - if (!ipf->paused) return; - - int r= oop_rd_read(ipf->rd, &feedfile_rdstyle, MAX_LINE_FEEDFILE, - feedfile_got_article,ipf, feedfile_read_err, ipf); - if (r) syscrash("unable start reading feedfile %s",ipf->path); - - ipf->paused= 0; -} - -static void inputfile_reading_pause(InputFile *ipf) { - if (!ipf->rd) return; - if (ipf->paused) return; - oop_rd_cancel(ipf->rd); - ipf->paused= 1; -} - -static void inputfile_reading_start(InputFile *ipf) { - assert(!ipf->rd); - ipf->readable.on_readable= tailing_on_readable; - ipf->readable.on_cancel= tailing_on_cancel; - ipf->readable.try_read= tailing_try_read; - ipf->readable.delete_tidy= 0; /* we never call oop_rd_delete_{tidy,kill} */ - ipf->readable.delete_kill= 0; - - ipf->readable_callback= 0; - ipf->readable_callback_user= 0; - - ipf->rd= oop_rd_new(loop, &ipf->readable, 0,0); - assert(ipf->rd); - - ipf->paused= 1; - inputfile_reading_resume(ipf); -} - -static void inputfile_reading_stop(InputFile *ipf) { - assert(ipf->rd); - inputfile_reading_pause(ipf); - oop_rd_delete(ipf->rd); - ipf->rd= 0; - assert(!ipf->filemon); /* we shouldn't be monitoring it now */ -} - - -/*========== interaction with innd - state machine ==========*/ - -/* See official state diagram at top of file. We implement - * this as follows: - * -8<- - - .=======. - ||START|| - `=======' - | - | open F - | - | F ENOENT - |`---------------------------------------------------. - F OPEN OK | | - |`---------------- - - - | - D ENOENT | D EXISTS see OVERALL STATES diagram | - | for full startup logic | - ,--------->| | - | V | - | ============ try to | - | NORMAL open D | - | [Normal] | - | main F tail | - | ============ V - | | | - | | F IS SO BIG WE SHOULD FLUSH, OR TIMEOUT | - ^ | hardlink F to D | - | [Hardlinked] | - | | unlink F | - | | our handle onto F is now onto D | - | [Moved] | - | | | - | |<-------------------<---------------------<---------+ - | | | - | | spawn inndcomm flush | - | V | - | ================== | - | FLUSHING[-ABSENT] | - | [Flushing] | - | main D tail/none | - | ================== | - | | | - | | INNDCOMM FLUSH FAILS ^ - | |`----------------------->----------. | - | | | | - | | NO SUCH SITE V | - ^ |`--------------->----. ==================== | - | | \ FLUSHFAILED[-ABSENT] | - | | \ [Moved] | - | | FLUSH OK \ main D tail/none | - | | open F \ ==================== | - | | \ | | - | | \ | TIME TO RETRY | - | |`------->----. ,---<---'\ `----------------' - | | D NONE | | D NONE `----. - | V | | V - | ============= V V ============ - | SEPARATED-1 | | DROPPING-1 - | flsh->rd!=0 | | flsh->rd!=0 - | [Separated] | | [Dropping] - | main F idle | | main none - | flsh D tail | | flsh D tail - | ============= | | ============ - | | | | install | - ^ | EOF ON D | | defer | EOF ON D - | V | | V - | =============== | | =============== - | SEPARATED-2 | | DROPPING-2 - | flsh->rd==0 | V flsh->rd==0 - | [Finishing] | | [Dropping] - | main F tail | `. main none - | flsh D closed | `. flsh D closed - | =============== V `. =============== - | | `. | - | | ALL D PROCESSED `. | ALL D PROCESSED - | V install defer as backlog `. | install defer - ^ | close D `. | close D - | | unlink D `. | unlink D - | | | | - | | V V - `----------' ============== - DROPPED - [Dropped] - main none - flsh none - some backlog - ============== - | - | ALL BACKLOG DONE - | - | unlink lock - | exit - V - ========== - (ESRCH) - [Droppped] - ========== - * ->8- - */ - -static void startup_set_input_file(InputFile *f) { - assert(!main_input_file); - main_input_file= f; - inputfile_reading_start(f); -} - -static void statemc_lock(void) { - int lockfd; - struct stat stab, stabf; - - for (;;) { - lockfd= open(path_lock, O_CREAT|O_RDWR, 0600); - if (lockfd<0) sysdie("open lockfile %s", path_lock); - - struct flock fl; - memset(&fl,0,sizeof(fl)); - fl.l_type= F_WRLCK; - fl.l_whence= SEEK_SET; - int r= fcntl(lockfd, F_SETLK, &fl); - if (r==-1) { - if (errno==EACCES || isewouldblock(errno)) { - if (quiet_multiple) exit(0); - die("another duct holds the lockfile"); - } - sysdie("fcntl F_SETLK lockfile %s", path_lock); - } - - xfstat_isreg(lockfd, &stabf, path_lock, "lockfile"); - int lock_noent; - xlstat_isreg(path_lock, &stab, &lock_noent, "lockfile"); - - if (!lock_noent && samefile(&stab, &stabf)) - break; - - xclose(lockfd, "stale lockfile ", path_lock); - } - - FILE *lockfile= fdopen(lockfd, "w"); - if (!lockfile) syscrash("fdopen lockfile"); - - int r= ftruncate(lockfd, 0); - if (r) syscrash("truncate lockfile to write new info"); - - if (fprintf(lockfile, "pid %ld\nsite %s\nfeedfile %s\nfqdn %s\n", - (unsigned long)self_pid, - sitename, feedfile, remote_host) == EOF || - fflush(lockfile)) - sysdie("write info to lockfile %s", path_lock); - - dbg("startup: locked"); -} - -static void statemc_init(void) { - struct stat stabdefer; - - search_backlog_file(); - - int defer_noent; - xlstat_isreg(path_defer, &stabdefer, &defer_noent, "defer file"); - if (defer_noent) { - dbg("startup: ductdefer ENOENT"); - } else { - dbg("startup: ductdefer nlink=%ld", (long)stabdefer.st_nlink); - switch (stabdefer.st_nlink==1) { - case 1: - open_defer(); /* so that we will later close it and rename it */ - break; - case 2: - xunlink(path_defer, "stale defer file link" - " (presumably hardlink to backlog file)"); - break; - default: - crash("defer file %s has unexpected link count %d", - path_defer, stabdefer.st_nlink); - } - } - - struct stat stab_f, stab_d; - int noent_f; - - InputFile *file_d= open_input_file(path_flushing); - if (file_d) xfstat_isreg(file_d->fd, &stab_d, path_flushing,"flushing file"); - - xlstat_isreg(feedfile, &stab_f, &noent_f, "feedfile"); - - if (!noent_f && file_d && samefile(&stab_f, &stab_d)) { - dbg("startup: F==D => Hardlinked"); - xunlink(feedfile, "feed file (during startup)"); /* => Moved */ - noent_f= 1; - } - - if (noent_f) { - dbg("startup: F ENOENT => Moved"); - if (file_d) startup_set_input_file(file_d); - spawn_inndcomm_flush("feedfile missing at startup"); - /* => Flushing, sms:=FLUSHING */ - } else { - if (file_d) { - dbg("startup: F!=D => Separated"); - startup_set_input_file(file_d); - flushing_input_file= main_input_file; - main_input_file= open_input_file(feedfile); - if (!main_input_file) crash("feedfile vanished during startup"); - SMS(SEPARATED, max_separated_periods, - "found both old and current feed files"); - } else { - dbg("startup: F exists, D ENOENT => Normal"); - InputFile *file_f= open_input_file(feedfile); - if (!file_f) crash("feed file vanished during startup"); - startup_set_input_file(file_f); - SMS(NORMAL, spontaneous_flush_periods, "normal startup"); - } - } -} - -static void statemc_start_flush(const char *why) { /* Normal => Flushing */ - assert(sms == sm_NORMAL); - - dbg("starting flush (%s) (%lu >?= %lu) (%d)", - why, - (unsigned long)(main_input_file ? main_input_file->offset : 0), - (unsigned long)target_max_feedfile_size, - until_flush); - - int r= link(feedfile, path_flushing); - if (r) sysdie("link feedfile %s to flushing file %s", - feedfile, path_flushing); - /* => Hardlinked */ - - xunlink(feedfile, "old feedfile link"); - /* => Moved */ - - spawn_inndcomm_flush(why); /* => Flushing FLUSHING */ -} - -static int trigger_flush_ok(const char *why) { - switch (sms) { - - case sm_NORMAL: - statemc_start_flush(why ? why : "periodic"); - return 1; /* Normal => Flushing; => FLUSHING */ - - case sm_FLUSHFAILED: - spawn_inndcomm_flush(why ? why : "retry"); - return 1; /* Moved => Flushing; => FLUSHING */ - - case sm_SEPARATED: - case sm_DROPPING: - warn("abandoning old feedfile after flush (%s), autodeferring", - why ? why : "took too long to complete"); - assert(flushing_input_file); - autodefer_input_file(flushing_input_file); - return 1; - - default: - return 0; - } -} - -static void statemc_period_poll(void) { - if (!until_flush) return; - until_flush--; - assert(until_flush>=0); - - if (until_flush) return; - int ok= trigger_flush_ok(0); - assert(ok); -} - -static int inputfile_is_done(InputFile *ipf) { - if (!ipf) return 0; - if (ipf->inprogress) return 0; /* new article in the meantime */ - if (ipf->rd) return 0; /* not had EOF */ - return 1; -} - -static void notice_processed(InputFile *ipf, int completed, - const char *what, const char *spec) { - if (!ipf) return; /* allows preterminate to be lazy */ - -#define RCI_NOTHING(x) /* nothing */ -#define RCI_TRIPLE_FMT(x) " " #x "=" RCI_TRIPLE_FMT_BASE -#define RCI_TRIPLE_VALS(x) , RCI_TRIPLE_VALS_BASE(ipf->counts, [RC_##x]) - -#define CNT(art,rc) (ipf->counts[art_##art][RC_##rc]) - - char *inprog= completed - ? xasprintf("%s","") /* GCC produces a stupid warning for printf("") ! */ - : xasprintf(" inprogress=%ld", ipf->inprogress); - char *autodefer= ipf->autodefer >= 0 - ? xasprintf(" autodeferred=%ld", ipf->autodefer) - : xasprintf("%s",""); - - info("%s %s%s read=%d (+bl=%d,+err=%d)%s%s" - " missing=%d offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)" - RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT) - , - completed?"completed":"processed", what, spec, - ipf->readcount_ok, ipf->readcount_blank, ipf->readcount_err, - inprog, autodefer, ipf->count_nooffer_missing, - CNT(Unchecked,sent) + CNT(Unsolicited,sent) - , CNT(Unchecked,sent), CNT(Unsolicited,sent), - CNT(Wanted,accepted) + CNT(Unsolicited,accepted) - , CNT(Wanted,accepted), CNT(Unsolicited,accepted) - RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_VALS) - ); - - free(inprog); - free(autodefer); - -#undef CNT -} - -static void statemc_check_backlog_done(void) { - InputFile *ipf= backlog_input_file; - if (!inputfile_is_done(ipf)) return; - - const char *slash= strrchr(ipf->path, '/'); - const char *leaf= slash ? slash+1 : ipf->path; - const char *under= strchr(slash, '_'); - const char *rest= under ? under+1 : leaf; - if (!strncmp(rest,"backlog",7)) rest += 7; - notice_processed(ipf,1,"backlog ",rest); - - close_input_file(ipf); - if (unlink(ipf->path)) { - if (errno != ENOENT) - syscrash("could not unlink processed backlog file %s", ipf->path); - warn("backlog file %s vanished while we were reading it" - " so we couldn't remove it (but it's done now, anyway)", - ipf->path); - } - free(ipf); - backlog_input_file= 0; - search_backlog_file(); - return; -} - -static void statemc_check_flushing_done(void) { - InputFile *ipf= flushing_input_file; - if (!inputfile_is_done(ipf)) return; - - assert(sms==sm_SEPARATED || sms==sm_DROPPING); - - notice_processed(ipf,1,"feedfile",""); - - close_defer(); - - xunlink(path_flushing, "old flushing file"); - - close_input_file(flushing_input_file); - free(flushing_input_file); - flushing_input_file= 0; - - if (sms==sm_SEPARATED) { - notice("flush complete"); - SMS(NORMAL, spontaneous_flush_periods, "flush complete"); - } else if (sms==sm_DROPPING) { - SMS(DROPPED, max_separated_periods, "old flush complete"); - search_backlog_file(); - notice("feed dropped, but will continue until backlog is finished"); - } -} - -static void *statemc_check_input_done(oop_source *lp, struct timeval now, - void *u) { - /* main input file may be idle but if so that's because - * we haven't got to it yet, but that doesn't mean it's really done */ - statemc_check_flushing_done(); - statemc_check_backlog_done(); - return OOP_CONTINUE; -} - -static void queue_check_input_done(void) { - loop->on_time(loop, OOP_TIME_NOW, statemc_check_input_done, 0); -} - -static void statemc_setstate(StateMachineState newsms, int periods, - const char *forlog, const char *why) { - sms= newsms; - until_flush= periods; - - const char *xtra= ""; - switch (sms) { - case sm_FLUSHING: - case sm_FLUSHFAILED: - if (!main_input_file) xtra= "-ABSENT"; - break; - case sm_SEPARATED: - case sm_DROPPING: - xtra= flushing_input_file->rd ? "-1" : "-2"; - break; - default:; - } - - if (periods) { - info("state %s%s[%d] %s",forlog,xtra,periods,why); - } else { - info("state %s%s %s",forlog,xtra,why); - } -} - -/*---------- defer and backlog files ----------*/ - -static void open_defer(void) { - struct stat stab; - - if (defer) return; - - defer= fopen(path_defer, "a+"); - if (!defer) sysdie("could not open defer file %s", path_defer); - - /* truncate away any half-written records */ - - xfstat_isreg(fileno(defer), &stab, path_defer, "newly opened defer file"); - - if (stab.st_size > LONG_MAX) - crash("defer file %s size is far too large", path_defer); - - if (!stab.st_size) - return; - - long orgsize= stab.st_size; - long truncto= stab.st_size; - for (;;) { - if (!truncto) break; /* was only (if anything) one half-truncated record */ - if (fseek(defer, truncto-1, SEEK_SET) < 0) - syscrash("seek in defer file %s while truncating partial", path_defer); - - int r= getc(defer); - if (r==EOF) { - if (ferror(defer)) - syscrash("failed read from defer file %s", path_defer); - else - crash("defer file %s shrank while we were checking it!", path_defer); - } - if (r=='\n') break; - truncto--; - } - - if (stab.st_size != truncto) { - warn("truncating half-record at end of defer file %s -" - " shrinking by %ld bytes from %ld to %ld", - path_defer, orgsize - truncto, orgsize, truncto); - - if (fflush(defer)) - sysdie("could not flush defer file %s", path_defer); - if (ftruncate(fileno(defer), truncto)) - syscrash("could not truncate defer file %s", path_defer); - - } else { - info("continuing existing defer file %s (%ld bytes)", - path_defer, orgsize); - } - if (fseek(defer, truncto, SEEK_SET)) - syscrash("could not seek to new end of defer file %s", path_defer); -} - -static void close_defer(void) { - if (!defer) - return; - - struct stat stab; - xfstat_isreg(fileno(defer), &stab, path_defer, "defer file"); - - if (fclose(defer)) sysdie("could not close defer file %s", path_defer); - defer= 0; - - time_t now= xtime(); - - char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile, - (unsigned long)now, - (unsigned long)stab.st_ino); - if (link(path_defer, backlog)) - sysdie("could not install defer file %s as backlog file %s", - path_defer, backlog); - if (unlink(path_defer)) - syscrash("could not unlink old defer link %s to backlog file %s", - path_defer, backlog); - - free(backlog); - - if (until_backlog_nextscan < 0 || - until_backlog_nextscan > backlog_retry_minperiods + 1) - until_backlog_nextscan= backlog_retry_minperiods + 1; -} - -static void poll_backlog_file(void) { - if (until_backlog_nextscan < 0) return; - if (until_backlog_nextscan-- > 0) return; - search_backlog_file(); -} - -static void search_backlog_file(void) { - /* returns non-0 iff there are any backlog files */ - - glob_t gl; - int r, i; - struct stat stab; - const char *oldest_path=0; - time_t oldest_mtime=0, now; - - if (backlog_input_file) return; - - try_again: - - r= glob(globpat_backlog, GLOB_ERR|GLOB_MARK|GLOB_NOSORT, 0, &gl); - - switch (r) { - case GLOB_ABORTED: - sysdie("failed to expand backlog pattern %s", globpat_backlog); - case GLOB_NOSPACE: - die("out of memory expanding backlog pattern %s", globpat_backlog); - case 0: - for (i=0; i= 0 && - until_backlog_nextscan > backlog_spontrescan_periods) - until_backlog_nextscan= backlog_spontrescan_periods; - - dbg("backlog scan: young age=%f deficiency=%ld nextscan=%d oldest=%s", - age, age_deficiency, until_backlog_nextscan, oldest_path); - - xfree: - globfree(&gl); - return; -} - -/*---------- shutdown and signal handling ----------*/ - -static void preterminate(void) { - if (in_child) return; - notice_processed(main_input_file,0,"feedfile",""); - notice_processed(flushing_input_file,0,"flushing",""); - if (backlog_input_file) - notice_processed(backlog_input_file,0, "backlog file ", - backlog_input_file->path); -} - -static int signal_self_pipe[2]; -static sig_atomic_t terminate_sig_flag; - -static void raise_default(int signo) { - xsigsetdefault(signo); - raise(signo); - abort(); -} - -static void *sigarrived_event(oop_source *lp, int fd, oop_event e, void *u) { - assert(fd=signal_self_pipe[0]); - char buf[PIPE_BUF]; - int r= read(signal_self_pipe[0], buf, sizeof(buf)); - if (r<0 && !isewouldblock(errno)) - syscrash("failed to read signal self pipe"); - if (r==0) crash("eof on signal self pipe"); - if (terminate_sig_flag) { - preterminate(); - notice("terminating (%s)", strsignal(terminate_sig_flag)); - raise_default(terminate_sig_flag); - } - return OOP_CONTINUE; -} - -static void sigarrived_handler(int signum) { - static char x; - switch (signum) { - case SIGTERM: - case SIGINT: - if (!terminate_sig_flag) terminate_sig_flag= signum; - break; - default: - abort(); - } - write(signal_self_pipe[1],&x,1); -} - -static void init_signals(void) { - if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) - syscrash("could not ignore SIGPIPE"); - - if (pipe(signal_self_pipe)) sysdie("create self-pipe for signals"); - - xsetnonblock(signal_self_pipe[0],1); - xsetnonblock(signal_self_pipe[1],1); - - struct sigaction sa; - memset(&sa,0,sizeof(sa)); - sa.sa_handler= sigarrived_handler; - sa.sa_flags= SA_RESTART; - xsigaction(SIGTERM,&sa); - xsigaction(SIGINT,&sa); - - on_fd_read_except(signal_self_pipe[0], sigarrived_event); -} - -/*========== flushing the feed ==========*/ - -static pid_t inndcomm_child; -static int inndcomm_sentinel_fd; - -static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { - assert(inndcomm_child); - assert(fd == inndcomm_sentinel_fd); - int status= xwaitpid(&inndcomm_child, "inndcomm"); - inndcomm_child= 0; - - cancel_fd_read_except(fd); - xclose_perhaps(&fd, "inndcomm sentinel pipe",0); - inndcomm_sentinel_fd= 0; - - assert(!flushing_input_file); - - if (WIFEXITED(status)) { - switch (WEXITSTATUS(status)) { - - case INNDCOMMCHILD_ESTATUS_FAIL: - goto failed; - - case INNDCOMMCHILD_ESTATUS_NONESUCH: - notice("feed has been dropped by innd, finishing up"); - flushing_input_file= main_input_file; - tailing_make_readable(flushing_input_file); - /* we probably previously returned EAGAIN from our fake read method - * when in fact we were at EOF, so signal another readable event - * so we actually see the EOF */ - - main_input_file= 0; - - if (flushing_input_file) { - SMS(DROPPING, max_separated_periods, - "feed dropped by innd, but must finish last flush"); - } else { - close_defer(); - SMS(DROPPED, 0, "feed dropped by innd"); - search_backlog_file(); - } - return OOP_CONTINUE; - - case 0: - /* as above */ - flushing_input_file= main_input_file; - tailing_make_readable(flushing_input_file); - - main_input_file= open_input_file(feedfile); - if (!main_input_file) - crash("flush succeeded but feedfile %s does not exist!" - " (this probably means feedfile does not correspond" - " to site %s in newsfeeds)", feedfile, sitename); - - if (flushing_input_file) { - SMS(SEPARATED, max_separated_periods, "flush complete"); - } else { - close_defer(); - SMS(NORMAL, spontaneous_flush_periods, "recovery flush complete"); - } - return OOP_CONTINUE; - - default: - goto unexpected_exitstatus; - - } - } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { - warn("flush timed out trying to talk to innd"); - goto failed; - } else { - unexpected_exitstatus: - report_child_status("inndcomm child", status); - } - - failed: - SMS(FLUSHFAILED, flushfail_retry_periods, "flush failed, will retry"); - return OOP_CONTINUE; -} - -static void inndcommfail(const char *what) { - syswarn("error communicating with innd: %s failed: %s", what, ICCfailure); - exit(INNDCOMMCHILD_ESTATUS_FAIL); -} - -void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ - int pipefds[2]; - - notice("flushing %s",why); - - assert(sms==sm_NORMAL || sms==sm_FLUSHFAILED); - assert(!inndcomm_child); - assert(!inndcomm_sentinel_fd); - - if (pipe(pipefds)) sysdie("create pipe for inndcomm child sentinel"); - - inndcomm_child= xfork("inndcomm child"); - - if (!inndcomm_child) { - const char *flushargv[2]= { sitename, 0 }; - char *reply; - int r; - - xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0); - /* parent spots the autoclose of pipefds[1] when we die or exit */ - - if (simulate_flush>=0) { - warn("SIMULATING flush child status %d", simulate_flush); - if (simulate_flush>128) raise(simulate_flush-128); - else exit(simulate_flush); - } - - alarm(inndcomm_flush_timeout); - r= ICCopen(); if (r) inndcommfail("connect"); - r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit"); - if (!r) exit(0); /* yay! */ - - if (!strcmp(reply, "1 No such site")) exit(INNDCOMMCHILD_ESTATUS_NONESUCH); - syswarn("innd ctlinnd flush failed: innd said %s", reply); - exit(INNDCOMMCHILD_ESTATUS_FAIL); - } +struct Conn { + ISNODE(Conn); + int fd; /* may be 0, meaning closed (during construction/destruction) */ + oop_read *rd; /* likewise */ + int oopwriting; /* since on_fd is not idempotent */ + int max_queue, stream; + const char *quitting; + int since_activity; /* periods */ + ArticleList waiting; /* not yet told peer */ + ArticleList priority; /* peer says send it now */ + ArticleList sent; /* offered/transmitted - in xmit or waiting reply */ + struct iovec xmit[CONNIOVS]; + XmitDetails xmitd[CONNIOVS]; + int xmitu; +}; - simulate_flush= -1; - xclose(pipefds[1], "inndcomm sentinel child's end",0); - inndcomm_sentinel_fd= pipefds[0]; - assert(inndcomm_sentinel_fd); - on_fd_read_except(inndcomm_sentinel_fd, inndcomm_event); +/*----- general operational variables -----*/ - SMS(FLUSHING, 0, why); -} +/* main initialises */ +oop_source *loop; +ConnList conns; +char *path_lock, *path_flushing, *path_defer, *path_dump; +char *globpat_backlog; +pid_t self_pid; +int *lowvol_perperiod; +int lowvol_circptr; +int lowvol_total; /* does not include current period */ /*========== main program ==========*/ @@ -3258,7 +74,7 @@ static void postfork_stdio(FILE *f, const char *what, const char *what2) { if (fclose(f)) syscrash("(in child) close %s%s", what, what2?what2:0); } -static void postfork(void) { +void postfork(void) { in_child= 1; xsigsetdefault(SIGTERM); @@ -3310,11 +126,6 @@ static void every(int interval, int fixed_rate, void (*f)(void)) { every_schedule(e, now); } -static void filepoll(void) { - tailing_make_readable(main_input_file); - tailing_make_readable(flushing_input_file); -} - static char *dbg_report_ipf(InputFile *ipf) { if (!ipf) return xasprintf("none"); @@ -3360,145 +171,6 @@ static void period(void) { } -/*========== dumping state ==========*/ - -static void dump_article_list(FILE *f, const CliCommand *c, - const ArticleList *al) { - fprintf(f, " count=%d\n", al->count); - if (!c->xval) return; - - int i; Article *art; - for (i=0, art=LIST_HEAD(*al); art; i++, art=LIST_NEXT(art)) { - fprintf(f," #%05d %-11s", i, artstate_names[art->state]); - DUMPV("%p", art->,ipf); - DUMPV("%d", art->,missing); - DUMPV("%lu", (unsigned long)art->,offset); - DUMPV("%d", art->,blanklen); - DUMPV("%d", art->,midlen); - fprintf(f, " %s %s\n", TokenToText(art->token), art->messageid); - } -} - -static void dump_input_file(FILE *f, const CliCommand *c, - InputFile *ipf, const char *wh) { - char *dipf= dbg_report_ipf(ipf); - fprintf(f,"input %s %s", wh, dipf); - free(dipf); - - if (ipf) { - DUMPV("%d", ipf->,readcount_ok); - DUMPV("%d", ipf->,readcount_blank); - DUMPV("%d", ipf->,readcount_err); - DUMPV("%d", ipf->,count_nooffer_missing); - } - fprintf(f,"\n"); - if (ipf) { - ArtState state; const char *const *statename; - for (state=0, statename=artstate_names; *statename; state++,statename++) { -#define RC_DUMP_FMT(x) " " #x "=%d" -#define RC_DUMP_VAL(x) ,ipf->counts[state][RC_##x] - fprintf(f,"input %s counts %-11s" - RESULT_COUNTS(RC_DUMP_FMT,RC_DUMP_FMT) "\n", - wh, *statename - RESULT_COUNTS(RC_DUMP_VAL,RC_DUMP_VAL)); - } - fprintf(f,"input %s queue", wh); - dump_article_list(f,c,&ipf->queue); - } -} - -CCMD(dump) { - int i; - fprintf(cc->out, "dumping state to %s\n", path_dump); - FILE *f= fopen(path_dump, "w"); - if (!f) { fprintf(cc->out, "failed: open: %s\n", strerror(errno)); return; } - - fprintf(f,"general"); - DUMPV("%s", sms_names,[sms]); - DUMPV("%d", ,until_flush); - DUMPV("%ld", (long),self_pid); - DUMPV("%p", , defer); - DUMPV("%d", , until_connect); - DUMPV("%d", , until_backlog_nextscan); - DUMPV("%d", , simulate_flush); - fprintf(f,"\nnocheck"); - DUMPV("%#.10f", , accept_proportion); - DUMPV("%d", , nocheck); - DUMPV("%d", , nocheck_reported); - fprintf(f,"\n"); - - fprintf(f,"special"); - DUMPV("%ld", (long),connecting_child); - DUMPV("%d", , connecting_fdpass_sock); - DUMPV("%d", , cli_master); - fprintf(f,"\n"); - - fprintf(f,"lowvol"); - DUMPV("%d", , lowvol_circptr); - DUMPV("%d", , lowvol_total); - fprintf(f,":"); - for (i=0; ifd); - DUMPV("%p",conn->,rd); DUMPV("%d",conn->,max_queue); - DUMPV("%d",conn->,stream); DUMPV("\"%s\"",conn->,quitting); - DUMPV("%d",conn->,since_activity); - fprintf(f,"\n"); - - fprintf(f,"C%d waiting", conn->fd); dump_article_list(f,c,&conn->waiting); - fprintf(f,"C%d priority",conn->fd); dump_article_list(f,c,&conn->priority); - fprintf(f,"C%d sent", conn->fd); dump_article_list(f,c,&conn->sent); - - fprintf(f,"C%d xmit xmitu=%d\n", conn->fd, conn->xmitu); - for (i=0; ixmitu; i++) { - const struct iovec *iv= &conn->xmit[i]; - const XmitDetails *xd= &conn->xmitd[i]; - char *dinfo; - switch (xd->kind) { - case xk_Const: dinfo= xasprintf("Const"); break; - case xk_Artdata: dinfo= xasprintf("A%p", xd->info.sm_art); break; - default: - abort(); - } - fprintf(f," #%03d %-11s l=%d %s\n", i, dinfo, iv->iov_len, - sanitise(iv->iov_base, iv->iov_len)); - free(dinfo); - } - } - - fprintf(f,"paths"); - DUMPV("%s", , feedfile); - DUMPV("%s", , path_cli); - DUMPV("%s", , path_lock); - DUMPV("%s", , path_flushing); - DUMPV("%s", , path_defer); - DUMPV("%s", , path_dump); - DUMPV("%s", , globpat_backlog); - fprintf(f,"\n"); - - if (!!ferror(f) + !!fclose(f)) { - fprintf(cc->out, "failed: write: %s\n", strerror(errno)); - return; - } -} - /*========== option parsing ==========*/ static void vbadusage(const char *fmt, va_list al) NORET_PRINTF(1,0); @@ -3551,7 +223,7 @@ static void parse_options(const Option *options, char ***argvp) { if (a=='-') { arg++; char *equals= strchr(arg,'='); - int len= equals ? (equals - arg) : strlen(arg); + unsigned len= equals ? (size_t)(equals - arg) : strlen(arg); for (o=options; o->shrt || o->lng; o++) if (strlen(o->lng) == len && !memcmp(o->lng,arg,len)) goto found_long; @@ -3835,7 +507,6 @@ int main(int argc, char **argv) { LIST_INIT(conns); if (interactive < 1) { - int i; for (i=3; i<255; i++) /* do this now before we open syslog, etc. */ close(i);