/** @brief Cancel a timeout
* @param ev Event loop
- * @param handle Handle returned from ev_timeout()
+ * @param handle Handle returned from ev_timeout(), or 0
* @return 0 on success, non-0 on error
+ *
+ * If @p handle is 0 then this is a no-op.
*/
int ev_timeout_cancel(ev_source *ev,
ev_timeout_handle handle) {
struct timeout *t = handle, *p, **pp;
+ if(!t)
+ return 0;
for(pp = &ev->timeouts; (p = *pp) && p != t; pp = &p->next)
;
if(p) {
ev_error_callback *callback;
void *u;
ev_source *ev;
+
+ /** @brief Maximum amount of time between succesful writes, 0 = don't care */
+ int timebound;
+ /** @brief Maximum amount of data to buffer, 0 = don't care */
+ int spacebound;
+ /** @brief Synthesized error code */
+ int syntherror;
+ /** @brief Timeout handle for @p timebound (or 0) */
+ ev_timeout_handle timeout;
+
+ const char *what;
};
+/** @brief Synthesized error callback
+ *
+ * Calls @p callback with @p w->syntherr as the error code (which might be 0).
+ */
+static int writer_shutdown(ev_source *ev,
+ const attribute((unused)) struct timeval *now,
+ void *u) {
+ ev_writer *w = u;
+ int fd;
+
+ if(w->fd == -1)
+ return 0; /* already closed */
+ ev_timeout_cancel(ev, w->timeout);
+ w->timeout = 0;
+ fd = w->fd;
+ w->fd = -1;
+ return w->callback(ev, fd, w->syntherror, w->u);
+}
+
+/** @brief Called when a writer's @p timebound expires */
+static int writer_timebound_exceeded(ev_source *ev,
+ const struct timeval attribute((unused)) *now,
+ void *u) {
+ ev_writer *const w = u;
+ int fd;
+
+ if(w->fd == -1)
+ return 0; /* already closed */
+ error(0, "abandoning writer %s because no writes within %ds",
+ w->what, w->timebound);
+ fd = w->fd;
+ w->fd = -1;
+ return w->callback(ev, fd, ETIMEDOUT, w->u);
+}
+
+/** @brief Set the time bound callback (if not set already) */
+static void writer_set_timebound(ev_writer *w) {
+ if(w->timebound && !w->timeout) {
+ struct timeval when;
+ ev_source *const ev = w->ev;
+
+ xgettimeofday(&when, 0);
+ when.tv_sec += w->timebound;
+ ev_timeout(ev, &w->timeout, &when, writer_timebound_exceeded, w);
+ }
+}
+
/** @brief Called when a writer's file descriptor is writable */
static int writer_callback(ev_source *ev, int fd, void *u) {
- ev_writer *w = u;
+ ev_writer *const w = u;
int n;
+ if(w->fd == -1)
+ return 0;
n = write(fd, w->b.start, w->b.end - w->b.start);
D(("callback for writer fd %d, %ld bytes, n=%d, errno=%d",
fd, (long)(w->b.end - w->b.start), n, errno));
if(n >= 0) {
w->b.start += n;
+ ev_timeout_cancel(ev, w->timeout);
+ w->timeout = 0;
if(w->b.start == w->b.end) {
if(w->eof) {
ev_fd_cancel(ev, ev_write, fd);
+ w->fd = -1;
return w->callback(ev, fd, 0, w->u);
} else
ev_fd_disable(ev, ev_write, fd);
- }
+ } else
+ writer_set_timebound(w);
} else {
switch(errno) {
case EINTR:
break;
default:
ev_fd_cancel(ev, ev_write, fd);
+ w->fd = -1;
return w->callback(ev, fd, errno, w->u);
}
}
*/
static int ev_writer_write(struct sink *sk, const void *s, int n) {
ev_writer *w = (ev_writer *)sk;
-
+
+ if(!n)
+ return 0; /* avoid silliness */
buffer_space(&w->b, n);
if(w->b.start == w->b.end)
ev_fd_enable(w->ev, ev_write, w->fd);
memcpy(w->b.end, s, n);
w->b.end += n;
+ if(w->spacebound && w->b.end - w->b.start > w->spacebound) {
+ /* Buffer contents have exceeded the space bound. We assume that the
+ * remote client has gone away and TCP hasn't noticed yet, or that it's got
+ * hopelessly stuck. */
+ error(0, "abandoning writer %s because buffer has reached %td bytes",
+ w->what, w->b.end - w->b.start);
+ w->syntherror = EPIPE;
+ ev_fd_cancel(w->ev, ev_write, w->fd);
+ return ev_timeout(w->ev, 0, 0, writer_shutdown, w);
+ }
+ writer_set_timebound(w);
return 0;
}
w->callback = callback;
w->u = u;
w->ev = ev;
+ w->timebound = 10 * 60;
+ w->spacebound = 512 * 1024;
+ w->what = what;
if(ev_fd(ev, ev_write, fd, writer_callback, w, what))
return 0;
ev_fd_disable(ev, ev_write, fd);
return w;
}
+/** @brief Get/set the time bound
+ * @param w Writer
+ * @param new_time_bound New bound or -1 for no change
+ * @return Latest time bound
+ *
+ * If @p new_time_bound is negative then the current time bound is returned.
+ * Otherwise it is set and the new value returned.
+ *
+ * The time bound is the number of seconds allowed between writes. If it takes
+ * longer than this to flush a buffer then the peer will be assumed to be dead
+ * and an error will be synthesized. 0 means "don't care". The default time
+ * bound is 10 minutes.
+ *
+ * Note that this value does not take into account kernel buffering and
+ * timeouts.
+ */
+int ev_writer_time_bound(ev_writer *w,
+ int new_time_bound) {
+ if(new_time_bound >= 0)
+ w->timebound = new_time_bound;
+ return w->timebound;
+}
+
+/** @brief Get/set the space bound
+ * @param w Writer
+ * @param new_space_bound New bound or -1 for no change
+ * @return Latest space bound
+ *
+ * If @p new_space_bound is negative then the current space bound is returned.
+ * Otherwise it is set and the new value returned.
+ *
+ * The space bound is the number of bytes allowed between in the buffer. If
+ * the buffer exceeds this size an error will be synthesized. 0 means "don't
+ * care". The default space bound is 512Kbyte.
+ *
+ * Note that this value does not take into account kernel buffering.
+ */
+int ev_writer_space_bound(ev_writer *w,
+ int new_space_bound) {
+ if(new_space_bound >= 0)
+ w->spacebound = new_space_bound;
+ return w->spacebound;
+}
+
/** @brief Return the sink associated with a writer
* @param w Writer
* @return Pointer to sink
* descriptor as and when it is writable.
*/
struct sink *ev_writer_sink(ev_writer *w) {
+ if(!w)
+ fatal(0, "ev_write_sink called with null writer");
return &w->s;
}
-/** @brief Shutdown callback
- *
- * See ev_writer_close().
- */
-static int writer_shutdown(ev_source *ev,
- const attribute((unused)) struct timeval *now,
- void *u) {
- ev_writer *w = u;
-
- return w->callback(ev, w->fd, 0, w->u);
-}
-
/** @brief Close a writer
* @param w Writer to close
* @return 0 on success, non-0 on error
w->eof = 1;
if(w->b.start == w->b.end) {
/* we're already finished */
+ w->syntherror = 0; /* no error */
ev_fd_cancel(w->ev, ev_write, w->fd);
return ev_timeout(w->ev, 0, 0, writer_shutdown, w);
}
* writer is therefore obsolete.
*/
int ev_writer_cancel(ev_writer *w) {
+ ev_source *const ev = w->ev;
D(("cancel writer fd %d", w->fd));
+ ev_timeout_cancel(ev, w->timeout);
+ w->timeout = 0;
return ev_fd_cancel(w->ev, ev_write, w->fd);
}