X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;f=backends%2Finnduct.c;h=f28db2edfc6330259bbfe25225186f32b12c7dc9;hb=0eaa69043c94debeb488b4f5ecadc6f4470db200;hp=e9da15b0ff8f209a62e906538f43dc3cb55e763c;hpb=a381013aaaf1b621d6e3a20302fb074b5f8a7667;p=innduct.git diff --git a/backends/innduct.c b/backends/innduct.c index e9da15b..f28db2e 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,7 +1,22 @@ /* + * bugs + * + + [740] info: processed feedfile(null) read=4(+0bl,+6err) offered=5(ch5,nc0) accepted=0(ch0+nc0) unwanted=0(0id+0bd+0nc) rejected=0(0id+0bd+0nc) deferred=0(0id+0bd+0nc) missing=2(0id+2bd+0nc) connretry=0(0id+0bd+0nc) + + also unwanted should be nonzero I think + + [740] warning: corrupted file: /home/ian/things/Innfeed/inn2-2.4.5/fee, offset 349: line partially blanked: in ` @050000002D130000006A0000000000000000@ 8-/; print "\f" if m/-\^L-/' backends/innduct. #include #include #include +#include #include #include @@ -201,6 +217,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #define INNDCOMMCHILD_ESTATUS_NONESUCH 27 #define MAX_LINE_FEEDFILE (NNTP_MSGID_MAXLEN + sizeof(TOKEN)*2 + 10) +#define MAX_CONTROL_COMMAND 1000 #define VA va_list al; va_start(al,fmt) #define PRINTF(f,a) __attribute__((__format__(printf,f,a))) @@ -215,7 +232,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. int count; \ } T##List -#define NODE(n) (assert((void*)&(n)->list_node == &(n)), &(n)->list_node) +#define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node) #define LIST_CHECKCANHAVENODE(l,n) \ ((void)((n) == ((l).u.for_type))) /* just for the type check */ @@ -241,7 +258,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead) #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail) -#define LIST_INIT(l) (list_new(&(l).u.li)) +#define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li)) #define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l)))) #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n)))) #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n)))) @@ -287,6 +304,8 @@ static void statemc_setstate(StateMachineState newsms, int periods, static void statemc_start_flush(const char *why); /* Normal => Flushing */ static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */ +static void article_done(Conn *conn, Article *art, int whichcount); + static void check_assign_articles(void); static void queue_check_input_done(void); @@ -294,6 +313,7 @@ static void statemc_check_flushing_done(void); static void statemc_check_backlog_done(void); static void postfork(void); +static void period(void); static void open_defer(void); static void close_defer(void); @@ -354,7 +374,7 @@ typedef enum { /* in queue in conn->sent */ art_Unchecked, /* not checked, not sent checking */ art_Wanted, /* checked, wanted sent body as requested */ art_Unsolicited, /* - sent body without check */ - art_MaxState + art_MaxState, } ArtState; #define RESULT_COUNTS(RCS,RCN) \ @@ -363,9 +383,10 @@ typedef enum { /* in queue in conn->sent */ RCN(unwanted) \ RCN(rejected) \ RCN(deferred) \ + RCN(missing) \ RCN(connretry) -#define RCI_TRIPLE_FMT_BASE "%d(id%d+bd%d+nc%d)" +#define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)" #define RCI_TRIPLE_VALS_BASE(counts,x) \ counts[art_Unchecked] x \ + counts[art_Wanted] x \ @@ -422,7 +443,7 @@ struct InputFile { struct Article { ISNODE(Article); ArtState state; - int midlen; + int midlen, missing; InputFile *ipf; TOKEN token; off_t offset; @@ -480,14 +501,14 @@ static FILE *defer; static InputFile *main_input_file, *flushing_input_file, *backlog_input_file; static int sm_period_counter; -/* control_init initialises */ -static int control_master; - /* initialisation to 0 is good */ static int until_connect, until_backlog_nextscan; static double accept_proportion; static int nocheck, nocheck_reported; +/* for simulation, debugging, etc. */ +int simulate_flush= -1; + /*========== logging ==========*/ static void logcore(int sysloglevel, const char *fmt, ...) PRINTF(2,3); @@ -568,7 +589,7 @@ static char *xasprintf(const char *fmt, ...) { } static int close_perhaps(int *fd) { - if (!*fd) return 0; + if (*fd <= 0) return 0; int r= close(*fd); *fd=0; return r; @@ -578,7 +599,7 @@ static void xclose(int fd, const char *what, const char *what2) { if (r) sysdie("close %s%s",what,what2?what2:""); } static void xclose_perhaps(int *fd, const char *what, const char *what2) { - if (!*fd) return; + if (*fd <= 0) return; xclose(*fd,what,what2); *fd=0; } @@ -627,8 +648,9 @@ static int xwaitpid(pid_t *pid, const char *what) { int r= kill(*pid, SIGKILL); if (r) sysdie("cannot kill %s child", what); - pid_t got= waitpid(*pid, &status, WNOHANG); + pid_t got= waitpid(*pid, &status, 0); if (got==-1) sysdie("cannot reap %s child", what); + if (got==0) die("cannot reap %s child", what); *pid= 0; @@ -646,6 +668,16 @@ static time_t xtime(void) { return now; } +static void xgettimeofday(struct timeval *tv_r) { + int r= gettimeofday(tv_r,0); + if (r) sysdie("gettimeofday(2) failed"); +} + +static void xsetnonblock(int fd, int nonblocking) { + int errnoval= oop_fd_nonblock(fd, nonblocking); + if (errnoval) { errno= errnoval; sysdie("setnonblocking"); } +} + static void check_isreg(const struct stat *stab, const char *path, const char *what) { if (!S_ISREG(stab->st_mode)) @@ -707,6 +739,190 @@ static int isewouldblock(int errnoval) { /*========== command and control connections ==========*/ +static int control_master; + +typedef struct ControlConn ControlConn; +struct ControlConn { + void (*destroy)(ControlConn*); + int fd; + oop_read *rd; + FILE *out; + union { + struct sockaddr sa; + struct sockaddr_un un; + } sa; + socklen_t salen; +}; + +static const oop_rd_style control_rd_style= { + OOP_RD_DELIM_STRIP, '\n', + OOP_RD_NUL_FORBID, + OOP_RD_SHORTREC_FORBID +}; + +static void control_destroy(ControlConn *cc) { + cc->destroy(cc); +} + +static void control_checkouterr(ControlConn *cc /* may destroy*/) { + if (ferror(cc->out) | fflush(cc->out)) { + info("CTRL%d write error %s", cc->fd, strerror(errno)); + control_destroy(cc); + } +} + +static void control_prompt(ControlConn *cc /* may destroy*/) { + fprintf(cc->out, "%s| ", sitename); + control_checkouterr(cc); +} + +typedef struct ControlCommand ControlCommand; +struct ControlCommand { + const char *cmd; + void (*f)(ControlConn *cc, const ControlCommand *ccmd, + const char *arg, size_t argsz); + void *xdata; + int xval; +}; + +static const ControlCommand control_commands[]; + +#define CCMD(wh) \ + static void ccmd_##wh(ControlConn *cc, const ControlCommand *c, \ + const char *arg, size_t argsz) + +CCMD(help) { + fputs("commands:\n", cc->out); + const ControlCommand *ccmd; + for (ccmd=control_commands; ccmd->cmd; ccmd++) + fprintf(cc->out, " %s\n", ccmd->cmd); +} + +CCMD(period) { period(); } +CCMD(setintarg) { *(int*)c->xdata= atoi(arg); } +CCMD(setint) { *(int*)c->xdata= c->xval; } + +static const ControlCommand control_commands[]= { + { "h", ccmd_help }, + { "p", ccmd_period }, + { "pretend flush", ccmd_setintarg, &simulate_flush }, + { "poke sm", ccmd_setint, &sm_period_counter, 1 }, + { "poke conn", ccmd_setint, &until_connect, 0 }, + { "poke blscan", ccmd_setint, &until_backlog_nextscan, 0 }, + { "wedge blscan", ccmd_setint, &until_backlog_nextscan, -1 }, + { 0 } +}; + +static void *control_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, + const char *errmsg, int errnoval, + const char *data, size_t recsz, void *cc_v) { + ControlConn *cc= cc_v; + + if (!data) { + info("CTRL%d closed", cc->fd); + cc->destroy(cc); + return OOP_CONTINUE; + } + + if (recsz == 0) goto prompt; + + const ControlCommand *ccmd; + for (ccmd=control_commands; ccmd->cmd; ccmd++) { + int l= strlen(ccmd->cmd); + if (recsz < l) continue; + if (recsz > l && data[l] != ' ') continue; + if (memcmp(data, ccmd->cmd, l)) continue; + + int argl= (int)recsz - (l+1); + ccmd->f(cc, ccmd, argl>=0 ? data+l+1 : 0, argl); + goto prompt; + } + + fputs("unknown command; h for help\n", cc->out); + + prompt: + control_prompt(cc); + return OOP_CONTINUE; +} + +static void *control_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev, + const char *errmsg, int errnoval, + const char *data, size_t recsz, void *cc_v) { + ControlConn *cc= cc_v; + + info("CTRL%d read error %s", cc->fd, errmsg); + cc->destroy(cc); + return OOP_CONTINUE; +} + +static int control_conn_startup(ControlConn *cc /* may destroy*/, + const char *how) { + cc->rd= oop_rd_new_fd(loop, cc->fd, 0,0); + if (!cc->rd) { warn("oop_rd_new_fd control failed"); return -1; } + + int er= oop_rd_read(cc->rd, &control_rd_style, MAX_CONTROL_COMMAND, + control_rd_ok, cc, + control_rd_err, cc); + if (er) { errno= er; syswarn("oop_rd_read control failed"); return -1; } + + info("CTRL%d %s ready", cc->fd, how); + control_prompt(cc); + return 0; +} + +static void control_stdio_destroy(ControlConn *cc) { + if (cc->rd) { + oop_rd_cancel(cc->rd); + errno= oop_rd_delete_tidy(cc->rd); + if (errno) syswarn("oop_rd_delete tidy failed (no-nonblock stdin?)"); + } + free(cc); +} + +static void control_stdio(void) { + ControlConn *cc= xmalloc(sizeof(*cc)); + memset(cc,0,sizeof(*cc)); + cc->destroy= control_stdio_destroy; + + cc->fd= 0; + cc->out= stdout; + int r= control_conn_startup(cc,"stdio"); + if (r) cc->destroy(cc); +} + +static void control_accepted_destroy(ControlConn *cc) { + if (cc->rd) { + oop_rd_cancel(cc->rd); + oop_rd_delete_kill(cc->rd); + } + if (cc->out) { fclose(cc->out); cc->fd=0; } + close_perhaps(&cc->fd); + free(cc); +} + +static void *control_master_readable(oop_source *lp, int master, + oop_event ev, void *u) { + ControlConn *cc= xmalloc(sizeof(*cc)); + memset(cc,0,sizeof(*cc)); + cc->destroy= control_accepted_destroy; + + cc->salen= sizeof(cc->sa); + cc->fd= accept(master, &cc->sa.sa, &cc->salen); + if (cc->fd<0) { syswarn("error accepting control connection"); goto x; } + + cc->out= fdopen(cc->fd, "w"); + if (!cc->out) { syswarn("error fdopening accepted control conn"); goto x; } + + int r= control_conn_startup(cc, "accepted"); + if (r) goto x; + + return OOP_CONTINUE; + + x: + cc->destroy(cc); + return OOP_CONTINUE; +} + #define NOCONTROL(...) do{ \ syswarn("no control socket, because failed to " __VA_ARGS__); \ goto nocontrol; \ @@ -787,7 +1003,9 @@ static void control_init(void) { r= listen(control_master, 5); if (r) NOCONTROL("listen"); - //loop->on_fd(loop, control_master, OOP_READ, control_master_readable, 0); + xsetnonblock(control_master, 1); + + loop->on_fd(loop, control_master, OOP_READ, control_master_readable, 0); info("control socket ok, real path %s", sa.un.sun_path); return; @@ -837,6 +1055,7 @@ static void *conn_exception(oop_source *lp, int fd, static void vconnfail(Conn *conn, const char *fmt, va_list al) { int requeue[art_MaxState]; + memset(requeue,0,sizeof(requeue)); Article *art; while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art); @@ -922,10 +1141,7 @@ static int connecting_fdpass_sock; static void connect_attempt_discard(void) { if (connecting_child) { - int r= kill(connecting_child, SIGTERM); - if (r) syswarn("failed to kill connecting child"); int status= xwaitpid(&connecting_child, "connect"); - if (!(WIFEXITED(status) || (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) report_child_status("connect", status); @@ -937,9 +1153,15 @@ static void connect_attempt_discard(void) { } #define PREP_DECL_MSG_CMSG(msg) \ + char msgbyte= 0; \ + struct iovec msgiov; \ + msgiov.iov_base= &msgbyte; \ + msgiov.iov_len= 1; \ struct msghdr msg; \ memset(&msg,0,sizeof(msg)); \ - char msg##cbuf[CMSG_SPACE(sizeof(fd))]; \ + char msg##cbuf[CMSG_SPACE(sizeof(int))]; \ + msg.msg_iov= &msgiov; \ + msg.msg_iovlen= 1; \ msg.msg_control= msg##cbuf; \ msg.msg_controllen= sizeof(msg##cbuf); @@ -948,42 +1170,45 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { assert(fd == connecting_fdpass_sock); + PREP_DECL_MSG_CMSG(msg); + + ssize_t rs= recvmsg(fd, &msg, 0); + if (rs<0) { + if (isewouldblock(errno)) return OOP_CONTINUE; + syswarn("failed to read socket from connecting child"); + goto x; + } + conn= xmalloc(sizeof(*conn)); memset(conn,0,sizeof(*conn)); + LIST_INIT(conn->waiting); + LIST_INIT(conn->priority); + LIST_INIT(conn->sent); - PREP_DECL_MSG_CMSG(msg); struct cmsghdr *h= 0; - ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT); if (rs >= 0) h= CMSG_FIRSTHDR(&msg); if (!h) { - int status; - pid_t got= waitpid(connecting_child, &status, WNOHANG); - if (got != -1) { - assert(got==connecting_child); - connecting_child= 0; - if (WIFEXITED(status)) { - if (WEXITSTATUS(status) != 0 && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM) - /* child already reported the problem */; - else - warn("connect: connection child exited code %d but no cmsg", + int status= xwaitpid(&connecting_child, "connect child (broken)"); + + if (WIFEXITED(status)) { + if (WEXITSTATUS(status) != 0 && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM) + /* child already reported the problem */; + else { + if (e == OOP_EXCEPTION) + warn("connect: connection child exited code %d but" + " unexpected exception on fdpass socket", WEXITSTATUS(status)); - } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { - warn("connect: connection attempt timed out"); - } else { - report_child_status("connect", status); + else + warn("connect: connection child exited code %d but" + " no cmsg (rs=%d)", + WEXITSTATUS(status), (int)rs); } + } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { + warn("connect: connection attempt timed out"); } else { - /* child is still running apparently, report the socket problem */ - if (rs < 0) - syswarn("connect: read from fdpass socket failed"); - else if (e == OOP_EXCEPTION) - warn("connect: unexpected exception on fdpass socket"); - else if (!rs) - warn("connect: unexpected EOF on fdpass socket"); - else - fatal("connect: unexpected lack of cmsg from child"); + report_child_status("connect", status); } goto x; } @@ -1019,14 +1244,11 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { } /* Phew! */ - LIST_INIT(conn->waiting); - LIST_INIT(conn->priority); - LIST_INIT(conn->sent); conn->max_queue= conn->stream ? max_queue_per_conn : 1; loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn); conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */ - if (!conn->fd) sysdie("oop_rd_new_fd (fd=%d)",conn->fd); + if (!conn->fd) die("oop_rd_new_fd conn failed (fd=%d)",conn->fd); int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN, &peer_rd_ok, conn, &peer_rd_err, conn); @@ -1037,7 +1259,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { connect_attempt_discard(); check_assign_articles(); - return 0; + return OOP_CONTINUE; x: conn_dispose(conn); @@ -1055,7 +1277,7 @@ static void connect_start(void) { assert(!connecting_child); assert(!connecting_fdpass_sock); - notice("starting connection attempt"); + info("starting connection attempt"); int socks[2]; int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks); @@ -1072,13 +1294,26 @@ static void connect_start(void) { alarm(connection_setup_timeout); if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) { - if (buf[0]) fatal("connect: rejected: %s", sanitise(buf)); - else sysfatal("connect: connection attempt failed"); + int l= strlen(buf); + int stripped=0; + while (l>0) { + unsigned char c= buf[l-1]; + if (!isspace(c)) break; + if (c=='\n' || c=='\r') stripped=1; + --l; + } + if (!buf[0]) { + sysfatal("connect: connection attempt failed"); + } else { + buf[l]= 0; + fatal("connect: %s: %s", stripped ? "rejected" : "failed", + sanitise(buf)); + } } if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0) sysfatal("connect: authentication failed"); if (try_stream) { - if (fputs("MODE STREAM\r\n", cn_to) || + if (fputs("MODE STREAM\r\n", cn_to)==EOF || fflush(cn_to)) sysfatal("connect: could not send MODE STREAM"); buf[sizeof(buf)-1]= 0; @@ -1090,7 +1325,7 @@ static void connect_start(void) { } int l= strlen(buf); assert(l>=1); - if (buf[-1]!='\n') + if (buf[l-1]!='\n') fatal("connect: response to MODE STREAM is too long: %.100s...", sanitise(buf)); l--; if (l>0 && buf[l-1]=='\r') l--; @@ -1125,13 +1360,15 @@ static void connect_start(void) { msg.msg_controllen= cmsg->cmsg_len; r= sendmsg(socks[1], &msg, 0); - if (r) sysdie("sendmsg failed for new connection"); + if (r<0) sysdie("sendmsg failed for new connection"); + if (r!=1) die("sendmsg for new connection gave wrong result %d",r); _exit(exitstatus); } xclose(socks[1], "connecting fdpass child's socket",0); connecting_fdpass_sock= socks[0]; + xsetnonblock(connecting_fdpass_sock, 1); on_fd_read_except(connecting_fdpass_sock, connchild_event); } @@ -1165,6 +1402,7 @@ static void check_assign_articles(void) { if (!inqueue) use->since_activity= 0; /* reset idle counter */ while (spare>0) { Article *art= LIST_REMHEAD(queue); + if (!art) break; LIST_ADDTAIL(use->waiting, art); spare--; } @@ -1289,12 +1527,25 @@ static void conn_make_some_xmits(Conn *conn) { ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL); + art->state= + art->state == art_Unchecked ? art_Unsolicited : + art->state == art_Wanted ? art_Wanted : + (abort(),-1); + + if (!artdata) art->missing= 1; +fprintf(stderr,"INC %d %d?%d:%d\n",(int)art->state, + !!artdata, (int)RC_sent, (int)RC_missing); + art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++; + if (conn->stream) { if (artdata) { XMIT_LITERAL("TAKETHIS "); xmit_noalloc(conn, art->messageid, art->midlen); XMIT_LITERAL("\r\n"); xmit_artbody(conn, artdata); + } else { + article_done(conn, art, -1); + continue; } } else { /* we got 235 from IHAVE */ @@ -1305,24 +1556,20 @@ static void conn_make_some_xmits(Conn *conn) { } } - art->state= - art->state == art_Unchecked ? art_Unsolicited : - art->state == art_Wanted ? art_Wanted : - (abort(),-1); - art->ipf->counts[art->state][RC_sent]++; LIST_ADDTAIL(conn->sent, art); } else { /* check it */ if (conn->stream) - XMIT_LITERAL("IHAVE "); - else XMIT_LITERAL("CHECK "); + else + XMIT_LITERAL("IHAVE "); xmit_noalloc(conn, art->messageid, art->midlen); XMIT_LITERAL("\r\n"); assert(art->state == art_Unchecked); +fprintf(stderr,"INC %d %d\n",(int)art->state,(int)RC_sent); art->ipf->counts[art->state][RC_sent]++; LIST_ADDTAIL(conn->sent, art); } @@ -1420,7 +1667,9 @@ static void update_nocheck(int accepted) { } static void article_done(Conn *conn, Article *art, int whichcount) { - art->ipf->counts[art->state][whichcount]++; +fprintf(stderr,"INC %d %d\n",(int)art->state,whichcount); + if (!art->missing) art->ipf->counts[art->state][whichcount]++; + if (whichcount == RC_accepted) update_nocheck(1); else if (whichcount == RC_unwanted) update_nocheck(0); @@ -1428,6 +1677,11 @@ static void article_done(Conn *conn, Article *art, int whichcount) { while (art->blanklen) { static const char spaces[]= + " " + " " + " " + " " + " " " " " " " " @@ -1492,7 +1746,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, Article *art; #define GET_ARTICLE(musthavesent) \ - art= article_reply_check(conn, data, musthavesent, code_streaming, sani); \ + art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \ if (art) ; else return OOP_CONTINUE /* reply_check has failed the conn */ #define ARTICLE_DEALTWITH(streaming,musthavesent,how) \ @@ -1524,6 +1778,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, case 335: /* IHAVE says send it */ GET_ARTICLE(-1); assert(art->state == art_Unchecked); +fprintf(stderr,"INC %d %d\n",(int)art->state,(int)RC_accepted); art->ipf->counts[art->state][RC_accepted]++; art->state= art_Wanted; LIST_ADDTAIL(conn->priority, art); @@ -1566,7 +1821,7 @@ static void feedfile_eof(InputFile *ipf) { } static InputFile *open_input_file(const char *path) { - int fd= open(path, O_RDONLY); + int fd= open(path, O_RDWR); if (fd<0) { if (errno==ENOENT) return 0; sysfatal("unable to open input file %s", path); @@ -1595,7 +1850,7 @@ static void close_input_file(InputFile *ipf) { /* does not free */ static void *feedfile_got_bad_data(InputFile *ipf, off_t offset, const char *data, const char *how) { - warn("corrupted file: %s, offset %lu: %s: %s", + warn("corrupted file: %s, offset %lu: %s: in %s", ipf->path, (unsigned long)offset, how, sanitise(data)); ipf->readcount_err++; if (ipf->readcount_err > max_bad_data_initial + @@ -1742,6 +1997,7 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, abort(); } } + tailing_queue_readable(ipf); return r; } } @@ -1856,7 +2112,8 @@ static void filemon_stop(InputFile *ipf) { } static void filemon_callback(InputFile *ipf) { - ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user); + if (ipf && ipf->readable_callback) /* so filepoll() can be naive */ + ipf->readable_callback(loop, &ipf->readable, ipf->readable_callback_user); } /*---------- interface to start and stop an input file ----------*/ @@ -2102,7 +2359,7 @@ static void statemc_init(void) { InputFile *file_f= open_input_file(feedfile); if (!file_f) die("feed file vanished during startup"); startup_set_input_file(file_f); - SMS(NORMAL, flushfail_retry_periods, "normal startup"); + SMS(NORMAL, spontaneous_flush_periods, "normal startup"); } } } @@ -2160,8 +2417,8 @@ static void notice_processed(InputFile *ipf, const char *what, #define CNT(art,rc) (ipf->counts[art_##art][RC_##rc]) - info("processed %s%s read=%d(+%dbl,+%derr)" - " offered=%d(ch%d,nc%d) accepted=%d(ch%d+nc%d)" + info("processed %s%s read=%d (+bl=%d,+err=%d)" + " offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)" RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT) , what, spec, @@ -2185,7 +2442,7 @@ static void statemc_check_backlog_done(void) { const char *under= strchr(slash, '_'); const char *rest= under ? under+1 : leaf; if (!strncmp(rest,"backlog",7)) rest += 7; - notice_processed(ipf,"backlog:",rest); + notice_processed(ipf,"backlog ",rest); close_input_file(ipf); if (unlink(ipf->path)) { @@ -2207,7 +2464,7 @@ static void statemc_check_flushing_done(void) { assert(sms==sm_SEPARATED || sms==sm_DROPPING); - notice_processed(ipf,"feedfile",0); + notice_processed(ipf,"feedfile",""); close_defer(); @@ -2219,7 +2476,7 @@ static void statemc_check_flushing_done(void) { if (sms==sm_SEPARATED) { notice("flush complete"); - SMS(NORMAL, 0, "flush complete"); + SMS(NORMAL, spontaneous_flush_periods, "flush complete"); } else if (sms==sm_DROPPING) { SMS(DROPPED, 0, "old flush complete"); search_backlog_file(); @@ -2258,9 +2515,9 @@ static void statemc_setstate(StateMachineState newsms, int periods, } if (periods) { - info("%s%s[%d] %s",forlog,xtra,periods,why); + info("state %s%s[%d] %s",forlog,xtra,periods,why); } else { - info("%s%s %s",forlog,xtra,why); + info("state %s%s %s",forlog,xtra,why); } } @@ -2409,7 +2666,12 @@ static void search_backlog_file(void) { if (sms==sm_DROPPED) { notice("feed dropped and our work is complete"); - xunlink(path_lock, "lockfile for old feed"); + + int r= unlink(path_control); + if (r && errno!=ENOENT) + syswarn("failed to remove control symlink for old feed"); + + xunlink(path_lock, "lockfile for old feed"); exit(4); } until_backlog_nextscan= backlog_spontrescan_periods; @@ -2458,8 +2720,11 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { assert(inndcomm_child); assert(fd == inndcomm_sentinel_fd); int status= xwaitpid(&inndcomm_child, "inndcomm"); + inndcomm_child= 0; + cancel_fd_read_except(fd); xclose_perhaps(&fd, "inndcomm sentinel pipe",0); + inndcomm_sentinel_fd= 0; assert(!flushing_input_file); @@ -2470,7 +2735,7 @@ static void *inndcomm_event(oop_source *lp, int fd, oop_event e, void *u) { goto failed; case INNDCOMMCHILD_ESTATUS_NONESUCH: - warn("feed has been dropped by innd, finishing up"); + notice("feed has been dropped by innd, finishing up"); flushing_input_file= main_input_file; tailing_queue_readable(flushing_input_file); /* we probably previously returned EAGAIN from our fake read method @@ -2548,6 +2813,12 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ xclose(pipefds[0], "(in child) inndcomm sentinel parent's end",0); /* parent spots the autoclose of pipefds[1] when we die or exit */ + if (simulate_flush>=0) { + warn("SIMULATING flush child status %d", simulate_flush); + if (simulate_flush>128) raise(simulate_flush-128); + else exit(simulate_flush); + } + alarm(inndcomm_flush_timeout); r= ICCopen(); if (r) inndcommfail("connect"); r= ICCcommand('f',flushargv,&reply); if (r<0) inndcommfail("transmit"); @@ -2558,6 +2829,8 @@ void spawn_inndcomm_flush(const char *why) { /* Moved => Flushing */ exit(INNDCOMMCHILD_ESTATUS_FAIL); } + simulate_flush= -1; + xclose(pipefds[1], "inndcomm sentinel child's end",0); inndcomm_sentinel_fd= pipefds[0]; assert(inndcomm_sentinel_fd); @@ -2593,22 +2866,44 @@ static void postfork(void) { postfork_stdio(defer, "defer file ", path_defer); } -#define EVERY(what, interval_sec, interval_usec, body) \ - static struct timeval what##_timeout = { interval_sec, interval_usec }; \ - static void what##_schedule(void); \ - static void *what##_timedout(oop_source *lp, struct timeval tv, void *u) { \ - body; \ - what##_schedule(); \ - return OOP_CONTINUE; \ - } \ - static void what##_schedule(void) { \ - loop->on_time(loop, what##_timeout, what##_timedout, 0); \ - } +typedef struct Every Every; +struct Every { + struct timeval interval; + int fixed_rate; + void (*f)(void); +}; + +static void every_schedule(Every *e, struct timeval base); -EVERY(filepoll, 5,0, ({ - if (main_input_file && main_input_file->readable_callback) - filemon_callback(main_input_file); -})); +static void *every_happens(oop_source *lp, struct timeval base, void *e_v) { + Every *e= e_v; + e->f(); + if (!e->fixed_rate) xgettimeofday(&base); + every_schedule(e, base); + return OOP_CONTINUE; +} + +static void every_schedule(Every *e, struct timeval base) { + struct timeval when; + timeradd(&base, &e->interval, &when); + loop->on_time(loop, when, every_happens, e); +} + +static void every(int interval, int fixed_rate, void (*f)(void)) { + Every *e= xmalloc(sizeof(*e)); + e->interval.tv_sec= interval; + e->interval.tv_usec= 0; + e->fixed_rate= fixed_rate; + e->f= f; + struct timeval now; + xgettimeofday(&now); + every_schedule(e, now); +} + +static void filepoll(void) { + filemon_callback(main_input_file); + filemon_callback(flushing_input_file); +} static char *debug_report_ipf(InputFile *ipf) { if (!ipf) return xasprintf("none"); @@ -2619,17 +2914,17 @@ static char *debug_report_ipf(InputFile *ipf) { return xasprintf("%p/%s:ip=%ld,off=%ld,fd=%d%s", ipf, path, ipf->inprogress, (long)ipf->offset, - ipf->fd, ipf->rd ? "+" : ""); + ipf->fd, ipf->rd ? "" : ",!rd"); } -EVERY(period, -1,0, ({ +static void period(void) { char *dipf_main= debug_report_ipf(main_input_file); char *dipf_flushing= debug_report_ipf(flushing_input_file); char *dipf_backlog= debug_report_ipf(backlog_input_file); debug("PERIOD" " sms=%s[%d] conns=%d queue=%d until_connect=%d" - " input_files main:%s old:%s flushing:%s" + " input_files main:%s flushing:%s backlog:%s" " children connecting=%ld inndcomm=%ld" , sms_names[sms], sm_period_counter, @@ -2649,7 +2944,7 @@ EVERY(period, -1,0, ({ statemc_period_poll(); check_assign_articles(); check_idle_conns(); -})); +} /*========== option parsing ==========*/ @@ -2822,7 +3117,7 @@ static const Option innduct_options[]= { {'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 }, {0,"no-daemon", 0, &become_daemon, op_setint, 0 }, {0,"no-streaming", 0, &try_stream, op_setint, 0 }, -{0,"inndconf", "F", &inndconffile, op_string }, +{'C',"inndconf", "F", &inndconffile, op_string }, {'P',"port", "PORT", &port, op_integer }, {0,"help", 0, 0, help }, @@ -2887,6 +3182,9 @@ int main(int argc, char **argv) { /* defaults */ + int r= innconf_read(inndconffile); + if (!r) badusage("could not read inn.conf (more info on stderr)"); + if (!remote_host) remote_host= sitename; if (nocheck_thresh < 0 || nocheck_thresh > 100) @@ -2909,7 +3207,6 @@ int main(int argc, char **argv) { max_bad_data_ratio *= 0.01; if (!feedfile) { - innconf_read(inndconffile); feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename); } else if (!feedfile[0]) { badusage("feed filename must be nonempty"); @@ -2972,21 +3269,23 @@ int main(int argc, char **argv) { notice("starting"); + if (!become_daemon) + control_stdio(); + control_init(); if (!filemon_method_init()) { warn("no file monitoring available, polling"); - filepoll_schedule(); + every(5,0,filepoll); } - period_timeout.tv_sec= period_seconds; - period_schedule(); + every(period_seconds,1,period); statemc_init(); /* let's go */ - void *r= oop_sys_run(sysloop); - assert(r == OOP_ERROR); + void *run= oop_sys_run(sysloop); + assert(run == OOP_ERROR); sysdie("event loop failed"); }