From fd3cf2326b89d2e672afa764978190ce30d409f5 Mon Sep 17 00:00:00 2001 Message-Id: From: Mark Wooding Date: Mon, 19 Feb 2001 19:11:09 +0000 Subject: [PATCH] Output buffering on admin connections. Organization: Straylight/Edgeware From: mdw --- admin.c | 269 +++++++++++++++++++++++++++++++++++++++++++++++++++----- tripe.h | 22 ++++- 2 files changed, 265 insertions(+), 26 deletions(-) diff --git a/admin.c b/admin.c index e5e9201c..4571b662 100644 --- a/admin.c +++ b/admin.c @@ -1,6 +1,6 @@ /* -*-c-*- * - * $Id: admin.c,v 1.5 2001/02/16 21:22:51 mdw Exp $ + * $Id: admin.c,v 1.6 2001/02/19 19:11:09 mdw Exp $ * * Admin interface for configuration * @@ -29,6 +29,9 @@ /*----- Revision history --------------------------------------------------* * * $Log: admin.c,v $ + * Revision 1.6 2001/02/19 19:11:09 mdw + * Output buffering on admin connections. + * * Revision 1.5 2001/02/16 21:22:51 mdw * Support for displaying statistics. Make client connections blocking, so * that things don't get dropped. (This might change again if I add @@ -89,6 +92,144 @@ static sig s_term, s_int, s_hup; #define T_RESOLVE SEC(30) +static void a_destroy(admin */*a*/); +static void a_lock(admin */*a*/); +static void a_unlock(admin */*a*/); + +/*----- Output functions --------------------------------------------------*/ + +/* --- @trywrite@ --- * + * + * Arguments: @admin *a@ = pointer to an admin block + * @const char *p@ = pointer to buffer to write + * @size_t sz@ = size of data to write + * + * Returns: The number of bytes written, or less than zero on error. + * + * Use: Attempts to write data to a client. + */ + +static ssize_t trywrite(admin *a, const char *p, size_t sz) +{ + ssize_t n, done = 0; + +again: + if (!sz) + return (done); + n = write(a->w.fd, p, sz); + if (n > 0) { + done += n; + p += n; + sz -= n; + goto again; + } + if (n < 0) { + if (errno == EINTR) + goto again; + if (errno != EAGAIN && errno != EWOULDBLOCK) { + a_destroy(a); + a_warn("disconnecting admin client due to write errors: %s", + strerror(errno)); + return (-1); + } + } + return (done); +} + +/* --- @dosend@ --- * + * + * Arguemnts: @admin *a@ = pointer to an admin block + * @const char *p@ = pointer to buffer to write + * @size_t sz@ = size of data to write + * + * Returns: --- + * + * Use: Sends data to an admin client. + */ + +static void dosend(admin *a, const char *p, size_t sz) +{ + ssize_t n; + obuf *o; + + if (a->f & AF_DEAD) + return; + + /* --- Try to send the data immediately --- */ + + if (!a->o_head) { + if ((n = trywrite(a, p, sz)) < 0) + return; + p += n; + sz -= n; + if (!sz) + return; + } + + /* --- Fill buffers with the data until it's all gone --- */ + + o = a->o_tail; + if (!o) + sel_addfile(&a->w); + else if (o->p_in < o->buf + OBUFSZ) + goto noalloc; + + do { + o = xmalloc(sizeof(obuf)); + o->next = 0; + o->p_in = o->p_out = o->buf; + if (a->o_tail) + a->o_tail->next = o; + else + a->o_head = o; + a->o_tail = o; + + noalloc: + n = o->buf + OBUFSZ - o->p_in; + if (n > sz) + n = sz; + memcpy(o->p_in, p, n); + o->p_in += n; + p += n; + sz -= n; + } while (sz); +} + +/* --- @a_flush@ --- * + * + * Arguments: @int fd@ = file descriptor + * @unsigned mode@ = what's happening + * @void *v@ = pointer to my admin block + * + * Returns: --- + * + * Use: Flushes buffers when a client is ready to read again. + */ + +static void a_flush(int fd, unsigned mode, void *v) +{ + admin *a = v; + obuf *o, *oo; + ssize_t n; + + o = a->o_head; + while (o) { + if ((n = trywrite(a, o->p_out, o->p_in - o->p_out)) < 0) + return; + o->p_out += n; + if (o->p_in < o->p_out) + break; + oo = o; + o = o->next; + xfree(oo); + } + a->o_head = o; + if (!o) { + a->o_tail = 0; + sel_rmfile(&a->w); + } +} + /*----- Utility functions -------------------------------------------------*/ /* --- @a_write@ --- * @@ -109,7 +250,7 @@ static void a_write(admin *a, const char *fmt, ...) va_start(ap, fmt); dstr_vputf(&d, fmt, ap); va_end(ap); - write(a->fd, d.buf, d.len); + dosend(a, d.buf, d.len); dstr_destroy(&d); } @@ -126,7 +267,7 @@ static void a_write(admin *a, const char *fmt, ...) void a_warn(const char *fmt, ...) { va_list ap; - admin *a; + admin *a, *aa; dstr d = DSTR_INIT; if (flags & F_INIT) @@ -138,8 +279,10 @@ void a_warn(const char *fmt, ...) moan("%s", d.buf); else { dstr_putc(&d, '\n'); - for (a = admins; a; a = a->next) - write(a->fd, d.buf, d.len); + for (a = admins; a; a = aa) { + aa = a->next; + dosend(a, d.buf, d.len); + } } dstr_destroy(&d); } @@ -159,13 +302,15 @@ void a_warn(const char *fmt, ...) static void a_trace(const char *p, size_t sz, void *v) { dstr d = DSTR_INIT; - admin *a; + admin *a, *aa; dstr_puts(&d, "TRACE "); dstr_putm(&d, p, sz); dstr_putc(&d, '\n'); - for (a = admins; a; a = a->next) - write(a->fd, d.buf, d.len); + for (a = admins; a; a = aa) { + aa = a->next; + dosend(a, d.buf, d.len); + } dstr_destroy(&d); } #endif @@ -243,6 +388,8 @@ static void a_sighup(int sig, void *v) static void a_resolve(struct hostent *h, void *v) { admin *a = v; + + a_lock(a); T( trace(T_ADMIN, "admin: %u resolved", a->seq); ) TIMER; sel_rmtimer(&a->t); @@ -261,6 +408,7 @@ static void a_resolve(struct hostent *h, void *v) xfree(a->paddr); a->pname = 0; selbuf_enable(&a->b); + a_unlock(a); } /* --- @a_timer@ --- * @@ -276,6 +424,8 @@ static void a_resolve(struct hostent *h, void *v) static void a_timer(struct timeval *tv, void *v) { admin *a = v; + + a_lock(a); T( trace(T_ADMIN, "admin: %u resolver timeout", a->seq); ) bres_abort(&a->r); a_write(a, "FAIL timeout resolving `%s'\n", a->paddr); @@ -283,6 +433,7 @@ static void a_timer(struct timeval *tv, void *v) xfree(a->paddr); a->pname = 0; selbuf_enable(&a->b); + a_unlock(a); } /* --- @acmd_add@ --- * @@ -410,8 +561,6 @@ static void acmd_port(admin *a, unsigned ac, char *av[]) a_write(a, "INFO %u\nOK\n", p_port()); } -static void a_destroy(admin */*a*/); - static void acmd_daemon(admin *a, unsigned ac, char *av[]) { if (flags & F_DAEMON) @@ -546,39 +695,105 @@ static void acmd_help(admin *a, unsigned ac, char *av[]) /*----- Connection handling -----------------------------------------------*/ -/* --- @a_destroy@ --- * +/* --- @a_lock@ --- * * * Arguments: @admin *a@ = pointer to an admin block * * Returns: --- * - * Use: Destroys an admin block. + * Use: Locks an admin block so that it won't be destroyed + * immediately. */ -static void a_destroy(admin *a) +static void a_lock(admin *a) { assert(!(a->f & AF_LOCK)); a->f |= AF_LOCK; } + +/* --- @a_unlock@ --- * + * + * Arguments: @admin *a@ = pointer to an admin block + * + * Returns: --- + * + * Use: Unlocks an admin block, allowing its destruction. This is + * also the second half of @a_destroy@. + */ + +static void a_unlock(admin *a) { - T( trace(T_ADMIN, "admin: destroying connection %u", a->seq); ) + assert(a->f & AF_LOCK); + if (!(a->f & AF_DEAD)) { + a->f &= ~AF_LOCK; + return; + } + + T( trace(T_ADMIN, "admin: completing destruction of connection %u", + a->seq); ) + selbuf_destroy(&a->b); - if (a->b.reader.fd != a->fd) - close(a->b.reader.fd); - close(a->fd); if (a->pname) { xfree(a->pname); xfree(a->paddr); bres_abort(&a->r); sel_rmtimer(&a->t); } + if (a->b.reader.fd != a->w.fd) + close(a->b.reader.fd); + close(a->w.fd); + + if (a_stdin == a) + a_stdin = 0; if (a->next) a->next->prev = a->prev; if (a->prev) a->prev->next = a->next; else admins = a->next; - if (a_stdin == a) - a_stdin = 0; DESTROY(a); } +/* --- @a_destroy@ --- * + * + * Arguments: @admin *a@ = pointer to an admin block + * + * Returns: --- + * + * Use: Destroys an admin block. This requires a certain amount of + * care. + */ + +static void a_destroy(admin *a) +{ + /* --- Don't multiply destroy admin blocks --- */ + + if (a->f & AF_DEAD) + return; + + /* --- Make sure nobody expects it to work --- */ + + a->f |= AF_DEAD; + T( trace(T_ADMIN, "admin: destroying connection %u", a->seq); ) + + /* --- Free the output buffers --- */ + + if (a->o_head) { + obuf *o, *oo; + sel_rmfile(&a->w); + for (o = a->o_head; o; o = oo) { + oo = o->next; + xfree(o); + } + a->o_head = 0; + } + + /* --- If the block is locked, that's all we can manage --- */ + + if (a->f & AF_LOCK) { + T( trace(T_ADMIN, "admin: deferring destruction..."); ) + return; + } + a->f |= AF_LOCK; + a_unlock(a); +} + /* --- @a_line@ --- * * * Arguments: @char *p@ = pointer to the line read @@ -597,6 +812,8 @@ static void a_line(char *p, void *vp) size_t ac; TIMER; + if (a->f & AF_DEAD) + return; if (!p) { a_destroy(a); return; @@ -610,8 +827,11 @@ static void a_line(char *p, void *vp) ac--; if (c->argmin > ac || ac > c->argmax) a_write(a, "FAIL syntax: %s\n", c->help); - else + else { + a_lock(a); c->func(a, ac, av + 1); + a_unlock(a); + } return; } } @@ -634,13 +854,16 @@ void a_create(int fd_in, int fd_out) a->seq = seq++; ) T( trace(T_ADMIN, "admin: accepted connection %u", a->seq); ) a->pname = 0; + a->f = 0; if (fd_in == STDIN_FILENO) a_stdin = a; - fdflags(fd_in, O_NONBLOCK, 0, FD_CLOEXEC, FD_CLOEXEC); + fdflags(fd_in, O_NONBLOCK, O_NONBLOCK, FD_CLOEXEC, FD_CLOEXEC); if (fd_out != fd_in) - fdflags(fd_out, O_NONBLOCK, 0, FD_CLOEXEC, FD_CLOEXEC); - a->fd = fd_out; + fdflags(fd_out, O_NONBLOCK, O_NONBLOCK, FD_CLOEXEC, FD_CLOEXEC); selbuf_init(&a->b, &sel, fd_in, a_line, a); + sel_initfile(&sel, &a->w, fd_out, SEL_WRITE, a_flush, a); + a->o_head = 0; + a->o_tail = 0; a->next = admins; a->prev = 0; if (admins) diff --git a/tripe.h b/tripe.h index 9f7fd5d7..7f5e48c1 100644 --- a/tripe.h +++ b/tripe.h @@ -1,6 +1,6 @@ /* -*-c-*- * - * $Id: tripe.h,v 1.5 2001/02/16 21:41:43 mdw Exp $ + * $Id: tripe.h,v 1.6 2001/02/19 19:11:09 mdw Exp $ * * Main header file for TrIPE * @@ -29,6 +29,9 @@ /*----- Revision history --------------------------------------------------* * * $Log: tripe.h,v $ + * Revision 1.6 2001/02/19 19:11:09 mdw + * Output buffering on admin connections. + * * Revision 1.5 2001/02/16 21:41:43 mdw * Major changes. See source files for details. * @@ -405,21 +408,34 @@ typedef struct peer { /* --- Admin structure --- */ +#define OBUFSZ 16384u + +typedef struct obuf { + struct obuf *next; /* Next buffer in list */ + char *p_in, *p_out; /* Pointers into the buffer */ + char buf[OBUFSZ]; /* The actual buffer */ +} obuf; + typedef struct admin { struct admin *next, *prev; /* Links to next and previous */ - selbuf b; /* Line buffer for commands */ - int fd; /* File descriptor for output */ + unsigned f; /* Various useful flags */ #ifndef NTRACE unsigned seq; /* Sequence number for tracing */ #endif char *pname; /* Peer name to create */ char *paddr; /* Address string to resolve */ + obuf *o_head, *o_tail; /* Output buffer list */ + selbuf b; /* Line buffer for commands */ + sel_file w; /* Selector for write buffering */ bres_client r; /* Background resolver task */ sel_timer t; /* Timer for resolver */ addr peer; /* Address to set */ size_t sasz; /* Size of the address */ } admin; +#define AF_DEAD 1u /* Destroy this admin block */ +#define AF_LOCK 2u /* Don't destroy it yet */ + /*----- Global variables --------------------------------------------------*/ extern sel_state sel; /* Global I/O event state */ -- [mdw]