From: Ian Jackson Date: Tue, 23 Mar 2010 22:35:43 +0000 (+0000) Subject: Reorganisation and a bit of new code X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?p=inn-innduct.git;a=commitdiff_plain;h=68e3b7e4d8578bb5caa22b913fdf56f962eedd6f Reorganisation and a bit of new code --- diff --git a/backends/innduct.c b/backends/innduct.c index ba4c6bd..2bc5075 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -134,8 +134,17 @@ * */ + +/*----- general definitions, probably best not changed -----*/ + #define PERIOD_SECONDS 60 +#define CONNCHILD_ESTATUS_STREAM 4 +#define CONNCHILD_ESTATUS_NOSTREAM 5 + + +/*----- configuration options -----*/ + static char *feedfile; static int max_connections, max_queue_per_conn; static int connection_setup_timeout, port, try_stream; @@ -146,6 +155,9 @@ static double nocheck_thresh= 0.95; static double nocheck_decay= 1-1/100; static int nocheck, nocheck_reported; + +/*----- doubly linked lists -----*/ + #define ISNODE(T) T *next, *back; #define LIST(T) struct { T *head, *tail, *tailpred; int count; } @@ -166,6 +178,8 @@ static int nocheck, nocheck_reported; (list_insert((struct list*)&(l), NODE((n)), NODE((pred))), (void)(l).count++) +/*----- statistics -----*/ + #define RESULT_COUNTS \ RC(offered) \ RC(sent) \ @@ -184,21 +198,10 @@ typedef struct { int articles[2 /* checked */][RCI_max]; } Counts; -struct Article { - int midlen; - int checked, sentbody; - InputFile *ipf; - TOKEN token; - off_t offset; - int blanklen; - char messageid[1]; -}; -#define CONNIOVS 128 +/*----- transmission buffers -----*/ -#define CN "<%d> " - -typedef struct Conn Conn; +#define CONNIOVS 128 typedef enum { xk_Malloc, xk_Const, xk_Artdata; @@ -212,6 +215,44 @@ typedef struct { } info; } XmitDetails; + +/*----- core operational data structure types -----*/ + +struct Article { + int midlen; + int checked, sentbody; + InputFile *ipf; + TOKEN token; + off_t offset; + int blanklen; + char messageid[1]; +}; + +typedef struct { + /* This is an instance of struct oop_readable */ + struct oop_readable readable; /* first */ + oop_readable_call *readable_callback; + void *readable_callback_user; + + int fd; + const char *path; /* ptr copy of path_ or feedfile */ + struct Filemon_Perfile *filemon; + + oop_read *rd; + long inprogress; /* no. of articles read but not processed */ + off_t offset; +} InputFile; + +typedef enum { + sm_WAITING, + sm_NORMAL, + sm_FLUSHING, + sm_FLUSHFAIL, + sm_DROPPING, + sm_SEPARATED, + sm_FINISHING; +} StateMachineState; + struct Conn { ISNODE(Conn); int fd, max_queue, stream; @@ -222,24 +263,36 @@ struct Conn { int xmitu; }; + +/*----- operational variables -----*/ + +static int since_connect_attempt; +static int nconns; +static LIST(Conn) idle, working, full; +static LIST(Article) *queue; + +static char *path_ductlock, *path_duct, *path_ductdefer; + +static StateMachineState sms; static FILE *defer; +static InputFile *main_input_file, *old_input_file; +static int waiting_periods_sofar; + + +/*----- function predeclarations -----*/ + +static void conn_check_work(Conn *conn); static int filemon_init(void); static void filemon_setfile(int mainfeed_fd, const char *mainfeed_path); static void filemon_callback(void); -#define CHILD_ESTATUS_STREAM 4 -#define CHILD_ESTATUS_NOSTREAM 5 - -static int since_connect_attempt; -static int nconns; -static LIST(Conn) idle, working, full; - -static LIST(Article) *queue; +/*========== utility functions etc. ==========*/ static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } } + /*========== making new connections ==========*/ static int connecting_sockets[2]= {-1,-1}; @@ -312,8 +365,8 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { connecting_child= 0; if (WIFEXITED(status) && (WEXITSTATUS(status) != 0 - WEXITSTATUS(status) != CHILD_ESTATUS_STREAM && - WEXITSTATUS(status) != CHILD_ESTATUS_NOSTREAM)) { + WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)) { /* child already reported the problem */ } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALARM) { warn("connect: connection attempt timed out"); @@ -355,8 +408,8 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; } int es= WEXITSTATUS(status); switch (es) { - case CHILD_ESTATUS_STREAM: conn->stream= 1; break; - case CHILD_ESTATUS_NOSTREAM: conn->stream= 0; break; + case CONNCHILD_ESTATUS_STREAM: conn->stream= 1; break; + case CONNCHILD_ESTATUS_NOSTREAM: conn->stream= 0; break; default: die("connect: child gave unexpected exit status %d", es); } @@ -394,10 +447,9 @@ static void connect_start() { if (!connecting_child) { FILE *cn_from, *cn_to; char buf[NNTP_STRLEN+100]; - int exitstatus= CHILD_ESTATUS_NOSTREAM; + int exitstatus= CONNCHILD_ESTATUS_NOSTREAM; - put sigpipe back; - close unwanted fds; + postfork(); r= close(connecting_sockets[0]); if (r) sysdie("connect: close parent socket in child"); @@ -441,7 +493,7 @@ static void connect_start() { } switch (rcode) { case 203: - exitstatus= CHILD_ESTATUS_STREAM; + exitstatus= CONNCHILD_ESTATUS_STREAM; break; case 480: case 500: @@ -480,9 +532,8 @@ static void connect_start() { connect_attempt_discard(); } -/*========== overall control of article flow ==========*/ -static void conn_check_work(Conn *conn); +/*========== overall control of article flow ==========*/ static void check_master_queue(void) { try reading current feed file; @@ -564,8 +615,8 @@ static void conn_check_work(Conn *conn) { } } -/*========== article transmission ==========*/ +/*========== article transmission ==========*/ static XmitDetails *xmit_core(Conn *conn, const char *data, int len, XmitKind kind) { /* caller must then fill in details */ @@ -610,8 +661,7 @@ static void *conn_write_some_xmits(Conn *conn) { ssize_t rs= writev(conn->fd, conn->xmit, count); if (rs < 0) { if (errno == EAGAIN) return OOP_CONTINUE; - syswarn(CN "write failed", conn->fd); - conn_failed(conn); + connfail(conn, "write failed: %s", strerror(errno)); return OOP_HALT; } assert(rs > 0); @@ -684,7 +734,8 @@ static void conn_make_some_xmits(Conn *conn) { } } -/*========== responses from peer ==========*/ + +/*========== handling responses from peer ==========*/ static const oop_rd_style peer_rd_style= { OOP_RD_DELIM_STRIP, '\n', @@ -698,45 +749,43 @@ static Article *article_reply_check(Connection *conn, const char *response, Article *art= LIST_REMHEAD(conn->sent); if (!art) { - warn("peer gave unexpected response when no commands outstanding: %s", - sanitised_response); - goto failed; + connfail(conn, + "peer gave unexpected response when no commands outstanding: %s", + sanitised_response); + return 0; } if (code_indicates_streaming) { assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */ if (!conn->stream) { - warn("peer gave streaming response code " - " to IHAVE or subsequent body: %s", sanitised_response); - goto failed; + connfail("peer gave streaming response code " + " to IHAVE or subsequent body: %s", sanitised_response); + return 0; } const char *got_mid= response+4; int got_midlen= strcspn(got_mid, " \n\r"); if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') { - warn("peer gave streaming response with syntactically invalid" - " messageid: %s", sanitised_response); - goto failed; + connfail("peer gave streaming response with syntactically invalid" + " messageid: %s", sanitised_response); + return 0; } if (got_midlen != art->midlen || memcmp(got_mid, art->messageid, got_midlen)) { - peer("peer gave streaming response code to wrong article -" - " probable synchronisation problem; we offered: %s; peer said: %s", - art->messageid, sanitised_response); - goto failed; + connfail("peer gave streaming response code to wrong article -" + " probable synchronisation problem; we offered: %s;" + " peer said: %s", + art->messageid, sanitised_response); + return 0; } } else { if (conn->stream) { - warn("peer gave non-streaming response code to CHECK/TAKETHIS: %s", - sanitised_response); - goto failed; + connfail("peer gave non-streaming response code to CHECK/TAKETHIS: %s", + sanitised_response); + return 0; } } return art; - - failed: - conn_failed(conn); - return 0; } static void update_nocheck(int accepted) { @@ -780,15 +829,22 @@ static void article_done(Connection *conn, Article *art, int whichcount) { free(art); } +static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_event ev, + const char *errmsg, int errnoval, + const char *data, size_t recsz, void *conn_v) { + Conn *conn= conn_v; + connfail(conn, "error receiving from peer: %s", errmsg); + return OOP_CONTINUE; +} + static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, const char *errmsg, int errnoval, const char *data, size_t recsz, void *conn_v) { Conn *conn= conn_v; if (ev == OOP_RD_EOF) { - warn("unexpected EOF from peer"); - conn_failed(conn); - return; + connfail(conn, "unexpected EOF from peer"); + return OOP_CONTINUE; } assert(ev == OOP_RD_OK); @@ -807,16 +863,14 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, sprintf(q,"\\x%02x",c); q += 4; } - warn("badly formatted response from peer: %s", sanibuf); - conn_failed(conn); - return; + connfail(conn, "badly formatted response from peer: %s", sanibuf); + return OOP_CONTINUE; } if (conn->quitting) { if (code!=205) { - warn("peer gave failure response to QUIT: %s", sani); - conn_failed(conn); - return; + connfail(conn, "peer gave failure response to QUIT: %s", sani); + return OOP_CONTINUE; } conn close ok; return; @@ -833,10 +887,16 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, GET_ARTICLE; \ article_done(conn, art, RC_##how); break; +#define PEERBADMSG(m) connfail(conn, m ": %s", sani); return OOP_CONTINUE + int code_streaming= 0; switch (code) { + case 400: PEERBADMSG("peer stopped accepting articles"); + case 503: PEERBADMSG("peer timed us out"); + default: PEERBADMSG("peer sent unexpected message"); + case 435: ARTICLE_DEALTWITH(0,unwanted); /* IHAVE says they have it */ case 438: ARTICLE_DEALTWITH(1,unwanted); /* CHECK/TAKETHIS: they have it */ @@ -853,8 +913,8 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, count_checkedwanted++; LIST_ADDTAIL(conn->queue); if (art->checked) { - warn("peer gave %d response to article body",code); - goto failed; + connfail("peer gave %d response to article body: %s",code, sani); + return OOP_CONTINUE; } art->checked= 1; break; @@ -869,20 +929,14 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev, article_done(conn, art, RC_deferred); break; - case 400: warn("peer has stopped accepting articles: %s", sani); goto failed; - case 503: warn("peer timed us out: %s", sani); goto failed; - default: warn("peer sent unexpected message: %s", sani); goto failed; - - failed: - conn_failed(conn); - return OOP_CONTINUE;; } check_check_work(conn); return OOP_CONTINUE; } -/*========== monitoring of input file ==========*/ + +/*========== monitoring of input files ==========*/ static void feedfile_eof(InputFile *ipf) { assert(ipf != main_input_file); /* promised by tailing_try_read */ @@ -893,41 +947,6 @@ static void feedfile_eof(InputFile *ipf) { inputfile_tailing_start(main_input_file); } -static void statmc_finishdone(void) { - time_t now; - struct stat stab; - - assert(sms == sm_FINISHING); - - r= fstat(fileno(defer), &stab); - if (r) sysdie("check defer file %s", path_defer); - - if (fclose(defer)) sysdie("could not close defer file %s", path_defer); - defer= 0; - - now= time(0); - if (now==-1) sysdie("could not get current time for backlog filename"); - - char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile, - (unsigned long)now, - (unsigned long)stab.st_ino); - if (link(path_defer, path_backlog)) - sysdie("could not install defer file %s as backlog file %s", - path_defer, backlog); - if (unlink(path_defer)) - sysdie("could not unlink old defer link %s to backlog file %s", - path_defer, backlog); - open_defer(); - - close_input_file(old_input_file); - old_input_file= 0; - - if (unlink(path_duct)) - sysdie("could not unlink old duct file %s", path_duct); - - sms= sm_NORMAL; -} - static InputFile *open_input_file(const char *path) { int fd= open(path, O_RDONLY); if (fd<0) { @@ -958,6 +977,7 @@ static void close_input_file(InputFile *ipf) { free(ipf); } + /*---------- dealing with articles read in the input file ----------*/ typedef void *feedfile_got_article(oop_source *lp, oop_read *rd, @@ -1009,7 +1029,8 @@ typedef void *feedfile_got_article(oop_source *lp, oop_read *rd, check_master_queue(); } -/*---------- tailing input file ----------*/ + +/*========== tailing input file ==========*/ static void filemon_start(InputFile *ipf) { assert(!ipf->filemon); @@ -1188,6 +1209,7 @@ static void inputfile_tailing_stop(InputFile *ipf) { assert(!ipf->filemon); /* we shouldn't be monitoring it now */ } + /*========== interaction with innd ==========*/ /* See official state diagram at top of file. We implement @@ -1271,37 +1293,6 @@ static void inputfile_tailing_stop(InputFile *ipf) { * */ -static char *path_ductlock, *path_duct, *path_ductdefer; - -typedef struct { - /* This is an instance of struct oop_readable */ - struct oop_readable readable; /* first */ - oop_readable_call *readable_callback; - void *readable_callback_user; - - int fd; - const char *path; /* ptr copy of path_ or feedfile */ - struct Filemon_Perfile *filemon; - - oop_read *rd; - long inprogress; /* no. of articles read but not processed */ - off_t offset; -} InputFile; - -typedef enum { - sm_WAITING, - sm_NORMAL, - sm_FLUSHING, - sm_FLUSHFAIL, - sm_DROPPING, - sm_SEPARATED, - sm_FINISHING; -} StateMachineState; - -static InputFile *main_input_file, *old_input_file; -static StateMachineState sms; -static int waiting_periods_sofar; - static void open_defer(void) { struct stat stab; @@ -1455,12 +1446,79 @@ static void startup_set_input_file(InputFile *f) { inputfile_tailing_start(f); } +static void statmc_finishdone(void) { + time_t now; + struct stat stab; + + assert(sms == sm_FINISHING); + + r= fstat(fileno(defer), &stab); + if (r) sysdie("check defer file %s", path_defer); + + if (fclose(defer)) sysdie("could not close defer file %s", path_defer); + defer= 0; + + now= time(0); + if (now==-1) sysdie("could not get current time for backlog filename"); + + char *backlog= xasprintf("%s_backlog_%lu.%lu", feedfile, + (unsigned long)now, + (unsigned long)stab.st_ino); + if (link(path_defer, path_backlog)) + sysdie("could not install defer file %s as backlog file %s", + path_defer, backlog); + if (unlink(path_defer)) + sysdie("could not unlink old defer link %s to backlog file %s", + path_defer, backlog); + open_defer(); + + close_input_file(old_input_file); + old_input_file= 0; + + if (unlink(path_duct)) + sysdie("could not unlink old duct file %s", path_duct); + + sms= sm_NORMAL; +} + /*========== flushing the feed ==========*/ /*========== main program ==========*/ +static void postfork_inputfile(InputFile *ipf) { + if (!ipf) return; + assert(ipf->fd >= 0); + close(ipf->fd); + ipf->fd= -1; +} + +static void postfork_conns(Connection *conn) { + while (conn) { + close(conn->fd); + conn= conn->next; + } +} + +static void postfork_stdio(FILE *f) { + /* we have no stdio streams that are buffered long-term */ + if (f) fclose(f); +} + +static void postfork(const char *what) { + if (signal(SIGPIPE, SIG_DFL) == SIG_ERR) + sysdie("%s child: failed to reset SIGPIPE"); + + postfork_inputfile(main_input_file); + postfork_inputfile(old_input_file); + postfork_conns(idle.head); + postfork_conns(working.head); + postfork_conns(full.head); + postfork_stdio(defer); +} + + #define EVERY(what, interval, body) \ static const struct timeval what##_timeout = { 5, 0 }; \ static void what##_schedule(void); \