chiark / gitweb /
Output buffering on admin connections.
authormdw <mdw>
Mon, 19 Feb 2001 19:11:09 +0000 (19:11 +0000)
committermdw <mdw>
Mon, 19 Feb 2001 19:11:09 +0000 (19:11 +0000)
admin.c
tripe.h

diff --git a/admin.c b/admin.c
index e5e9201c7e2f4dc5409cef4b53f64a2899038bec..4571b6628aed249c2cfaec5698b026f870defaee 100644 (file)
--- a/admin.c
+++ b/admin.c
@@ -1,6 +1,6 @@
 /* -*-c-*-
  *
- * $Id: admin.c,v 1.5 2001/02/16 21:22:51 mdw Exp $
+ * $Id: admin.c,v 1.6 2001/02/19 19:11:09 mdw Exp $
  *
  * Admin interface for configuration
  *
@@ -29,6 +29,9 @@
 /*----- Revision history --------------------------------------------------* 
  *
  * $Log: admin.c,v $
+ * Revision 1.6  2001/02/19 19:11:09  mdw
+ * Output buffering on admin connections.
+ *
  * Revision 1.5  2001/02/16 21:22:51  mdw
  * Support for displaying statistics.  Make client connections blocking, so
  * that things don't get dropped.  (This might change again if I add
@@ -89,6 +92,144 @@ static sig s_term, s_int, s_hup;
 
 #define T_RESOLVE SEC(30)
 
+static void a_destroy(admin */*a*/);
+static void a_lock(admin */*a*/);
+static void a_unlock(admin */*a*/);
+
+/*----- Output functions --------------------------------------------------*/
+
+/* --- @trywrite@ --- *
+ *
+ * Arguments:  @admin *a@ = pointer to an admin block
+ *             @const char *p@ = pointer to buffer to write
+ *             @size_t sz@ = size of data to write
+ *
+ * Returns:    The number of bytes written, or less than zero on error.
+ *
+ * Use:                Attempts to write data to a client.
+ */
+
+static ssize_t trywrite(admin *a, const char *p, size_t sz)
+{
+  ssize_t n, done = 0;
+
+again:
+  if (!sz)
+    return (done);
+  n = write(a->w.fd, p, sz);
+  if (n > 0) {
+    done += n;
+    p += n;
+    sz -= n;
+    goto again;
+  }
+  if (n < 0) {
+    if (errno == EINTR)
+      goto again;
+    if (errno != EAGAIN && errno != EWOULDBLOCK) {
+      a_destroy(a);
+      a_warn("disconnecting admin client due to write errors: %s",
+            strerror(errno));
+      return (-1);
+    }
+  }
+  return (done);
+}
+
+/* --- @dosend@ --- *
+ *
+ * Arguemnts:  @admin *a@ = pointer to an admin block
+ *             @const char *p@ = pointer to buffer to write
+ *             @size_t sz@ = size of data to write
+ *
+ * Returns:    ---
+ *
+ * Use:                Sends data to an admin client.
+ */
+
+static void dosend(admin *a, const char *p, size_t sz)
+{
+  ssize_t n;
+  obuf *o;
+
+  if (a->f & AF_DEAD)
+    return;
+
+  /* --- Try to send the data immediately --- */
+
+  if (!a->o_head) {
+    if ((n = trywrite(a, p, sz)) < 0)
+      return;
+    p += n;
+    sz -= n;
+    if (!sz)
+      return;
+  }
+       
+  /* --- Fill buffers with the data until it's all gone --- */
+
+  o = a->o_tail;
+  if (!o)
+    sel_addfile(&a->w);
+  else if (o->p_in < o->buf + OBUFSZ)
+    goto noalloc;
+
+  do {
+    o = xmalloc(sizeof(obuf));
+    o->next = 0;
+    o->p_in = o->p_out = o->buf;
+    if (a->o_tail)
+      a->o_tail->next = o;
+    else
+      a->o_head = o;
+    a->o_tail = o;
+
+  noalloc:
+    n = o->buf + OBUFSZ - o->p_in;
+    if (n > sz)
+      n = sz;
+    memcpy(o->p_in, p, n);
+    o->p_in += n;
+    p += n;
+    sz -= n;
+  } while (sz);
+}
+
+/* --- @a_flush@ --- *
+ *
+ * Arguments:  @int fd@ = file descriptor
+ *             @unsigned mode@ = what's happening
+ *             @void *v@ = pointer to my admin block
+ *
+ * Returns:    ---
+ *
+ * Use:                Flushes buffers when a client is ready to read again.
+ */
+
+static void a_flush(int fd, unsigned mode, void *v)
+{
+  admin *a = v;
+  obuf *o, *oo;
+  ssize_t n;
+
+  o = a->o_head;
+  while (o) {
+    if ((n = trywrite(a, o->p_out, o->p_in - o->p_out)) < 0)
+      return;
+    o->p_out += n;
+    if (o->p_in < o->p_out)
+      break;
+    oo = o;
+    o = o->next;
+    xfree(oo);
+  }
+  a->o_head = o;
+  if (!o) {
+    a->o_tail = 0;
+    sel_rmfile(&a->w);
+  }
+}
+
 /*----- Utility functions -------------------------------------------------*/
 
 /* --- @a_write@ --- *
@@ -109,7 +250,7 @@ static void a_write(admin *a, const char *fmt, ...)
   va_start(ap, fmt);
   dstr_vputf(&d, fmt, ap);
   va_end(ap);
-  write(a->fd, d.buf, d.len);
+  dosend(a, d.buf, d.len);
   dstr_destroy(&d);
 }
 
@@ -126,7 +267,7 @@ static void a_write(admin *a, const char *fmt, ...)
 void a_warn(const char *fmt, ...)
 {
   va_list ap;
-  admin *a;
+  admin *a, *aa;
   dstr d = DSTR_INIT;
 
   if (flags & F_INIT)
@@ -138,8 +279,10 @@ void a_warn(const char *fmt, ...)
     moan("%s", d.buf);
   else {
     dstr_putc(&d, '\n');
-    for (a = admins; a; a = a->next)
-      write(a->fd, d.buf, d.len);
+    for (a = admins; a; a = aa) {
+      aa = a->next;
+      dosend(a, d.buf, d.len);
+    }
   }
   dstr_destroy(&d);
 }
@@ -159,13 +302,15 @@ void a_warn(const char *fmt, ...)
 static void a_trace(const char *p, size_t sz, void *v)
 {
   dstr d = DSTR_INIT;
-  admin *a;
+  admin *a, *aa;
 
   dstr_puts(&d, "TRACE ");
   dstr_putm(&d, p, sz);
   dstr_putc(&d, '\n');
-  for (a = admins; a; a = a->next)
-    write(a->fd, d.buf, d.len);
+  for (a = admins; a; a = aa) {
+    aa = a->next;
+    dosend(a, d.buf, d.len);
+  }
   dstr_destroy(&d);  
 }
 #endif
@@ -243,6 +388,8 @@ static void a_sighup(int sig, void *v)
 static void a_resolve(struct hostent *h, void *v)
 {
   admin *a = v;
+
+  a_lock(a);
   T( trace(T_ADMIN, "admin: %u resolved", a->seq); )
   TIMER;
   sel_rmtimer(&a->t);
@@ -261,6 +408,7 @@ static void a_resolve(struct hostent *h, void *v)
   xfree(a->paddr);
   a->pname = 0;
   selbuf_enable(&a->b);
+  a_unlock(a);
 }
 
 /* --- @a_timer@ --- *
@@ -276,6 +424,8 @@ static void a_resolve(struct hostent *h, void *v)
 static void a_timer(struct timeval *tv, void *v)
 {
   admin *a = v;
+
+  a_lock(a);
   T( trace(T_ADMIN, "admin: %u resolver timeout", a->seq); )
   bres_abort(&a->r);
   a_write(a, "FAIL timeout resolving `%s'\n", a->paddr);
@@ -283,6 +433,7 @@ static void a_timer(struct timeval *tv, void *v)
   xfree(a->paddr);
   a->pname = 0;
   selbuf_enable(&a->b);
+  a_unlock(a);
 }
 
 /* --- @acmd_add@ --- *
@@ -410,8 +561,6 @@ static void acmd_port(admin *a, unsigned ac, char *av[])
   a_write(a, "INFO %u\nOK\n", p_port());
 }
 
-static void a_destroy(admin */*a*/);
-
 static void acmd_daemon(admin *a, unsigned ac, char *av[])
 {
   if (flags & F_DAEMON)
@@ -546,39 +695,105 @@ static void acmd_help(admin *a, unsigned ac, char *av[])
 
 /*----- Connection handling -----------------------------------------------*/
 
-/* --- @a_destroy@ --- *
+/* --- @a_lock@ --- *
  *
  * Arguments:  @admin *a@ = pointer to an admin block
  *
  * Returns:    ---
  *
- * Use:                Destroys an admin block.
+ * Use:                Locks an admin block so that it won't be destroyed
+ *             immediately.
  */
 
-static void a_destroy(admin *a)
+static void a_lock(admin *a) { assert(!(a->f & AF_LOCK)); a->f |= AF_LOCK; }
+
+/* --- @a_unlock@ --- *
+ *
+ * Arguments:  @admin *a@ = pointer to an admin block
+ *
+ * Returns:    ---
+ *
+ * Use:                Unlocks an admin block, allowing its destruction.  This is
+ *             also the second half of @a_destroy@.
+ */
+
+static void a_unlock(admin *a)
 {
-  T( trace(T_ADMIN, "admin: destroying connection %u", a->seq); )
+  assert(a->f & AF_LOCK);
+  if (!(a->f & AF_DEAD)) {
+    a->f &= ~AF_LOCK;
+    return;
+  }
+
+  T( trace(T_ADMIN, "admin: completing destruction of connection %u",
+          a->seq); )
+
   selbuf_destroy(&a->b);
-  if (a->b.reader.fd != a->fd)
-    close(a->b.reader.fd);
-  close(a->fd);
   if (a->pname) {
     xfree(a->pname);
     xfree(a->paddr);
     bres_abort(&a->r);
     sel_rmtimer(&a->t);
   }
+  if (a->b.reader.fd != a->w.fd)
+    close(a->b.reader.fd);
+  close(a->w.fd);
+
+  if (a_stdin == a)
+    a_stdin = 0;
   if (a->next)
     a->next->prev = a->prev;
   if (a->prev)
     a->prev->next = a->next;
   else
     admins = a->next;
-  if (a_stdin == a)
-    a_stdin = 0;
   DESTROY(a);
 }
 
+/* --- @a_destroy@ --- *
+ *
+ * Arguments:  @admin *a@ = pointer to an admin block
+ *
+ * Returns:    ---
+ *
+ * Use:                Destroys an admin block.  This requires a certain amount of
+ *             care.
+ */
+
+static void a_destroy(admin *a)
+{
+  /* --- Don't multiply destroy admin blocks --- */
+
+  if (a->f & AF_DEAD)
+    return;
+
+  /* --- Make sure nobody expects it to work --- */
+
+  a->f |= AF_DEAD;
+  T( trace(T_ADMIN, "admin: destroying connection %u", a->seq); )
+
+  /* --- Free the output buffers --- */
+
+  if (a->o_head) {
+    obuf *o, *oo;
+    sel_rmfile(&a->w);
+    for (o = a->o_head; o; o = oo) {
+      oo = o->next;
+      xfree(o);
+    }
+    a->o_head = 0;
+  }
+
+  /* --- If the block is locked, that's all we can manage --- */
+
+  if (a->f & AF_LOCK) {
+    T( trace(T_ADMIN, "admin: deferring destruction..."); )
+    return;
+  }
+  a->f |= AF_LOCK;
+  a_unlock(a);
+}
+
 /* --- @a_line@ --- *
  *
  * Arguments:  @char *p@ = pointer to the line read
@@ -597,6 +812,8 @@ static void a_line(char *p, void *vp)
   size_t ac;
 
   TIMER;
+  if (a->f & AF_DEAD)
+    return;
   if (!p) {
     a_destroy(a);
     return;
@@ -610,8 +827,11 @@ static void a_line(char *p, void *vp)
       ac--;
       if (c->argmin > ac || ac > c->argmax)
        a_write(a, "FAIL syntax: %s\n", c->help);
-      else
+      else {
+       a_lock(a);
        c->func(a, ac, av + 1);
+       a_unlock(a);
+      }
       return;
     }
   }
@@ -634,13 +854,16 @@ void a_create(int fd_in, int fd_out)
      a->seq = seq++; )
   T( trace(T_ADMIN, "admin: accepted connection %u", a->seq); )
   a->pname = 0;
+  a->f = 0;
   if (fd_in == STDIN_FILENO)
     a_stdin = a;
-  fdflags(fd_in, O_NONBLOCK, 0, FD_CLOEXEC, FD_CLOEXEC);
+  fdflags(fd_in, O_NONBLOCK, O_NONBLOCK, FD_CLOEXEC, FD_CLOEXEC);
   if (fd_out != fd_in)
-    fdflags(fd_out, O_NONBLOCK, 0, FD_CLOEXEC, FD_CLOEXEC);
-  a->fd = fd_out;
+    fdflags(fd_out, O_NONBLOCK, O_NONBLOCK, FD_CLOEXEC, FD_CLOEXEC);
   selbuf_init(&a->b, &sel, fd_in, a_line, a);
+  sel_initfile(&sel, &a->w, fd_out, SEL_WRITE, a_flush, a);
+  a->o_head = 0;
+  a->o_tail = 0;
   a->next = admins;
   a->prev = 0;
   if (admins)
diff --git a/tripe.h b/tripe.h
index 9f7fd5d719ff9678102e258b7c807847e13886f8..7f5e48c1bae6ea465983791e2af4f7f5ef7c7e96 100644 (file)
--- a/tripe.h
+++ b/tripe.h
@@ -1,6 +1,6 @@
 /* -*-c-*-
  *
- * $Id: tripe.h,v 1.5 2001/02/16 21:41:43 mdw Exp $
+ * $Id: tripe.h,v 1.6 2001/02/19 19:11:09 mdw Exp $
  *
  * Main header file for TrIPE
  *
@@ -29,6 +29,9 @@
 /*----- Revision history --------------------------------------------------* 
  *
  * $Log: tripe.h,v $
+ * Revision 1.6  2001/02/19 19:11:09  mdw
+ * Output buffering on admin connections.
+ *
  * Revision 1.5  2001/02/16 21:41:43  mdw
  * Major changes.  See source files for details.
  *
@@ -405,21 +408,34 @@ typedef struct peer {
 
 /* --- Admin structure --- */
 
+#define OBUFSZ 16384u
+
+typedef struct obuf {
+  struct obuf *next;                   /* Next buffer in list */
+  char *p_in, *p_out;                  /* Pointers into the buffer */
+  char buf[OBUFSZ];                    /* The actual buffer */
+} obuf;
+
 typedef struct admin {
   struct admin *next, *prev;           /* Links to next and previous */
-  selbuf b;                            /* Line buffer for commands */
-  int fd;                              /* File descriptor for output */
+  unsigned f;                          /* Various useful flags */
 #ifndef NTRACE
   unsigned seq;                                /* Sequence number for tracing */
 #endif
   char *pname;                         /* Peer name to create */
   char *paddr;                         /* Address string to resolve */
+  obuf *o_head, *o_tail;               /* Output buffer list */
+  selbuf b;                            /* Line buffer for commands */
+  sel_file w;                          /* Selector for write buffering */
   bres_client r;                       /* Background resolver task */
   sel_timer t;                         /* Timer for resolver */
   addr peer;                           /* Address to set */
   size_t sasz;                         /* Size of the address */
 } admin;
 
+#define AF_DEAD 1u                     /* Destroy this admin block */
+#define AF_LOCK 2u                     /* Don't destroy it yet */
+
 /*----- Global variables --------------------------------------------------*/
 
 extern sel_state sel;                  /* Global I/O event state */