5 * site.name_ductlock lock taken out by innduct
6 * F site.name written by innd
7 * D site.name_duct moved aside by innduct, for flush
8 * site.name_deferwork 431'd articles, still being written
9 * site.name_defergo_<inum> 431'd articles, ready for innxmit
10 * site.name_deferlock lock taken out by innxmit wrapper
23 * | F: innd writing, duct reading /|\
26 * | | duct decides time to flush same / | |
27 * | | duct makes hardlink as D / | |
31 * | F == D: innd writing, duct reading | |
33 * | | duct unlinks F / |
35 * | Moved <------------' |
37 * | D: innd writing, duct reading |
39 * | | duct flushes feed |
40 * | | (others can too, harmlessly) |
42 * | Separated <-----------------'
43 * | F: innd writing different to D
46 * | V duct completes processing of D
53 static int max_connections, max_queue_per_conn;
54 static int connection_setup_timeout, port, try_stream;
55 static const char *remote_host;
57 #define ISNODE(T) T *next, *back;
58 #define LIST(T) struct { T *head, *tail, *tailpred; int count; }
60 #define NODE(n) ((struct node*)&(n)->head)
62 #define LIST_ADDHEAD(l,n) \
63 (list_addhead((struct list*)&(l), NODE((n))), (void)(l).count++)
64 #define LIST_ADDTAIL(l,n) \
65 (list_addtail((struct list*)&(l), NODE((n))), (void)(l).count++)
67 #define LIST_REMHEAD(l) \
68 ((l).count ? ((l).count--, (void*)list_remhead((struct list*)&(l))) : 0)
69 #define LIST_REMTAIL(l) \
70 ((l).count ? ((l).count--, (void*)list_remtail((struct list*)&(l))) : 0)
71 #define LIST_REMOVE(l,n) \
72 (list_remove(NODE((n))), (void)(l).count--)
73 #define LIST_INSERT(l,n,pred) \
74 (list_insert((struct list*)&(l), NODE((n)), NODE((pred))), (void)(l).count++)
79 int checked, sentbody;
80 fd and offset for blanking token or mid;
87 typedef struct Conn Conn;
90 Malloc, Const, Artdata;
103 int fd, max_queue, stream;
104 LIST(Article) queue; /* not yet told peer, or CHECK said send it */
105 LIST(Article) sent; /* offered/transmitted - in xmit or waiting reply */
106 struct iovec xmit[CONNIOVS];
107 XmitDetails xmitd[CONNIOVS];
112 #define CHILD_ESTATUS_STREAM 4
113 #define CHILD_ESTATUS_NOSTREAM 5
115 static int since_connect_attempt;
117 static LIST(Conn) idle, working, full;
119 static LIST(Article) *queue;
121 static void perhaps_close(int *fd) { if (*fd) { close(*fd); fd=0; } }
124 /*========== making new connections ==========*/
126 static int connecting_sockets[2]= {-1,-1};
127 static pid_t connecting_child;
129 static void report_child_status(const char *what, int status) {
130 if (WIFEXITED(status)) {
131 int es= WEXITSTATUS(status);
133 warn("%s: child died with error exit status %d",es);
134 } else if (WIFSIGNALED(status)) {
135 int sig= WTERMSIG(status);
136 const char *sigstr= strsignal(sig);
137 const char *coredump= WCOREDUMP(status) ? " (core dumped)" : "";
139 warn("%s: child died due to fatal signal %s%s", what, sigstr, coredump);
141 warn("%s: child died due to unknown fatal signal %d%s",
142 what, sig, coredump);
144 warn("%s: child died with unknown wait status %d", status);
148 static void connect_attempt_discard(void) {
149 if (connecting_sockets[0]) {
150 cancel_fd(loop, connecting_sockets[0], OOP_READ);
151 cancel_fd(loop, connecting_sockets[0], OOP_EXCEPTION);
153 perhaps_close(&connecting_sockets[0]);
154 perhaps_close(&connecting_sockets[1]);
156 if (connecting_child) {
158 r= kill(connecting_child, SIGKILL);
159 if (r) sysdie("cannot kill connect child");
161 pid_t got= waitpid(connecting_child, &status, WNOHANG);
162 if (got==-1) sysdie("cannot reap connect child");
164 if (!(WIFEXITED(status) ||
165 (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL))) {
166 report_child_status("connect"
172 #define PREP_DECL_MSG_CMSG(msg) \
174 memset(&msg,0,sizeof(msg)); \
175 char msg##cbuf[CMSG_SPACE(sizeof(fd))]; \
176 msg.msg_control= msg##cbuf; \
177 msg.msg_controllen= sizeof(msg##cbuf);
179 static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
182 conn= xcalloc(sizeof(*conn));
185 struct cmsghdr *h= 0;
186 ssize_t rs= recvmsg(fd, &msg, MSG_DONTWAIT);
187 if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
190 pid_t got= waitpid(connecting_child, &status, WNOHANG);
192 assert(got==connecting_child);
194 if (WIFEXITED(status) &&
195 (WEXITSTATUS(status) != 0
196 WEXITSTATUS(status) != CHILD_ESTATUS_STREAM &&
197 WEXITSTATUS(status) != CHILD_ESTATUS_NOSTREAM)) {
198 /* child already reported the problem */
199 } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALARM) {
200 warn("connect: connection attempt timed out");
201 } else if (!WIFEXITED(status)) {
202 report_child_status("connect", status);
203 /* that's probably the root cause then */
206 /* child is still running apparently, report the socket problem */
208 syswarn("connect: read from child socket failed");
209 else if (e == OOP_EXCEPTIONN)
210 warn("connect: unexpected exception on child socket");
212 warn("connect: unexpected EOF on child socket");
217 #define CHK(field, val) \
218 if (h->cmsg_##field != val) { \
219 die("connect: child sent cmsg with cmsg_" #field "=%d, expected %d"); \
222 CHK(level, SOL_SOCKET);
223 CHK(type, SCM_RIGHTS);
224 CHK(len, CMSG_LEN(sizeof(conn-b>fd)));
227 if (CMSG_NXTHDR,&msg,h) { die("connect: child sent many cmsgs"); goto x; }
229 memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
231 pid_t got= waitpid(connecting_child, &status, 0);
232 if (got==-1) sysdie("connect: real wait for child");
233 assert(got == connecting_child);
236 if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
237 int es= WEXITSTATUS(status);
239 case CHILD_ESTATUS_STREAM: conn->stream= 1; break;
240 case CHILD_ESTATUS_NOSTREAM: conn->stream= 0; break;
242 die("connect: child gave unexpected exit status %d", es);
248 LIST_ADDHEAD(idle, conn);
249 notice(CN "connected %s", conn->fd, conn->stream ? "streaming" : "plain");
250 connect_attempt_discard();
251 check_master_queue();
256 perhaps_close(&conn->fd);
259 connect_attempt_discard();
262 static void connect_start() {
263 assert(!connecting_sockets[0]);
264 assert(!connecting_sockets[1]);
265 assert(!connecting_child);
267 notice("starting connection attempt");
269 r= socketpair(AF_UNIX, SOCK_STREAM, 0, connecting_sockets);
270 if (r) { syswarn("connect: cannot create socketpair for child"); goto x; }
272 connecting_child= fork();
273 if (connecting_child==-1) { syswarn("connect: cannot fork"); goto x; }
275 if (!connecting_child) {
276 FILE *cn_from, *cn_to;
277 char buf[NNTP_STRLEN+100];
278 int exitstatus= CHILD_ESTATUS_NOSTREAM;
283 r= close(connecting_sockets[0]);
284 if (r) sysdie("connect: close parent socket in child");
286 alarm(connection_setup_timeout);
287 if (NNTPconnect(remote_host, port, &cn_from, &cn_to, buf) < 0) {
289 sanitise_inplace(buf);
290 die("connect: rejected: %s", buf);
292 sysdie("connect: connection attempt failed");
295 if (NNTPsendpassword(remote_host, cn_from, cn_to) < 0)
296 sysdie("connect: authentication failed");
298 if (fputs("MODE STREAM\r\n", cn_to) ||
300 sysdie("connect: could not send MODE STREAM");
301 buf[sizeof(buf)-1]= 0;
302 if (!fgets(buf, sizeof(buf)-1, cn_from)) {
304 sysdie("connect: could not read response to MODE STREAM");
306 die("connect: connection close in response to MODE STREAM");
311 sanitise_inplace(buf);
312 die("connect: response to MODE STREAM is too long: %.100s...",
315 l--; if (l>0 && buf[1-]=='\r') l--;
318 int rcode= strtoul(buf,&ep,10);
320 sanitise_inplace(buf);
321 die("connect: bad response to MODE STREAM: %.50s", buf);
325 exitstatus= CHILD_ESTATUS_STREAM;
331 sanitise_inplace(buf);
332 warn("connect: unexpected response to MODE STREAM: %.50s", buf);
337 int fd= fileno(cn_from);
339 PREP_DECL_MSG_CMSG(msg);
340 struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
341 cmsg->cmsg_level= SOL_SOCKET;
342 cmsg->cmsg_type= SCM_RIGHTS;
343 cmsg->cmsg_len= CMSG_LEN(sizeof(fd));
344 memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
346 msg.msg_controllen= cmsg->cmsg_len;
347 r= sendmsg(connecting_sockets[1], &msg, 0);
348 if (r) sysdie("sendmsg failed for new connection");
353 r= close(connecting_sockets[1]); connecting_sockets[1]= 0;
354 if (r) sysdie("connect: close child socket in parent");
356 loop->on_fd(loop, connecting_sockets[0], OOP_READ, connchild_event, 0);
357 loop->on_fd(loop, connecting_sockets[0], OOP_EXCEPTION, connchild_event, 0);
361 connect_attempt_discard();
364 /*========== overall control of article flow ==========*/
366 static void conn_check_work(Conn *conn);
368 static void check_master_queue(void) {
372 Conn *last_assigned=0;
375 conn_assign_one_article(&working, &last_assigned);
376 } else if (idle.head) {
377 conn_assign_one_article(&idle, &last_assigned);
378 } else if (nconns < maxconns && queue.count >= max_queue_per_conn &&
379 !connecting_child && !connect_delay) {
380 connect_delay= reconnect_delay_periods;
386 conn_check_work(last_assigned);
389 static void conn_assign_one_article(LIST(Conn) *connlist,
390 Conn **last_assigned) {
391 Conn *conn= connlist->head;
393 LIST_REMOVE(*connlist, conn);
394 Article *art= LIST_REMHEAD(queue);
395 LIST_ADDTAIL(conn->queue, art);
396 LIST_ADD(*conn_determine_right_list(conn), conn);
398 /* This slightly odd arrangement is so that we call conn_check_work
399 * once after filling the queue for a new connection in
400 * check_master_queue, rather than for each article. */
401 if (conn != *last_assigned && *last_assigned)
402 conn_check_work(*last_assigned);
403 *last_assigned= conn;
406 static int conn_total_queued_articles(Conn *conn) {
407 return conn->sent.count + conn->queue.count;
410 static LIST(Conn) *conn_determine_right_list(Conn *conn) {
411 int inqueue= conn_total_queued_articles(conn);
412 assert(inqueue <= max_queue);
413 if (inqueue == 0) return &idle;
414 if (inqueue == conn->max_queue) return &full;
418 static void *conn_writeable(oop_source *l, int fd, int ev, void *u) {
423 static void conn_check_work(Conn *conn) {
426 conn_make_some_xmits(conn);
428 loop->cancel_fd(loop, conn->fd, OOP_WRITE);
432 void *rp= conn_write_some_xmits(conn);
433 if (rp==OOP_CONTINUE) {
434 loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
436 } else if (rp==OOP_HALT) {
439 /* transmitted everything */
446 /*========== article transmission ==========*/
448 static void *conn_write_some_xmits(Conn *conn) {
450 * 0: nothing more to write, no need to call us again
451 * OOP_CONTINUE: more to write but fd not writeable
452 * OOP_HALT: disaster, have destroyed conn
455 int count= conn->xmitu;
456 if (!count) return 0;
458 if (count > IOV_MAX) count= IOV_MAX;
459 ssize_t rs= writev(conn->fd, conn->xmit, count);
461 if (errno == EAGAIN) return OOP_CONTINUE;
462 syswarn(CN "write failed", conn->fd);
468 for (done=0; rs && done<xmitu; done++) {
469 struct iovec *vp= &conn->xmit[done];
470 XmitDetails *dp= &conn->xmitd[done];
471 if (rs > vp->iov_len) {
479 int newu= conn->xmitu - done;
480 memmove(conn->xmit, conn->xmit + done, newu * sizeof(*conn->xmit));
481 memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
486 static void conn_make_some_xmits(Conn *conn) {
488 if (conn->xmitu+5 > CONNIOVS)
491 Article *art= LIST_REMHEAD(queue);
494 if (art->checked || conn->nocheck) {
495 /* actually send it */
497 ARTHANDLE *artdata= SMretrieve(somehow);
501 XMIT_LITERAL("TAKETHIS ");
502 xmit_noalloc(art->mid, art->midlen);
503 XMIT_LITERAL("\r\n");
504 xmit_artbody(artdata);
507 /* we got 235 from IHAVE */
509 xmit_artbody(artdata);
511 XMIT_LITERAL(".\r\n");
515 LIST_ADDTAIL(conn->sent, art);
521 XMIT_LITERAL("IHAVE ");
523 XMIT_LITERAL("CHECK ");
524 xmit_noalloc(art->mid, art->midlen);
525 XMIT_LITERAL("\r\n");
527 LIST_ADDTAIL(conn->sent, art);
532 /*========== responses from peer ==========*/
534 static const oop_rd_style peer_rd_style= {
535 OOP_RD_DELIM_STRIP, '\n',
537 OOP_RD_SHORTREC_FORBID
540 static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_event ev,
541 const char *errmsg, int errnoval,
542 const char *data, size_t recsz, void *conn_v) {
545 if (ev == OOP_RD_EOF) {
546 warn("unexpected EOF from peer");
550 assert(ev == OOP_RD_OK);
553 unsigned long code= strtoul(data, &ep, 10);
554 if (ep != data+3 || *ep != ' ' || data[0]=='0') {
560 if (q > sanibuf+sizeof(sanibuf)-8) { strcpy(q,"..."); break; }
562 if (!c) { *q++= '\''; break; }
563 if (c>=' ' && c<=126 && c!='\\') { *q++= c; continue; }
564 sprintf(q,"\\x%02x",c);
567 warn("badly formatted response from peer: %s", sanibuf);
572 if (conn->quitting) {
574 warn("peer gave failure response to QUIT: %s", sani);
583 case 438: /* CHECK says they have it */
584 case 435: /* IHAVE says they have it */
585 ARTICLE_DEALTWITH(1,unwanted);
588 case 238: /* CHECK says send it */
589 case 335: /* IHAVE says send it */
590 count_checkedwanted++;
591 Article *art= LIST_REMHEAD(conn->sent);
593 LIST_ADDTAIL(conn->queue);
596 case 235: /* IHAVE says thanks */
597 case 239: /* TAKETHIS says thanks */
598 ARTICLE_DEALTWITH(1,accepted);
601 case 439: /* TAKETHIS says rejected */
602 case 437: /* IHAVE says rejected */
603 ARTICLE_DEALTWITH(1,rejected);
606 case 431: /* CHECK or TAKETHIS says try later */
607 case 436: /* IHAVE says try later */
608 ARTICLE_DEALTWITH(0,deferred);
611 case 400: warn("peer has stopped accepting articles: %s", sani); goto failed;
612 case 503: warn("peer timed us out: %s", sani); goto failed;
613 default: warn("peer sent unexpected message: %s", sani);
616 return OOP_CONTINUE;;