X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ian/git?a=blobdiff_plain;f=backends%2Finnduct.c;h=427792089e6ca7dab1c96e51941a6cfab9aa00d4;hb=58574c7dd1bb38e5e4ba4a6c4bd459e9e9a8e625;hp=091b1985a3688435a31d261ffb381d1666325438;hpb=01c8d7ea6ab315c5ac205e8f4096dca83dfe0b41;p=inn-innduct.git diff --git a/backends/innduct.c b/backends/innduct.c index 091b198..4277920 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,12 +1,13 @@ /* * todo - * - actually do something with readable on control master + * - inotify not working ? + * - some per-conn info thing for control * - option for realsockdir * - option for filepoll * - option for no inotify * - manpage: document control master stuff - * - manpage: innconf is used for communicating with innd - * - debug this: + * + * debugging rune: * build-lfs/backends/innduct --no-daemon -f `pwd`/fee sit dom */ @@ -190,6 +191,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #include #include #include +#include #include #include @@ -218,7 +220,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. int count; \ } T##List -#define NODE(n) (assert((void*)&(n)->list_node == &(n)), &(n)->list_node) +#define NODE(n) (assert((void*)&(n)->list_node == (n)), &(n)->list_node) #define LIST_CHECKCANHAVENODE(l,n) \ ((void)((n) == ((l).u.for_type))) /* just for the type check */ @@ -244,7 +246,7 @@ perl -ne 'print if m/-8\<-/..m/-\>8-/; print "\f" if m/-\^L-/' backends/innduct. #define LIST_REMHEAD(l) LIST_REMSOMEHOW((l),list_remhead) #define LIST_REMTAIL(l) LIST_REMSOMEHOW((l),list_remtail) -#define LIST_INIT(l) (list_new(&(l).u.li)) +#define LIST_INIT(l) ((l).count=0, list_new(&(l).u.li)) #define LIST_HEAD(l) ((typeof((l).u.for_type))(list_head((struct list*)&(l)))) #define LIST_NEXT(n) ((typeof(n))list_succ(NODE((n)))) #define LIST_BACK(n) ((typeof(n))list_pred(NODE((n)))) @@ -290,6 +292,8 @@ static void statemc_setstate(StateMachineState newsms, int periods, static void statemc_start_flush(const char *why); /* Normal => Flushing */ static void spawn_inndcomm_flush(const char *why); /* Moved => Flushing */ +static void article_done(Conn *conn, Article *art, int whichcount); + static void check_assign_articles(void); static void queue_check_input_done(void); @@ -358,7 +362,7 @@ typedef enum { /* in queue in conn->sent */ art_Unchecked, /* not checked, not sent checking */ art_Wanted, /* checked, wanted sent body as requested */ art_Unsolicited, /* - sent body without check */ - art_MaxState + art_MaxState, } ArtState; #define RESULT_COUNTS(RCS,RCN) \ @@ -367,9 +371,10 @@ typedef enum { /* in queue in conn->sent */ RCN(unwanted) \ RCN(rejected) \ RCN(deferred) \ + RCN(missing) \ RCN(connretry) -#define RCI_TRIPLE_FMT_BASE "%d(id%d+bd%d+nc%d)" +#define RCI_TRIPLE_FMT_BASE "%d (id=%d,bod=%d,nc=%d)" #define RCI_TRIPLE_VALS_BASE(counts,x) \ counts[art_Unchecked] x \ + counts[art_Wanted] x \ @@ -426,7 +431,7 @@ struct InputFile { struct Article { ISNODE(Article); ArtState state; - int midlen; + int midlen, missing; InputFile *ipf; TOKEN token; off_t offset; @@ -631,8 +636,9 @@ static int xwaitpid(pid_t *pid, const char *what) { int r= kill(*pid, SIGKILL); if (r) sysdie("cannot kill %s child", what); - pid_t got= waitpid(*pid, &status, WNOHANG); + pid_t got= waitpid(*pid, &status, 0); if (got==-1) sysdie("cannot reap %s child", what); + if (got==0) die("cannot reap %s child", what); *pid= 0; @@ -1037,6 +1043,7 @@ static void *conn_exception(oop_source *lp, int fd, static void vconnfail(Conn *conn, const char *fmt, va_list al) { int requeue[art_MaxState]; + memset(requeue,0,sizeof(requeue)); Article *art; while ((art= LIST_REMHEAD(conn->priority))) LIST_ADDTAIL(queue, art); @@ -1122,10 +1129,7 @@ static int connecting_fdpass_sock; static void connect_attempt_discard(void) { if (connecting_child) { - int r= kill(connecting_child, SIGTERM); - if (r) syswarn("failed to kill connecting child"); int status= xwaitpid(&connecting_child, "connect"); - if (!(WIFEXITED(status) || (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) report_child_status("connect", status); @@ -1137,9 +1141,15 @@ static void connect_attempt_discard(void) { } #define PREP_DECL_MSG_CMSG(msg) \ + char msgbyte= 0; \ + struct iovec msgiov; \ + msgiov.iov_base= &msgbyte; \ + msgiov.iov_len= 1; \ struct msghdr msg; \ memset(&msg,0,sizeof(msg)); \ - char msg##cbuf[CMSG_SPACE(sizeof(fd))]; \ + char msg##cbuf[CMSG_SPACE(sizeof(int))]; \ + msg.msg_iov= &msgiov; \ + msg.msg_iovlen= 1; \ msg.msg_control= msg##cbuf; \ msg.msg_controllen= sizeof(msg##cbuf); @@ -1148,42 +1158,45 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { assert(fd == connecting_fdpass_sock); + PREP_DECL_MSG_CMSG(msg); + + ssize_t rs= recvmsg(fd, &msg, 0); + if (rs<0) { + if (isewouldblock(errno)) return OOP_CONTINUE; + syswarn("failed to read socket from connecting child"); + goto x; + } + conn= xmalloc(sizeof(*conn)); memset(conn,0,sizeof(*conn)); + LIST_INIT(conn->waiting); + LIST_INIT(conn->priority); + LIST_INIT(conn->sent); - PREP_DECL_MSG_CMSG(msg); struct cmsghdr *h= 0; - ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT); if (rs >= 0) h= CMSG_FIRSTHDR(&msg); if (!h) { - int status; - pid_t got= waitpid(connecting_child, &status, WNOHANG); - if (got != -1) { - assert(got==connecting_child); - connecting_child= 0; - if (WIFEXITED(status)) { - if (WEXITSTATUS(status) != 0 && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && - WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM) - /* child already reported the problem */; - else - warn("connect: connection child exited code %d but no cmsg", + int status= xwaitpid(&connecting_child, "connect child (broken)"); + + if (WIFEXITED(status)) { + if (WEXITSTATUS(status) != 0 && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM && + WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM) + /* child already reported the problem */; + else { + if (e == OOP_EXCEPTION) + warn("connect: connection child exited code %d but" + " unexpected exception on fdpass socket", WEXITSTATUS(status)); - } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { - warn("connect: connection attempt timed out"); - } else { - report_child_status("connect", status); + else + warn("connect: connection child exited code %d but" + " no cmsg (rs=%d)", + WEXITSTATUS(status), (int)rs); } + } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) { + warn("connect: connection attempt timed out"); } else { - /* child is still running apparently, report the socket problem */ - if (rs < 0) - syswarn("connect: read from fdpass socket failed"); - else if (e == OOP_EXCEPTION) - warn("connect: unexpected exception on fdpass socket"); - else if (!rs) - warn("connect: unexpected EOF on fdpass socket"); - else - fatal("connect: unexpected lack of cmsg from child"); + report_child_status("connect", status); } goto x; } @@ -1219,9 +1232,6 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { } /* Phew! */ - LIST_INIT(conn->waiting); - LIST_INIT(conn->priority); - LIST_INIT(conn->sent); conn->max_queue= conn->stream ? max_queue_per_conn : 1; loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn); @@ -1237,7 +1247,7 @@ static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { connect_attempt_discard(); check_assign_articles(); - return 0; + return OOP_CONTINUE; x: conn_dispose(conn); @@ -1255,7 +1265,7 @@ static void connect_start(void) { assert(!connecting_child); assert(!connecting_fdpass_sock); - notice("starting connection attempt"); + info("starting connection attempt"); int socks[2]; int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks); @@ -1272,13 +1282,26 @@ static void connect_start(void) { alarm(connection_setup_timeout); if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) { - if (buf[0]) fatal("connect: rejected: %s", sanitise(buf)); - else sysfatal("connect: connection attempt failed"); + int l= strlen(buf); + int stripped=0; + while (l>0) { + unsigned char c= buf[l-1]; + if (!isspace(c)) break; + if (c=='\n' || c=='\r') stripped=1; + --l; + } + if (!buf[0]) { + sysfatal("connect: connection attempt failed"); + } else { + buf[l]= 0; + fatal("connect: %s: %s", stripped ? "rejected" : "failed", + sanitise(buf)); + } } if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0) sysfatal("connect: authentication failed"); if (try_stream) { - if (fputs("MODE STREAM\r\n", cn_to) || + if (fputs("MODE STREAM\r\n", cn_to)==EOF || fflush(cn_to)) sysfatal("connect: could not send MODE STREAM"); buf[sizeof(buf)-1]= 0; @@ -1290,7 +1313,7 @@ static void connect_start(void) { } int l= strlen(buf); assert(l>=1); - if (buf[-1]!='\n') + if (buf[l-1]!='\n') fatal("connect: response to MODE STREAM is too long: %.100s...", sanitise(buf)); l--; if (l>0 && buf[l-1]=='\r') l--; @@ -1325,13 +1348,15 @@ static void connect_start(void) { msg.msg_controllen= cmsg->cmsg_len; r= sendmsg(socks[1], &msg, 0); - if (r) sysdie("sendmsg failed for new connection"); + if (r<0) sysdie("sendmsg failed for new connection"); + if (r!=1) die("sendmsg for new connection gave wrong result %d",r); _exit(exitstatus); } xclose(socks[1], "connecting fdpass child's socket",0); connecting_fdpass_sock= socks[0]; + xsetnonblock(connecting_fdpass_sock, 1); on_fd_read_except(connecting_fdpass_sock, connchild_event); } @@ -1365,6 +1390,7 @@ static void check_assign_articles(void) { if (!inqueue) use->since_activity= 0; /* reset idle counter */ while (spare>0) { Article *art= LIST_REMHEAD(queue); + if (!art) break; LIST_ADDTAIL(use->waiting, art); spare--; } @@ -1489,12 +1515,23 @@ static void conn_make_some_xmits(Conn *conn) { ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL); + art->state= + art->state == art_Unchecked ? art_Unsolicited : + art->state == art_Wanted ? art_Wanted : + (abort(),-1); + + if (!artdata) art->missing= 1; + art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++; + if (conn->stream) { if (artdata) { XMIT_LITERAL("TAKETHIS "); xmit_noalloc(conn, art->messageid, art->midlen); XMIT_LITERAL("\r\n"); xmit_artbody(conn, artdata); + } else { + article_done(conn, art, -1); + continue; } } else { /* we got 235 from IHAVE */ @@ -1505,20 +1542,15 @@ static void conn_make_some_xmits(Conn *conn) { } } - art->state= - art->state == art_Unchecked ? art_Unsolicited : - art->state == art_Wanted ? art_Wanted : - (abort(),-1); - art->ipf->counts[art->state][RC_sent]++; LIST_ADDTAIL(conn->sent, art); } else { /* check it */ if (conn->stream) - XMIT_LITERAL("IHAVE "); - else XMIT_LITERAL("CHECK "); + else + XMIT_LITERAL("IHAVE "); xmit_noalloc(conn, art->messageid, art->midlen); XMIT_LITERAL("\r\n"); @@ -1620,7 +1652,8 @@ static void update_nocheck(int accepted) { } static void article_done(Conn *conn, Article *art, int whichcount) { - art->ipf->counts[art->state][whichcount]++; + if (!art->missing) art->ipf->counts[art->state][whichcount]++; + if (whichcount == RC_accepted) update_nocheck(1); else if (whichcount == RC_unwanted) update_nocheck(0); @@ -1628,6 +1661,11 @@ static void article_done(Conn *conn, Article *art, int whichcount) { while (art->blanklen) { static const char spaces[]= + " " + " " + " " + " " + " " " " " " " " @@ -1692,7 +1730,7 @@ static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev, Article *art; #define GET_ARTICLE(musthavesent) \ - art= article_reply_check(conn, data, musthavesent, code_streaming, sani); \ + art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \ if (art) ; else return OOP_CONTINUE /* reply_check has failed the conn */ #define ARTICLE_DEALTWITH(streaming,musthavesent,how) \ @@ -1766,7 +1804,7 @@ static void feedfile_eof(InputFile *ipf) { } static InputFile *open_input_file(const char *path) { - int fd= open(path, O_RDONLY); + int fd= open(path, O_RDWR); if (fd<0) { if (errno==ENOENT) return 0; sysfatal("unable to open input file %s", path); @@ -1871,7 +1909,7 @@ static void *feedfile_got_article(oop_source *lp, oop_read *rd, art->midlen= midlen; art->ipf= ipf; ipf->inprogress++; art->token= TextToToken(tokentextbuf); - art->offset= ipf->offset; + art->offset= old_offset; art->blanklen= recsz; strcpy(art->messageid, space+1); LIST_ADDTAIL(queue, art); @@ -1949,10 +1987,10 @@ static ssize_t tailing_try_read(struct oop_readable *rable, void *buffer, /*---------- filemon implemented with inotify ----------*/ -#if defined(HAVE_INOTIFY) && !defined(HAVE_FILEMON) +#if defined(HAVE_SYS_INOTIFY_H) && !defined(HAVE_FILEMON) #define HAVE_FILEMON -#include +#include static int filemon_inotify_fd; static int filemon_inotify_wdmax; @@ -1968,7 +2006,7 @@ static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { if (wd >= filemon_inotify_wdmax) { int newmax= wd+2; - filemon_inotify_wd= xrealloc(filemon_inotify_wd2ipf, + filemon_inotify_wd2ipf= xrealloc(filemon_inotify_wd2ipf, sizeof(*filemon_inotify_wd2ipf) * newmax); memset(filemon_inotify_wd2ipf + filemon_inotify_wdmax, 0, sizeof(*filemon_inotify_wd2ipf) * (newmax - filemon_inotify_wdmax)); @@ -1987,7 +2025,7 @@ static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { int wd= pf->wd; debug("filemon inotify stopfile %p wd=%d", ipf, wd); - int r= inotify_rm_watch(filemon_inotify_fd, filemon_inotify_wd); + int r= inotify_rm_watch(filemon_inotify_fd, wd); if (r) sysdie("inotify_rm_watch"); filemon_inotify_wd2ipf[wd]= 0; } @@ -2006,7 +2044,7 @@ static void *filemon_inotify_readable(oop_source *lp, int fd, die("inotify read %d bytes wanted struct of %d", r, (int)sizeof(iev)); } InputFile *ipf= filemon_inotify_wd2ipf[iev.wd]; - debug("filemon inotify readable read %p wd=%p", iev.wd, ipf); + debug("filemon inotify readable read %d wd=%p", iev.wd, ipf); filemon_callback(ipf); } return OOP_CONTINUE; @@ -2015,11 +2053,11 @@ static void *filemon_inotify_readable(oop_source *lp, int fd, static int filemon_method_init(void) { filemon_inotify_fd= inotify_init(); if (filemon_inotify_fd<0) { - syswarn("could not initialise inotify: inotify_init failed"); + syswarn("filemon/inotify: inotify_init failed"); return 0; } - set nonblock; - loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable); + xsetnonblock(filemon_inotify_fd, 1); + loop->on_fd(loop, filemon_inotify_fd, OOP_READ, filemon_inotify_readable, 0); debug("filemon inotify init filemon_inotify_fd=%d", filemon_inotify_fd); return 1; @@ -2033,7 +2071,10 @@ static int filemon_method_init(void) { struct Filemon_Perfile { int dummy; }; -static int filemon_method_init(void) { return 0; } +static int filemon_method_init(void) { + warn("filemon/dummy: no filemon method compiled in"); + return 0; +} static void filemon_method_startfile(InputFile *ipf, Filemon_Perfile *pf) { } static void filemon_method_stopfile(InputFile *ipf, Filemon_Perfile *pf) { } @@ -2362,8 +2403,8 @@ static void notice_processed(InputFile *ipf, const char *what, #define CNT(art,rc) (ipf->counts[art_##art][RC_##rc]) - info("processed %s%s read=%d(+%dbl,+%derr)" - " offered=%d(ch%d,nc%d) accepted=%d(ch%d+nc%d)" + info("processed %s%s read=%d (+bl=%d,+err=%d)" + " offered=%d (ch=%d,nc=%d) accepted=%d (ch=%d,nc=%d)" RESULT_COUNTS(RCI_NOTHING, RCI_TRIPLE_FMT) , what, spec, @@ -2387,7 +2428,7 @@ static void statemc_check_backlog_done(void) { const char *under= strchr(slash, '_'); const char *rest= under ? under+1 : leaf; if (!strncmp(rest,"backlog",7)) rest += 7; - notice_processed(ipf,"backlog:",rest); + notice_processed(ipf,"backlog ",rest); close_input_file(ipf); if (unlink(ipf->path)) { @@ -2409,7 +2450,7 @@ static void statemc_check_flushing_done(void) { assert(sms==sm_SEPARATED || sms==sm_DROPPING); - notice_processed(ipf,"feedfile",0); + notice_processed(ipf,"feedfile",""); close_defer(); @@ -2421,7 +2462,7 @@ static void statemc_check_flushing_done(void) { if (sms==sm_SEPARATED) { notice("flush complete"); - SMS(NORMAL, 0, "flush complete"); + SMS(NORMAL, spontaneous_flush_periods, "flush complete"); } else if (sms==sm_DROPPING) { SMS(DROPPED, 0, "old flush complete"); search_backlog_file(); @@ -2859,7 +2900,7 @@ static char *debug_report_ipf(InputFile *ipf) { return xasprintf("%p/%s:ip=%ld,off=%ld,fd=%d%s", ipf, path, ipf->inprogress, (long)ipf->offset, - ipf->fd, ipf->rd ? "+" : ""); + ipf->fd, ipf->rd ? "" : ",!rd"); } static void period(void) { @@ -2869,7 +2910,7 @@ static void period(void) { debug("PERIOD" " sms=%s[%d] conns=%d queue=%d until_connect=%d" - " input_files main:%s old:%s flushing:%s" + " input_files main:%s flushing:%s backlog:%s" " children connecting=%ld inndcomm=%ld" , sms_names[sms], sm_period_counter, @@ -3062,7 +3103,7 @@ static const Option innduct_options[]= { {'q',"quiet-multiple", 0, &quiet_multiple, op_setint, 1 }, {0,"no-daemon", 0, &become_daemon, op_setint, 0 }, {0,"no-streaming", 0, &try_stream, op_setint, 0 }, -{0,"inndconf", "F", &inndconffile, op_string }, +{'C',"inndconf", "F", &inndconffile, op_string }, {'P',"port", "PORT", &port, op_integer }, {0,"help", 0, 0, help }, @@ -3127,6 +3168,9 @@ int main(int argc, char **argv) { /* defaults */ + int r= innconf_read(inndconffile); + if (!r) badusage("could not read inn.conf (more info on stderr)"); + if (!remote_host) remote_host= sitename; if (nocheck_thresh < 0 || nocheck_thresh > 100) @@ -3149,7 +3193,6 @@ int main(int argc, char **argv) { max_bad_data_ratio *= 0.01; if (!feedfile) { - innconf_read(inndconffile); feedfile= xasprintf("%s/%s",innconf->pathoutgoing,sitename); } else if (!feedfile[0]) { badusage("feed filename must be nonempty"); @@ -3218,7 +3261,7 @@ int main(int argc, char **argv) { control_init(); if (!filemon_method_init()) { - warn("no file monitoring available, polling"); + warn("filemon: no file monitoring available, polling"); every(5,0,filepoll); } @@ -3228,7 +3271,7 @@ int main(int argc, char **argv) { /* let's go */ - void *r= oop_sys_run(sysloop); - assert(r == OOP_ERROR); + void *run= oop_sys_run(sysloop); + assert(run == OOP_ERROR); sysdie("event loop failed"); }