+ xclose(socks[0], "(in child) parent's connection fdpass socket",0);
+
+ alarm(connection_setup_timeout);
+ if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) {
+ int l= strlen(buf);
+ int stripped=0;
+ while (l>0) {
+ unsigned char c= buf[l-1];
+ if (!isspace(c)) break;
+ if (c=='\n' || c=='\r') stripped=1;
+ --l;
+ }
+ if (!buf[0]) {
+ sysfatal("connect: connection attempt failed");
+ } else {
+ buf[l]= 0;
+ fatal("connect: %s: %s", stripped ? "rejected" : "failed",
+ sanitise(buf,-1));
+ }
+ }
+ if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0)
+ sysfatal("connect: authentication failed");
+ if (try_stream) {
+ if (fputs("MODE STREAM\r\n", cn_to)==EOF ||
+ fflush(cn_to))
+ sysfatal("connect: could not send MODE STREAM");
+ buf[sizeof(buf)-1]= 0;
+ if (!fgets(buf, sizeof(buf)-1, cn_from)) {
+ if (ferror(cn_from))
+ sysfatal("connect: could not read response to MODE STREAM");
+ else
+ fatal("connect: connection close in response to MODE STREAM");
+ }
+ int l= strlen(buf);
+ assert(l>=1);
+ if (buf[l-1]!='\n')
+ fatal("connect: response to MODE STREAM is too long: %.100s...",
+ sanitise(buf,-1));
+ l--; if (l>0 && buf[l-1]=='\r') l--;
+ buf[l]= 0;
+ char *ep;
+ int rcode= strtoul(buf,&ep,10);
+ if (ep != &buf[3])
+ fatal("connect: bad response to MODE STREAM: %.50s", sanitise(buf,-1));
+
+ switch (rcode) {
+ case 203:
+ exitstatus= CONNCHILD_ESTATUS_STREAM;
+ break;
+ case 480:
+ case 500:
+ break;
+ default:
+ warn("connect: unexpected response to MODE STREAM: %.50s",
+ sanitise(buf,-1));
+ exitstatus= 2;
+ break;
+ }
+ }
+ int fd= fileno(cn_from);
+
+ PREP_DECL_MSG_CMSG(msg);
+ struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level= SOL_SOCKET;
+ cmsg->cmsg_type= SCM_RIGHTS;
+ cmsg->cmsg_len= CMSG_LEN(sizeof(fd));
+ memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
+
+ msg.msg_controllen= cmsg->cmsg_len;
+ r= sendmsg(socks[1], &msg, 0);
+ if (r<0) sysdie("sendmsg failed for new connection");
+ if (r!=1) die("sendmsg for new connection gave wrong result %d",r);
+
+ _exit(exitstatus);
+ }
+
+ xclose(socks[1], "connecting fdpass child's socket",0);
+ connecting_fdpass_sock= socks[0];
+ xsetnonblock(connecting_fdpass_sock, 1);
+ on_fd_read_except(connecting_fdpass_sock, connchild_event);
+}
+
+/*---------- assigning articles to conns, and transmitting ----------*/
+
+static Article *dequeue_from(int peek, InputFile *ipf) {
+ if (!ipf) return 0;
+ if (peek) return LIST_HEAD(ipf->queue);
+ else return LIST_REMHEAD(ipf->queue);
+}
+
+static Article *dequeue(int peek) {
+ Article *art;
+ art= dequeue_from(peek, flushing_input_file); if (art) return art;
+ art= dequeue_from(peek, backlog_input_file); if (art) return art;
+ art= dequeue_from(peek, main_input_file); if (art) return art;
+ return 0;
+}
+
+static void check_assign_articles(void) {
+ for (;;) {
+ if (!dequeue(1))
+ break;
+
+ Conn *walk, *use=0;
+ int spare=0, inqueue=0;
+
+ /* Find a connection to offer this article. We prefer a busy
+ * connection to an idle one, provided it's not full. We take the
+ * first (oldest) and since that's stable, it will mean we fill up
+ * connections in order. That way if we have too many
+ * connections, the spare ones will go away eventually.
+ */
+ FOR_CONN(walk) {
+ if (walk->quitting) continue;
+ inqueue= walk->sent.count + walk->priority.count
+ + walk->waiting.count;
+ spare= walk->max_queue - inqueue;
+ assert(inqueue <= max_queue_per_conn);
+ assert(spare >= 0);
+ if (inqueue==0) /*idle*/ { if (!use) use= walk; }
+ else if (spare>0) /*working*/ { use= walk; break; }
+ }
+ if (use) {
+ if (!inqueue) use->since_activity= 0; /* reset idle counter */
+ while (spare>0) {
+ Article *art= dequeue(0);
+ if (!art) break;
+ LIST_ADDTAIL(use->waiting, art);
+ spare--;
+ }
+ conn_maybe_write(use);
+ } else if (allow_connect_start()) {
+ until_connect= reconnect_delay_periods;
+ connect_start();
+ break;
+ } else {
+ break;
+ }
+ }
+}
+
+static void *conn_writeable(oop_source *l, int fd, oop_event ev, void *u) {
+ conn_maybe_write(u);
+ return OOP_CONTINUE;
+}
+
+static void conn_maybe_write(Conn *conn) {
+ for (;;) {
+ conn_make_some_xmits(conn);
+ if (!conn->xmitu) {
+ loop->cancel_fd(loop, conn->fd, OOP_WRITE);
+ return;
+ }
+
+ void *rp= conn_write_some_xmits(conn);
+ if (rp==OOP_CONTINUE) {
+ loop->on_fd(loop, conn->fd, OOP_WRITE, conn_writeable, conn);
+ return;
+ } else if (rp==OOP_HALT) {
+ return;
+ } else if (!rp) {
+ /* transmitted everything */
+ } else {
+ abort();
+ }
+ }
+}
+
+/*---------- expiry and deferral ----------*/
+
+static void article_defer(Article *art /* not on a queue */, int whichcount) {
+ open_defer();
+ if (fprintf(defer, "%s %s\n", TokenToText(art->token), art->messageid) <0
+ || fflush(defer))
+ sysfatal("write to defer file %s",path_defer);
+ article_done(art, whichcount);
+}
+
+static int article_check_expired(Article *art /* must be queued, not conn */) {
+ ARTHANDLE *artdata= SMretrieve(art->token, RETR_STAT);
+ if (artdata) { SMfreearticle(artdata); return 0; }
+
+ LIST_REMOVE(art->ipf->queue, art);
+ art->missing= 1;
+ art->ipf->counts[art_Unchecked][RC_missing]++;
+ article_done(art,-1);
+ return 1;
+}
+
+static void inputfile_queue_check_expired(InputFile *ipf) {
+ if (!ipf) return;
+
+ for (;;) {
+ Article *art= LIST_HEAD(ipf->queue);
+ int exp= article_check_expired(art);
+ if (!exp) break;
+ }
+}
+
+static void article_autodefer(InputFile *ipf, Article *art) {
+ ipf->autodefer++;
+ article_defer(art,-1);
+}
+
+static int has_article_in(const ArticleList *al, InputFile *ipf) {
+ Article *art;
+ for (art=LIST_HEAD(*al); art; art=LIST_NEXT(art))
+ if (art->ipf == ipf) return 1;
+ return 0;
+}
+
+static void autodefer_input_file(InputFile *ipf) {
+ ipf->autodefer= 0;
+
+ Article *art;
+ while ((art= LIST_REMHEAD(ipf->queue)))
+ article_autodefer(ipf, art);
+
+ if (ipf->inprogress) {
+ Conn *walk;
+ FOR_CONN(walk) {
+ if (has_article_in(&walk->waiting, ipf) ||
+ has_article_in(&walk->priority, ipf) ||
+ has_article_in(&walk->sent, ipf))
+ walk->quitting= -1;
+ }
+ while (ipf->inprogress) {
+ FOR_CONN(walk)
+ if (walk->quitting < 0) goto found;
+ abort(); /* where are they ?? */
+
+ found:
+ connfail(walk, "connection is stuck or crawling,"
+ " and we need to finish flush");
+ }
+ }
+}
+
+/*========== article transmission ==========*/
+
+static XmitDetails *xmit_core(Conn *conn, const char *data, int len,
+ XmitKind kind) { /* caller must then fill in details */
+ struct iovec *v= &conn->xmit[conn->xmitu];
+ XmitDetails *d= &conn->xmitd[conn->xmitu++];
+ v->iov_base= (char*)data;
+ v->iov_len= len;
+ d->kind= kind;
+ return d;
+}
+
+static void xmit_noalloc(Conn *conn, const char *data, int len) {
+ xmit_core(conn,data,len, xk_Const);
+}
+#define XMIT_LITERAL(lit) (xmit_noalloc(conn, (lit), sizeof(lit)-1))
+
+static void xmit_artbody(Conn *conn, ARTHANDLE *ah /* consumed */) {
+ XmitDetails *d= xmit_core(conn, ah->data, ah->len, xk_Artdata);
+ d->info.sm_art= ah;
+}
+
+static void xmit_free(XmitDetails *d) {
+ switch (d->kind) {
+ case xk_Artdata: SMfreearticle(d->info.sm_art); break;
+ case xk_Const: break;
+ default: abort();
+ }
+}
+
+static void *conn_write_some_xmits(Conn *conn) {
+ /* return values:
+ * 0: nothing more to write, no need to call us again
+ * OOP_CONTINUE: more to write but fd not writeable
+ * OOP_HALT: disaster, have destroyed conn
+ */
+ for (;;) {
+ int count= conn->xmitu;
+ if (!count) return 0;
+
+ if (count > IOV_MAX) count= IOV_MAX;
+ ssize_t rs= writev(conn->fd, conn->xmit, count);
+ if (rs < 0) {
+ if (isewouldblock(errno)) return OOP_CONTINUE;
+ connfail(conn, "write failed: %s", strerror(errno));
+ return OOP_HALT;
+ }
+ assert(rs > 0);
+
+ int done;
+ for (done=0; rs && done<conn->xmitu; done++) {
+ struct iovec *vp= &conn->xmit[done];
+ XmitDetails *dp= &conn->xmitd[done];
+ if (rs > vp->iov_len) {
+ rs -= vp->iov_len;
+ xmit_free(dp);
+ } else {
+ vp->iov_base= (char*)vp->iov_base + rs;
+ vp->iov_len -= rs;
+ }
+ }
+ int newu= conn->xmitu - done;
+ memmove(conn->xmit, conn->xmit + done, newu * sizeof(*conn->xmit));
+ memmove(conn->xmitd, conn->xmitd + done, newu * sizeof(*conn->xmitd));
+ conn->xmitu= newu;
+ }
+}
+
+static void conn_make_some_xmits(Conn *conn) {
+ for (;;) {
+ if (conn->xmitu+5 > CONNIOVS)
+ break;
+
+ Article *art= LIST_REMHEAD(conn->priority);
+ if (!art) art= LIST_REMHEAD(conn->waiting);
+ if (!art) break;
+
+ if (art->state >= art_Wanted || (conn->stream && nocheck)) {
+ /* actually send it */
+
+ ARTHANDLE *artdata= SMretrieve(art->token, RETR_ALL);
+
+ art->state=
+ art->state == art_Unchecked ? art_Unsolicited :
+ art->state == art_Wanted ? art_Wanted :
+ (abort(),-1);
+
+ if (!artdata) art->missing= 1;
+ art->ipf->counts[art->state][ artdata ? RC_sent : RC_missing ]++;
+
+ if (conn->stream) {
+ if (artdata) {
+ XMIT_LITERAL("TAKETHIS ");
+ xmit_noalloc(conn, art->messageid, art->midlen);
+ XMIT_LITERAL("\r\n");
+ xmit_artbody(conn, artdata);
+ } else {
+ article_done(art, -1);
+ continue;
+ }
+ } else {
+ /* we got 235 from IHAVE */
+ if (artdata) {
+ xmit_artbody(conn, artdata);
+ } else {
+ XMIT_LITERAL(".\r\n");
+ }
+ }
+
+ LIST_ADDTAIL(conn->sent, art);
+
+ } else {
+ /* check it */
+
+ if (conn->stream)
+ XMIT_LITERAL("CHECK ");
+ else
+ XMIT_LITERAL("IHAVE ");
+ xmit_noalloc(conn, art->messageid, art->midlen);
+ XMIT_LITERAL("\r\n");
+
+ assert(art->state == art_Unchecked);
+ art->ipf->counts[art->state][RC_sent]++;
+ LIST_ADDTAIL(conn->sent, art);
+ }
+ }
+}
+
+/*========== handling responses from peer ==========*/
+
+static const oop_rd_style peer_rd_style= {
+ OOP_RD_DELIM_STRIP, '\n',
+ OOP_RD_NUL_FORBID,
+ OOP_RD_SHORTREC_FORBID
+};
+
+static void *peer_rd_err(oop_source *lp, oop_read *oread, oop_rd_event ev,
+ const char *errmsg, int errnoval,
+ const char *data, size_t recsz, void *conn_v) {
+ Conn *conn= conn_v;
+ connfail(conn, "error receiving from peer: %s", errmsg);
+ return OOP_CONTINUE;
+}
+
+static Article *article_reply_check(Conn *conn, const char *response,
+ int code_indicates_streaming,
+ int must_have_sent
+ /* 1:yes, -1:no, 0:dontcare */,
+ const char *sanitised_response) {
+ Article *art= LIST_HEAD(conn->sent);
+
+ if (!art) {
+ connfail(conn,
+ "peer gave unexpected response when no commands outstanding: %s",
+ sanitised_response);
+ return 0;
+ }
+
+ if (code_indicates_streaming) {
+ assert(!memchr(response, 0, 4)); /* ensured by peer_rd_ok */
+ if (!conn->stream) {
+ connfail(conn, "peer gave streaming response code "
+ " to IHAVE or subsequent body: %s", sanitised_response);
+ return 0;
+ }
+ const char *got_mid= response+4;
+ int got_midlen= strcspn(got_mid, " \n\r");
+ if (got_midlen<3 || got_mid[0]!='<' || got_mid[got_midlen-1]!='>') {
+ connfail(conn, "peer gave streaming response with syntactically invalid"
+ " messageid: %s", sanitised_response);
+ return 0;
+ }
+ if (got_midlen != art->midlen ||
+ memcmp(got_mid, art->messageid, got_midlen)) {
+ connfail(conn, "peer gave streaming response code to wrong article -"
+ " probable synchronisation problem; we offered: %s;"
+ " peer said: %s",
+ art->messageid, sanitised_response);
+ return 0;
+ }
+ } else {
+ if (conn->stream) {
+ connfail(conn, "peer gave non-streaming response code to"
+ " CHECK/TAKETHIS: %s", sanitised_response);
+ return 0;
+ }
+ }
+
+ if (must_have_sent>0 && art->state < art_Wanted) {
+ connfail(conn, "peer says article accepted but"
+ " we had not sent the body: %s", sanitised_response);
+ return 0;
+ }
+ if (must_have_sent<0 && art->state >= art_Wanted) {
+ connfail(conn, "peer says please sent the article but we just did: %s",
+ sanitised_response);
+ return 0;
+ }
+
+ Article *art_again= LIST_REMHEAD(conn->sent);
+ assert(art_again == art);
+ return art;
+}
+
+static void update_nocheck(int accepted) {
+ accept_proportion *= nocheck_decay;
+ accept_proportion += accepted * (1.0 - nocheck_decay);
+ int new_nocheck= accept_proportion >= nocheck_thresh;
+ if (new_nocheck && !nocheck_reported) {
+ notice("entering nocheck mode for the first time");
+ nocheck_reported= 1;
+ } else if (new_nocheck != nocheck) {
+ debug("nocheck mode %s", new_nocheck ? "start" : "stop");
+ }
+ nocheck= new_nocheck;
+}
+
+static void article_done(Article *art, int whichcount) {
+ if (!art->missing) art->ipf->counts[art->state][whichcount]++;
+
+ if (whichcount == RC_accepted) update_nocheck(1);
+ else if (whichcount == RC_unwanted) update_nocheck(0);
+
+ InputFile *ipf= art->ipf;
+
+ while (art->blanklen) {
+ static const char spaces[]=
+ " "
+ " "
+ " "
+ " "
+ " "
+ " "
+ " "
+ " "
+ " ";
+ int w= art->blanklen; if (w >= sizeof(spaces)) w= sizeof(spaces)-1;
+ int r= pwrite(ipf->fd, spaces, w, art->offset);
+ if (r==-1) {
+ if (errno==EINTR) continue;
+ sysdie("failed to blank entry for %s (length %d at offset %lu) in %s",
+ art->messageid, art->blanklen,
+ (unsigned long)art->offset, ipf->path);
+ }
+ assert(r>=0 && r<=w);
+ art->blanklen -= w;
+ art->offset += w;
+ }
+
+ ipf->inprogress--;
+ assert(ipf->inprogress >= 0);
+ free(art);
+
+ if (!ipf->inprogress && ipf != main_input_file)
+ queue_check_input_done();
+}
+
+static void *peer_rd_ok(oop_source *lp, oop_read *oread, oop_rd_event ev,
+ const char *errmsg, int errnoval,
+ const char *data, size_t recsz, void *conn_v) {
+ Conn *conn= conn_v;
+
+ if (ev == OOP_RD_EOF) {
+ connfail(conn, "unexpected EOF from peer");
+ return OOP_CONTINUE;
+ }
+ assert(ev == OOP_RD_OK);
+
+ char *sani= sanitise(data,-1);
+
+ char *ep;
+ unsigned long code= strtoul(data, &ep, 10);
+ if (ep != data+3 || *ep != ' ' || data[0]=='0') {
+ connfail(conn, "badly formatted response from peer: %s", sani);
+ return OOP_CONTINUE;
+ }
+
+ int conn_busy=
+ conn->waiting.count ||
+ conn->priority.count ||
+ conn->sent.count ||
+ conn->xmitu;
+
+ if (conn->quitting) {
+ if (code!=205 && code!=503) {
+ connfail(conn, "peer gave unexpected response to QUIT: %s", sani);
+ } else {
+ notice("C%d idle connection closed by us", conn->fd);
+ assert(!conn_busy);
+ LIST_REMOVE(conns,conn);
+ conn_dispose(conn);
+ }
+ return OOP_CONTINUE;
+ }
+
+ conn->since_activity= 0;
+ Article *art;
+
+#define GET_ARTICLE(musthavesent) do{ \
+ art= article_reply_check(conn, data, code_streaming, musthavesent, sani); \
+ if (!art) return OOP_CONTINUE; /* reply_check has failed the conn */ \
+ }while(0)
+
+#define ARTICLE_DEALTWITH(streaming,musthavesent,how) do{ \
+ code_streaming= (streaming); \
+ GET_ARTICLE(musthavesent); \
+ article_done(art, RC_##how); \
+ goto dealtwith; \
+ }while(0)
+
+#define PEERBADMSG(m) do { \
+ connfail(conn, m ": %s", sani); return OOP_CONTINUE; \
+ }while(0)
+
+ int code_streaming= 0;
+
+ switch (code) {
+
+ case 400: PEERBADMSG("peer stopped accepting articles");
+ default: PEERBADMSG("peer sent unexpected message");
+
+ case 503:
+ if (conn_busy) PEERBADMSG("peer timed us out");
+ notice("C%d idle connection closed by peer", conn->fd);
+ LIST_REMOVE(conns,conn);
+ conn_dispose(conn);
+ return OOP_CONTINUE;
+
+ case 435: ARTICLE_DEALTWITH(0,0,unwanted); /* IHAVE says they have it */
+ case 438: ARTICLE_DEALTWITH(1,0,unwanted); /* CHECK/TAKETHIS: they have it */
+
+ case 235: ARTICLE_DEALTWITH(0,1,accepted); /* IHAVE says thanks */
+ case 239: ARTICLE_DEALTWITH(1,1,accepted); /* TAKETHIS says thanks */
+
+ case 437: ARTICLE_DEALTWITH(0,0,rejected); /* IHAVE says rejected */
+ case 439: ARTICLE_DEALTWITH(1,0,rejected); /* TAKETHIS says rejected */
+
+ case 238: /* CHECK says send it */
+ code_streaming= 1;
+ case 335: /* IHAVE says send it */
+ GET_ARTICLE(-1);
+ assert(art->state == art_Unchecked);
+ art->ipf->counts[art->state][RC_accepted]++;
+ art->state= art_Wanted;
+ LIST_ADDTAIL(conn->priority, art);
+ break;
+
+ case 431: /* CHECK or TAKETHIS says try later */
+ code_streaming= 1;
+ case 436: /* IHAVE says try later */
+ GET_ARTICLE(0);
+ article_defer(art, RC_deferred);
+ break;
+
+ }
+dealtwith:
+
+ conn_maybe_write(conn);
+ check_assign_articles();
+ return OOP_CONTINUE;
+}
+
+
+/*========== monitoring of input files ==========*/
+
+static void feedfile_eof(InputFile *ipf) {