/*
+ * Four files full of
+ * token article
+ *
+ * site.name_ductlock lock taken out by innduct
+ * F site.name written by innd
+ * D site.name_duct moved aside by innduct, for flush
+ * site.name_deferwork 431'd articles, still being written
+ * site.name_defergo_<inum> 431'd articles, ready for innxmit
+ * site.name_deferlock lock taken out by innxmit wrapper
+ *
+ *
+ *
+ * OVERALL STATES:
+ *
+ * START
+ * ,-->--. |
+ * | | stat D
+ * | | / |
+ * | | ENOENT/ |exists
+ * | V <----------' |
+ * | Normal stat F
+ * | F: innd writing, duct reading /|\
+ * | D: ENOENT / | \
+ * | | / | \
+ * | | duct decides time to flush same / | |
+ * | | duct makes hardlink as D / | |
+ * | | / | |
+ * | V <---------' | |
+ * | Hardlinked | |
+ * | F == D: innd writing, duct reading | |
+ * ^ | | |
+ * | | duct unlinks F / |
+ * | V ENOENT / |
+ * | Moved <------------' |
+ * | F: ENOENT |
+ * | D: innd writing, duct reading |
+ * | | |
+ * | | duct flushes feed |
+ * | | (others can too, harmlessly) |
+ * | V |
+ * | Separated <-----------------'
+ * | F: innd writing different to D
+ * | D: duct reading
+ * | |
+ * | V duct completes processing of D
+ * | | duct unlinks D
+ * | |
+ * `--<--'
+ *
*/
-static int max_connections, articles_per_connect_attempt;
+static int max_connections, max_queue_per_conn;
static int connection_setup_timeout, port, try_stream;
static const char *remote_host;
+#define ISNODE(T) T *next, *back;
+#define LIST(T) struct { T *head, *tail, *tailpred; int count; }
+
+#define NODE(n) ((struct node*)&(n)->head)
+
+#define LIST_ADDHEAD(l,n) \
+ (list_addhead((struct list*)&(l), NODE((n))), (void)(l).count++)
+#define LIST_ADDTAIL(l,n) \
+ (list_addtail((struct list*)&(l), NODE((n))), (void)(l).count++)
+
+#define LIST_REMHEAD(l) \
+ ((l).count ? ((l).count--, (void*)list_remhead((struct list*)&(l))) : 0)
+#define LIST_REMTAIL(l) \
+ ((l).count ? ((l).count--, (void*)list_remtail((struct list*)&(l))) : 0)
+#define LIST_REMOVE(l,n) \
+ (list_remove(NODE((n))), (void)(l).count--)
+#define LIST_INSERT(l,n,pred) \
+ (list_insert((struct list*)&(l), NODE((n)), NODE((pred))), (void)(l).count++)
+
struct Article {
+ char *mid;
+ int midlen;
+ int nocheck; /* also used when CHECK says yes please */
};
+#define CONNBUFSZ 16384
+
+#define CN "<%d> "
+
typedef struct Conn Conn;
struct Conn {
- Conn *next, *back;
- int fd;
- Article *queue;
+ ISNODE(Conn);
+ int fd, max_queue, stream;
+ LIST(Article) queue;
+ Article *tosend; /* points into queue */
+ char circ_buf[CONNBUFSZ];
+ unsigned circ_read, circ_write;
};
-#define CHILD_ESTATUS_STREAM 1
-#define CHILD_ESTATUS_NOSTREAM 2
+#define CHILD_ESTATUS_STREAM 4
+#define CHILD_ESTATUS_NOSTREAM 5
static int since_connect_attempt;
static int nconns;
-static struct { Conn *head, *tail } idle, working, full;
+static LIST(Conn) idle, working, full;
+
+static LIST(Article) *queue;
+
+static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } }
+
+
+/*========== making new connections ==========*/
-static Conn *connecting;
static int connecting_sockets[2]= {-1,-1};
static pid_t connecting_child;
-static Article *currentart;
+static void report_child_status(const char *what, int status) {
+ if (WIFEXITED(status)) {
+ int es= WEXITSTATUS(status);
+ if (es)
+ warn("%s: child died with error exit status %d",es);
+ } else if (WIFSIGNALED(status)) {
+ int sig= WTERMSIG(status);
+ const char *sigstr= strsignal(sig);
+ const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
+ if (sigstr)
+ warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
+ else
+ warn("%s: child died due to unknown fatal signal %d%s",
+ what, sig, coredump);
+ } else {
+ warn("%s: child died with unknown wait status %d", status);
+ }
+}
-static void start_connecting() {
- r= socketpair(AF_UNIX, SOCK_STREAM, 0, connecting_sockets);
- if (r) { syswarn("cannot create socketpair for connect child"); goto x; }
+static void connect_attempt_discard(void) {
+ if (connecting_sockets[0]) {
+ cancel_fd(loop, connecting_sockets[0], OOP_READ);
+ cancel_fd(loop, connecting_sockets[0], OOP_EXCEPTION);
+ }
+ perhaps_close(&connecting_sockets[0]);
+ perhaps_close(&connecting_sockets[1]);
+
+ if (connecting_child) {
+ int status;
+ r= kill(connecting_child, SIGKILL);
+ if (r) sysdie("cannot kill connect child");
+
+ pid_t got= waitpid(connecting_child, &status, WNOHANG);
+ if (got==-1) sysdie("cannot reap connect child");
+
+ if (!(WIFEXITED(status) ||
+ (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) {
+ report_child_status("connect"
+ }
+ connecting_child= 0;
+ }
+}
+
+#define PREP_DECL_MSG_CMSG(msg) \
+ struct msghdr msg; \
+ memset(&msg,0,sizeof(msg)); \
+ char msg##cbuf[CMSG_SPACE(sizeof(fd))]; \
+ msg.msg_control= msg##cbuf; \
+ msg.msg_controllen= sizeof(msg##cbuf);
+
+static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
+ Conn *conn= 0;
+
+ conn= xcalloc(sizeof(*conn));
+ DECL_MSG_CMSG(msg);
+ struct cmsghdr *h= 0;
+ ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT);
+ if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
+ if (!h) {
+ int status;
+ pid_t got= waitpid(connecting_child, &status, WNOHANG);
+ if (got != -1) {
+ assert(got==connecting_child);
+ connecting_child= 0;
+ if (WIFEXITED(status) &&
+ (WEXITSTATUS(status) != 0
+ WEXITSTATUS(status) != CHILD_ESTATUS_STREAM &&
+ WEXITSTATUS(status) != CHILD_ESTATUS_NOSTREAM)) {
+ /* child already reported the problem */
+ } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALARM) {
+ warn("connect: connection attempt timed out");
+ } else if (!WIFEXITED(status)) {
+ report_child_status("connect", status);
+ /* that's probably the root cause then */
+ }
+ } else {
+ /* child is still running apparently, report the socket problem */
+ if (rs < 0)
+ syswarn("connect: read from child socket failed");
+ else if (e == OOP_EXCEPTIONN)
+ warn("connect: unexpected exception on child socket");
+ else
+ warn("connect: unexpected EOF on child socket");
+ }
+ goto x;
+ }
+
+#define CHK(field, val) \
+ if (h->cmsg_##field != val) { \
+ die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d"); \
+ goto x; \
+ }
+ CHK(level, SOL_SOCKET);
+ CHK(type, SCM_RIGHTS);
+ CHK(len, CMSG_LEN(sizeof(conn-b>fd)));
+#undef CHK
+
+ if (CMSG_NXTHDR,&msg,h) { die("connect: child sent many cmsgs"); goto x; }
+
+ memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
+
+ pid_t got= waitpid(connecting_child, &status, 0);
+ if (got==-1) sysdie("connect: real wait for child");
+ assert(got == connecting_child);
+ connecting_child= 0;
+
+ if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
+ int es= WEXITSTATUS(status);
+ switch (es) {
+ case CHILD_ESTATUS_STREAM: conn->stream= 1; break;
+ case CHILD_ESTATUS_NOSTREAM: conn->stream= 0; break;
+ default:
+ die("connect: child gave unexpected exit status %d", es);
+ }
+
+ set nonblocking;
+
+ /* Phew! */
+ LIST_ADDHEAD(idle, conn);
+ notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain");
+ connect_attempt_discard();
+ process_queue();
+ return 0;
+
+ x:
+ if (conn) {
+ perhaps_close(&conn->fd);
+ free(conn);
+ }
+ connect_attempt_discard();
+}
+
+static void connect_start() {
+ assert(!connecting_sockets[0]);
+ assert(!connecting_sockets[1]);
+ assert(!connecting_child);
+
+ notice("starting connection attempt");
+
+ r= socketpair(AF_UNIX, SOCK_STREAM, 0, connecting_sockets);
+ if (r) { syswarn("connect: cannot create socketpair for child"); goto x; }
+
connecting_child= fork();
- if (connecting_child==-1) { syswarn("cannot fork for connect"); goto x; }
-
+ if (connecting_child==-1) { syswarn("connect: cannot fork"); goto x; }
+
if (!connecting_child) {
FILE *cn_from, *cn_to;
char buf[NNTP_STRLEN+100];
int exitstatus= CHILD_ESTATUS_NOSTREAM;
+ put sigpipe back;
+ close unwanted fds;
+
+ r= close(connecting_sockets[0]);
+ if (r) sysdie("connect: close parent socket in child");
+
alarm(connection_setup_timeout);
if (NNTPconnect(remote_host, port, &cn_from, &cn_to, buf) < 0) {
if (buf[0]) {
sanitise_inplace(buf);
- die("%s: connection rejected: %s", remote_host, buf);
+ die("connect: rejected: %s", buf);
} else {
- sysdie("%s: connection attempt failed", remote_host);
+ sysdie("connect: connection attempt failed");
}
}
if (NNTPsendpassword(remote_host, cn_from, cn_to) < 0)
- sysdie("%s: authentication failed", remote_host);
+ sysdie("connect: authentication failed");
if (try_stream) {
if (fputs("MODE STREAM\r\n", cn_to) ||
fflush(cn_to))
- sysdie("%s: could not send MODE STREAM", remote_host);
+ sysdie("connect: could not send MODE STREAM");
buf[sizeof(buf)-1]= 0;
if (!fgets(buf, sizeof(buf)-1, cn_from)) {
if (ferror(cn_from))
- sysdie("%s: could not read response to MODE STREAM", remote_host);
+ sysdie("connect: could not read response to MODE STREAM");
else
- die("%s: connection close in response to MODE STREAM", remote_host);
+ die("connect: connection close in response to MODE STREAM");
}
int l= strlen(buf);
assert(l>=1);
if (buf[-1]!='\n') {
sanitise_inplace(buf);
- die("%s: response to MODE STREAM is too long: %.100s...",
+ die("connect: response to MODE STREAM is too long: %.100s...",
remote_host, buf);
}
l--; if (l>0 && buf[1-]=='\r') l--;
int rcode= strtoul(buf,&ep,10);
if (ep != buf[3]) {
sanitise_inplace(buf);
- die("%s: bad response to MODE STREAM: %.50s", remote_host, buf);
+ die("connect: bad response to MODE STREAM: %.50s", buf);
}
switch (rcode) {
case 203:
break;
default:
sanitise_inplace(buf);
- warn("%s: bad response to MODE STREAM: %.50s", remote_host, buf);
+ warn("connect: unexpected response to MODE STREAM: %.50s", buf);
exitstatus= 2;
break;
}
}
int fd= fileno(cn_from);
- char cmsgbuf[CMSG_SPACE(sizeof(fd))];
- struct msghdr msg;
- memset(&msg,0,sizeof(msg));
- msg.msg_control= cmsgbuf;
- msg.msg_controllen= sizeof(cmsgbuf);
-
+ PREP_DECL_MSG_CMSG(msg);
struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
cmsg->cmsg_level= SOL_SOCKET;
cmsg->cmsg_type= SCM_RIGHTS;
cmsg->cmsg_len= CMSG_LEN(sizeof(fd));
memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
-
+
msg.msg_controllen= cmsg->cmsg_len;
- r= sendmsg(childs_socket, &msg, 0);
- if (r) sysdie("%s: sendmsg failed for new connection", remote_host);
+ r= sendmsg(connecting_sockets[1], &msg, 0);
+ if (r) sysdie("sendmsg failed for new connection");
_exit(exitstatus);
}
- on_fd(loop,
+ r= close(connecting_sockets[1]); connecting_sockets[1]= 0;
+ if (r) sysdie("connect: close child socket in parent");
- struct cmsghdr *cmsg;
-
- /* NNTPconnect inexplicably duplicates the fd but we don't care
- * about that as we're going to exit shortly */
-
-
-
-
- warn("
-
- syswarn("
- int e= errno;
- sanitise(buf);
- syswarn("
+ loop->on_fd(loop, connecting_sockets[0], OOP_READ, connchild_event, 0);
+ loop->on_fd(loop, connecting_sockets[0], OOP_EXCEPTION, connchild_event, 0);
+ return OOP_CONTINUE;
x:
- kill_connection_attempt();
+ connect_attempt_discard();
}
-static void kill_connection_attempt() {
- fixme;
- connecting_sockets[0]= connecting_sockets[1]= -1;
- connecting_child= 0;
-}
+/*========== overall control of article flow ==========*/
-static void process_any_article() {
- if (!currentart)
+static void process_queue() {
+ if (!queue.count)
return;
-
+
if (working.head) {
transmit(working.head);
} else if (idle.head) {
transmit(idle.head);
- } else if (nconns < maxconns && !connecting_child &&
- since_connect_attempt >= connect_attempt_limiter) {
- since_connect_attempt= 0;
+ } else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
+ !connecting_child && !connect_delay) {
+ connect_delay= reconnect_delay_periods;
connect_start();
- }
+ }
+}
+
+/*========== article transmission ==========*/
+
+static void *conn_writeable() {
+ for (;;) {
+ int circ_used= circ_write - circ_read;
+ if (circ_used < 0) circ_used += CONNBUFSZ;
+ writeable_moredata(conn, CONNBUFSZ-1 - circ_used);
+
+ if (conn->circ_read == conn->circ_write)
+ return OOP_CONTINUE;
+
+ struct iovec iov[2];
+ int niov= 1;
+ iov[0].iov_base= conn->circ_buf + conn->circ_read;
+ if (conn->circ_read > conn->circ_write) { /* wrapped */
+ iov[0].iov_len= CONNBUFSZ - conn->circ_read;
+ iov[1].iov_base= conn->circ_buf;
+ iov[1].iov_len= conn->circ_write;
+ if (niov[1].iov_len) niov= 2;
+ } else {
+ iov[0].iov_len= conn->circ_write - conn->circ_read;
+ }
+ ssize_t rs= writev(conn->fd, &iov, niov);
+ if (rs < 0) {
+ if (errno == EAGAIN) return OOP_CONTINUE;
+ syswarn(CN "write failed", conn->fd);
+ conn_failed(conn);
+ return OOP_CONTINUE;
+ }
+ assert(rs > 0);
+
+ conn->circ_read += rs;
+ if (conn->circ_read > CONNBUFSZ)
+ conn->circ_read -= CONNBUFSZ;
+ }
}
+
+
+
+static void transmit(Conn *conn) {
+ assert(conn->queue.count < max_queue);
+
+
+main {
+ ignore sigpipe;
+};