5 * site.name_ductlock lock taken out by innduct
6 * F site.name written by innd
7 * D site.name_duct moved aside by innduct, for flush
8 * site.name_deferwork 431'd articles, still being written
9 * site.name_defergo_<inum> 431'd articles, ready for innxmit
10 * site.name_deferlock lock taken out by innxmit wrapper
23 * | F: innd writing, duct reading /|\
26 * | | duct decides time to flush same / | |
27 * | | duct makes hardlink as D / | |
31 * | F == D: innd writing, duct reading | |
33 * | | duct unlinks F / |
35 * | Moved <------------' |
37 * | D: innd writing, duct reading |
39 * | | duct flushes feed |
40 * | | (others can too, harmlessly) |
42 * | Separated <-----------------'
43 * | F: innd writing different to D
46 * | V duct completes processing of D
53 static int max_connections, max_queue_per_conn;
54 static int connection_setup_timeout, port, try_stream;
55 static const char *remote_host;
57 #define ISNODE(T) T *next, *back;
58 #define LIST(T) struct { T *head, *tail, *tailpred; int count; }
60 #define NODE(n) ((struct node*)&(n)->head)
62 #define LIST_ADDHEAD(l,n) \
63 (list_addhead((struct list*)&(l), NODE((n))), (void)(l).count++)
64 #define LIST_ADDTAIL(l,n) \
65 (list_addtail((struct list*)&(l), NODE((n))), (void)(l).count++)
67 #define LIST_REMHEAD(l) \
68 ((l).count ? ((l).count--, (void*)list_remhead((struct list*)&(l))) : 0)
69 #define LIST_REMTAIL(l) \
70 ((l).count ? ((l).count--, (void*)list_remtail((struct list*)&(l))) : 0)
71 #define LIST_REMOVE(l,n) \
72 (list_remove(NODE((n))), (void)(l).count--)
73 #define LIST_INSERT(l,n,pred) \
74 (list_insert((struct list*)&(l), NODE((n)), NODE((pred))), (void)(l).count++)
82 #define CONNBUFSZ 16384
86 typedef struct Conn Conn;
89 Malloc, Const, Artdata;
100 int fd, max_queue, stream;
101 LIST(Article) queue; /* not yet told peer, or said TAKETHIS */
102 LIST(Article) sent; /* offered, in xmit, or transmitted waiting reply */
103 Article send; /* partially transmitted */
110 #define CHILD_ESTATUS_STREAM 4
111 #define CHILD_ESTATUS_NOSTREAM 5
113 static int since_connect_attempt;
115 static LIST(Conn) idle, working, full;
117 static LIST(Article) *queue;
119 static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } }
122 /*========== making new connections ==========*/
124 static int connecting_sockets[2]= {-1,-1};
125 static pid_t connecting_child;
127 static void report_child_status(const char *what, int status) {
128 if (WIFEXITED(status)) {
129 int es= WEXITSTATUS(status);
131 warn("%s: child died with error exit status %d",es);
132 } else if (WIFSIGNALED(status)) {
133 int sig= WTERMSIG(status);
134 const char *sigstr= strsignal(sig);
135 const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
137 warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
139 warn("%s: child died due to unknown fatal signal %d%s",
140 what, sig, coredump);
142 warn("%s: child died with unknown wait status %d", status);
146 static void connect_attempt_discard(void) {
147 if (connecting_sockets[0]) {
148 cancel_fd(loop, connecting_sockets[0], OOP_READ);
149 cancel_fd(loop, connecting_sockets[0], OOP_EXCEPTION);
151 perhaps_close(&connecting_sockets[0]);
152 perhaps_close(&connecting_sockets[1]);
154 if (connecting_child) {
156 r= kill(connecting_child, SIGKILL);
157 if (r) sysdie("cannot kill connect child");
159 pid_t got= waitpid(connecting_child, &status, WNOHANG);
160 if (got==-1) sysdie("cannot reap connect child");
162 if (!(WIFEXITED(status) ||
163 (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) {
164 report_child_status("connect"
170 #define PREP_DECL_MSG_CMSG(msg) \
172 memset(&msg,0,sizeof(msg)); \
173 char msg##cbuf[CMSG_SPACE(sizeof(fd))]; \
174 msg.msg_control= msg##cbuf; \
175 msg.msg_controllen= sizeof(msg##cbuf);
177 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
180 conn= xcalloc(sizeof(*conn));
183 struct cmsghdr *h= 0;
184 ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT);
185 if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
188 pid_t got= waitpid(connecting_child, &status, WNOHANG);
190 assert(got==connecting_child);
192 if (WIFEXITED(status) &&
193 (WEXITSTATUS(status) != 0
194 WEXITSTATUS(status) != CHILD_ESTATUS_STREAM &&
195 WEXITSTATUS(status) != CHILD_ESTATUS_NOSTREAM)) {
196 /* child already reported the problem */
197 } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALARM) {
198 warn("connect: connection attempt timed out");
199 } else if (!WIFEXITED(status)) {
200 report_child_status("connect", status);
201 /* that's probably the root cause then */
204 /* child is still running apparently, report the socket problem */
206 syswarn("connect: read from child socket failed");
207 else if (e == OOP_EXCEPTIONN)
208 warn("connect: unexpected exception on child socket");
210 warn("connect: unexpected EOF on child socket");
215 #define CHK(field, val) \
216 if (h->cmsg_##field != val) { \
217 die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d"); \
220 CHK(level, SOL_SOCKET);
221 CHK(type, SCM_RIGHTS);
222 CHK(len, CMSG_LEN(sizeof(conn-b>fd)));
225 if (CMSG_NXTHDR,&msg,h) { die("connect: child sent many cmsgs"); goto x; }
227 memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
229 pid_t got= waitpid(connecting_child, &status, 0);
230 if (got==-1) sysdie("connect: real wait for child");
231 assert(got == connecting_child);
234 if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
235 int es= WEXITSTATUS(status);
237 case CHILD_ESTATUS_STREAM: conn->stream= 1; break;
238 case CHILD_ESTATUS_NOSTREAM: conn->stream= 0; break;
240 die("connect: child gave unexpected exit status %d", es);
246 LIST_ADDHEAD(idle, conn);
247 notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain");
248 connect_attempt_discard();
254 perhaps_close(&conn->fd);
257 connect_attempt_discard();
260 static void connect_start() {
261 assert(!connecting_sockets[0]);
262 assert(!connecting_sockets[1]);
263 assert(!connecting_child);
265 notice("starting connection attempt");
267 r= socketpair(AF_UNIX, SOCK_STREAM, 0, connecting_sockets);
268 if (r) { syswarn("connect: cannot create socketpair for child"); goto x; }
270 connecting_child= fork();
271 if (connecting_child==-1) { syswarn("connect: cannot fork"); goto x; }
273 if (!connecting_child) {
274 FILE *cn_from, *cn_to;
275 char buf[NNTP_STRLEN+100];
276 int exitstatus= CHILD_ESTATUS_NOSTREAM;
281 r= close(connecting_sockets[0]);
282 if (r) sysdie("connect: close parent socket in child");
284 alarm(connection_setup_timeout);
285 if (NNTPconnect(remote_host, port, &cn_from, &cn_to, buf) < 0) {
287 sanitise_inplace(buf);
288 die("connect: rejected: %s", buf);
290 sysdie("connect: connection attempt failed");
293 if (NNTPsendpassword(remote_host, cn_from, cn_to) < 0)
294 sysdie("connect: authentication failed");
296 if (fputs("MODE STREAM\r\n", cn_to) ||
298 sysdie("connect: could not send MODE STREAM");
299 buf[sizeof(buf)-1]= 0;
300 if (!fgets(buf, sizeof(buf)-1, cn_from)) {
302 sysdie("connect: could not read response to MODE STREAM");
304 die("connect: connection close in response to MODE STREAM");
309 sanitise_inplace(buf);
310 die("connect: response to MODE STREAM is too long: %.100s...",
313 l--; if (l>0 && buf[1-]=='\r') l--;
316 int rcode= strtoul(buf,&ep,10);
318 sanitise_inplace(buf);
319 die("connect: bad response to MODE STREAM: %.50s", buf);
323 exitstatus= CHILD_ESTATUS_STREAM;
329 sanitise_inplace(buf);
330 warn("connect: unexpected response to MODE STREAM: %.50s", buf);
335 int fd= fileno(cn_from);
337 PREP_DECL_MSG_CMSG(msg);
338 struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
339 cmsg->cmsg_level= SOL_SOCKET;
340 cmsg->cmsg_type= SCM_RIGHTS;
341 cmsg->cmsg_len= CMSG_LEN(sizeof(fd));
342 memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
344 msg.msg_controllen= cmsg->cmsg_len;
345 r= sendmsg(connecting_sockets[1], &msg, 0);
346 if (r) sysdie("sendmsg failed for new connection");
351 r= close(connecting_sockets[1]); connecting_sockets[1]= 0;
352 if (r) sysdie("connect: close child socket in parent");
354 loop->on_fd(loop, connecting_sockets[0], OOP_READ, connchild_event, 0);
355 loop->on_fd(loop, connecting_sockets[0], OOP_EXCEPTION, connchild_event, 0);
359 connect_attempt_discard();
362 /*========== overall control of article flow ==========*/
364 static void check_master_queue(void) {
369 conn_assign_one_article(&working);
370 } else if (idle.head) {
371 conn_assign_one_article(&idle);
372 } else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
373 !connecting_child && !connect_delay) {
374 connect_delay= reconnect_delay_periods;
379 static int conn_total_queued_articles(Conn *conn) {
380 return conn->sent.count + !!conn->send + conn->queue.count;
383 static void conn_assign_one_article(LIST(Conn) *connlist) {
384 Conn *conn= connlist->head;
386 LIST_REMOVE(*connlist, conn);
387 Article *art= LIST_REMHEAD(queue);
388 LIST_ADDTAIL(conn->queue, art);
389 LIST_ADD(*conn_determine_right_list(conn), conn);
391 check_conn_work(conn);
394 static LIST(Conn) *conn_determine_right_list(Conn *conn) {
395 int inqueue= conn_total_queued_articles(conn);
396 assert(inqueue <= max_queue);
397 if (inqueue == 0) return &idle;
398 if (inqueue == conn->max_queue) return &full;
402 static void check_conn_work(Conn *conn) {
405 conn_make_some_xmits(conn);
407 void *rp= conn_write_some_xmits(conn);
409 loop->cancel_fd(loop, conn->fd, OOP_WRITE);
411 } else if (rp==OOP_CONTINUE) {
412 loop->on_fd(loop, conn->fd, OOP_WRITE;)
413 else if (rp==OOP_HALT) {
424 /*========== article transmission ==========*/
426 static void *conn_writeable() {
430 perhaps_transmit_on(conn);
432 unlink from readable;
438 static void *conn_write_some_xmits(Conn *conn) {
440 * 0: nothing more to write, no need to call us again
441 * OOP_CONTINUE: more to write but fd not writeable
442 * OOP_HALT: disaster, have destroyed conn
445 int count= conn->xmitu;
446 if (!count) return 0;
448 if (count > IOV_MAX) count= IOV_MAX;
449 ssize_t rs= writev(conn->fd, conn->xmit, count);
451 if (errno == EAGAIN) return OOP_CONTINUE;
452 syswarn(CN "write failed", conn->fd);
458 for (done=0; rs && done<xmitu; done++) {
459 struct iovec *vp= &conn->xmit[done];
460 XmitDetails *dp= &conn->xmitd[done];
461 if (rs > vp->iov_len) {
469 int newu= conn->xmitu - done;
470 memmove(conn->xmit, conn->xmit + done, newu * sizeof(*conn->xmit));
471 memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
476 static void conn_make_some_xmits(Conn *conn) {
479 do something about this article text;
483 if (conn->xmitu+3 > conn->xmita)
484 /* no space for a CHECK even */
487 Article *art= LIST_REMHEAD(queue);
490 if (art->checked || conn->nocheck) {
492 XMIT_LITERAL("TAKETHIS ");
493 xmit_noalloc(art->mid, art->midlen);
494 XMIT_LITERAL("\r\n");
496 /* we got 235 from IHAVE */
501 XMIT_LITERAL("IHAVE ");
503 XMIT_LITERAL("CHECK ");
504 xmit_noalloc(art->mid, art->midlen);
505 XMIT_LITERAL("\r\n");
506 LIST_ADDTAIL(conn->sent, art);
512 if (conn->queue.head) {
513 if (conn->queue.checked || conn->nocheck) {
516 && conn->xmitu+3 <= xmita) {
519 if (conn->xmitu < xmita
528 int circ_used= circ_write - circ_read;
529 if (circ_used < 0) circ_used += CONNBUFSZ;
530 writeable_moredata(conn, CONNBUFSZ-1 - circ_used);
532 if (conn->circ_read == conn->circ_write)
537 iov[0].iov_base= conn->circ_buf + conn->circ_read;
538 if (conn->circ_read > conn->circ_write) { /* wrapped */
539 iov[0].iov_len= CONNBUFSZ - conn->circ_read;
540 iov[1].iov_base= conn->circ_buf;
541 iov[1].iov_len= conn->circ_write;
542 if (niov[1].iov_len) niov= 2;
544 iov[0].iov_len= conn->circ_write - conn->circ_read;
546 ssize_t rs= writev(conn->fd, &iov, niov);
552 conn->circ_read += rs;
553 if (conn->circ_read > CONNBUFSZ)
554 conn->circ_read -= CONNBUFSZ;