X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?a=blobdiff_plain;f=src%2Flibsystemd-bus%2Fsd-event.c;h=727528bb3cc84e839e3c180d1812497165c159ec;hb=e7d43b3cc30764138c90eaaf95d3d8f49e448890;hp=b3964325a0607e7228451ad94e38e6a42606138e;hpb=6203e07a83214a55bb1f88508fcda2005c601dea;p=elogind.git diff --git a/src/libsystemd-bus/sd-event.c b/src/libsystemd-bus/sd-event.c index b3964325a..727528bb3 100644 --- a/src/libsystemd-bus/sd-event.c +++ b/src/libsystemd-bus/sd-event.c @@ -34,7 +34,7 @@ #include "sd-event.h" -#define EPOLL_QUEUE_MAX 64 +#define EPOLL_QUEUE_MAX 512U #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC) typedef enum EventSourceType { @@ -58,6 +58,7 @@ struct sd_event_source { EventSourceType type:4; int enabled:3; bool pending:1; + bool dispatching:1; int priority; unsigned pending_index; @@ -150,6 +151,8 @@ struct sd_event { sd_event **default_event_ptr; usec_t watchdog_last, watchdog_period; + + unsigned n_sources; }; static int pending_prioq_compare(const void *a, const void *b) { @@ -315,6 +318,7 @@ static int exit_prioq_compare(const void *a, const void *b) { static void event_free(sd_event *e) { assert(e); + assert(e->n_sources == 0); if (e->default_event_ptr) *(e->default_event_ptr) = NULL; @@ -469,6 +473,8 @@ static void source_free(sd_event_source *s) { assert(s); if (s->event) { + assert(s->event->n_sources > 0); + switch (s->type) { case SOURCE_IO: @@ -520,6 +526,9 @@ static void source_free(sd_event_source *s) { case SOURCE_EXIT: prioq_remove(s->event->exit, s, &s->exit.prioq_index); break; + + case SOURCE_WATCHDOG: + assert_not_reached("Wut? I shouldn't exist."); } if (s->pending) @@ -528,6 +537,7 @@ static void source_free(sd_event_source *s) { if (s->prepare) prioq_remove(s->event->prepare, s, &s->prepare_index); + s->event->n_sources--; sd_event_unref(s->event); } @@ -581,6 +591,8 @@ static sd_event_source *source_new(sd_event *e, EventSourceType type) { s->type = type; s->pending_index = s->prepare_index = PRIOQ_IDX_NULL; + e->n_sources ++; + return s; } @@ -993,8 +1005,21 @@ _public_ sd_event_source* sd_event_source_unref(sd_event_source *s) { assert(s->n_ref >= 1); s->n_ref--; - if (s->n_ref <= 0) - source_free(s); + if (s->n_ref <= 0) { + /* Here's a special hack: when we are called from a + * dispatch handler we won't free the event source + * immediately, but we will detach the fd from the + * epoll. This way it is safe for the caller to unref + * the event source and immediately close the fd, but + * we still retain a valid event source object after + * the callback. */ + + if (s->dispatching) { + if (s->type == SOURCE_IO) + source_io_unregister(s); + } else + source_free(s); + } return NULL; } @@ -1022,6 +1047,42 @@ _public_ int sd_event_source_get_io_fd(sd_event_source *s) { return s->io.fd; } +_public_ int sd_event_source_set_io_fd(sd_event_source *s, int fd) { + int r; + + assert_return(s, -EINVAL); + assert_return(fd >= 0, -EINVAL); + assert_return(s->type == SOURCE_IO, -EDOM); + assert_return(!event_pid_changed(s->event), -ECHILD); + + if (s->io.fd == fd) + return 0; + + if (s->enabled == SD_EVENT_OFF) { + s->io.fd = fd; + s->io.registered = false; + } else { + int saved_fd; + + saved_fd = s->io.fd; + assert(s->io.registered); + + s->io.fd = fd; + s->io.registered = false; + + r = source_io_register(s, s->enabled, s->io.events); + if (r < 0) { + s->io.fd = saved_fd; + s->io.registered = true; + return r; + } + + epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, saved_fd, NULL); + } + + return 0; +} + _public_ int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) { assert_return(s, -EINVAL); assert_return(events, -EINVAL); @@ -1178,6 +1239,9 @@ _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) { case SOURCE_DEFER: s->enabled = m; break; + + case SOURCE_WATCHDOG: + assert_not_reached("Wut? I shouldn't exist."); } } else { @@ -1233,6 +1297,9 @@ _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) { case SOURCE_DEFER: s->enabled = m; break; + + case SOURCE_WATCHDOG: + assert_not_reached("Wut? I shouldn't exist."); } } @@ -1357,6 +1424,17 @@ _public_ void* sd_event_source_get_userdata(sd_event_source *s) { return s->userdata; } +_public_ void *sd_event_source_set_userdata(sd_event_source *s, void *userdata) { + void *ret; + + assert_return(s, NULL); + + ret = s->userdata; + s->userdata = userdata; + + return ret; +} + static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) { usec_t c; assert(e); @@ -1491,12 +1569,21 @@ static int event_arm_timer( return 0; } -static int process_io(sd_event *e, sd_event_source *s, uint32_t events) { +static int process_io(sd_event *e, sd_event_source *s, uint32_t revents) { assert(e); assert(s); assert(s->type == SOURCE_IO); - s->io.revents = events; + /* If the event source was already pending, we just OR in the + * new revents, otherwise we reset the value. The ORing is + * necessary to handle EPOLLONESHOT events properly where + * readability might happen independently of writability, and + * we need to keep track of both */ + + if (s->pending) + s->io.revents |= revents; + else + s->io.revents = revents; return source_set_pending(s, true); } @@ -1518,7 +1605,7 @@ static int flush_timer(sd_event *e, int fd, uint32_t events, usec_t *next) { return -errno; } - if (ss != sizeof(x)) + if (_unlikely_(ss != sizeof(x))) return -EIO; if (next) @@ -1646,7 +1733,7 @@ static int process_signal(sd_event *e, uint32_t events) { return -errno; } - if (ss != sizeof(si)) + if (_unlikely_(ss != sizeof(si))) return -EIO; read_one = true; @@ -1689,7 +1776,7 @@ static int source_dispatch(sd_event_source *s) { return r; } - sd_event_source_ref(s); + s->dispatching = true; switch (s->type) { @@ -1732,14 +1819,21 @@ static int source_dispatch(sd_event_source *s) { case SOURCE_EXIT: r = s->exit.callback(s, s->userdata); break; + + case SOURCE_WATCHDOG: + assert_not_reached("Wut? I shouldn't exist."); } - if (r < 0) { + s->dispatching = false; + + if (r < 0) log_debug("Event source %p returned error, disabling: %s", s, strerror(-r)); + + if (s->n_ref == 0) + source_free(s); + else if (r < 0) sd_event_source_set_enabled(s, SD_EVENT_OFF); - } - sd_event_source_unref(s); return 1; } @@ -1761,10 +1855,18 @@ static int event_prepare(sd_event *e) { return r; assert(s->prepare); + + s->dispatching = true; r = s->prepare(s, s->userdata); + s->dispatching = false; + if (r < 0) - return r; + log_debug("Prepare callback of event source %p returned error, disabling: %s", s, strerror(-r)); + if (s->n_ref == 0) + source_free(s); + else if (r < 0) + sd_event_source_set_enabled(s, SD_EVENT_OFF); } return 0; @@ -1847,7 +1949,8 @@ static int process_watchdog(sd_event *e) { } _public_ int sd_event_run(sd_event *e, uint64_t timeout) { - struct epoll_event ev_queue[EPOLL_QUEUE_MAX]; + struct epoll_event *ev_queue; + unsigned ev_queue_max; sd_event_source *p; int r, i, m; @@ -1877,8 +1980,10 @@ _public_ int sd_event_run(sd_event *e, uint64_t timeout) { if (event_next_pending(e) || e->need_process_child) timeout = 0; + ev_queue_max = CLAMP(e->n_sources, 1U, EPOLL_QUEUE_MAX); + ev_queue = newa(struct epoll_event, ev_queue_max); - m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX, + m = epoll_wait(e->epoll_fd, ev_queue, ev_queue_max, timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC)); if (m < 0) { r = errno == EAGAIN || errno == EINTR ? 0 : -errno; @@ -2011,7 +2116,7 @@ _public_ int sd_event_get_now_monotonic(sd_event *e, uint64_t *usec) { _public_ int sd_event_default(sd_event **ret) { - static __thread sd_event *default_event = NULL; + static thread_local sd_event *default_event = NULL; sd_event *e; int r; @@ -2052,6 +2157,7 @@ _public_ int sd_event_set_watchdog(sd_event *e, int b) { int r; assert_return(e, -EINVAL); + assert_return(!event_pid_changed(e), -ECHILD); if (e->watchdog == !!b) return e->watchdog; @@ -2107,3 +2213,10 @@ fail: e->watchdog_fd = -1; return r; } + +_public_ int sd_event_get_watchdog(sd_event *e) { + assert_return(e, -EINVAL); + assert_return(!event_pid_changed(e), -ECHILD); + + return e->watchdog; +}