chiark / gitweb /
wip split into multiple files and make compile
[innduct.git] / conn.c
diff --git a/conn.c b/conn.c
new file mode 100644 (file)
index 0000000..301d025
--- /dev/null
+++ b/conn.c
@@ -0,0 +1,406 @@
+/*========== management of connections ==========*/
+
+static void reconnect_blocking_event(void) {
+  until_connect= reconnect_delay_periods;
+}
+
+void conn_closefd(Conn *conn, const char *msgprefix) {
+  int r= close_perhaps(&conn->fd);
+  if (r) info("C%d %serror closing socket: %s",
+             conn->fd, msgprefix, strerror(errno));
+}
+
+static int conn_busy(Conn *conn) {
+  return
+    conn->waiting.count ||
+    conn->priority.count ||
+    conn->sent.count ||
+    conn->xmitu;
+}
+
+static void conn_dispose(Conn *conn) {
+  if (!conn) return;
+  if (conn->rd) {
+    oop_rd_cancel(conn->rd);
+    oop_rd_delete_kill(conn->rd);
+    conn->rd= 0;
+  }
+  if (conn->fd) {
+    loop->cancel_fd(loop, conn->fd, OOP_WRITE);
+    loop->cancel_fd(loop, conn->fd, OOP_EXCEPTION);
+  }
+  conn_closefd(conn,"");
+  free(conn);
+}
+
+static void *conn_exception(oop_source *lp, int fd,
+                           oop_event ev, void *conn_v) {
+  Conn *conn= conn_v;
+  unsigned char ch;
+  assert(fd == conn->fd);
+  assert(ev == OOP_EXCEPTION);
+  int r= read(conn->fd, &ch, 1);
+  if (r<0) connfail(conn,"read failed: %s",strerror(errno));
+  else connfail(conn,"exceptional condition on socket (peer sent urgent"
+               " data? read(,&ch,1)=%d,ch='\\x%02x')",r,ch);
+  return OOP_CONTINUE;
+}  
+
+static void vconnfail(Conn *conn, const char *fmt, va_list al) {
+  int requeue[art_MaxState];
+  memset(requeue,0,sizeof(requeue));
+
+  Article *art;
+  
+  while ((art= LIST_REMHEAD(conn->priority)))
+    LIST_ADDTAIL(art->ipf->queue, art);
+
+  while ((art= LIST_REMHEAD(conn->waiting)))
+    LIST_ADDTAIL(art->ipf->queue, art);
+
+  while ((art= LIST_REMHEAD(conn->sent))) {
+    requeue[art->state]++;
+    if (art->state==art_Unsolicited) art->state= art_Unchecked;
+    LIST_ADDTAIL(art->ipf->queue,art);
+    check_reading_pause_resume(art->ipf);
+  }
+
+  int i;
+  XmitDetails *d;
+  for (i=0, d=conn->xmitd; i<conn->xmitu; i++, d++)
+    xmit_free(d);
+
+  LIST_REMOVE(conns,conn);
+
+  char *m= xvasprintf(fmt,al);
+  warn("C%d (now %d) connection failed requeueing " RCI_TRIPLE_FMT_BASE ": %s",
+       conn->fd, conns.count, RCI_TRIPLE_VALS_BASE(requeue, /*nothing*/), m);
+  free(m);
+
+  reconnect_blocking_event();
+  conn_dispose(conn);
+  check_assign_articles();
+}
+
+static void connfail(Conn *conn, const char *fmt, ...) {
+  va_list al;
+  va_start(al,fmt);
+  vconnfail(conn,fmt,al);
+  va_end(al);
+}
+
+static void conn_idle_close(Conn *conn, const char *why) {
+  static const char quitcmd[]= "QUIT\r\n";
+  int todo= sizeof(quitcmd)-1;
+  const char *p= quitcmd;
+  for (;;) {
+    int r= write(conn->fd, p, todo);
+    if (r<0) {
+      if (isewouldblock(errno))
+       connfail(conn, "blocked writing QUIT to idle connection");
+      else
+       connfail(conn, "failed to write QUIT to idle connection: %s",
+                strerror(errno));
+      break;
+    }
+    assert(r<=todo);
+    todo -= r;
+    if (!todo) {
+      conn->quitting= why;
+      conn->since_activity= 0;
+      dbg("C%d is idle (%s), quitting", conn->fd, why);
+      break;
+    }
+  }
+}
+
+/*
+ * For our last connection, we also shut it down if we have had
+ * less than K in the last L
+ */
+static void check_idle_conns(void) {
+  Conn *conn;
+
+  int volthisperiod= lowvol_perperiod[lowvol_circptr];
+  lowvol_circptr++;
+  lowvol_circptr %= lowvol_periods;
+  lowvol_total += volthisperiod;
+  lowvol_total -= lowvol_perperiod[lowvol_circptr];
+  lowvol_perperiod[lowvol_circptr]= 0;
+
+  FOR_CONN(conn)
+    conn->since_activity++;
+
+ search_again:
+  FOR_CONN(conn) {
+    if (conn->since_activity <= need_activity_periods) continue;
+
+    /* We need to shut this down */
+    if (conn->quitting)
+      connfail(conn,"timed out waiting for response to QUIT (%s)",
+              conn->quitting);
+    else if (conn->sent.count)
+      connfail(conn,"timed out waiting for responses");
+    else if (conn->waiting.count || conn->priority.count)
+      connfail(conn,"BUG IN INNDUCT conn has queue but nothing sent");
+    else if (conn->xmitu)
+      connfail(conn,"peer has been sending responses"
+              " before receiving our commands!");
+    else
+      conn_idle_close(conn, "no activity");
+    
+    goto search_again;
+  }
+
+  conn= LIST_HEAD(conns);
+  if (!volthisperiod &&
+      conns.count==1 &&
+      lowvol_total < lowvol_thresh &&
+      !conn_busy(conn))
+    conn_idle_close(conn, "low volume");
+}  
+
+/*---------- making new connections ----------*/
+
+static pid_t connecting_child;
+int connecting_fdpass_sock;
+
+static void connect_attempt_discard(void) {
+  if (connecting_child) {
+    int status= xwaitpid(&connecting_child, "connect");
+    if (!(WIFEXITED(status) ||
+         (WIFSIGNALED(status) && WTERMSIG(status) == SIGKILL)))
+      report_child_status("connect", status);
+  }
+  if (connecting_fdpass_sock) {
+    cancel_fd_read_except(connecting_fdpass_sock);
+    xclose_perhaps(&connecting_fdpass_sock, "connecting fdpass socket",0);
+  }
+}
+
+#define PREP_DECL_MSG_CMSG(msg)                        \
+  char msgbyte= 0;                             \
+  struct iovec msgiov;                         \
+  msgiov.iov_base= &msgbyte;                   \
+  msgiov.iov_len= 1;                           \
+  struct msghdr msg;                           \
+  memset(&msg,0,sizeof(msg));                  \
+  char msg##cbuf[CMSG_SPACE(sizeof(int))];     \
+  msg.msg_iov= &msgiov;                                \
+  msg.msg_iovlen= 1;                           \
+  msg.msg_control= msg##cbuf;                  \
+  msg.msg_controllen= sizeof(msg##cbuf);
+
+static void *connchild_event(oop_source *lp, int fd, oop_event e, void *u) {
+  Conn *conn= 0;
+
+  assert(fd == connecting_fdpass_sock);
+
+  PREP_DECL_MSG_CMSG(msg);
+  
+  ssize_t rs= recvmsg(fd, &msg, 0);
+  if (rs<0) {
+    if (isewouldblock(errno)) return OOP_CONTINUE;
+    syswarn("failed to read socket from connecting child");
+    goto x;
+  }
+
+  NEW(conn);
+  LIST_INIT(conn->waiting);
+  LIST_INIT(conn->priority);
+  LIST_INIT(conn->sent);
+
+  struct cmsghdr *h= 0;
+  if (rs >= 0) h= CMSG_FIRSTHDR(&msg);
+  if (!h) {
+    int status= xwaitpid(&connecting_child, "connect child (broken)");
+
+    if (WIFEXITED(status)) {
+      if (WEXITSTATUS(status) != 0 &&
+         WEXITSTATUS(status) != CONNCHILD_ESTATUS_STREAM &&
+         WEXITSTATUS(status) != CONNCHILD_ESTATUS_NOSTREAM)
+       /* child already reported the problem */;
+      else {
+       if (e == OOP_EXCEPTION)
+         warn("connect: connection child exited code %d but"
+              " unexpected exception on fdpass socket",
+              WEXITSTATUS(status));
+       else
+         warn("connect: connection child exited code %d but"
+              " no cmsg (rs=%d)",
+              WEXITSTATUS(status), (int)rs);
+      }
+    } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGALRM) {
+      warn("connect: connection attempt timed out");
+    } else {
+      report_child_status("connect", status);
+    }
+    goto x;
+  }
+
+#define CHK(field, val)                                                           \
+  if (h->cmsg_##field != val) {                                                   \
+    crash("connect: child sent cmsg with cmsg_" #field "=%d, expected %d", \
+         h->cmsg_##field, val);                                           \
+    goto x;                                                               \
+  }
+  CHK(level, SOL_SOCKET);
+  CHK(type,  SCM_RIGHTS);
+  CHK(len,   CMSG_LEN(sizeof(conn->fd)));
+#undef CHK
+
+  if (CMSG_NXTHDR(&msg,h)) crash("connect: child sent many cmsgs");
+
+  memcpy(&conn->fd, CMSG_DATA(h), sizeof(conn->fd));
+
+  int status;
+  pid_t got= waitpid(connecting_child, &status, 0);
+  if (got==-1) syscrash("connect: real wait for child");
+  assert(got == connecting_child);
+  connecting_child= 0;
+
+  if (!WIFEXITED(status)) { report_child_status("connect",status); goto x; }
+  int es= WEXITSTATUS(status);
+  switch (es) {
+  case CONNCHILD_ESTATUS_STREAM:    conn->stream= 1;   break;
+  case CONNCHILD_ESTATUS_NOSTREAM:  conn->stream= 0;   break;
+  default:
+    die("connect: child gave unexpected exit status %d", es);
+  }
+
+  /* Phew! */
+  conn->max_queue= conn->stream ? max_queue_per_conn : 1;
+
+  loop->on_fd(loop, conn->fd, OOP_EXCEPTION, conn_exception, conn);
+  conn->rd= oop_rd_new_fd(loop,conn->fd, 0, 0); /* sets nonblocking, too */
+  if (!conn->fd) crash("oop_rd_new_fd conn failed (fd=%d)",conn->fd);
+  int r= oop_rd_read(conn->rd, &peer_rd_style, NNTP_STRLEN,
+                    &peer_rd_ok, conn,
+                    &peer_rd_err, conn);
+  if (r) syscrash("oop_rd_read for peer (fd=%d)",conn->fd);
+
+  LIST_ADDHEAD(conns, conn);
+  notice("C%d (now %d) connected %s",
+        conn->fd, conns.count, conn->stream ? "streaming" : "plain");
+
+  connect_attempt_discard();
+  check_assign_articles();
+  return OOP_CONTINUE;
+
+ x:
+  conn_dispose(conn);
+  connect_attempt_discard();
+  reconnect_blocking_event();
+  return OOP_CONTINUE;
+}
+
+static int allow_connect_start(void) {
+  return conns.count < max_connections
+    && !connecting_child
+    && !until_connect;
+}
+
+static void connect_start(void) {
+  assert(!connecting_child);
+  assert(!connecting_fdpass_sock);
+
+  info("starting connection attempt");
+  int ok_until_connect= until_connect;
+  reconnect_blocking_event();
+
+  int socks[2];
+  int r= socketpair(AF_UNIX, SOCK_STREAM, 0, socks);
+  if (r) { syswarn("connect: cannot create socketpair for child"); return; }
+
+  connecting_child= xfork("connection");
+
+  if (!connecting_child) {
+    FILE *cn_from, *cn_to;
+    char buf[NNTP_STRLEN+100];
+    int exitstatus= CONNCHILD_ESTATUS_NOSTREAM;
+
+    xclose(socks[0], "(in child) parent's connection fdpass socket",0);
+
+    alarm(connection_setup_timeout);
+    if (NNTPconnect((char*)remote_host, port, &cn_from, &cn_to, buf) < 0) {
+      int l= strlen(buf);
+      int stripped=0;
+      while (l>0) {
+       unsigned char c= buf[l-1];
+       if (!isspace(c)) break;
+       if (c=='\n' || c=='\r') stripped=1;
+       --l;
+      }
+      if (!buf[0]) {
+       sysdie("connect: connection attempt failed");
+      } else {
+       buf[l]= 0;
+       die("connect: %s: %s", stripped ? "rejected" : "failed",
+           sanitise(buf,-1));
+      }
+    }
+    if (NNTPsendpassword((char*)remote_host, cn_from, cn_to) < 0)
+      sysdie("connect: authentication failed");
+    if (try_stream) {
+      if (fputs("MODE STREAM\r\n", cn_to)==EOF ||
+         fflush(cn_to))
+       sysdie("connect: could not send MODE STREAM");
+      buf[sizeof(buf)-1]= 0;
+      if (!fgets(buf, sizeof(buf)-1, cn_from)) {
+       if (ferror(cn_from))
+         sysdie("connect: could not read response to MODE STREAM");
+       else
+         die("connect: connection close in response to MODE STREAM");
+      }
+      int l= strlen(buf);
+      assert(l>=1);
+      if (buf[l-1]!='\n')
+       die("connect: response to MODE STREAM is too long: %.100s...",
+           sanitise(buf,-1));
+      l--;  if (l>0 && buf[l-1]=='\r') l--;
+      buf[l]= 0;
+      char *ep;
+      int rcode= strtoul(buf,&ep,10);
+      if (ep != &buf[3])
+       die("connect: bad response to MODE STREAM: %.50s", sanitise(buf,-1));
+
+      switch (rcode) {
+      case 203:
+       exitstatus= CONNCHILD_ESTATUS_STREAM;
+       break;
+      case 480:
+      case 500:
+       break;
+      default:
+       warn("connect: unexpected response to MODE STREAM: %.50s",
+            sanitise(buf,-1));
+       exitstatus= 2;
+       break;
+      }
+    }
+    int fd= fileno(cn_from);
+
+    PREP_DECL_MSG_CMSG(msg);
+    struct cmsghdr *cmsg= CMSG_FIRSTHDR(&msg);
+    cmsg->cmsg_level= SOL_SOCKET;
+    cmsg->cmsg_type=  SCM_RIGHTS;
+    cmsg->cmsg_len=   CMSG_LEN(sizeof(fd));
+    memcpy(CMSG_DATA(cmsg), &fd, sizeof(fd));
+
+    msg.msg_controllen= cmsg->cmsg_len;
+    r= sendmsg(socks[1], &msg, 0);
+    if (r<0) syscrash("sendmsg failed for new connection");
+    if (r!=1) crash("sendmsg for new connection gave wrong result %d",r);
+
+    _exit(exitstatus);
+  }
+
+  xclose(socks[1], "connecting fdpass child's socket",0);
+  connecting_fdpass_sock= socks[0];
+  xsetnonblock(connecting_fdpass_sock, 1);
+  on_fd_read_except(connecting_fdpass_sock, connchild_event);
+
+  if (!conns.count)
+    until_connect= ok_until_connect;
+}
+