From ece47a280c15bca137b918d8fa9695e7c38c25df Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Sun, 29 Nov 2009 17:07:13 +0000 Subject: [PATCH] WIP before new iov queue --- backends/innduct.c | 375 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 315 insertions(+), 60 deletions(-) diff --git a/backends/innduct.c b/backends/innduct.c index 9be77ee..610504e 100644 --- a/backends/innduct.c +++ b/backends/innduct.c @@ -1,73 +1,300 @@ /* + * Four files full of + * token article + * + * site.name_ductlock lock taken out by innduct + * F site.name written by innd + * D site.name_duct moved aside by innduct, for flush + * site.name_deferwork 431'd articles, still being written + * site.name_defergo_ 431'd articles, ready for innxmit + * site.name_deferlock lock taken out by innxmit wrapper + * + * + * + * OVERALL STATES: + * + * START + * ,-->--. | + * | | stat D + * | | / | + * | | ENOENT/ |exists + * | V <----------' | + * | Normal stat F + * | F: innd writing, duct reading /|\ + * | D: ENOENT / | \ + * | | / | \ + * | | duct decides time to flush same / | | + * | | duct makes hardlink as D / | | + * | | / | | + * | V <---------' | | + * | Hardlinked | | + * | F == D: innd writing, duct reading | | + * ^ | | | + * | | duct unlinks F / | + * | V ENOENT / | + * | Moved <------------' | + * | F: ENOENT | + * | D: innd writing, duct reading | + * | | | + * | | duct flushes feed | + * | | (others can too, harmlessly) | + * | V | + * | Separated <-----------------' + * | F: innd writing different to D + * | D: duct reading + * | | + * | V duct completes processing of D + * | | duct unlinks D + * | | + * `--<--' + * */ -static int max_connections, articles_per_connect_attempt; +static int max_connections, max_queue_per_conn; static int connection_setup_timeout, port, try_stream; static const char *remote_host; +#define ISNODE(T) T *next, *back; +#define LIST(T) struct { T *head, *tail, *tailpred; int count; } + +#define NODE(n) ((struct node*)&(n)->head) + +#define LIST_ADDHEAD(l,n) \ + (list_addhead((struct list*)&(l), NODE((n))), (void)(l).count++) +#define LIST_ADDTAIL(l,n) \ + (list_addtail((struct list*)&(l), NODE((n))), (void)(l).count++) + +#define LIST_REMHEAD(l) \ + ((l).count ? ((l).count--, (void*)list_remhead((struct list*)&(l))) : 0) +#define LIST_REMTAIL(l) \ + ((l).count ? ((l).count--, (void*)list_remtail((struct list*)&(l))) : 0) +#define LIST_REMOVE(l,n) \ + (list_remove(NODE((n))), (void)(l).count--) +#define LIST_INSERT(l,n,pred) \ + (list_insert((struct list*)&(l), NODE((n)), NODE((pred))), (void)(l).count++) + struct Article { + char *mid; + int midlen; + int nocheck; /* also used when CHECK says yes please */ }; +#define CONNBUFSZ 16384 + +#define CN "<%d> " + typedef struct Conn Conn; struct Conn { - Conn *next, *back; - int fd; - Article *queue; + ISNODE(Conn); + int fd, max_queue, stream; + LIST(Article) queue; + Article *tosend; /* points into queue */ + char circ_buf[CONNBUFSZ]; + unsigned circ_read, circ_write; }; -#define CHILD_ESTATUS_STREAM 1 -#define CHILD_ESTATUS_NOSTREAM 2 +#define CHILD_ESTATUS_STREAM 4 +#define CHILD_ESTATUS_NOSTREAM 5 static int since_connect_attempt; static int nconns; -static struct { Conn *head, *tail } idle, working, full; +static LIST(Conn) idle, working, full; + +static LIST(Article) *queue; + +static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } } + + +/*========== making new connections ==========*/ -static Conn *connecting; static int connecting_sockets[2]= {-1,-1}; static pid_t connecting_child; -static Article *currentart; +static void report_child_status(const char *what, int status) { + if (WIFEXITED(status)) { + int es= WEXITSTATUS(status); + if (es) + warn("%s: child died with error exit status %d",es); + } else if (WIFSIGNALED(status)) { + int sig= WTERMSIG(status); + const char *sigstr= strsignal(sig); + const char *coredump= WCOREDUMP(status) ? " (core dumped)" : ""; + if (sigstr) + warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump); + else + warn("%s: child died due to unknown fatal signal %d%s", + what, sig, coredump); + } else { + warn("%s: child died with unknown wait status %d", status); + } +} -static void start_connecting() { - r= socketpair(AF_UNIX, SOCK_STREAM, 0, connecting_sockets); - if (r) { syswarn("cannot create socketpair for connect child"); goto x; } +static void connect_attempt_discard(void) { + if (connecting_sockets[0]) { + cancel_fd(loop, connecting_sockets[0], OOP_READ); + cancel_fd(loop, connecting_sockets[0], OOP_EXCEPTION); + } + perhaps_close(&connecting_sockets[0]); + perhaps_close(&connecting_sockets[1]); + + if (connecting_child) { + int status; + r= kill(connecting_child, SIGKILL); + if (r) sysdie("cannot kill connect child"); + + pid_t got= waitpid(connecting_child, &status, WNOHANG); + if (got==-1) sysdie("cannot reap connect child"); + + if (!(WIFEXITED(status) || + (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) { + report_child_status("connect" + } + connecting_child= 0; + } +} + +#define PREP_DECL_MSG_CMSG(msg) \ + struct msghdr msg; \ + memset(&msg,0,sizeof(msg)); \ + char msg##cbuf[CMSG_SPACE(sizeof(fd))]; \ + msg.msg_control= msg##cbuf; \ + msg.msg_controllen= sizeof(msg##cbuf); + +static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) { + Conn *conn= 0; + + conn= xcalloc(sizeof(*conn)); + DECL_MSG_CMSG(msg); + struct cmsghdr *h= 0; + ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT); + if (rs >= 0) h= CMSG_FIRSTHDR(&msg); + if (!h) { + int status; + pid_t got= waitpid(connecting_child, &status, WNOHANG); + if (got != -1) { + assert(got==connecting_child); + connecting_child= 0; + if (WIFEXITED(status) && + (WEXITSTATUS(status) != 0 + WEXITSTATUS(status) != CHILD_ESTATUS_STREAM && + WEXITSTATUS(status) != CHILD_ESTATUS_NOSTREAM)) { + /* child already reported the problem */ + } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALARM) { + warn("connect: connection attempt timed out"); + } else if (!WIFEXITED(status)) { + report_child_status("connect", status); + /* that's probably the root cause then */ + } + } else { + /* child is still running apparently, report the socket problem */ + if (rs < 0) + syswarn("connect: read from child socket failed"); + else if (e == OOP_EXCEPTIONN) + warn("connect: unexpected exception on child socket"); + else + warn("connect: unexpected EOF on child socket"); + } + goto x; + } + +#define CHK(field, val) \ + if (h->cmsg_##field != val) { \ + die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d"); \ + goto x; \ + } + CHK(level, SOL_SOCKET); + CHK(type, SCM_RIGHTS); + CHK(len, CMSG_LEN(sizeof(conn-b>fd))); +#undef CHK + + if (CMSG_NXTHDR,&msg,h) { die("connect: child sent many cmsgs"); goto x; } + + memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd)); + + pid_t got= waitpid(connecting_child, &status, 0); + if (got==-1) sysdie("connect: real wait for child"); + assert(got == connecting_child); + connecting_child= 0; + + if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; } + int es= WEXITSTATUS(status); + switch (es) { + case CHILD_ESTATUS_STREAM: conn->stream= 1; break; + case CHILD_ESTATUS_NOSTREAM: conn->stream= 0; break; + default: + die("connect: child gave unexpected exit status %d", es); + } + + set nonblocking; + + /* Phew! */ + LIST_ADDHEAD(idle, conn); + notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain"); + connect_attempt_discard(); + process_queue(); + return 0; + + x: + if (conn) { + perhaps_close(&conn->fd); + free(conn); + } + connect_attempt_discard(); +} + +static void connect_start() { + assert(!connecting_sockets[0]); + assert(!connecting_sockets[1]); + assert(!connecting_child); + + notice("starting connection attempt"); + + r= socketpair(AF_UNIX, SOCK_STREAM, 0, connecting_sockets); + if (r) { syswarn("connect: cannot create socketpair for child"); goto x; } + connecting_child= fork(); - if (connecting_child==-1) { syswarn("cannot fork for connect"); goto x; } - + if (connecting_child==-1) { syswarn("connect: cannot fork"); goto x; } + if (!connecting_child) { FILE *cn_from, *cn_to; char buf[NNTP_STRLEN+100]; int exitstatus= CHILD_ESTATUS_NOSTREAM; + put sigpipe back; + close unwanted fds; + + r= close(connecting_sockets[0]); + if (r) sysdie("connect: close parent socket in child"); + alarm(connection_setup_timeout); if (NNTPconnect(remote_host, port, &cn_from, &cn_to, buf) < 0) { if (buf[0]) { sanitise_inplace(buf); - die("%s: connection rejected: %s", remote_host, buf); + die("connect: rejected: %s", buf); } else { - sysdie("%s: connection attempt failed", remote_host); + sysdie("connect: connection attempt failed"); } } if (NNTPsendpassword(remote_host, cn_from, cn_to) < 0) - sysdie("%s: authentication failed", remote_host); + sysdie("connect: authentication failed"); if (try_stream) { if (fputs("MODE STREAM\r\n", cn_to) || fflush(cn_to)) - sysdie("%s: could not send MODE STREAM", remote_host); + sysdie("connect: could not send MODE STREAM"); buf[sizeof(buf)-1]= 0; if (!fgets(buf, sizeof(buf)-1, cn_from)) { if (ferror(cn_from)) - sysdie("%s: could not read response to MODE STREAM", remote_host); + sysdie("connect: could not read response to MODE STREAM"); else - die("%s: connection close in response to MODE STREAM", remote_host); + die("connect: connection close in response to MODE STREAM"); } int l= strlen(buf); assert(l>=1); if (buf[-1]!='\n') { sanitise_inplace(buf); - die("%s: response to MODE STREAM is too long: %.100s...", + die("connect: response to MODE STREAM is too long: %.100s...", remote_host, buf); } l--; if (l>0 && buf[1-]=='\r') l--; @@ -76,7 +303,7 @@ static void start_connecting() { int rcode= strtoul(buf,&ep,10); if (ep != buf[3]) { sanitise_inplace(buf); - die("%s: bad response to MODE STREAM: %.50s", remote_host, buf); + die("connect: bad response to MODE STREAM: %.50s", buf); } switch (rcode) { case 203: @@ -87,70 +314,98 @@ static void start_connecting() { break; default: sanitise_inplace(buf); - warn("%s: bad response to MODE STREAM: %.50s", remote_host, buf); + warn("connect: unexpected response to MODE STREAM: %.50s", buf); exitstatus= 2; break; } } int fd= fileno(cn_from); - char cmsgbuf[CMSG_SPACE(sizeof(fd))]; - struct msghdr msg; - memset(&msg,0,sizeof(msg)); - msg.msg_control= cmsgbuf; - msg.msg_controllen= sizeof(cmsgbuf); - + PREP_DECL_MSG_CMSG(msg); struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg); cmsg->cmsg_level= SOL_SOCKET; cmsg->cmsg_type= SCM_RIGHTS; cmsg->cmsg_len= CMSG_LEN(sizeof(fd)); memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd)); - + msg.msg_controllen= cmsg->cmsg_len; - r= sendmsg(childs_socket, &msg, 0); - if (r) sysdie("%s: sendmsg failed for new connection", remote_host); + r= sendmsg(connecting_sockets[1], &msg, 0); + if (r) sysdie("sendmsg failed for new connection"); _exit(exitstatus); } - on_fd(loop, + r= close(connecting_sockets[1]); connecting_sockets[1]= 0; + if (r) sysdie("connect: close child socket in parent"); - struct cmsghdr *cmsg; - - /* NNTPconnect inexplicably duplicates the fd but we don't care - * about that as we're going to exit shortly */ - - - - - warn(" - - syswarn(" - int e= errno; - sanitise(buf); - syswarn(" + loop->on_fd(loop, connecting_sockets[0], OOP_READ, connchild_event, 0); + loop->on_fd(loop, connecting_sockets[0], OOP_EXCEPTION, connchild_event, 0); + return OOP_CONTINUE; x: - kill_connection_attempt(); + connect_attempt_discard(); } -static void kill_connection_attempt() { - fixme; - connecting_sockets[0]= connecting_sockets[1]= -1; - connecting_child= 0; -} +/*========== overall control of article flow ==========*/ -static void process_any_article() { - if (!currentart) +static void process_queue() { + if (!queue.count) return; - + if (working.head) { transmit(working.head); } else if (idle.head) { transmit(idle.head); - } else if (nconns < maxconns && !connecting_child && - since_connect_attempt >= connect_attempt_limiter) { - since_connect_attempt= 0; + } else if (nconns < maxconns && queue.count >= max_queue_per_conn && + !connecting_child && !connect_delay) { + connect_delay= reconnect_delay_periods; connect_start(); - } + } +} + +/*========== article transmission ==========*/ + +static void *conn_writeable() { + for (;;) { + int circ_used= circ_write - circ_read; + if (circ_used < 0) circ_used += CONNBUFSZ; + writeable_moredata(conn, CONNBUFSZ-1 - circ_used); + + if (conn->circ_read == conn->circ_write) + return OOP_CONTINUE; + + struct iovec iov[2]; + int niov= 1; + iov[0].iov_base= conn->circ_buf + conn->circ_read; + if (conn->circ_read > conn->circ_write) { /* wrapped */ + iov[0].iov_len= CONNBUFSZ - conn->circ_read; + iov[1].iov_base= conn->circ_buf; + iov[1].iov_len= conn->circ_write; + if (niov[1].iov_len) niov= 2; + } else { + iov[0].iov_len= conn->circ_write - conn->circ_read; + } + ssize_t rs= writev(conn->fd, &iov, niov); + if (rs < 0) { + if (errno == EAGAIN) return OOP_CONTINUE; + syswarn(CN "write failed", conn->fd); + conn_failed(conn); + return OOP_CONTINUE; + } + assert(rs > 0); + + conn->circ_read += rs; + if (conn->circ_read > CONNBUFSZ) + conn->circ_read -= CONNBUFSZ; + } } + + + +static void transmit(Conn *conn) { + assert(conn->queue.count < max_queue); + + +main { + ignore sigpipe; +}; -- 2.30.2