1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
27 #include "sd-daemon.h"
32 #include "time-util.h"
37 #define EPOLL_QUEUE_MAX 512U
38 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
40 typedef enum EventSourceType {
51 struct sd_event_source {
56 sd_event_handler_t prepare;
58 EventSourceType type:4;
64 unsigned pending_index;
65 unsigned prepare_index;
66 unsigned pending_iteration;
67 unsigned prepare_iteration;
71 sd_event_io_handler_t callback;
78 sd_event_time_handler_t callback;
79 usec_t next, accuracy;
80 unsigned earliest_index;
81 unsigned latest_index;
84 sd_event_signal_handler_t callback;
85 struct signalfd_siginfo siginfo;
89 sd_event_child_handler_t callback;
95 sd_event_handler_t callback;
98 sd_event_handler_t callback;
116 /* For both clocks we maintain two priority queues each, one
117 * ordered for the earliest times the events may be
118 * dispatched, and one ordered by the latest times they must
119 * have been dispatched. The range between the top entries in
120 * the two prioqs is the time window we can freely schedule
122 Prioq *monotonic_earliest;
123 Prioq *monotonic_latest;
124 Prioq *realtime_earliest;
125 Prioq *realtime_latest;
127 usec_t realtime_next, monotonic_next;
131 sd_event_source **signal_sources;
133 Hashmap *child_sources;
134 unsigned n_enabled_child_sources;
141 dual_timestamp timestamp;
144 bool exit_requested:1;
145 bool need_process_child:1;
151 sd_event **default_event_ptr;
153 usec_t watchdog_last, watchdog_period;
158 static int pending_prioq_compare(const void *a, const void *b) {
159 const sd_event_source *x = a, *y = b;
164 /* Enabled ones first */
165 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
167 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
170 /* Lower priority values first */
171 if (x->priority < y->priority)
173 if (x->priority > y->priority)
176 /* Older entries first */
177 if (x->pending_iteration < y->pending_iteration)
179 if (x->pending_iteration > y->pending_iteration)
182 /* Stability for the rest */
191 static int prepare_prioq_compare(const void *a, const void *b) {
192 const sd_event_source *x = a, *y = b;
197 /* Move most recently prepared ones last, so that we can stop
198 * preparing as soon as we hit one that has already been
199 * prepared in the current iteration */
200 if (x->prepare_iteration < y->prepare_iteration)
202 if (x->prepare_iteration > y->prepare_iteration)
205 /* Enabled ones first */
206 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
208 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
211 /* Lower priority values first */
212 if (x->priority < y->priority)
214 if (x->priority > y->priority)
217 /* Stability for the rest */
226 static int earliest_time_prioq_compare(const void *a, const void *b) {
227 const sd_event_source *x = a, *y = b;
229 assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
230 assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
232 /* Enabled ones first */
233 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
235 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
238 /* Move the pending ones to the end */
239 if (!x->pending && y->pending)
241 if (x->pending && !y->pending)
245 if (x->time.next < y->time.next)
247 if (x->time.next > y->time.next)
250 /* Stability for the rest */
259 static int latest_time_prioq_compare(const void *a, const void *b) {
260 const sd_event_source *x = a, *y = b;
262 assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
263 (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
265 /* Enabled ones first */
266 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
268 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
271 /* Move the pending ones to the end */
272 if (!x->pending && y->pending)
274 if (x->pending && !y->pending)
278 if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
280 if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
283 /* Stability for the rest */
292 static int exit_prioq_compare(const void *a, const void *b) {
293 const sd_event_source *x = a, *y = b;
295 assert(x->type == SOURCE_EXIT);
296 assert(y->type == SOURCE_EXIT);
298 /* Enabled ones first */
299 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
301 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
304 /* Lower priority values first */
305 if (x->priority < y->priority)
307 if (x->priority > y->priority)
310 /* Stability for the rest */
319 static void event_free(sd_event *e) {
321 assert(e->n_sources == 0);
323 if (e->default_event_ptr)
324 *(e->default_event_ptr) = NULL;
326 if (e->epoll_fd >= 0)
327 close_nointr_nofail(e->epoll_fd);
329 if (e->signal_fd >= 0)
330 close_nointr_nofail(e->signal_fd);
332 if (e->realtime_fd >= 0)
333 close_nointr_nofail(e->realtime_fd);
335 if (e->monotonic_fd >= 0)
336 close_nointr_nofail(e->monotonic_fd);
338 if (e->watchdog_fd >= 0)
339 close_nointr_nofail(e->watchdog_fd);
341 prioq_free(e->pending);
342 prioq_free(e->prepare);
343 prioq_free(e->monotonic_earliest);
344 prioq_free(e->monotonic_latest);
345 prioq_free(e->realtime_earliest);
346 prioq_free(e->realtime_latest);
349 free(e->signal_sources);
351 hashmap_free(e->child_sources);
355 _public_ int sd_event_new(sd_event** ret) {
359 assert_return(ret, -EINVAL);
361 e = new0(sd_event, 1);
366 e->signal_fd = e->realtime_fd = e->monotonic_fd = e->watchdog_fd = e->epoll_fd = -1;
367 e->realtime_next = e->monotonic_next = (usec_t) -1;
368 e->original_pid = getpid();
370 assert_se(sigemptyset(&e->sigset) == 0);
372 e->pending = prioq_new(pending_prioq_compare);
378 e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
379 if (e->epoll_fd < 0) {
392 _public_ sd_event* sd_event_ref(sd_event *e) {
393 assert_return(e, NULL);
395 assert(e->n_ref >= 1);
401 _public_ sd_event* sd_event_unref(sd_event *e) {
406 assert(e->n_ref >= 1);
415 static bool event_pid_changed(sd_event *e) {
418 /* We don't support people creating am event loop and keeping
419 * it around over a fork(). Let's complain. */
421 return e->original_pid != getpid();
424 static int source_io_unregister(sd_event_source *s) {
428 assert(s->type == SOURCE_IO);
430 if (!s->io.registered)
433 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
437 s->io.registered = false;
441 static int source_io_register(
446 struct epoll_event ev = {};
450 assert(s->type == SOURCE_IO);
451 assert(enabled != SD_EVENT_OFF);
456 if (enabled == SD_EVENT_ONESHOT)
457 ev.events |= EPOLLONESHOT;
459 if (s->io.registered)
460 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
462 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
467 s->io.registered = true;
472 static void source_free(sd_event_source *s) {
476 assert(s->event->n_sources > 0);
482 source_io_unregister(s);
486 case SOURCE_MONOTONIC:
487 prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
488 prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
491 case SOURCE_REALTIME:
492 prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
493 prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
497 if (s->signal.sig > 0) {
498 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)
499 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
501 if (s->event->signal_sources)
502 s->event->signal_sources[s->signal.sig] = NULL;
508 if (s->child.pid > 0) {
509 if (s->enabled != SD_EVENT_OFF) {
510 assert(s->event->n_enabled_child_sources > 0);
511 s->event->n_enabled_child_sources--;
514 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
515 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
517 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
527 prioq_remove(s->event->exit, s, &s->exit.prioq_index);
530 case SOURCE_WATCHDOG:
531 assert_not_reached("Wut? I shouldn't exist.");
535 prioq_remove(s->event->pending, s, &s->pending_index);
538 prioq_remove(s->event->prepare, s, &s->prepare_index);
540 s->event->n_sources--;
541 sd_event_unref(s->event);
547 static int source_set_pending(sd_event_source *s, bool b) {
551 assert(s->type != SOURCE_EXIT);
559 s->pending_iteration = s->event->iteration;
561 r = prioq_put(s->event->pending, s, &s->pending_index);
567 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
569 if (s->type == SOURCE_REALTIME) {
570 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
571 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
572 } else if (s->type == SOURCE_MONOTONIC) {
573 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
574 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
580 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
585 s = new0(sd_event_source, 1);
590 s->event = sd_event_ref(e);
592 s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
599 _public_ int sd_event_add_io(
603 sd_event_io_handler_t callback,
605 sd_event_source **ret) {
610 assert_return(e, -EINVAL);
611 assert_return(fd >= 0, -EINVAL);
612 assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP|EPOLLET)), -EINVAL);
613 assert_return(callback, -EINVAL);
614 assert_return(ret, -EINVAL);
615 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
616 assert_return(!event_pid_changed(e), -ECHILD);
618 s = source_new(e, SOURCE_IO);
623 s->io.events = events;
624 s->io.callback = callback;
625 s->userdata = userdata;
626 s->enabled = SD_EVENT_ON;
628 r = source_io_register(s, s->enabled, events);
638 static int event_setup_timer_fd(
640 EventSourceType type,
644 struct epoll_event ev = {};
651 if (_likely_(*timer_fd >= 0))
654 fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
659 ev.data.ptr = INT_TO_PTR(type);
661 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
663 close_nointr_nofail(fd);
667 /* When we sleep for longer, we try to realign the wakeup to
668 the same time wihtin each minute/second/250ms, so that
669 events all across the system can be coalesced into a single
670 CPU wakeup. However, let's take some system-specific
671 randomness for this value, so that in a network of systems
672 with synced clocks timer events are distributed a
673 bit. Here, we calculate a perturbation usec offset from the
676 if (sd_id128_get_boot(&bootid) >= 0)
677 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_MINUTE;
683 static int event_add_time_internal(
685 EventSourceType type,
692 sd_event_time_handler_t callback,
694 sd_event_source **ret) {
699 assert_return(e, -EINVAL);
700 assert_return(callback, -EINVAL);
701 assert_return(ret, -EINVAL);
702 assert_return(usec != (uint64_t) -1, -EINVAL);
703 assert_return(accuracy != (uint64_t) -1, -EINVAL);
704 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
705 assert_return(!event_pid_changed(e), -ECHILD);
712 *earliest = prioq_new(earliest_time_prioq_compare);
718 *latest = prioq_new(latest_time_prioq_compare);
724 r = event_setup_timer_fd(e, type, timer_fd, id);
729 s = source_new(e, type);
734 s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
735 s->time.callback = callback;
736 s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
737 s->userdata = userdata;
738 s->enabled = SD_EVENT_ONESHOT;
740 r = prioq_put(*earliest, s, &s->time.earliest_index);
744 r = prioq_put(*latest, s, &s->time.latest_index);
756 _public_ int sd_event_add_monotonic(sd_event *e,
759 sd_event_time_handler_t callback,
761 sd_event_source **ret) {
763 return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
766 _public_ int sd_event_add_realtime(sd_event *e,
769 sd_event_time_handler_t callback,
771 sd_event_source **ret) {
773 return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
776 static int event_update_signal_fd(sd_event *e) {
777 struct epoll_event ev = {};
783 add_to_epoll = e->signal_fd < 0;
785 r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
795 ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
797 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
799 close_nointr_nofail(e->signal_fd);
808 _public_ int sd_event_add_signal(
811 sd_event_signal_handler_t callback,
813 sd_event_source **ret) {
818 assert_return(e, -EINVAL);
819 assert_return(sig > 0, -EINVAL);
820 assert_return(sig < _NSIG, -EINVAL);
821 assert_return(callback, -EINVAL);
822 assert_return(ret, -EINVAL);
823 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
824 assert_return(!event_pid_changed(e), -ECHILD);
826 if (!e->signal_sources) {
827 e->signal_sources = new0(sd_event_source*, _NSIG);
828 if (!e->signal_sources)
830 } else if (e->signal_sources[sig])
833 s = source_new(e, SOURCE_SIGNAL);
838 s->signal.callback = callback;
839 s->userdata = userdata;
840 s->enabled = SD_EVENT_ON;
842 e->signal_sources[sig] = s;
843 assert_se(sigaddset(&e->sigset, sig) == 0);
845 if (sig != SIGCHLD || e->n_enabled_child_sources == 0) {
846 r = event_update_signal_fd(e);
857 _public_ int sd_event_add_child(
861 sd_event_child_handler_t callback,
863 sd_event_source **ret) {
868 assert_return(e, -EINVAL);
869 assert_return(pid > 1, -EINVAL);
870 assert_return(!(options & ~(WEXITED|WSTOPPED|WCONTINUED)), -EINVAL);
871 assert_return(options != 0, -EINVAL);
872 assert_return(callback, -EINVAL);
873 assert_return(ret, -EINVAL);
874 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
875 assert_return(!event_pid_changed(e), -ECHILD);
877 r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
881 if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
884 s = source_new(e, SOURCE_CHILD);
889 s->child.options = options;
890 s->child.callback = callback;
891 s->userdata = userdata;
892 s->enabled = SD_EVENT_ONESHOT;
894 r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
900 e->n_enabled_child_sources ++;
902 assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
904 if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
905 r = event_update_signal_fd(e);
912 e->need_process_child = true;
918 _public_ int sd_event_add_defer(
920 sd_event_handler_t callback,
922 sd_event_source **ret) {
927 assert_return(e, -EINVAL);
928 assert_return(callback, -EINVAL);
929 assert_return(ret, -EINVAL);
930 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
931 assert_return(!event_pid_changed(e), -ECHILD);
933 s = source_new(e, SOURCE_DEFER);
937 s->defer.callback = callback;
938 s->userdata = userdata;
939 s->enabled = SD_EVENT_ONESHOT;
941 r = source_set_pending(s, true);
951 _public_ int sd_event_add_exit(
953 sd_event_handler_t callback,
955 sd_event_source **ret) {
960 assert_return(e, -EINVAL);
961 assert_return(callback, -EINVAL);
962 assert_return(ret, -EINVAL);
963 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
964 assert_return(!event_pid_changed(e), -ECHILD);
967 e->exit = prioq_new(exit_prioq_compare);
972 s = source_new(e, SOURCE_EXIT);
976 s->exit.callback = callback;
977 s->userdata = userdata;
978 s->exit.prioq_index = PRIOQ_IDX_NULL;
979 s->enabled = SD_EVENT_ONESHOT;
981 r = prioq_put(s->event->exit, s, &s->exit.prioq_index);
991 _public_ sd_event_source* sd_event_source_ref(sd_event_source *s) {
992 assert_return(s, NULL);
994 assert(s->n_ref >= 1);
1000 _public_ sd_event_source* sd_event_source_unref(sd_event_source *s) {
1005 assert(s->n_ref >= 1);
1008 if (s->n_ref <= 0) {
1009 /* Here's a special hack: when we are called from a
1010 * dispatch handler we won't free the event source
1011 * immediately, but we will detach the fd from the
1012 * epoll. This way it is safe for the caller to unref
1013 * the event source and immediately close the fd, but
1014 * we still retain a valid event source object after
1017 if (s->dispatching) {
1018 if (s->type == SOURCE_IO)
1019 source_io_unregister(s);
1027 _public_ sd_event *sd_event_source_get_event(sd_event_source *s) {
1028 assert_return(s, NULL);
1033 _public_ int sd_event_source_get_pending(sd_event_source *s) {
1034 assert_return(s, -EINVAL);
1035 assert_return(s->type != SOURCE_EXIT, -EDOM);
1036 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1037 assert_return(!event_pid_changed(s->event), -ECHILD);
1042 _public_ int sd_event_source_get_io_fd(sd_event_source *s) {
1043 assert_return(s, -EINVAL);
1044 assert_return(s->type == SOURCE_IO, -EDOM);
1045 assert_return(!event_pid_changed(s->event), -ECHILD);
1050 _public_ int sd_event_source_set_io_fd(sd_event_source *s, int fd) {
1053 assert_return(s, -EINVAL);
1054 assert_return(fd >= 0, -EINVAL);
1055 assert_return(s->type == SOURCE_IO, -EDOM);
1056 assert_return(!event_pid_changed(s->event), -ECHILD);
1061 if (s->enabled == SD_EVENT_OFF) {
1063 s->io.registered = false;
1067 saved_fd = s->io.fd;
1068 assert(s->io.registered);
1071 s->io.registered = false;
1073 r = source_io_register(s, s->enabled, s->io.events);
1075 s->io.fd = saved_fd;
1076 s->io.registered = true;
1080 epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, saved_fd, NULL);
1086 _public_ int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
1087 assert_return(s, -EINVAL);
1088 assert_return(events, -EINVAL);
1089 assert_return(s->type == SOURCE_IO, -EDOM);
1090 assert_return(!event_pid_changed(s->event), -ECHILD);
1092 *events = s->io.events;
1096 _public_ int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
1099 assert_return(s, -EINVAL);
1100 assert_return(s->type == SOURCE_IO, -EDOM);
1101 assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP|EPOLLET)), -EINVAL);
1102 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1103 assert_return(!event_pid_changed(s->event), -ECHILD);
1105 if (s->io.events == events)
1108 if (s->enabled != SD_EVENT_OFF) {
1109 r = source_io_register(s, s->enabled, events);
1114 s->io.events = events;
1115 source_set_pending(s, false);
1120 _public_ int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1121 assert_return(s, -EINVAL);
1122 assert_return(revents, -EINVAL);
1123 assert_return(s->type == SOURCE_IO, -EDOM);
1124 assert_return(s->pending, -ENODATA);
1125 assert_return(!event_pid_changed(s->event), -ECHILD);
1127 *revents = s->io.revents;
1131 _public_ int sd_event_source_get_signal(sd_event_source *s) {
1132 assert_return(s, -EINVAL);
1133 assert_return(s->type == SOURCE_SIGNAL, -EDOM);
1134 assert_return(!event_pid_changed(s->event), -ECHILD);
1136 return s->signal.sig;
1139 _public_ int sd_event_source_get_priority(sd_event_source *s, int *priority) {
1140 assert_return(s, -EINVAL);
1141 assert_return(!event_pid_changed(s->event), -ECHILD);
1146 _public_ int sd_event_source_set_priority(sd_event_source *s, int priority) {
1147 assert_return(s, -EINVAL);
1148 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1149 assert_return(!event_pid_changed(s->event), -ECHILD);
1151 if (s->priority == priority)
1154 s->priority = priority;
1157 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1160 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1162 if (s->type == SOURCE_EXIT)
1163 prioq_reshuffle(s->event->exit, s, &s->exit.prioq_index);
1168 _public_ int sd_event_source_get_enabled(sd_event_source *s, int *m) {
1169 assert_return(s, -EINVAL);
1170 assert_return(m, -EINVAL);
1171 assert_return(!event_pid_changed(s->event), -ECHILD);
1177 _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
1180 assert_return(s, -EINVAL);
1181 assert_return(m == SD_EVENT_OFF || m == SD_EVENT_ON || m == SD_EVENT_ONESHOT, -EINVAL);
1182 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1183 assert_return(!event_pid_changed(s->event), -ECHILD);
1185 if (s->enabled == m)
1188 if (m == SD_EVENT_OFF) {
1193 r = source_io_unregister(s);
1200 case SOURCE_MONOTONIC:
1202 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1203 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1206 case SOURCE_REALTIME:
1208 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1209 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1214 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1215 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1216 event_update_signal_fd(s->event);
1224 assert(s->event->n_enabled_child_sources > 0);
1225 s->event->n_enabled_child_sources--;
1227 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1228 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1229 event_update_signal_fd(s->event);
1236 prioq_reshuffle(s->event->exit, s, &s->exit.prioq_index);
1243 case SOURCE_WATCHDOG:
1244 assert_not_reached("Wut? I shouldn't exist.");
1251 r = source_io_register(s, m, s->io.events);
1258 case SOURCE_MONOTONIC:
1260 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1261 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1264 case SOURCE_REALTIME:
1266 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1267 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1273 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1274 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1275 event_update_signal_fd(s->event);
1282 if (s->enabled == SD_EVENT_OFF) {
1283 s->event->n_enabled_child_sources++;
1285 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1286 assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1287 event_update_signal_fd(s->event);
1294 prioq_reshuffle(s->event->exit, s, &s->exit.prioq_index);
1301 case SOURCE_WATCHDOG:
1302 assert_not_reached("Wut? I shouldn't exist.");
1307 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1310 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1315 _public_ int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1316 assert_return(s, -EINVAL);
1317 assert_return(usec, -EINVAL);
1318 assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1319 assert_return(!event_pid_changed(s->event), -ECHILD);
1321 *usec = s->time.next;
1325 _public_ int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1326 assert_return(s, -EINVAL);
1327 assert_return(usec != (uint64_t) -1, -EINVAL);
1328 assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1329 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1330 assert_return(!event_pid_changed(s->event), -ECHILD);
1332 s->time.next = usec;
1334 source_set_pending(s, false);
1336 if (s->type == SOURCE_REALTIME) {
1337 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1338 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1340 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1341 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1347 _public_ int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1348 assert_return(s, -EINVAL);
1349 assert_return(usec, -EINVAL);
1350 assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1351 assert_return(!event_pid_changed(s->event), -ECHILD);
1353 *usec = s->time.accuracy;
1357 _public_ int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1358 assert_return(s, -EINVAL);
1359 assert_return(usec != (uint64_t) -1, -EINVAL);
1360 assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1361 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1362 assert_return(!event_pid_changed(s->event), -ECHILD);
1365 usec = DEFAULT_ACCURACY_USEC;
1367 s->time.accuracy = usec;
1369 source_set_pending(s, false);
1371 if (s->type == SOURCE_REALTIME)
1372 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1374 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1379 _public_ int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid) {
1380 assert_return(s, -EINVAL);
1381 assert_return(pid, -EINVAL);
1382 assert_return(s->type == SOURCE_CHILD, -EDOM);
1383 assert_return(!event_pid_changed(s->event), -ECHILD);
1385 *pid = s->child.pid;
1389 _public_ int sd_event_source_set_prepare(sd_event_source *s, sd_event_handler_t callback) {
1392 assert_return(s, -EINVAL);
1393 assert_return(s->type != SOURCE_EXIT, -EDOM);
1394 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1395 assert_return(!event_pid_changed(s->event), -ECHILD);
1397 if (s->prepare == callback)
1400 if (callback && s->prepare) {
1401 s->prepare = callback;
1405 r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1409 s->prepare = callback;
1412 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1416 prioq_remove(s->event->prepare, s, &s->prepare_index);
1421 _public_ void* sd_event_source_get_userdata(sd_event_source *s) {
1422 assert_return(s, NULL);
1427 _public_ void *sd_event_source_set_userdata(sd_event_source *s, void *userdata) {
1430 assert_return(s, NULL);
1433 s->userdata = userdata;
1438 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1450 Find a good time to wake up again between times a and b. We
1451 have two goals here:
1453 a) We want to wake up as seldom as possible, hence prefer
1454 later times over earlier times.
1456 b) But if we have to wake up, then let's make sure to
1457 dispatch as much as possible on the entire system.
1459 We implement this by waking up everywhere at the same time
1460 within any given minute if we can, synchronised via the
1461 perturbation value determined from the boot ID. If we can't,
1462 then we try to find the same spot in every 10s, then 1s and
1463 then 250ms step. Otherwise, we pick the last possible time
1467 c = (b / USEC_PER_MINUTE) * USEC_PER_MINUTE + e->perturb;
1469 if (_unlikely_(c < USEC_PER_MINUTE))
1472 c -= USEC_PER_MINUTE;
1478 c = (b / (USEC_PER_SEC*10)) * (USEC_PER_SEC*10) + (e->perturb % (USEC_PER_SEC*10));
1480 if (_unlikely_(c < USEC_PER_SEC*10))
1483 c -= USEC_PER_SEC*10;
1489 c = (b / USEC_PER_SEC) * USEC_PER_SEC + (e->perturb % USEC_PER_SEC);
1491 if (_unlikely_(c < USEC_PER_SEC))
1500 c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1502 if (_unlikely_(c < USEC_PER_MSEC*250))
1505 c -= USEC_PER_MSEC*250;
1514 static int event_arm_timer(
1521 struct itimerspec its = {};
1522 sd_event_source *a, *b;
1529 a = prioq_peek(earliest);
1530 if (!a || a->enabled == SD_EVENT_OFF) {
1535 if (*next == (usec_t) -1)
1539 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1543 *next = (usec_t) -1;
1548 b = prioq_peek(latest);
1549 assert_se(b && b->enabled != SD_EVENT_OFF);
1551 t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1555 assert_se(timer_fd >= 0);
1558 /* We don' want to disarm here, just mean some time looooong ago. */
1559 its.it_value.tv_sec = 0;
1560 its.it_value.tv_nsec = 1;
1562 timespec_store(&its.it_value, t);
1564 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1572 static int process_io(sd_event *e, sd_event_source *s, uint32_t revents) {
1575 assert(s->type == SOURCE_IO);
1577 /* If the event source was already pending, we just OR in the
1578 * new revents, otherwise we reset the value. The ORing is
1579 * necessary to handle EPOLLONESHOT events properly where
1580 * readability might happen independently of writability, and
1581 * we need to keep track of both */
1584 s->io.revents |= revents;
1586 s->io.revents = revents;
1588 return source_set_pending(s, true);
1591 static int flush_timer(sd_event *e, int fd, uint32_t events, usec_t *next) {
1598 assert_return(events == EPOLLIN, -EIO);
1600 ss = read(fd, &x, sizeof(x));
1602 if (errno == EAGAIN || errno == EINTR)
1608 if (_unlikely_(ss != sizeof(x)))
1612 *next = (usec_t) -1;
1617 static int process_timer(
1629 s = prioq_peek(earliest);
1632 s->enabled == SD_EVENT_OFF ||
1636 r = source_set_pending(s, true);
1640 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1641 prioq_reshuffle(latest, s, &s->time.latest_index);
1647 static int process_child(sd_event *e) {
1654 e->need_process_child = false;
1657 So, this is ugly. We iteratively invoke waitid() with P_PID
1658 + WNOHANG for each PID we wait for, instead of using
1659 P_ALL. This is because we only want to get child
1660 information of very specific child processes, and not all
1661 of them. We might not have processed the SIGCHLD even of a
1662 previous invocation and we don't want to maintain a
1663 unbounded *per-child* event queue, hence we really don't
1664 want anything flushed out of the kernel's queue that we
1665 don't care about. Since this is O(n) this means that if you
1666 have a lot of processes you probably want to handle SIGCHLD
1669 We do not reap the children here (by using WNOWAIT), this
1670 is only done after the event source is dispatched so that
1671 the callback still sees the process as a zombie.
1674 HASHMAP_FOREACH(s, e->child_sources, i) {
1675 assert(s->type == SOURCE_CHILD);
1680 if (s->enabled == SD_EVENT_OFF)
1683 zero(s->child.siginfo);
1684 r = waitid(P_PID, s->child.pid, &s->child.siginfo,
1685 WNOHANG | (s->child.options & WEXITED ? WNOWAIT : 0) | s->child.options);
1689 if (s->child.siginfo.si_pid != 0) {
1691 s->child.siginfo.si_code == CLD_EXITED ||
1692 s->child.siginfo.si_code == CLD_KILLED ||
1693 s->child.siginfo.si_code == CLD_DUMPED;
1695 if (!zombie && (s->child.options & WEXITED)) {
1696 /* If the child isn't dead then let's
1697 * immediately remove the state change
1698 * from the queue, since there's no
1699 * benefit in leaving it queued */
1701 assert(s->child.options & (WSTOPPED|WCONTINUED));
1702 waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|(s->child.options & (WSTOPPED|WCONTINUED)));
1705 r = source_set_pending(s, true);
1714 static int process_signal(sd_event *e, uint32_t events) {
1715 bool read_one = false;
1719 assert(e->signal_sources);
1721 assert_return(events == EPOLLIN, -EIO);
1724 struct signalfd_siginfo si;
1728 ss = read(e->signal_fd, &si, sizeof(si));
1730 if (errno == EAGAIN || errno == EINTR)
1736 if (_unlikely_(ss != sizeof(si)))
1741 s = e->signal_sources[si.ssi_signo];
1742 if (si.ssi_signo == SIGCHLD) {
1743 r = process_child(e);
1752 s->signal.siginfo = si;
1753 r = source_set_pending(s, true);
1761 static int source_dispatch(sd_event_source *s) {
1765 assert(s->pending || s->type == SOURCE_EXIT);
1767 if (s->type != SOURCE_DEFER && s->type != SOURCE_EXIT) {
1768 r = source_set_pending(s, false);
1773 if (s->enabled == SD_EVENT_ONESHOT) {
1774 r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
1779 s->dispatching = true;
1784 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1787 case SOURCE_MONOTONIC:
1788 r = s->time.callback(s, s->time.next, s->userdata);
1791 case SOURCE_REALTIME:
1792 r = s->time.callback(s, s->time.next, s->userdata);
1796 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1799 case SOURCE_CHILD: {
1802 zombie = s->child.siginfo.si_code == CLD_EXITED ||
1803 s->child.siginfo.si_code == CLD_KILLED ||
1804 s->child.siginfo.si_code == CLD_DUMPED;
1806 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1808 /* Now, reap the PID for good. */
1810 waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|WEXITED);
1816 r = s->defer.callback(s, s->userdata);
1820 r = s->exit.callback(s, s->userdata);
1823 case SOURCE_WATCHDOG:
1824 assert_not_reached("Wut? I shouldn't exist.");
1827 s->dispatching = false;
1830 log_debug("Event source %p returned error, disabling: %s", s, strerror(-r));
1835 sd_event_source_set_enabled(s, SD_EVENT_OFF);
1840 static int event_prepare(sd_event *e) {
1848 s = prioq_peek(e->prepare);
1849 if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
1852 s->prepare_iteration = e->iteration;
1853 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1859 s->dispatching = true;
1860 r = s->prepare(s, s->userdata);
1861 s->dispatching = false;
1864 log_debug("Prepare callback of event source %p returned error, disabling: %s", s, strerror(-r));
1869 sd_event_source_set_enabled(s, SD_EVENT_OFF);
1875 static int dispatch_exit(sd_event *e) {
1881 p = prioq_peek(e->exit);
1882 if (!p || p->enabled == SD_EVENT_OFF) {
1883 e->state = SD_EVENT_FINISHED;
1889 e->state = SD_EVENT_EXITING;
1891 r = source_dispatch(p);
1893 e->state = SD_EVENT_PASSIVE;
1899 static sd_event_source* event_next_pending(sd_event *e) {
1904 p = prioq_peek(e->pending);
1908 if (p->enabled == SD_EVENT_OFF)
1914 static int arm_watchdog(sd_event *e) {
1915 struct itimerspec its = {};
1920 assert(e->watchdog_fd >= 0);
1922 t = sleep_between(e,
1923 e->watchdog_last + (e->watchdog_period / 2),
1924 e->watchdog_last + (e->watchdog_period * 3 / 4));
1926 timespec_store(&its.it_value, t);
1928 r = timerfd_settime(e->watchdog_fd, TFD_TIMER_ABSTIME, &its, NULL);
1935 static int process_watchdog(sd_event *e) {
1941 /* Don't notify watchdog too often */
1942 if (e->watchdog_last + e->watchdog_period / 4 > e->timestamp.monotonic)
1945 sd_notify(false, "WATCHDOG=1");
1946 e->watchdog_last = e->timestamp.monotonic;
1948 return arm_watchdog(e);
1951 _public_ int sd_event_run(sd_event *e, uint64_t timeout) {
1952 struct epoll_event *ev_queue;
1953 unsigned ev_queue_max;
1957 assert_return(e, -EINVAL);
1958 assert_return(!event_pid_changed(e), -ECHILD);
1959 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1960 assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1962 if (e->exit_requested)
1963 return dispatch_exit(e);
1967 e->state = SD_EVENT_RUNNING;
1969 r = event_prepare(e);
1973 r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1977 r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1981 if (event_next_pending(e) || e->need_process_child)
1983 ev_queue_max = CLAMP(e->n_sources, 1U, EPOLL_QUEUE_MAX);
1984 ev_queue = newa(struct epoll_event, ev_queue_max);
1986 m = epoll_wait(e->epoll_fd, ev_queue, ev_queue_max,
1987 timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1989 r = errno == EAGAIN || errno == EINTR ? 1 : -errno;
1993 dual_timestamp_get(&e->timestamp);
1995 for (i = 0; i < m; i++) {
1997 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1998 r = flush_timer(e, e->monotonic_fd, ev_queue[i].events, &e->monotonic_next);
1999 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
2000 r = flush_timer(e, e->realtime_fd, ev_queue[i].events, &e->realtime_next);
2001 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
2002 r = process_signal(e, ev_queue[i].events);
2003 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_WATCHDOG))
2004 r = flush_timer(e, e->watchdog_fd, ev_queue[i].events, NULL);
2006 r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
2012 r = process_watchdog(e);
2016 r = process_timer(e, e->timestamp.monotonic, e->monotonic_earliest, e->monotonic_latest);
2020 r = process_timer(e, e->timestamp.realtime, e->realtime_earliest, e->realtime_latest);
2024 if (e->need_process_child) {
2025 r = process_child(e);
2030 p = event_next_pending(e);
2036 r = source_dispatch(p);
2039 e->state = SD_EVENT_PASSIVE;
2045 _public_ int sd_event_loop(sd_event *e) {
2048 assert_return(e, -EINVAL);
2049 assert_return(!event_pid_changed(e), -ECHILD);
2050 assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
2054 while (e->state != SD_EVENT_FINISHED) {
2055 r = sd_event_run(e, (uint64_t) -1);
2067 _public_ int sd_event_get_state(sd_event *e) {
2068 assert_return(e, -EINVAL);
2069 assert_return(!event_pid_changed(e), -ECHILD);
2074 _public_ int sd_event_get_exit_code(sd_event *e, int *code) {
2075 assert_return(e, -EINVAL);
2076 assert_return(code, -EINVAL);
2077 assert_return(!event_pid_changed(e), -ECHILD);
2079 if (!e->exit_requested)
2082 *code = e->exit_code;
2086 _public_ int sd_event_exit(sd_event *e, int code) {
2087 assert_return(e, -EINVAL);
2088 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
2089 assert_return(!event_pid_changed(e), -ECHILD);
2091 e->exit_requested = true;
2092 e->exit_code = code;
2097 _public_ int sd_event_get_now_realtime(sd_event *e, uint64_t *usec) {
2098 assert_return(e, -EINVAL);
2099 assert_return(usec, -EINVAL);
2100 assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
2101 assert_return(!event_pid_changed(e), -ECHILD);
2103 *usec = e->timestamp.realtime;
2107 _public_ int sd_event_get_now_monotonic(sd_event *e, uint64_t *usec) {
2108 assert_return(e, -EINVAL);
2109 assert_return(usec, -EINVAL);
2110 assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
2111 assert_return(!event_pid_changed(e), -ECHILD);
2113 *usec = e->timestamp.monotonic;
2117 _public_ int sd_event_default(sd_event **ret) {
2119 static thread_local sd_event *default_event = NULL;
2124 return !!default_event;
2126 if (default_event) {
2127 *ret = sd_event_ref(default_event);
2131 r = sd_event_new(&e);
2135 e->default_event_ptr = &default_event;
2143 _public_ int sd_event_get_tid(sd_event *e, pid_t *tid) {
2144 assert_return(e, -EINVAL);
2145 assert_return(tid, -EINVAL);
2146 assert_return(!event_pid_changed(e), -ECHILD);
2156 _public_ int sd_event_set_watchdog(sd_event *e, int b) {
2159 assert_return(e, -EINVAL);
2160 assert_return(!event_pid_changed(e), -ECHILD);
2162 if (e->watchdog == !!b)
2166 struct epoll_event ev = {};
2169 env = getenv("WATCHDOG_USEC");
2173 r = safe_atou64(env, &e->watchdog_period);
2176 if (e->watchdog_period <= 0)
2179 /* Issue first ping immediately */
2180 sd_notify(false, "WATCHDOG=1");
2181 e->watchdog_last = now(CLOCK_MONOTONIC);
2183 e->watchdog_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC);
2184 if (e->watchdog_fd < 0)
2187 r = arm_watchdog(e);
2191 ev.events = EPOLLIN;
2192 ev.data.ptr = INT_TO_PTR(SOURCE_WATCHDOG);
2194 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->watchdog_fd, &ev);
2201 if (e->watchdog_fd >= 0) {
2202 epoll_ctl(e->epoll_fd, EPOLL_CTL_DEL, e->watchdog_fd, NULL);
2203 close_nointr_nofail(e->watchdog_fd);
2204 e->watchdog_fd = -1;
2212 close_nointr_nofail(e->watchdog_fd);
2213 e->watchdog_fd = -1;
2217 _public_ int sd_event_get_watchdog(sd_event *e) {
2218 assert_return(e, -EINVAL);
2219 assert_return(!event_pid_changed(e), -ECHILD);