1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
30 #include "time-util.h"
35 #define EPOLL_QUEUE_MAX 64
36 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
38 typedef enum EventSourceType {
48 struct sd_event_source {
53 sd_prepare_handler_t prepare;
55 EventSourceType type:4;
60 unsigned pending_index;
61 unsigned prepare_index;
62 unsigned pending_iteration;
63 unsigned prepare_iteration;
67 sd_io_handler_t callback;
74 sd_time_handler_t callback;
75 usec_t next, accuracy;
76 unsigned earliest_index;
77 unsigned latest_index;
80 sd_signal_handler_t callback;
81 struct signalfd_siginfo siginfo;
85 sd_child_handler_t callback;
91 sd_defer_handler_t callback;
94 sd_quit_handler_t callback;
111 /* For both clocks we maintain two priority queues each, one
112 * ordered for the earliest times the events may be
113 * dispatched, and one ordered by the latest times they must
114 * have been dispatched. The range between the top entries in
115 * the two prioqs is the time window we can freely schedule
117 Prioq *monotonic_earliest;
118 Prioq *monotonic_latest;
119 Prioq *realtime_earliest;
120 Prioq *realtime_latest;
122 usec_t realtime_next, monotonic_next;
126 sd_event_source **signal_sources;
128 Hashmap *child_sources;
129 unsigned n_enabled_child_sources;
138 bool quit_requested:1;
139 bool need_process_child:1;
142 static int pending_prioq_compare(const void *a, const void *b) {
143 const sd_event_source *x = a, *y = b;
148 /* Enabled ones first */
149 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
151 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
154 /* Lower priority values first */
155 if (x->priority < y->priority)
157 if (x->priority > y->priority)
160 /* Older entries first */
161 if (x->pending_iteration < y->pending_iteration)
163 if (x->pending_iteration > y->pending_iteration)
166 /* Stability for the rest */
175 static int prepare_prioq_compare(const void *a, const void *b) {
176 const sd_event_source *x = a, *y = b;
181 /* Move most recently prepared ones last, so that we can stop
182 * preparing as soon as we hit one that has already been
183 * prepared in the current iteration */
184 if (x->prepare_iteration < y->prepare_iteration)
186 if (x->prepare_iteration > y->prepare_iteration)
189 /* Enabled ones first */
190 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
192 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
195 /* Lower priority values first */
196 if (x->priority < y->priority)
198 if (x->priority > y->priority)
201 /* Stability for the rest */
210 static int earliest_time_prioq_compare(const void *a, const void *b) {
211 const sd_event_source *x = a, *y = b;
213 assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
214 assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
216 /* Enabled ones first */
217 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
219 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
222 /* Move the pending ones to the end */
223 if (!x->pending && y->pending)
225 if (x->pending && !y->pending)
229 if (x->time.next < y->time.next)
231 if (x->time.next > y->time.next)
234 /* Stability for the rest */
243 static int latest_time_prioq_compare(const void *a, const void *b) {
244 const sd_event_source *x = a, *y = b;
246 assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
247 (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
249 /* Enabled ones first */
250 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
252 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
255 /* Move the pending ones to the end */
256 if (!x->pending && y->pending)
258 if (x->pending && !y->pending)
262 if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
264 if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
267 /* Stability for the rest */
276 static int quit_prioq_compare(const void *a, const void *b) {
277 const sd_event_source *x = a, *y = b;
279 assert(x->type == SOURCE_QUIT);
280 assert(y->type == SOURCE_QUIT);
282 /* Enabled ones first */
283 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
285 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
288 /* Lower priority values first */
289 if (x->priority < y->priority)
291 if (x->priority > y->priority)
294 /* Stability for the rest */
303 static void event_free(sd_event *e) {
306 if (e->epoll_fd >= 0)
307 close_nointr_nofail(e->epoll_fd);
309 if (e->signal_fd >= 0)
310 close_nointr_nofail(e->signal_fd);
312 if (e->realtime_fd >= 0)
313 close_nointr_nofail(e->realtime_fd);
315 if (e->monotonic_fd >= 0)
316 close_nointr_nofail(e->monotonic_fd);
318 prioq_free(e->pending);
319 prioq_free(e->prepare);
320 prioq_free(e->monotonic_earliest);
321 prioq_free(e->monotonic_latest);
322 prioq_free(e->realtime_earliest);
323 prioq_free(e->realtime_latest);
326 free(e->signal_sources);
328 hashmap_free(e->child_sources);
332 int sd_event_new(sd_event** ret) {
339 e = new0(sd_event, 1);
344 e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
345 e->realtime_next = e->monotonic_next = (usec_t) -1;
346 e->original_pid = getpid();
348 assert_se(sigemptyset(&e->sigset) == 0);
350 e->pending = prioq_new(pending_prioq_compare);
356 e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
357 if (e->epoll_fd < 0) {
370 sd_event* sd_event_ref(sd_event *e) {
374 assert(e->n_ref >= 1);
380 sd_event* sd_event_unref(sd_event *e) {
384 assert(e->n_ref >= 1);
393 static bool event_pid_changed(sd_event *e) {
396 /* We don't support people creating am event loop and keeping
397 * it around over a fork(). Let's complain. */
399 return e->original_pid != getpid();
402 static int source_io_unregister(sd_event_source *s) {
406 assert(s->type == SOURCE_IO);
408 if (!s->io.registered)
411 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
415 s->io.registered = false;
419 static int source_io_register(sd_event_source *s, int enabled, uint32_t events) {
420 struct epoll_event ev = {};
424 assert(s->type == SOURCE_IO);
425 assert(enabled != SD_EVENT_OFF);
430 if (enabled == SD_EVENT_ONESHOT)
431 ev.events |= EPOLLONESHOT;
433 if (s->io.registered)
434 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
436 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
441 s->io.registered = true;
446 static void source_free(sd_event_source *s) {
454 source_io_unregister(s);
458 case SOURCE_MONOTONIC:
459 prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
460 prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
463 case SOURCE_REALTIME:
464 prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
465 prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
469 if (s->signal.sig > 0) {
470 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)
471 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
473 if (s->event->signal_sources)
474 s->event->signal_sources[s->signal.sig] = NULL;
480 if (s->child.pid > 0) {
481 if (s->enabled != SD_EVENT_OFF) {
482 assert(s->event->n_enabled_child_sources > 0);
483 s->event->n_enabled_child_sources--;
486 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
487 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
489 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
495 prioq_remove(s->event->quit, s, &s->quit.prioq_index);
500 prioq_remove(s->event->pending, s, &s->pending_index);
503 prioq_remove(s->event->prepare, s, &s->prepare_index);
505 sd_event_unref(s->event);
511 static int source_set_pending(sd_event_source *s, bool b) {
515 assert(s->type != SOURCE_QUIT);
523 s->pending_iteration = s->event->iteration;
525 r = prioq_put(s->event->pending, s, &s->pending_index);
531 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
536 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
541 s = new0(sd_event_source, 1);
546 s->event = sd_event_ref(e);
548 s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
557 sd_io_handler_t callback,
559 sd_event_source **ret) {
568 if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP))
574 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
575 if (event_pid_changed(e))
578 s = source_new(e, SOURCE_IO);
583 s->io.events = events;
584 s->io.callback = callback;
585 s->userdata = userdata;
586 s->enabled = SD_EVENT_ON;
588 r = source_io_register(s, s->enabled, events);
598 static int event_setup_timer_fd(
600 EventSourceType type,
604 struct epoll_event ev = {};
611 if (_likely_(*timer_fd >= 0))
614 fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
619 ev.data.ptr = INT_TO_PTR(type);
621 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
623 close_nointr_nofail(fd);
627 /* When we sleep for longer, we try to realign the wakeup to
628 the same time wihtin each second, so that events all across
629 the system can be coalesced into a single CPU
630 wakeup. However, let's take some system-specific randomness
631 for this value, so that in a network of systems with synced
632 clocks timer events are distributed a bit. Here, we
633 calculate a perturbation usec offset from the boot ID. */
635 if (sd_id128_get_boot(&bootid) >= 0)
636 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_SEC;
642 static int event_add_time_internal(
644 EventSourceType type,
651 sd_time_handler_t callback,
653 sd_event_source **ret) {
664 if (usec == (uint64_t) -1)
666 if (accuracy == (uint64_t) -1)
668 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
669 if (event_pid_changed(e))
677 *earliest = prioq_new(earliest_time_prioq_compare);
683 *latest = prioq_new(latest_time_prioq_compare);
689 r = event_setup_timer_fd(e, type, timer_fd, id);
694 s = source_new(e, type);
699 s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
700 s->time.callback = callback;
701 s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
702 s->userdata = userdata;
703 s->enabled = SD_EVENT_ONESHOT;
705 r = prioq_put(*earliest, s, &s->time.earliest_index);
709 r = prioq_put(*latest, s, &s->time.latest_index);
721 int sd_event_add_monotonic(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
722 return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
725 int sd_event_add_realtime(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
726 return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
729 static int event_update_signal_fd(sd_event *e) {
730 struct epoll_event ev = {};
736 add_to_epoll = e->signal_fd < 0;
738 r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
748 ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
750 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
752 close_nointr_nofail(e->signal_fd);
761 int sd_event_add_signal(sd_event *e, int sig, sd_signal_handler_t callback, void *userdata, sd_event_source **ret) {
775 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
776 if (event_pid_changed(e))
779 if (!e->signal_sources) {
780 e->signal_sources = new0(sd_event_source*, _NSIG);
781 if (!e->signal_sources)
783 } else if (e->signal_sources[sig])
786 s = source_new(e, SOURCE_SIGNAL);
791 s->signal.callback = callback;
792 s->userdata = userdata;
793 s->enabled = SD_EVENT_ON;
795 e->signal_sources[sig] = s;
796 assert_se(sigaddset(&e->sigset, sig) == 0);
798 if (sig != SIGCHLD || e->n_enabled_child_sources == 0) {
799 r = event_update_signal_fd(e);
810 int sd_event_add_child(sd_event *e, pid_t pid, int options, sd_child_handler_t callback, void *userdata, sd_event_source **ret) {
818 if (options & ~(WEXITED|WSTOPPED|WCONTINUED))
824 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
825 if (event_pid_changed(e))
828 r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
832 if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
835 s = source_new(e, SOURCE_CHILD);
840 s->child.options = options;
841 s->child.callback = callback;
842 s->userdata = userdata;
843 s->enabled = SD_EVENT_ONESHOT;
845 r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
851 e->n_enabled_child_sources ++;
853 assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
855 if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
856 r = event_update_signal_fd(e);
863 e->need_process_child = true;
869 int sd_event_add_defer(sd_event *e, sd_defer_handler_t callback, void *userdata, sd_event_source **ret) {
877 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
878 if (event_pid_changed(e))
881 s = source_new(e, SOURCE_DEFER);
885 s->defer.callback = callback;
886 s->userdata = userdata;
887 s->enabled = SD_EVENT_ONESHOT;
889 r = source_set_pending(s, true);
899 int sd_event_add_quit(sd_event *e, sd_quit_handler_t callback, void *userdata, sd_event_source **ret) {
903 assert_return(e, -EINVAL);
904 assert_return(callback, -EINVAL);
905 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
906 assert_return(!event_pid_changed(e), -ECHILD);
909 e->quit = prioq_new(quit_prioq_compare);
914 s = source_new(e, SOURCE_QUIT);
918 s->quit.callback = callback;
919 s->userdata = userdata;
920 s->quit.prioq_index = PRIOQ_IDX_NULL;
921 s->enabled = SD_EVENT_ONESHOT;
923 r = prioq_put(s->event->quit, s, &s->quit.prioq_index);
933 sd_event_source* sd_event_source_ref(sd_event_source *s) {
934 assert_return(s, NULL);
936 assert(s->n_ref >= 1);
942 sd_event_source* sd_event_source_unref(sd_event_source *s) {
943 assert_return(s, NULL);
945 assert(s->n_ref >= 1);
954 sd_event *sd_event_get(sd_event_source *s) {
961 int sd_event_source_get_pending(sd_event_source *s) {
964 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
965 if (event_pid_changed(s->event))
971 int sd_event_source_get_io_fd(sd_event_source *s) {
974 if (s->type != SOURCE_IO)
976 if (event_pid_changed(s->event))
982 int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
985 if (s->type != SOURCE_IO)
989 if (event_pid_changed(s->event))
992 *events = s->io.events;
996 int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
1001 if (!s->type != SOURCE_IO)
1003 if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP))
1005 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1006 if (event_pid_changed(s->event))
1009 if (s->io.events == events)
1012 if (s->enabled != SD_EVENT_OFF) {
1013 r = source_io_register(s, s->io.events, events);
1018 s->io.events = events;
1023 int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1026 if (s->type != SOURCE_IO)
1032 if (event_pid_changed(s->event))
1035 *revents = s->io.revents;
1039 int sd_event_source_get_signal(sd_event_source *s) {
1042 if (s->type != SOURCE_SIGNAL)
1044 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1045 if (event_pid_changed(s->event))
1048 return s->signal.sig;
1051 int sd_event_source_get_priority(sd_event_source *s, int *priority) {
1054 if (event_pid_changed(s->event))
1060 int sd_event_source_set_priority(sd_event_source *s, int priority) {
1063 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1064 if (event_pid_changed(s->event))
1067 if (s->priority == priority)
1070 s->priority = priority;
1073 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1076 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1081 int sd_event_source_get_enabled(sd_event_source *s, int *m) {
1086 if (event_pid_changed(s->event))
1093 int sd_event_source_set_enabled(sd_event_source *s, int m) {
1098 if (m != SD_EVENT_OFF && m != SD_EVENT_ON && !SD_EVENT_ONESHOT)
1100 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1101 if (event_pid_changed(s->event))
1104 if (s->enabled == m)
1107 if (m == SD_EVENT_OFF) {
1112 r = source_io_unregister(s);
1119 case SOURCE_MONOTONIC:
1121 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1122 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1125 case SOURCE_REALTIME:
1127 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1128 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1133 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1134 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1135 event_update_signal_fd(s->event);
1143 assert(s->event->n_enabled_child_sources > 0);
1144 s->event->n_enabled_child_sources--;
1146 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1147 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1148 event_update_signal_fd(s->event);
1163 r = source_io_register(s, m, s->io.events);
1170 case SOURCE_MONOTONIC:
1172 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1173 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1176 case SOURCE_REALTIME:
1178 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1179 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1185 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1186 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1187 event_update_signal_fd(s->event);
1194 if (s->enabled == SD_EVENT_OFF) {
1195 s->event->n_enabled_child_sources++;
1197 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1198 assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1199 event_update_signal_fd(s->event);
1212 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1215 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1220 int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1225 if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
1227 if (event_pid_changed(s->event))
1230 *usec = s->time.next;
1234 int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1237 if (usec == (uint64_t) -1)
1239 if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
1241 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1242 if (event_pid_changed(s->event))
1245 if (s->time.next == usec)
1248 s->time.next = usec;
1250 if (s->type == SOURCE_REALTIME) {
1251 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1252 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1254 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1255 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1261 int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1264 if (s->type != SOURCE_MONOTONIC && s->type != SOURCE_REALTIME)
1266 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1267 if (event_pid_changed(s->event))
1271 usec = DEFAULT_ACCURACY_USEC;
1273 if (s->time.accuracy == usec)
1276 s->time.accuracy = usec;
1278 if (s->type == SOURCE_REALTIME)
1279 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1281 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1286 int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1291 if (s->type != SOURCE_MONOTONIC && s->type != SOURCE_REALTIME)
1293 if (event_pid_changed(s->event))
1296 *usec = s->time.accuracy;
1300 int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback) {
1303 assert_return(s, -EINVAL);
1304 assert_return(s->type != SOURCE_QUIT, -EDOM);
1305 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1306 assert_return(!event_pid_changed(s->event), -ECHILD);
1308 if (s->prepare == callback)
1311 if (callback && s->prepare) {
1312 s->prepare = callback;
1316 r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1320 s->prepare = callback;
1323 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1327 prioq_remove(s->event->prepare, s, &s->prepare_index);
1332 void* sd_event_source_get_userdata(sd_event_source *s) {
1333 assert_return(s, NULL);
1338 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1350 Find a good time to wake up again between times a and b. We
1351 have two goals here:
1353 a) We want to wake up as seldom as possible, hence prefer
1354 later times over earlier times.
1356 b) But if we have to wake up, then let's make sure to
1357 dispatch as much as possible on the entire system.
1359 We implement this by waking up everywhere at the same time
1360 within any given second if we can, synchronised via the
1361 perturbation value determined from the boot ID. If we can't,
1362 then we try to find the same spot in every a 250ms
1363 step. Otherwise, we pick the last possible time to wake up.
1366 c = (b / USEC_PER_SEC) * USEC_PER_SEC + e->perturb;
1368 if (_unlikely_(c < USEC_PER_SEC))
1377 c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1379 if (_unlikely_(c < USEC_PER_MSEC*250))
1382 c -= USEC_PER_MSEC*250;
1391 static int event_arm_timer(
1398 struct itimerspec its = {};
1399 sd_event_source *a, *b;
1406 a = prioq_peek(earliest);
1407 if (!a || a->enabled == SD_EVENT_OFF)
1410 b = prioq_peek(latest);
1411 assert_se(b && b->enabled != SD_EVENT_OFF);
1413 t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1417 assert_se(timer_fd >= 0);
1420 /* We don' want to disarm here, just mean some time looooong ago. */
1421 its.it_value.tv_sec = 0;
1422 its.it_value.tv_nsec = 1;
1424 timespec_store(&its.it_value, t);
1426 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1434 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1437 assert(s->type == SOURCE_IO);
1439 s->io.revents = events;
1442 If this is a oneshot event source, then we added it to the
1443 epoll with EPOLLONESHOT, hence we know it's not registered
1444 anymore. We can save a syscall here...
1447 if (s->enabled == SD_EVENT_ONESHOT)
1448 s->io.registered = false;
1450 return source_set_pending(s, true);
1453 static int flush_timer(sd_event *e, int fd, uint32_t events) {
1460 if (events != EPOLLIN)
1463 ss = read(fd, &x, sizeof(x));
1465 if (errno == EAGAIN || errno == EINTR)
1471 if (ss != sizeof(x))
1477 static int process_timer(sd_event *e, usec_t n, Prioq *earliest, Prioq *latest) {
1484 s = prioq_peek(earliest);
1487 s->enabled == SD_EVENT_OFF ||
1491 r = source_set_pending(s, true);
1495 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1496 prioq_reshuffle(latest, s, &s->time.latest_index);
1502 static int process_child(sd_event *e) {
1509 e->need_process_child = false;
1512 So, this is ugly. We iteratively invoke waitid() with P_PID
1513 + WNOHANG for each PID we wait for, instead of using
1514 P_ALL. This is because we only want to get child
1515 information of very specific child processes, and not all
1516 of them. We might not have processed the SIGCHLD even of a
1517 previous invocation and we don't want to maintain a
1518 unbounded *per-child* event queue, hence we really don't
1519 want anything flushed out of the kernel's queue that we
1520 don't care about. Since this is O(n) this means that if you
1521 have a lot of processes you probably want to handle SIGCHLD
1525 HASHMAP_FOREACH(s, e->child_sources, i) {
1526 assert(s->type == SOURCE_CHILD);
1531 if (s->enabled == SD_EVENT_OFF)
1534 zero(s->child.siginfo);
1535 r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options);
1539 if (s->child.siginfo.si_pid != 0) {
1540 r = source_set_pending(s, true);
1549 static int process_signal(sd_event *e, uint32_t events) {
1550 struct signalfd_siginfo si;
1551 bool read_one = false;
1557 if (events != EPOLLIN)
1563 ss = read(e->signal_fd, &si, sizeof(si));
1565 if (errno == EAGAIN || errno == EINTR)
1571 if (ss != sizeof(si))
1576 if (si.ssi_signo == SIGCHLD) {
1577 r = process_child(e);
1580 if (r > 0 || !e->signal_sources[si.ssi_signo])
1583 s = e->signal_sources[si.ssi_signo];
1588 s->signal.siginfo = si;
1589 r = source_set_pending(s, true);
1598 static int source_dispatch(sd_event_source *s) {
1602 assert(s->pending || s->type == SOURCE_QUIT);
1604 if (s->type != SOURCE_DEFER && s->type != SOURCE_QUIT) {
1605 r = source_set_pending(s, false);
1610 if (s->enabled == SD_EVENT_ONESHOT) {
1611 r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
1619 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1622 case SOURCE_MONOTONIC:
1623 r = s->time.callback(s, s->time.next, s->userdata);
1626 case SOURCE_REALTIME:
1627 r = s->time.callback(s, s->time.next, s->userdata);
1631 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1635 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1639 r = s->defer.callback(s, s->userdata);
1643 r = s->quit.callback(s, s->userdata);
1650 static int event_prepare(sd_event *e) {
1658 s = prioq_peek(e->prepare);
1659 if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
1662 s->prepare_iteration = e->iteration;
1663 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1668 r = s->prepare(s, s->userdata);
1677 static int dispatch_quit(sd_event *e) {
1683 p = prioq_peek(e->quit);
1684 if (!p || p->enabled == SD_EVENT_OFF) {
1685 e->state = SD_EVENT_FINISHED;
1691 e->state = SD_EVENT_QUITTING;
1693 r = source_dispatch(p);
1695 e->state = SD_EVENT_PASSIVE;
1701 static sd_event_source* event_next_pending(sd_event *e) {
1706 p = prioq_peek(e->pending);
1710 if (p->enabled == SD_EVENT_OFF)
1716 int sd_event_run(sd_event *e, uint64_t timeout) {
1717 struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1722 assert_return(e, -EINVAL);
1723 assert_return(!event_pid_changed(e), -ECHILD);
1724 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1725 assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1727 if (e->quit_requested)
1728 return dispatch_quit(e);
1732 e->state = SD_EVENT_RUNNING;
1734 r = event_prepare(e);
1738 if (event_next_pending(e) || e->need_process_child)
1742 r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1746 r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1751 m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1752 timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1758 dual_timestamp_get(&n);
1760 for (i = 0; i < m; i++) {
1762 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1763 r = flush_timer(e, e->monotonic_fd, ev_queue[i].events);
1764 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1765 r = flush_timer(e, e->realtime_fd, ev_queue[i].events);
1766 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1767 r = process_signal(e, ev_queue[i].events);
1769 r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1775 r = process_timer(e, n.monotonic, e->monotonic_earliest, e->monotonic_latest);
1779 r = process_timer(e, n.realtime, e->realtime_earliest, e->realtime_latest);
1783 if (e->need_process_child) {
1784 r = process_child(e);
1789 p = event_next_pending(e);
1795 r = source_dispatch(p);
1798 e->state = SD_EVENT_PASSIVE;
1804 int sd_event_loop(sd_event *e) {
1807 assert_return(e, -EINVAL);
1808 assert_return(!event_pid_changed(e), -ECHILD);
1809 assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1813 while (e->state != SD_EVENT_FINISHED) {
1814 r = sd_event_run(e, (uint64_t) -1);
1826 int sd_event_get_state(sd_event *e) {
1827 assert_return(e, -EINVAL);
1828 assert_return(!event_pid_changed(e), -ECHILD);
1833 int sd_event_get_quit(sd_event *e) {
1834 assert_return(e, -EINVAL);
1835 assert_return(!event_pid_changed(e), -ECHILD);
1837 return e->quit_requested;
1840 int sd_event_request_quit(sd_event *e) {
1841 assert_return(e, -EINVAL);
1842 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1843 assert_return(!event_pid_changed(e), -ECHILD);
1845 e->quit_requested = true;