1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
31 #include "time-util.h"
35 #define EPOLL_QUEUE_MAX 64
37 typedef enum EventSourceType {
46 struct sd_event_source {
51 sd_prepare_handler_t prepare;
53 EventSourceType type:4;
54 sd_event_mute_t mute:3;
58 unsigned pending_index;
59 unsigned prepare_index;
60 unsigned pending_iteration;
61 unsigned prepare_iteration;
65 sd_io_handler_t callback;
72 sd_time_handler_t callback;
77 sd_signal_handler_t callback;
78 struct signalfd_siginfo siginfo;
82 sd_child_handler_t callback;
88 sd_defer_handler_t callback;
107 sd_event_source **signal_sources;
109 Hashmap *child_sources;
110 unsigned n_unmuted_child_sources;
113 unsigned processed_children;
115 usec_t realtime_next, monotonic_next;
120 static int pending_prioq_compare(const void *a, const void *b) {
121 const sd_event_source *x = a, *y = b;
126 /* Unmuted ones first */
127 if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
129 if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
132 /* Lower priority values first */
133 if (x->priority < y->priority)
135 if (x->priority > y->priority)
138 /* Older entries first */
139 if (x->pending_iteration < y->pending_iteration)
141 if (x->pending_iteration > y->pending_iteration)
144 /* Stability for the rest */
153 static int prepare_prioq_compare(const void *a, const void *b) {
154 const sd_event_source *x = a, *y = b;
159 /* Move most recently prepared ones last, so that we can stop
160 * preparing as soon as we hit one that has already been
161 * prepared in the current iteration */
162 if (x->prepare_iteration < y->prepare_iteration)
164 if (x->prepare_iteration > y->prepare_iteration)
167 /* Unmuted ones first */
168 if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
170 if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
173 /* Lower priority values first */
174 if (x->priority < y->priority)
176 if (x->priority > y->priority)
179 /* Stability for the rest */
188 static int time_prioq_compare(const void *a, const void *b) {
189 const sd_event_source *x = a, *y = b;
191 assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
192 assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
194 /* Unmuted ones first */
195 if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
197 if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
200 /* Move the pending ones to the end */
201 if (!x->pending && y->pending)
203 if (x->pending && !y->pending)
207 if (x->time.next < y->time.next)
209 if (x->time.next > y->time.next)
212 /* Stability for the rest */
221 static void event_free(sd_event *e) {
224 if (e->epoll_fd >= 0)
225 close_nointr_nofail(e->epoll_fd);
227 if (e->signal_fd >= 0)
228 close_nointr_nofail(e->signal_fd);
230 if (e->realtime_fd >= 0)
231 close_nointr_nofail(e->realtime_fd);
233 if (e->monotonic_fd >= 0)
234 close_nointr_nofail(e->monotonic_fd);
236 prioq_free(e->pending);
237 prioq_free(e->prepare);
238 prioq_free(e->monotonic);
239 prioq_free(e->realtime);
241 free(e->signal_sources);
243 hashmap_free(e->child_sources);
247 int sd_event_new(sd_event** ret) {
254 e = new0(sd_event, 1);
258 e->n_ref = REFCNT_INIT;
259 e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
260 e->realtime_next = e->monotonic_next = (usec_t) -1;
262 assert_se(sigemptyset(&e->sigset) == 0);
264 e->pending = prioq_new(pending_prioq_compare);
270 e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
271 if (e->epoll_fd < 0) {
284 sd_event* sd_event_ref(sd_event *e) {
288 assert_se(REFCNT_INC(e->n_ref) >= 2);
293 sd_event* sd_event_unref(sd_event *e) {
297 if (REFCNT_DEC(e->n_ref) <= 0)
303 static int source_io_unregister(sd_event_source *s) {
307 assert(s->type == SOURCE_IO);
309 if (!s->io.registered)
312 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
316 s->io.registered = false;
320 static int source_io_register(sd_event_source *s, sd_event_mute_t m, uint32_t events) {
321 struct epoll_event ev = {};
325 assert(s->type == SOURCE_IO);
326 assert(m != SD_EVENT_MUTED);
331 if (m == SD_EVENT_ONESHOT)
332 ev.events |= EPOLLONESHOT;
334 if (s->io.registered)
335 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
337 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
342 s->io.registered = true;
347 static void source_free(sd_event_source *s) {
355 source_io_unregister(s);
359 case SOURCE_MONOTONIC:
360 prioq_remove(s->event->monotonic, s, &s->time.prioq_index);
363 case SOURCE_REALTIME:
364 prioq_remove(s->event->realtime, s, &s->time.prioq_index);
368 if (s->signal.sig > 0) {
369 if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0)
370 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
372 if (s->event->signal_sources)
373 s->event->signal_sources[s->signal.sig] = NULL;
379 if (s->child.pid > 0) {
380 if (s->mute != SD_EVENT_MUTED) {
381 assert(s->event->n_unmuted_child_sources > 0);
382 s->event->n_unmuted_child_sources--;
385 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
386 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
388 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
395 prioq_remove(s->event->pending, s, &s->pending_index);
398 prioq_remove(s->event->prepare, s, &s->prepare_index);
400 sd_event_unref(s->event);
406 static int source_set_pending(sd_event_source *s, bool b) {
417 s->pending_iteration = s->event->iteration;
419 r = prioq_put(s->event->pending, s, &s->pending_index);
425 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
430 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
435 s = new0(sd_event_source, 1);
439 s->n_ref = REFCNT_INIT;
440 s->event = sd_event_ref(e);
442 s->mute = SD_EVENT_UNMUTED;
443 s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
452 sd_io_handler_t callback,
454 sd_event_source **ret) {
463 if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP))
470 s = source_new(e, SOURCE_IO);
475 s->io.events = events;
476 s->io.callback = callback;
477 s->userdata = userdata;
479 r = source_io_register(s, s->mute, events);
489 static int event_setup_timer_fd(
491 EventSourceType type,
495 struct epoll_event ev = {};
501 if (_likely_(*timer_fd >= 0))
504 fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
509 ev.data.ptr = INT_TO_PTR(type);
511 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
513 close_nointr_nofail(fd);
521 static int event_add_time_internal(
523 EventSourceType type,
528 sd_time_handler_t callback,
530 sd_event_source **ret) {
546 *prioq = prioq_new(time_prioq_compare);
552 r = event_setup_timer_fd(e, type, timer_fd, id);
557 s = source_new(e, type);
562 s->time.callback = callback;
563 s->time.prioq_index = PRIOQ_IDX_NULL;
564 s->userdata = userdata;
566 r = prioq_put(*prioq, s, &s->time.prioq_index);
576 int sd_event_add_monotonic(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
577 return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic, usec, callback, userdata, ret);
580 int sd_event_add_realtime(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
581 return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime, usec, callback, userdata, ret);
584 static int event_update_signal_fd(sd_event *e) {
585 struct epoll_event ev = {};
591 add_to_epoll = e->signal_fd < 0;
593 r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
603 ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
605 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
607 close_nointr_nofail(e->signal_fd);
616 int sd_event_add_signal(sd_event *e, int sig, sd_signal_handler_t callback, void *userdata, sd_event_source **ret) {
631 if (!e->signal_sources) {
632 e->signal_sources = new0(sd_event_source*, _NSIG);
633 if (!e->signal_sources)
635 } else if (e->signal_sources[sig])
638 s = source_new(e, SOURCE_SIGNAL);
643 s->signal.callback = callback;
644 s->userdata = userdata;
646 e->signal_sources[sig] = s;
647 assert_se(sigaddset(&e->sigset, sig) == 0);
649 if (sig != SIGCHLD || e->n_unmuted_child_sources == 0) {
650 r = event_update_signal_fd(e);
661 int sd_event_add_child(sd_event *e, pid_t pid, int options, sd_child_handler_t callback, void *userdata, sd_event_source **ret) {
669 if (options & ~(WEXITED|WSTOPPED|WCONTINUED))
676 r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
680 if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
683 s = source_new(e, SOURCE_CHILD);
688 s->child.options = options;
689 s->child.callback = callback;
690 s->userdata = userdata;
692 r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
698 e->n_unmuted_child_sources ++;
700 assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
702 if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
703 r = event_update_signal_fd(e);
714 int sd_event_add_defer(sd_event *e, sd_defer_handler_t callback, void *userdata, sd_event_source **ret) {
723 s = source_new(e, SOURCE_DEFER);
727 s->defer.callback = callback;
728 s->userdata = userdata;
730 r = source_set_pending(s, true);
740 sd_event_source* sd_event_source_ref(sd_event_source *s) {
744 assert_se(REFCNT_INC(s->n_ref) >= 2);
749 sd_event_source* sd_event_source_unref(sd_event_source *s) {
753 if (REFCNT_DEC(s->n_ref) <= 0)
759 int sd_event_source_get_pending(sd_event_source *s) {
766 int sd_event_source_get_io_fd(sd_event_source *s) {
769 if (s->type != SOURCE_IO)
775 int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
778 if (s->type != SOURCE_IO)
783 *events = s->io.events;
787 int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
792 if (!s->type != SOURCE_IO)
794 if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP))
797 if (s->io.events == events)
800 if (s->mute != SD_EVENT_MUTED) {
801 r = source_io_register(s, s->io.events, events);
806 s->io.events = events;
811 int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
814 if (s->type != SOURCE_IO)
821 *revents = s->io.revents;
825 int sd_event_source_get_signal(sd_event_source *s) {
828 if (s->type != SOURCE_SIGNAL)
831 return s->signal.sig;
834 int sd_event_source_get_priority(sd_event_source *s, int *priority) {
841 int sd_event_source_set_priority(sd_event_source *s, int priority) {
845 if (s->priority == priority)
848 s->priority = priority;
851 assert_se(prioq_reshuffle(s->event->pending, s, &s->pending_index) == 0);
854 assert_se(prioq_reshuffle(s->event->prepare, s, &s->prepare_index) == 0);
859 int sd_event_source_get_mute(sd_event_source *s, sd_event_mute_t *m) {
869 int sd_event_source_set_mute(sd_event_source *s, sd_event_mute_t m) {
874 if (m != SD_EVENT_MUTED && m != SD_EVENT_UNMUTED && !SD_EVENT_ONESHOT)
880 if (m == SD_EVENT_MUTED) {
885 r = source_io_unregister(s);
892 case SOURCE_MONOTONIC:
894 prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index);
897 case SOURCE_REALTIME:
899 prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index);
904 if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0) {
905 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
906 event_update_signal_fd(s->event);
914 assert(s->event->n_unmuted_child_sources > 0);
915 s->event->n_unmuted_child_sources--;
917 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
918 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
919 event_update_signal_fd(s->event);
933 r = source_io_register(s, m, s->io.events);
940 case SOURCE_MONOTONIC:
942 prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index);
945 case SOURCE_REALTIME:
947 prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index);
953 if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0) {
954 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
955 event_update_signal_fd(s->event);
962 if (s->mute == SD_EVENT_MUTED) {
963 s->event->n_unmuted_child_sources++;
965 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
966 assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
967 event_update_signal_fd(s->event);
979 prioq_reshuffle(s->event->pending, s, &s->pending_index);
982 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
987 int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
992 if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
995 *usec = s->time.next;
999 int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1002 if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
1005 if (s->time.next == usec)
1008 s->time.next = usec;
1010 if (s->type == SOURCE_REALTIME)
1011 prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index);
1013 prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index);
1018 int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback) {
1024 if (s->prepare == callback)
1027 if (callback && s->prepare) {
1028 s->prepare = callback;
1032 r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1036 s->prepare = callback;
1039 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1043 prioq_remove(s->event->prepare, s, &s->prepare_index);
1048 void* sd_event_source_get_userdata(sd_event_source *s) {
1055 static int event_arm_timer(
1061 struct itimerspec its = {};
1068 s = prioq_peek(prioq);
1069 if (!s || s->mute == SD_EVENT_MUTED)
1072 if (*next == s->time.next)
1075 assert_se(timer_fd >= 0);
1077 if (s->time.next == 0) {
1078 /* We don' want to disarm here, just mean some time looooong ago. */
1079 its.it_value.tv_sec = 0;
1080 its.it_value.tv_nsec = 1;
1082 timespec_store(&its.it_value, s->time.next);
1084 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1088 *next = s->time.next;
1092 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1095 assert(s->type == SOURCE_IO);
1097 s->io.revents = events;
1100 If this is a oneshot event source, then we added it to the
1101 epoll with EPOLLONESHOT, hence we know it's not registered
1102 anymore. We can save a syscall here...
1105 if (s->mute == SD_EVENT_ONESHOT)
1106 s->io.registered = false;
1108 return source_set_pending(s, true);
1111 static int flush_timer(sd_event *e, int fd, uint32_t events) {
1117 if (events != EPOLLIN)
1120 ss = read(fd, &x, sizeof(x));
1122 if (errno == EAGAIN || errno == EINTR)
1128 if (ss != sizeof(x))
1134 static int process_timer(sd_event *e, usec_t n, Prioq *prioq) {
1141 s = prioq_peek(prioq);
1144 s->mute == SD_EVENT_MUTED ||
1148 r = source_set_pending(s, true);
1152 r = prioq_reshuffle(prioq, s, &s->time.prioq_index);
1160 static int process_child(sd_event *e) {
1168 So, this is ugly. We iteratively invoke waitid() with P_PID
1169 + WNOHANG for each PID we wait for, instead of using
1170 P_ALL. This is because we only want to get child
1171 information of very specific child processes, and not all
1172 of them. We might not have processed the SIGCHLD even of a
1173 previous invocation and we don't want to maintain a
1174 unbounded *per-child* event queue, hence we really don't
1175 want anything flushed out of the kernel's queue that we
1176 don't care about. Since this is O(n) this means that if you
1177 have a lot of processes you probably want to handle SIGCHLD
1181 HASHMAP_FOREACH(s, e->child_sources, i) {
1182 assert(s->type == SOURCE_CHILD);
1187 if (s->mute == SD_EVENT_MUTED)
1190 zero(s->child.siginfo);
1191 r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options);
1195 if (s->child.siginfo.si_pid != 0) {
1196 r = source_set_pending(s, true);
1202 e->processed_children = e->iteration;
1206 static int process_signal(sd_event *e, uint32_t events) {
1207 struct signalfd_siginfo si;
1208 bool read_one = false;
1212 if (events != EPOLLIN)
1218 ss = read(e->signal_fd, &si, sizeof(si));
1220 if (errno == EAGAIN || errno == EINTR)
1226 if (ss != sizeof(si))
1231 if (si.ssi_signo == SIGCHLD) {
1232 r = process_child(e);
1235 if (r > 0 || !e->signal_sources[si.ssi_signo])
1238 s = e->signal_sources[si.ssi_signo];
1243 s->signal.siginfo = si;
1244 r = source_set_pending(s, true);
1253 static int source_dispatch(sd_event_source *s) {
1259 r = source_set_pending(s, false);
1263 if (s->mute == SD_EVENT_ONESHOT) {
1264 r = sd_event_source_set_mute(s, SD_EVENT_MUTED);
1272 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1275 case SOURCE_MONOTONIC:
1276 r = s->time.callback(s, s->time.next, s->userdata);
1279 case SOURCE_REALTIME:
1280 r = s->time.callback(s, s->time.next, s->userdata);
1284 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1288 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1292 r = s->defer.callback(s, s->userdata);
1299 static int event_prepare(sd_event *e) {
1307 s = prioq_peek(e->prepare);
1308 if (!s || s->prepare_iteration == e->iteration || s->mute == SD_EVENT_MUTED)
1311 s->prepare_iteration = e->iteration;
1312 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1317 r = s->prepare(s, s->userdata);
1326 int sd_event_run(sd_event *e, uint64_t timeout) {
1327 struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1339 r = event_prepare(e);
1343 r = event_arm_timer(e, e->monotonic_fd, e->monotonic, &e->monotonic_next);
1347 r = event_arm_timer(e, e->realtime_fd, e->realtime, &e->realtime_next);
1351 if (e->iteration == 1 && !hashmap_isempty(e->child_sources))
1352 /* On the first iteration, there might be already some
1353 * zombies for us to care for, hence, don't wait */
1356 p = prioq_peek(e->pending);
1357 if (p && p->mute != SD_EVENT_MUTED)
1361 m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX, timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1365 dual_timestamp_get(&n);
1367 for (i = 0; i < m; i++) {
1369 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1370 r = flush_timer(e, e->monotonic_fd, ev_queue[i].events);
1371 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1372 r = flush_timer(e, e->realtime_fd, ev_queue[i].events);
1373 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1374 r = process_signal(e, ev_queue[i].events);
1376 r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1382 r = process_timer(e, n.monotonic, e->monotonic);
1386 r = process_timer(e, n.realtime, e->realtime);
1390 if (e->iteration == 1 && e->processed_children != 1) {
1391 /* On the first iteration, make sure we really process
1392 * all children which might already be zombies. */
1393 r = process_child(e);
1398 p = prioq_peek(e->pending);
1399 if (!p || p->mute == SD_EVENT_MUTED)
1402 return source_dispatch(p);
1405 int sd_event_loop(sd_event *e) {
1412 r = sd_event_run(e, (uint64_t) -1);
1420 int sd_event_quit(sd_event *e) {
1427 int sd_event_request_quit(sd_event *e) {
1435 sd_event *sd_event_get(sd_event_source *s) {