#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <sys/wait.h>
+#include <pthread.h>
#include "sd-id128.h"
#include "sd-daemon.h"
#include "sd-event.h"
-#define EPOLL_QUEUE_MAX 64
+#define EPOLL_QUEUE_MAX 512U
#define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
typedef enum EventSourceType {
sd_event **default_event_ptr;
usec_t watchdog_last, watchdog_period;
+
+ unsigned n_sources;
};
static int pending_prioq_compare(const void *a, const void *b) {
static void event_free(sd_event *e) {
assert(e);
+ assert(e->n_sources == 0);
if (e->default_event_ptr)
*(e->default_event_ptr) = NULL;
assert(s);
if (s->event) {
+ assert(s->event->n_sources > 0);
+
switch (s->type) {
case SOURCE_IO:
case SOURCE_EXIT:
prioq_remove(s->event->exit, s, &s->exit.prioq_index);
break;
+
+ case SOURCE_WATCHDOG:
+ assert_not_reached("Wut? I shouldn't exist.");
}
if (s->pending)
if (s->prepare)
prioq_remove(s->event->prepare, s, &s->prepare_index);
+ s->event->n_sources--;
sd_event_unref(s->event);
}
s->type = type;
s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
+ e->n_sources ++;
+
return s;
}
sd_event_source **ret) {
sd_event_source *s;
+ sigset_t ss;
int r;
assert_return(e, -EINVAL);
assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
assert_return(!event_pid_changed(e), -ECHILD);
+ r = pthread_sigmask(SIG_SETMASK, NULL, &ss);
+ if (r < 0)
+ return -errno;
+
+ if (!sigismember(&ss, sig))
+ return -EBUSY;
+
if (!e->signal_sources) {
e->signal_sources = new0(sd_event_source*, _NSIG);
if (!e->signal_sources)
return s->io.fd;
}
+_public_ int sd_event_source_set_io_fd(sd_event_source *s, int fd) {
+ int r;
+
+ assert_return(s, -EINVAL);
+ assert_return(fd >= 0, -EINVAL);
+ assert_return(s->type == SOURCE_IO, -EDOM);
+ assert_return(!event_pid_changed(s->event), -ECHILD);
+
+ if (s->io.fd == fd)
+ return 0;
+
+ if (s->enabled == SD_EVENT_OFF) {
+ s->io.fd = fd;
+ s->io.registered = false;
+ } else {
+ int saved_fd;
+
+ saved_fd = s->io.fd;
+ assert(s->io.registered);
+
+ s->io.fd = fd;
+ s->io.registered = false;
+
+ r = source_io_register(s, s->enabled, s->io.events);
+ if (r < 0) {
+ s->io.fd = saved_fd;
+ s->io.registered = true;
+ return r;
+ }
+
+ epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, saved_fd, NULL);
+ }
+
+ return 0;
+}
+
_public_ int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
assert_return(s, -EINVAL);
assert_return(events, -EINVAL);
case SOURCE_DEFER:
s->enabled = m;
break;
+
+ case SOURCE_WATCHDOG:
+ assert_not_reached("Wut? I shouldn't exist.");
}
} else {
case SOURCE_DEFER:
s->enabled = m;
break;
+
+ case SOURCE_WATCHDOG:
+ assert_not_reached("Wut? I shouldn't exist.");
}
}
return 0;
}
-static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
+static int process_io(sd_event *e, sd_event_source *s, uint32_t revents) {
assert(e);
assert(s);
assert(s->type == SOURCE_IO);
- s->io.revents = events;
+ /* If the event source was already pending, we just OR in the
+ * new revents, otherwise we reset the value. The ORing is
+ * necessary to handle EPOLLONESHOT events properly where
+ * readability might happen independently of writability, and
+ * we need to keep track of both */
+
+ if (s->pending)
+ s->io.revents |= revents;
+ else
+ s->io.revents = revents;
return source_set_pending(s, true);
}
return -errno;
}
- if (ss != sizeof(x))
+ if (_unlikely_(ss != sizeof(x)))
return -EIO;
if (next)
return -errno;
}
- if (ss != sizeof(si))
+ if (_unlikely_(ss != sizeof(si)))
return -EIO;
read_one = true;
case SOURCE_EXIT:
r = s->exit.callback(s, s->userdata);
break;
+
+ case SOURCE_WATCHDOG:
+ assert_not_reached("Wut? I shouldn't exist.");
}
s->dispatching = false;
}
_public_ int sd_event_run(sd_event *e, uint64_t timeout) {
- struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
+ struct epoll_event *ev_queue;
+ unsigned ev_queue_max;
sd_event_source *p;
int r, i, m;
if (event_next_pending(e) || e->need_process_child)
timeout = 0;
+ ev_queue_max = CLAMP(e->n_sources, 1U, EPOLL_QUEUE_MAX);
+ ev_queue = newa(struct epoll_event, ev_queue_max);
- m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
+ m = epoll_wait(e->epoll_fd, ev_queue, ev_queue_max,
timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
if (m < 0) {
- r = errno == EAGAIN || errno == EINTR ? 0 : -errno;
+ r = errno == EAGAIN || errno == EINTR ? 1 : -errno;
goto finish;
}
p = event_next_pending(e);
if (!p) {
- r = 0;
+ r = 1;
goto finish;
}
_public_ int sd_event_default(sd_event **ret) {
- static __thread sd_event *default_event = NULL;
+ static thread_local sd_event *default_event = NULL;
sd_event *e;
int r;
if (b) {
struct epoll_event ev = {};
- const char *env;
-
- env = getenv("WATCHDOG_USEC");
- if (!env)
- return false;
- r = safe_atou64(env, &e->watchdog_period);
- if (r < 0)
+ r = sd_watchdog_enabled(false, &e->watchdog_period);
+ if (r <= 0)
return r;
- if (e->watchdog_period <= 0)
- return -EIO;
/* Issue first ping immediately */
sd_notify(false, "WATCHDOG=1");