chiark / gitweb /
ev_reader and ev_writer now own the FDs you give them. This is
authorRichard Kettlewell <rjk@greenend.org.uk>
Sat, 10 Nov 2007 11:47:45 +0000 (11:47 +0000)
committerRichard Kettlewell <rjk@greenend.org.uk>
Sat, 10 Nov 2007 11:47:45 +0000 (11:47 +0000)
helpful because they are in a better position to know when to close
them.  They can be tied together to share one FD between the two.

It is still (currently) necessary to explicitly cancel a reader if the
corresponding writer fails.  This may be changed in the future.

Hopefuly this should make event loop bugs at least more tractable.

Currently there is still lots of logging cruft.  This should be
removed before release.

lib/event.c
lib/event.h
lib/logfd.c
server/server.c
server/trackdb.c

index c3936fa..0745695 100644 (file)
@@ -406,6 +406,8 @@ int ev_fd_disable(ev_source *ev, ev_fdmode mode, int fd) {
   D(("disabling mode %s fd %d", modenames[mode], fd));
   FD_CLR(fd, &ev->mode[mode].enabled);
   FD_CLR(fd, &ev->mode[mode].tripped);
+  /* Suppress any pending callbacks */
+  ev->escape = 1;
   return 0;
 }
 
@@ -886,31 +888,79 @@ static void buffer_space(struct buffer *b, size_t bytes) {
      (void *)b->base, (void *)b->start, (void *)b->end, (void *)b->top));
 }
 
-/* buffered writer ************************************************************/
+/* readers and writers *******************************************************/
 
 /** @brief State structure for a buffered writer */
 struct ev_writer {
+  /** @brief Sink used for writing to the buffer */
   struct sink s;
+
+  /** @brief Output buffer */
   struct buffer b;
+
+  /** @brief File descriptor to write to */
   int fd;
+
+  /** @brief Set if there'll be no more output */
   int eof;
+
+  /** @brief Error/termination callback */
   ev_error_callback *callback;
+
+  /** @brief Passed to @p callback */
   void *u;
+
+  /** @brief Parent event source */
   ev_source *ev;
 
   /** @brief Maximum amount of time between succesful writes, 0 = don't care */
   int timebound;
   /** @brief Maximum amount of data to buffer, 0 = don't care */
   int spacebound;
-  /** @brief Synthesized error code */
-  int syntherror;
+  /** @brief Error code to pass to @p callback (see writer_shutdown()) */
+  int error;
   /** @brief Timeout handle for @p timebound (or 0) */
   ev_timeout_handle timeout;
 
+  /** @brief Description of this writer */
   const char *what;
+
+  /** @brief Tied reader or 0 */
+  ev_reader *reader;
 };
 
-/** @brief Synthesized error callback
+/** @brief State structure for a buffered reader */
+struct ev_reader {
+  /** @brief Input buffer */
+  struct buffer b;
+  /** @brief File descriptor read from */
+  int fd;
+  /** @brief Called when new data is available */
+  ev_reader_callback *callback;
+  /** @brief Called on error and shutdown */
+  ev_error_callback *error_callback;
+  /** @brief Passed to @p callback and @p error_callback */
+  void *u;
+  /** @brief Parent event loop */
+  ev_source *ev;
+  /** @brief Set when EOF is detected */
+  int eof;
+  /** @brief Error code to pass to error callback */
+  int error;
+  /** @brief Tied writer or NULL */
+  ev_writer *writer;
+};
+
+/* buffered writer ************************************************************/
+
+/** @brief Shut down the writer
+ *
+ * This is called to shut down a writer.  The error callback is not called
+ * through any other path.  Also we do not cancel @p fd from anywhere else,
+ * though we might disable it.
+ *
+ * It has the signature of a timeout callback so that it can be called from a
+ * time=0 timeout.
  *
  * Calls @p callback with @p w->syntherr as the error code (which might be 0).
  */
@@ -918,31 +968,35 @@ static int writer_shutdown(ev_source *ev,
                           const attribute((unused)) struct timeval *now,
                           void *u) {
   ev_writer *w = u;
-  int fd;
 
   if(w->fd == -1)
-    return 0;                          /* already closed */
+    return 0;                          /* already shut down */
+  info("writer_shutdown fd=%d", w->fd);
   ev_timeout_cancel(ev, w->timeout);
+  ev_fd_cancel(ev, ev_write, w->fd);
   w->timeout = 0;
-  fd = w->fd;
+  if(w->reader) {
+    /* If there is a reader still around we just untie it */
+    w->reader->writer = 0;
+    shutdown(w->fd, SHUT_WR);          /* there'll be no more writes */
+  } else {
+    /* There's no reader so we are free to close the FD */
+    xclose(w->fd);
+  }
   w->fd = -1;
-  return w->callback(ev, fd, w->syntherror, w->u);
+  return w->callback(ev, w->error, w->u);
 }
 
 /** @brief Called when a writer's @p timebound expires */
 static int writer_timebound_exceeded(ev_source *ev,
-                                    const struct timeval attribute((unused)) *now,
+                                    const struct timeval *now,
                                     void *u) {
   ev_writer *const w = u;
-  int fd;
 
-  if(w->fd == -1)
-    return 0;                          /* already closed */
   error(0, "abandoning writer %s because no writes within %ds",
        w->what, w->timebound);
-  fd = w->fd;
-  w->fd = -1;
-  return w->callback(ev, fd, ETIMEDOUT, w->u);
+  w->error = ETIMEDOUT;
+  return writer_shutdown(ev, now, u);
 }
 
 /** @brief Set the time bound callback (if not set already) */
@@ -962,23 +1016,28 @@ static int writer_callback(ev_source *ev, int fd, void *u) {
   ev_writer *const w = u;
   int n;
 
-  if(w->fd == -1)
-    return 0;
   n = write(fd, w->b.start, w->b.end - w->b.start);
   D(("callback for writer fd %d, %ld bytes, n=%d, errno=%d",
      fd, (long)(w->b.end - w->b.start), n, errno));
   if(n >= 0) {
+    /* Consume bytes from the buffer */
     w->b.start += n;
+    /* Suppress any outstanding timeout */
     ev_timeout_cancel(ev, w->timeout);
     w->timeout = 0;
     if(w->b.start == w->b.end) {
+      /* The buffer is empty */
       if(w->eof) {
-       ev_fd_cancel(ev, ev_write, fd);
-       w->fd = -1;
-       return w->callback(ev, fd, 0, w->u);
+       /* We're done, we can shut down this writer */
+       w->error = 0;
+       return writer_shutdown(ev, 0, w);
       } else
+       /* There might be more to come but we don't need writer_callback() to
+        * be called for the time being */
        ev_fd_disable(ev, ev_write, fd);
     } else
+      /* The buffer isn't empty, set a timeout so we give up if we don't manage
+       * to write some more within a reasonable time */
       writer_set_timebound(w);
   } else {
     switch(errno) {
@@ -986,9 +1045,8 @@ static int writer_callback(ev_source *ev, int fd, void *u) {
     case EAGAIN:
       break;
     default:
-      ev_fd_cancel(ev, ev_write, fd);
-      w->fd = -1;
-      return w->callback(ev, fd, errno, w->u);
+      w->error = errno;
+      return writer_shutdown(ev, 0, w);
     }
   }
   return 0;
@@ -1006,21 +1064,24 @@ static int ev_writer_write(struct sink *sk, const void *s, int n) {
 
   if(!n)
     return 0;                          /* avoid silliness */
-  buffer_space(&w->b, n);
-  if(w->b.start == w->b.end)
-    ev_fd_enable(w->ev, ev_write, w->fd);
-  memcpy(w->b.end, s, n);
-  w->b.end += n;
-  if(w->spacebound && w->b.end - w->b.start > w->spacebound) {
-    /* Buffer contents have exceeded the space bound.  We assume that the
+  if(w->spacebound && w->b.end - w->b.start + n > w->spacebound) {
+    /* The new buffer contents will exceed the space bound.  We assume that the
      * remote client has gone away and TCP hasn't noticed yet, or that it's got
      * hopelessly stuck. */
     error(0, "abandoning writer %s because buffer has reached %td bytes",
          w->what, w->b.end - w->b.start);
-    w->syntherror = EPIPE;
-    ev_fd_cancel(w->ev, ev_write, w->fd);
+    ev_fd_disable(w->ev, ev_write, w->fd);
+    w->error = EPIPE;
     return ev_timeout(w->ev, 0, 0, writer_shutdown, w);
   }
+  /* Make sure there is space */
+  buffer_space(&w->b, n);
+  /* If the buffer was formerly empty then we'll need to re-enable the FD */
+  if(w->b.start == w->b.end)
+    ev_fd_enable(w->ev, ev_write, w->fd);
+  memcpy(w->b.end, s, n);
+  w->b.end += n;
+  /* Arrange a timeout if there wasn't one set already */
   writer_set_timebound(w);
   return 0;
 }
@@ -1032,6 +1093,12 @@ static int ev_writer_write(struct sink *sk, const void *s, int n) {
  * @param u Passed to @p callback
  * @param what Text description
  * @return New writer or @c NULL
+ *
+ * Writers own their file descriptor and close it when they have finished with
+ * it.
+ *
+ * If you pass the same fd to a reader and writer, you must tie them together
+ * with ev_tie().
  */ 
 ev_writer *ev_writer_new(ev_source *ev,
                         int fd,
@@ -1051,6 +1118,7 @@ ev_writer *ev_writer_new(ev_source *ev,
   w->what = what;
   if(ev_fd(ev, ev_write, fd, writer_callback, w, what))
     return 0;
+  /* Buffer is initially empty so we don't want a callback */
   ev_fd_disable(ev, ev_write, fd);
   return w;
 }
@@ -1125,33 +1193,17 @@ struct sink *ev_writer_sink(ev_writer *w) {
  */
 int ev_writer_close(ev_writer *w) {
   D(("close writer fd %d", w->fd));
+  if(w->eof)
+    return 0;                          /* already closed */
   w->eof = 1;
   if(w->b.start == w->b.end) {
-    /* we're already finished */
-    w->syntherror = 0;                 /* no error */
-    ev_fd_cancel(w->ev, ev_write, w->fd);
+    /* We're already finished */
+    w->error = 0;                      /* no error */
     return ev_timeout(w->ev, 0, 0, writer_shutdown, w);
   }
   return 0;
 }
 
-/** @brief Cancel a writer discarding any buffered data
- * @param w Writer to close
- * @return 0 on success, non-0 on error
- *
- * This cancels a writer immediately.  Any unwritten buffered data is discarded
- * and the error callback is never called.  This is appropriate to call if (for
- * instance) the read half of a TCP connection is known to have failed and the
- * writer is therefore obsolete.
- */
-int ev_writer_cancel(ev_writer *w) {
-  ev_source *const ev = w->ev;
-  D(("cancel writer fd %d", w->fd));
-  ev_timeout_cancel(ev, w->timeout);
-  w->timeout = 0;
-  return ev_fd_cancel(w->ev, ev_write, w->fd);
-}
-
 /** @brief Attempt to flush a writer
  * @param w Writer to flush
  * @return 0 on success, non-0 on error
@@ -1165,16 +1217,39 @@ int ev_writer_flush(ev_writer *w) {
 
 /* buffered reader ************************************************************/
 
-/** @brief State structure for a buffered reader */
-struct ev_reader {
-  struct buffer b;
-  int fd;
-  ev_reader_callback *callback;
-  ev_error_callback *error_callback;
-  void *u;
-  ev_source *ev;
-  int eof;
-};
+/** @brief Shut down a reader*
+ *
+ * This is the only path through which we cancel and close the file descriptor.
+ * As with the writer case it is given timeout signature to allow it be
+ * deferred to the next iteration of the event loop.
+ *
+ * We only call @p error_callback if @p error is nonzero (unlike the writer
+ * case).
+ */
+static int reader_shutdown(ev_source *ev,
+                          const attribute((unused)) struct timeval *now,
+                          void *u) {
+  ev_reader *const r = u;
+
+  if(r->fd == -1)
+    return 0;                          /* already shut down */
+  info("reader_shutdown fd=%d", r->fd);
+  ev_fd_cancel(ev, ev_read, r->fd);
+  r->eof = 1;
+  if(r->writer) {
+    /* If there is a writer still around we just untie it */
+    r->writer->reader = 0;
+    shutdown(r->fd, SHUT_RD);          /* there'll be no more reads */
+  } else {
+    /* There's no writer so we are free to close the FD */
+    xclose(r->fd);
+  }
+  r->fd = -1;
+  if(r->error)
+    return r->error_callback(ev, r->error, r->u);
+  else
+    return 0;
+}
 
 /** @brief Called when a reader's @p fd is readable */
 static int reader_callback(ev_source *ev, int fd, void *u) {
@@ -1187,19 +1262,22 @@ static int reader_callback(ev_source *ev, int fd, void *u) {
      fd, (int)(r->b.top - r->b.end), n, errno));
   if(n > 0) {
     r->b.end += n;
-    return r->callback(ev, r, fd, r->b.start, r->b.end - r->b.start, 0, r->u);
+    return r->callback(ev, r, r->b.start, r->b.end - r->b.start, 0, r->u);
   } else if(n == 0) {
-    r->eof = 1;
-    ev_fd_cancel(ev, ev_read, fd);
-    return r->callback(ev, r, fd, r->b.start, r->b.end - r->b.start, 1, r->u);
+    /* No more read callbacks needed */
+    ev_fd_disable(r->ev, ev_read, r->fd);
+    ev_timeout(r->ev, 0, 0, reader_shutdown, r);
+    /* Pass the remaining data and an eof indicator to the user */
+    return r->callback(ev, r, r->b.start, r->b.end - r->b.start, 1, r->u);
   } else {
     switch(errno) {
     case EINTR:
     case EAGAIN:
       break;
     default:
-      ev_fd_cancel(ev, ev_read, fd);
-      return r->error_callback(ev, fd, errno, r->u);
+      /* Fatal error, kill the reader now */
+      r->error = errno;
+      return reader_shutdown(ev, 0, r);
     }
   }
   return 0;
@@ -1213,6 +1291,11 @@ static int reader_callback(ev_source *ev, int fd, void *u) {
  * @param u Passed to callbacks
  * @param what Text description
  * @return New reader or @c NULL
+ *
+ * Readers own their fd and close it when they are finished with it.
+ *
+ * If you pass the same fd to a reader and writer, you must tie them together
+ * with ev_tie().
  */
 ev_reader *ev_reader_new(ev_source *ev,
                         int fd,
@@ -1252,10 +1335,16 @@ void ev_reader_consume(ev_reader *r, size_t n) {
 /** @brief Cancel a reader
  * @param r Reader
  * @return 0 on success, non-0 on error
+ *
+ * No further callbacks will be made, and the FD will be closed (in a later
+ * iteration of the event loop).
  */
 int ev_reader_cancel(ev_reader *r) {
   D(("cancel reader fd %d", r->fd));
-  return ev_fd_cancel(r->ev, ev_read, r->fd);
+  if(r->fd == -1)
+    return 0;                          /* already thoroughly cancelled */
+  ev_fd_disable(r->ev, ev_read, r->fd);
+  return ev_timeout(r->ev, 0, 0, reader_shutdown, r);
 }
 
 /** @brief Temporarily disable a reader
@@ -1267,7 +1356,7 @@ int ev_reader_cancel(ev_reader *r) {
  */
 int ev_reader_disable(ev_reader *r) {
   D(("disable reader fd %d", r->fd));
-  return r->eof ? 0 : ev_fd_disable(r->ev, ev_read, r->fd);
+  return ev_fd_disable(r->ev, ev_read, r->fd);
 }
 
 /** @brief Called from ev_run() for ev_reader_incomplete() */
@@ -1277,8 +1366,13 @@ static int reader_continuation(ev_source attribute((unused)) *ev,
   ev_reader *r = u;
 
   D(("reader continuation callback fd %d", r->fd));
-  if(ev_fd_enable(r->ev, ev_read, r->fd)) return -1;
-  return r->callback(ev, r, r->fd, r->b.start, r->b.end - r->b.start, r->eof, r->u);
+  /* If not at EOF turn the FD back on */
+  if(!r->eof)
+    if(ev_fd_enable(r->ev, ev_read, r->fd))
+      return -1;
+  /* We're already in a timeout callback so there's no reason we can't call the
+   * user callback directly (compare ev_reader_enable()). */
+  return r->callback(ev, r, r->b.start, r->b.end - r->b.start, r->eof, r->u);
 }
 
 /** @brief Arrange another callback
@@ -1301,7 +1395,7 @@ static int reader_enabled(ev_source *ev,
   ev_reader *r = u;
 
   D(("reader enabled callback fd %d", r->fd));
-  return r->callback(ev, r, r->fd, r->b.start, r->b.end - r->b.start, r->eof, r->u);
+  return r->callback(ev, r, r->b.start, r->b.end - r->b.start, r->eof, r->u);
 }
 
 /** @brief Re-enable reading
@@ -1317,11 +1411,38 @@ static int reader_enabled(ev_source *ev,
  * re-enable.  You'll automatically get another callback directly from the
  * event loop (i.e. not from inside ev_reader_enable()) so you can handle the
  * next line (or whatever) if the whole thing has in fact already arrived.
+ *
+ * The difference between this process and calling ev_reader_incomplete() is
+ * ev_reader_incomplete() deals with the case where you can process now but
+ * would rather yield to other clients of the event loop, while using
+ * ev_reader_disable() and ev_reader_enable() deals with the case where you
+ * cannot process input yet because some other process is actually not
+ * complete.
  */
 int ev_reader_enable(ev_reader *r) {
   D(("enable reader fd %d", r->fd));
-  return ((r->eof ? 0 : ev_fd_enable(r->ev, ev_read, r->fd))
-         || ev_timeout(r->ev, 0, 0, reader_enabled, r)) ? -1 : 0;
+
+  /* First if we're not at EOF then we re-enable reading */
+  if(!r->eof)
+    if(ev_fd_enable(r->ev, ev_read, r->fd))
+      return -1;
+  /* Arrange another callback next time round the event loop */
+  return ev_timeout(r->ev, 0, 0, reader_enabled, r);
+}
+
+/** @brief Tie a reader and a writer together
+ * @param r Reader
+ * @param w Writer
+ * @return 0 on success, non-0 on error
+ *
+ * This function must be called if @p r and @p w share a file descritptor.
+ */
+int ev_tie(ev_reader *r, ev_writer *w) {
+  assert(r->writer == 0);
+  assert(w->reader == 0);
+  r->writer = w;
+  w->reader = r;
+  return 0;
 }
 
 /*
index eb669cd..34a6f4d 100644 (file)
@@ -162,12 +162,22 @@ int ev_listen_cancel(ev_source *ev,
 
 typedef struct ev_writer ev_writer;
 
+/** @brief Error callback for @ref ev_reader and @ref ev_writer
+ * @param ev Event loop
+ * @param errno_value Errno value (might be 0)
+ * @param u As passed to ev_writer_new() or ev_reader_new()
+ * @return 0 on success, non-0 on error
+ *
+ * This is called for a writer in the following situations:
+ * - on error, with @p errno_value != 0
+ * - when all buffered data has been written, with @p errno_value = 0
+ * - after called ev_writer_cancel(), with @p errno_value = 0
+ *
+ * It is called for a reader only on error, with @p errno_value != 0.
+ */
 typedef int ev_error_callback(ev_source *ev,
-                             int fd,
                              int errno_value,
                              void *u);
-/* called when an error occurs on a writer.  Called with @errno_value@
- * of 0 when finished. */
 
 ev_writer *ev_writer_new(ev_source *ev,
                         int fd,
@@ -198,15 +208,35 @@ struct sink *ev_writer_sink(ev_writer *w) attribute((const));
 
 typedef struct ev_reader ev_reader;
 
+/** @brief Called when data is available to read
+ * @param ev Event loop
+ * @param reader Reader
+ * @param fd File descriptor we read from
+ * @param ptr Pointer to first byte
+ * @param bytes Number of bytes available
+ * @param eof True if EOF has been detected
+ * @param u As passed to ev_reader_new()
+ * @return 0 on succes, non-0 on error
+ *
+ * This callback should call ev_reader_consume() to indicate how many bytes you
+ * actually used.  If you do not call it then it is assumed no bytes were
+ * consumed.
+ *
+ * If having consumed some number of bytes it is not possible to do any further
+ * processing until more data is available then the callback can just return.
+ * Note that this is not allowed if @p eof was set.
+ *
+ * If on the other hand it would be possible to do more processing immediately
+ * with the bytes available, but this is undesirable for some other reason,
+ * then ev_reader_incomplete() should be called.  This will arrange a further
+ * callback in the very near future even if no more bytes are read.
+ */
 typedef int ev_reader_callback(ev_source *ev,
                               ev_reader *reader,
-                              int fd,
                               void *ptr,
                               size_t bytes,
                               int eof,
                               void *u);
-/* Called when data is read or an error occurs.  @ptr@ and @bytes@
- * indicate the amount of data available. @eof@ will be 1 at eof. */
 
 ev_reader *ev_reader_new(ev_source *ev,
                         int fd,
@@ -218,9 +248,10 @@ ev_reader *ev_reader_new(ev_source *ev,
  * available. */
 
 void ev_reader_buffer(ev_reader *r, size_t nbytes);
-/* specify a buffer size *case */
+/* specify a buffer size */
 
-void ev_reader_consume(ev_reader *r, size_t nbytes);
+void ev_reader_consume(ev_reader *r
+                      , size_t nbytes);
 /* consume @nbytes@ bytes. */
 
 int ev_reader_cancel(ev_reader *r);
@@ -247,6 +278,8 @@ int ev_reader_enable(ev_reader *r);
  * has in fact already arrived.
  */
 
+int ev_tie(ev_reader *r, ev_writer *w);
+
 #endif /* EVENT_H */
 
 /*
index 0eab833..60e5331 100644 (file)
 #include "event.h"
 #include "log.h"
 
-struct logfd_state {
-  const char *tag;
-};
-
 /* called when bytes are available and at eof */
 static int logfd_readable(ev_source attribute((unused)) *ev,
                          ev_reader *reader,
-                         int fd,
                          void *ptr,
                          size_t bytes,
                          int eof,
@@ -57,23 +52,31 @@ static int logfd_readable(ev_source attribute((unused)) *ev,
     info("%s: %.*s", tag, (int)bytes, (char *)ptr);
     ev_reader_consume(reader, bytes);
   }
-  if(eof)
-    xclose(fd);
   return 0;
 }
 
 /* called when a read error occurs */
 static int logfd_error(ev_source attribute((unused)) *ev,
-                      int fd,
                       int errno_value,
                       void *u) {
   const char *tag = u;
   
   error(errno_value, "error reading log pipe from %s", tag);
-  xclose(fd);
   return 0;
 }
 
+/** @brief Create file descriptor for a subprocess to log to
+ * @param ev Event loop
+ * @param tag Tag for this log
+ * @return File descriptor
+ *
+ * Returns a file descriptor which a subprocess can log to.  The normal thing
+ * to do would be to dup2() this fd onto the subprocess's stderr (and to close
+ * it in the parent).
+ *
+ * Any lines written to this fd (i.e. by the subprocess) will be logged via
+ * info(), with @p tag included.
+ */
 int logfd(ev_source *ev, const char *tag) {
   int p[2];
 
index 1e4c6ca..4b71e4e 100644 (file)
@@ -96,7 +96,6 @@ struct conn {
 
 static int reader_callback(ev_source *ev,
                           ev_reader *reader,
-                          int fd,
                           void *ptr,
                           size_t bytes,
                           int eof,
@@ -105,50 +104,32 @@ static int reader_callback(ev_source *ev,
 static const char *noyes[] = { "no", "yes" };
 
 static int writer_error(ev_source attribute((unused)) *ev,
-                       int fd,
                        int errno_value,
                        void *u) {
   struct conn *c = u;
 
-  D(("server writer_error %d %d", fd, errno_value));
+  D(("server writer_error %d", errno_value));
   if(errno_value == 0) {
     /* writer is done */
-    c->w = 0;
-    if(c->r == 0) {
-      //D(("server writer_error closes %d", fd));
-      info("server writer_error closes %d because all done", fd); /* TODO */
-      xclose(fd);              /* reader is done too, close */
-    } else {
-      //D(("server writer_error shutdown %d SHUT_WR", fd));
-      info("server writer_error shutdown %d SHUT_WR", fd); /* TODO */
-      xshutdown(fd, SHUT_WR);  /* reader is not done yet */
-    }
+    error(errno_value, "S%x writer completed", c->tag);        /* TODO */
   } else {
     if(errno_value != EPIPE)
       error(errno_value, "S%x write error on socket", c->tag);
-    if(c->r)
-      ev_reader_cancel(c->r);
-    info("server writer_error closes %d because errno=%d", fd, errno_value); /* TODO */
-    xclose(fd);
+    ev_reader_cancel(c->r);
   }
+  ev_report(ev);
   return 0;
 }
 
 static int reader_error(ev_source attribute((unused)) *ev,
-                       int fd,
                        int errno_value,
                        void *u) {
   struct conn *c = u;
 
-  D(("server reader_error %d %d", fd, errno_value));
+  D(("server reader_error %d", errno_value));
   error(errno, "S%x read error on socket", c->tag);
-  if(c->w) {
-    ev_writer_cancel(c->w);
-    c->w = 0;
-  }
+  ev_writer_close(c->w);
   ev_report(ev);
-  info("reader_error closing fd %d", fd); /* TODO */
-  xclose(fd);
   return 0;
 }
 
@@ -726,7 +707,6 @@ static int c_volume(struct conn *c,
 /* we are logging, and some data is available to read */
 static int logging_reader_callback(ev_source *ev,
                                   ev_reader *reader,
-                                  int fd,
                                   void *ptr,
                                   size_t bytes,
                                   int eof,
@@ -743,7 +723,7 @@ static int logging_reader_callback(ev_source *ev,
   /* restore the reader callback */
   c->reader = reader_callback;
   /* ...and exit via it */
-  return c->reader(ev, reader, fd, ptr, bytes, eof, u);
+  return c->reader(ev, reader, ptr, bytes, eof, u);
 }
 
 static void logclient(const char *msg, void *user) {
@@ -1061,20 +1041,18 @@ static int command(struct conn *c, char *line) {
 /* redirect to the right reader callback for our current state */
 static int redirect_reader_callback(ev_source *ev,
                                    ev_reader *reader,
-                                   int fd,
                                    void *ptr,
                                    size_t bytes,
                                    int eof,
                                    void *u) {
   struct conn *c = u;
 
-  return c->reader(ev, reader, fd, ptr, bytes, eof, u);
+  return c->reader(ev, reader, ptr, bytes, eof, u);
 }
 
 /* the main command reader */
 static int reader_callback(ev_source attribute((unused)) *ev,
                           ev_reader *reader,
-                          int attribute((unused)) fd,
                           void *ptr,
                           size_t bytes,
                           int eof,
@@ -1128,6 +1106,7 @@ static int listen_callback(ev_source *ev,
                       "client writer");
   c->r = ev_reader_new(ev, fd, redirect_reader_callback, reader_error, c,
                       "client reader");
+  ev_tie(c->r, c->w);
   c->fd = fd;
   c->reader = reader_callback;
   c->l = l;
index 59f755f..34311eb 100644 (file)
@@ -1024,7 +1024,6 @@ static int stats_finished(ev_source attribute((unused)) *ev,
 
 static int stats_read(ev_source attribute((unused)) *ev,
                       ev_reader *reader,
-                      int attribute((unused)) fd,
                       void *ptr,
                       size_t bytes,
                       int eof,
@@ -1040,7 +1039,6 @@ static int stats_read(ev_source attribute((unused)) *ev,
 }
 
 static int stats_error(ev_source attribute((unused)) *ev,
-                       int attribute((unused)) fd,
                        int errno_value,
                        void *u) {
   struct stats_details *const d = u;