1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
31 #include "time-util.h"
36 #define EPOLL_QUEUE_MAX 64
37 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
39 typedef enum EventSourceType {
49 struct sd_event_source {
54 sd_event_handler_t prepare;
56 EventSourceType type:4;
61 unsigned pending_index;
62 unsigned prepare_index;
63 unsigned pending_iteration;
64 unsigned prepare_iteration;
68 sd_event_io_handler_t callback;
75 sd_event_time_handler_t callback;
76 usec_t next, accuracy;
77 unsigned earliest_index;
78 unsigned latest_index;
81 sd_event_signal_handler_t callback;
82 struct signalfd_siginfo siginfo;
86 sd_event_child_handler_t callback;
92 sd_event_handler_t callback;
95 sd_event_handler_t callback;
112 /* For both clocks we maintain two priority queues each, one
113 * ordered for the earliest times the events may be
114 * dispatched, and one ordered by the latest times they must
115 * have been dispatched. The range between the top entries in
116 * the two prioqs is the time window we can freely schedule
118 Prioq *monotonic_earliest;
119 Prioq *monotonic_latest;
120 Prioq *realtime_earliest;
121 Prioq *realtime_latest;
123 usec_t realtime_next, monotonic_next;
127 sd_event_source **signal_sources;
129 Hashmap *child_sources;
130 unsigned n_enabled_child_sources;
137 dual_timestamp timestamp;
140 bool quit_requested:1;
141 bool need_process_child:1;
144 sd_event **default_event_ptr;
147 static int pending_prioq_compare(const void *a, const void *b) {
148 const sd_event_source *x = a, *y = b;
153 /* Enabled ones first */
154 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
156 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
159 /* Lower priority values first */
160 if (x->priority < y->priority)
162 if (x->priority > y->priority)
165 /* Older entries first */
166 if (x->pending_iteration < y->pending_iteration)
168 if (x->pending_iteration > y->pending_iteration)
171 /* Stability for the rest */
180 static int prepare_prioq_compare(const void *a, const void *b) {
181 const sd_event_source *x = a, *y = b;
186 /* Move most recently prepared ones last, so that we can stop
187 * preparing as soon as we hit one that has already been
188 * prepared in the current iteration */
189 if (x->prepare_iteration < y->prepare_iteration)
191 if (x->prepare_iteration > y->prepare_iteration)
194 /* Enabled ones first */
195 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
197 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
200 /* Lower priority values first */
201 if (x->priority < y->priority)
203 if (x->priority > y->priority)
206 /* Stability for the rest */
215 static int earliest_time_prioq_compare(const void *a, const void *b) {
216 const sd_event_source *x = a, *y = b;
218 assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
219 assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
221 /* Enabled ones first */
222 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
224 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
227 /* Move the pending ones to the end */
228 if (!x->pending && y->pending)
230 if (x->pending && !y->pending)
234 if (x->time.next < y->time.next)
236 if (x->time.next > y->time.next)
239 /* Stability for the rest */
248 static int latest_time_prioq_compare(const void *a, const void *b) {
249 const sd_event_source *x = a, *y = b;
251 assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
252 (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
254 /* Enabled ones first */
255 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
257 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
260 /* Move the pending ones to the end */
261 if (!x->pending && y->pending)
263 if (x->pending && !y->pending)
267 if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
269 if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
272 /* Stability for the rest */
281 static int quit_prioq_compare(const void *a, const void *b) {
282 const sd_event_source *x = a, *y = b;
284 assert(x->type == SOURCE_QUIT);
285 assert(y->type == SOURCE_QUIT);
287 /* Enabled ones first */
288 if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
290 if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
293 /* Lower priority values first */
294 if (x->priority < y->priority)
296 if (x->priority > y->priority)
299 /* Stability for the rest */
308 static void event_free(sd_event *e) {
311 if (e->default_event_ptr)
312 *(e->default_event_ptr) = NULL;
314 if (e->epoll_fd >= 0)
315 close_nointr_nofail(e->epoll_fd);
317 if (e->signal_fd >= 0)
318 close_nointr_nofail(e->signal_fd);
320 if (e->realtime_fd >= 0)
321 close_nointr_nofail(e->realtime_fd);
323 if (e->monotonic_fd >= 0)
324 close_nointr_nofail(e->monotonic_fd);
326 prioq_free(e->pending);
327 prioq_free(e->prepare);
328 prioq_free(e->monotonic_earliest);
329 prioq_free(e->monotonic_latest);
330 prioq_free(e->realtime_earliest);
331 prioq_free(e->realtime_latest);
334 free(e->signal_sources);
336 hashmap_free(e->child_sources);
340 _public_ int sd_event_new(sd_event** ret) {
344 assert_return(ret, -EINVAL);
346 e = new0(sd_event, 1);
351 e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
352 e->realtime_next = e->monotonic_next = (usec_t) -1;
353 e->original_pid = getpid();
355 assert_se(sigemptyset(&e->sigset) == 0);
357 e->pending = prioq_new(pending_prioq_compare);
363 e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
364 if (e->epoll_fd < 0) {
377 _public_ sd_event* sd_event_ref(sd_event *e) {
378 assert_return(e, NULL);
380 assert(e->n_ref >= 1);
386 _public_ sd_event* sd_event_unref(sd_event *e) {
387 assert_return(e, NULL);
389 assert(e->n_ref >= 1);
398 static bool event_pid_changed(sd_event *e) {
401 /* We don't support people creating am event loop and keeping
402 * it around over a fork(). Let's complain. */
404 return e->original_pid != getpid();
407 static int source_io_unregister(sd_event_source *s) {
411 assert(s->type == SOURCE_IO);
413 if (!s->io.registered)
416 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
420 s->io.registered = false;
424 static int source_io_register(
429 struct epoll_event ev = {};
433 assert(s->type == SOURCE_IO);
434 assert(enabled != SD_EVENT_OFF);
439 if (enabled == SD_EVENT_ONESHOT)
440 ev.events |= EPOLLONESHOT;
442 if (s->io.registered)
443 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
445 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
450 s->io.registered = true;
455 static void source_free(sd_event_source *s) {
463 source_io_unregister(s);
467 case SOURCE_MONOTONIC:
468 prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
469 prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
472 case SOURCE_REALTIME:
473 prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
474 prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
478 if (s->signal.sig > 0) {
479 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)
480 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
482 if (s->event->signal_sources)
483 s->event->signal_sources[s->signal.sig] = NULL;
489 if (s->child.pid > 0) {
490 if (s->enabled != SD_EVENT_OFF) {
491 assert(s->event->n_enabled_child_sources > 0);
492 s->event->n_enabled_child_sources--;
495 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
496 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
498 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
508 prioq_remove(s->event->quit, s, &s->quit.prioq_index);
513 prioq_remove(s->event->pending, s, &s->pending_index);
516 prioq_remove(s->event->prepare, s, &s->prepare_index);
518 sd_event_unref(s->event);
524 static int source_set_pending(sd_event_source *s, bool b) {
528 assert(s->type != SOURCE_QUIT);
536 s->pending_iteration = s->event->iteration;
538 r = prioq_put(s->event->pending, s, &s->pending_index);
544 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
549 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
554 s = new0(sd_event_source, 1);
559 s->event = sd_event_ref(e);
561 s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
566 _public_ int sd_event_add_io(
570 sd_event_io_handler_t callback,
572 sd_event_source **ret) {
577 assert_return(e, -EINVAL);
578 assert_return(fd >= 0, -EINVAL);
579 assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
580 assert_return(callback, -EINVAL);
581 assert_return(ret, -EINVAL);
582 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
583 assert_return(!event_pid_changed(e), -ECHILD);
585 s = source_new(e, SOURCE_IO);
590 s->io.events = events;
591 s->io.callback = callback;
592 s->userdata = userdata;
593 s->enabled = SD_EVENT_ON;
595 r = source_io_register(s, s->enabled, events);
605 static int event_setup_timer_fd(
607 EventSourceType type,
611 struct epoll_event ev = {};
618 if (_likely_(*timer_fd >= 0))
621 fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
626 ev.data.ptr = INT_TO_PTR(type);
628 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
630 close_nointr_nofail(fd);
634 /* When we sleep for longer, we try to realign the wakeup to
635 the same time wihtin each second, so that events all across
636 the system can be coalesced into a single CPU
637 wakeup. However, let's take some system-specific randomness
638 for this value, so that in a network of systems with synced
639 clocks timer events are distributed a bit. Here, we
640 calculate a perturbation usec offset from the boot ID. */
642 if (sd_id128_get_boot(&bootid) >= 0)
643 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_SEC;
649 static int event_add_time_internal(
651 EventSourceType type,
658 sd_event_time_handler_t callback,
660 sd_event_source **ret) {
665 assert_return(e, -EINVAL);
666 assert_return(callback, -EINVAL);
667 assert_return(ret, -EINVAL);
668 assert_return(usec != (uint64_t) -1, -EINVAL);
669 assert_return(accuracy != (uint64_t) -1, -EINVAL);
670 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
671 assert_return(!event_pid_changed(e), -ECHILD);
678 *earliest = prioq_new(earliest_time_prioq_compare);
684 *latest = prioq_new(latest_time_prioq_compare);
690 r = event_setup_timer_fd(e, type, timer_fd, id);
695 s = source_new(e, type);
700 s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
701 s->time.callback = callback;
702 s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
703 s->userdata = userdata;
704 s->enabled = SD_EVENT_ONESHOT;
706 r = prioq_put(*earliest, s, &s->time.earliest_index);
710 r = prioq_put(*latest, s, &s->time.latest_index);
722 _public_ int sd_event_add_monotonic(sd_event *e,
725 sd_event_time_handler_t callback,
727 sd_event_source **ret) {
729 return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
732 _public_ int sd_event_add_realtime(sd_event *e,
735 sd_event_time_handler_t callback,
737 sd_event_source **ret) {
739 return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
742 static int event_update_signal_fd(sd_event *e) {
743 struct epoll_event ev = {};
749 add_to_epoll = e->signal_fd < 0;
751 r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
761 ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
763 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
765 close_nointr_nofail(e->signal_fd);
774 _public_ int sd_event_add_signal(
777 sd_event_signal_handler_t callback,
779 sd_event_source **ret) {
784 assert_return(e, -EINVAL);
785 assert_return(sig > 0, -EINVAL);
786 assert_return(sig < _NSIG, -EINVAL);
787 assert_return(callback, -EINVAL);
788 assert_return(ret, -EINVAL);
789 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
790 assert_return(!event_pid_changed(e), -ECHILD);
792 if (!e->signal_sources) {
793 e->signal_sources = new0(sd_event_source*, _NSIG);
794 if (!e->signal_sources)
796 } else if (e->signal_sources[sig])
799 s = source_new(e, SOURCE_SIGNAL);
804 s->signal.callback = callback;
805 s->userdata = userdata;
806 s->enabled = SD_EVENT_ON;
808 e->signal_sources[sig] = s;
809 assert_se(sigaddset(&e->sigset, sig) == 0);
811 if (sig != SIGCHLD || e->n_enabled_child_sources == 0) {
812 r = event_update_signal_fd(e);
823 _public_ int sd_event_add_child(
827 sd_event_child_handler_t callback,
829 sd_event_source **ret) {
834 assert_return(e, -EINVAL);
835 assert_return(pid > 1, -EINVAL);
836 assert_return(!(options & ~(WEXITED|WSTOPPED|WCONTINUED)), -EINVAL);
837 assert_return(options != 0, -EINVAL);
838 assert_return(callback, -EINVAL);
839 assert_return(ret, -EINVAL);
840 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
841 assert_return(!event_pid_changed(e), -ECHILD);
843 r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
847 if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
850 s = source_new(e, SOURCE_CHILD);
855 s->child.options = options;
856 s->child.callback = callback;
857 s->userdata = userdata;
858 s->enabled = SD_EVENT_ONESHOT;
860 r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
866 e->n_enabled_child_sources ++;
868 assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
870 if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
871 r = event_update_signal_fd(e);
878 e->need_process_child = true;
884 _public_ int sd_event_add_defer(
886 sd_event_handler_t callback,
888 sd_event_source **ret) {
893 assert_return(e, -EINVAL);
894 assert_return(callback, -EINVAL);
895 assert_return(ret, -EINVAL);
896 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
897 assert_return(!event_pid_changed(e), -ECHILD);
899 s = source_new(e, SOURCE_DEFER);
903 s->defer.callback = callback;
904 s->userdata = userdata;
905 s->enabled = SD_EVENT_ONESHOT;
907 r = source_set_pending(s, true);
917 _public_ int sd_event_add_quit(
919 sd_event_handler_t callback,
921 sd_event_source **ret) {
926 assert_return(e, -EINVAL);
927 assert_return(callback, -EINVAL);
928 assert_return(ret, -EINVAL);
929 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
930 assert_return(!event_pid_changed(e), -ECHILD);
933 e->quit = prioq_new(quit_prioq_compare);
938 s = source_new(e, SOURCE_QUIT);
942 s->quit.callback = callback;
943 s->userdata = userdata;
944 s->quit.prioq_index = PRIOQ_IDX_NULL;
945 s->enabled = SD_EVENT_ONESHOT;
947 r = prioq_put(s->event->quit, s, &s->quit.prioq_index);
957 _public_ sd_event_source* sd_event_source_ref(sd_event_source *s) {
958 assert_return(s, NULL);
960 assert(s->n_ref >= 1);
966 _public_ sd_event_source* sd_event_source_unref(sd_event_source *s) {
967 assert_return(s, NULL);
969 assert(s->n_ref >= 1);
978 _public_ sd_event *sd_event_get(sd_event_source *s) {
979 assert_return(s, NULL);
984 _public_ int sd_event_source_get_pending(sd_event_source *s) {
985 assert_return(s, -EINVAL);
986 assert_return(s->type != SOURCE_QUIT, -EDOM);
987 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
988 assert_return(!event_pid_changed(s->event), -ECHILD);
993 _public_ int sd_event_source_get_io_fd(sd_event_source *s) {
994 assert_return(s, -EINVAL);
995 assert_return(s->type == SOURCE_IO, -EDOM);
996 assert_return(!event_pid_changed(s->event), -ECHILD);
1001 _public_ int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
1002 assert_return(s, -EINVAL);
1003 assert_return(events, -EINVAL);
1004 assert_return(s->type == SOURCE_IO, -EDOM);
1005 assert_return(!event_pid_changed(s->event), -ECHILD);
1007 *events = s->io.events;
1011 _public_ int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
1014 assert_return(s, -EINVAL);
1015 assert_return(s->type == SOURCE_IO, -EDOM);
1016 assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
1017 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1018 assert_return(!event_pid_changed(s->event), -ECHILD);
1020 if (s->io.events == events)
1023 if (s->enabled != SD_EVENT_OFF) {
1024 r = source_io_register(s, s->enabled, events);
1029 s->io.events = events;
1034 _public_ int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1035 assert_return(s, -EINVAL);
1036 assert_return(revents, -EINVAL);
1037 assert_return(s->type == SOURCE_IO, -EDOM);
1038 assert_return(s->pending, -ENODATA);
1039 assert_return(!event_pid_changed(s->event), -ECHILD);
1041 *revents = s->io.revents;
1045 _public_ int sd_event_source_get_signal(sd_event_source *s) {
1046 assert_return(s, -EINVAL);
1047 assert_return(s->type == SOURCE_SIGNAL, -EDOM);
1048 assert_return(!event_pid_changed(s->event), -ECHILD);
1050 return s->signal.sig;
1053 _public_ int sd_event_source_get_priority(sd_event_source *s, int *priority) {
1054 assert_return(s, -EINVAL);
1055 assert_return(!event_pid_changed(s->event), -ECHILD);
1060 _public_ int sd_event_source_set_priority(sd_event_source *s, int priority) {
1061 assert_return(s, -EINVAL);
1062 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1063 assert_return(!event_pid_changed(s->event), -ECHILD);
1065 if (s->priority == priority)
1068 s->priority = priority;
1071 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1074 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1076 if (s->type == SOURCE_QUIT)
1077 prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1082 _public_ int sd_event_source_get_enabled(sd_event_source *s, int *m) {
1083 assert_return(s, -EINVAL);
1084 assert_return(m, -EINVAL);
1085 assert_return(!event_pid_changed(s->event), -ECHILD);
1091 _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
1094 assert_return(s, -EINVAL);
1095 assert_return(m == SD_EVENT_OFF || m == SD_EVENT_ON || m == SD_EVENT_ONESHOT, -EINVAL);
1096 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1097 assert_return(!event_pid_changed(s->event), -ECHILD);
1099 if (s->enabled == m)
1102 if (m == SD_EVENT_OFF) {
1107 r = source_io_unregister(s);
1114 case SOURCE_MONOTONIC:
1116 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1117 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1120 case SOURCE_REALTIME:
1122 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1123 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1128 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1129 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1130 event_update_signal_fd(s->event);
1138 assert(s->event->n_enabled_child_sources > 0);
1139 s->event->n_enabled_child_sources--;
1141 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1142 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1143 event_update_signal_fd(s->event);
1150 prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1162 r = source_io_register(s, m, s->io.events);
1169 case SOURCE_MONOTONIC:
1171 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1172 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1175 case SOURCE_REALTIME:
1177 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1178 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1184 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1185 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1186 event_update_signal_fd(s->event);
1193 if (s->enabled == SD_EVENT_OFF) {
1194 s->event->n_enabled_child_sources++;
1196 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1197 assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1198 event_update_signal_fd(s->event);
1205 prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1215 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1218 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1223 _public_ int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1224 assert_return(s, -EINVAL);
1225 assert_return(usec, -EINVAL);
1226 assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1227 assert_return(!event_pid_changed(s->event), -ECHILD);
1229 *usec = s->time.next;
1233 _public_ int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1234 assert_return(s, -EINVAL);
1235 assert_return(usec != (uint64_t) -1, -EINVAL);
1236 assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1237 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1238 assert_return(!event_pid_changed(s->event), -ECHILD);
1240 if (s->time.next == usec)
1243 s->time.next = usec;
1244 source_set_pending(s, false);
1246 if (s->type == SOURCE_REALTIME) {
1247 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1248 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1250 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1251 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1257 _public_ int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1258 assert_return(s, -EINVAL);
1259 assert_return(usec, -EINVAL);
1260 assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1261 assert_return(!event_pid_changed(s->event), -ECHILD);
1263 *usec = s->time.accuracy;
1267 _public_ int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1268 assert_return(s, -EINVAL);
1269 assert_return(usec != (uint64_t) -1, -EINVAL);
1270 assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1271 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1272 assert_return(!event_pid_changed(s->event), -ECHILD);
1275 usec = DEFAULT_ACCURACY_USEC;
1277 if (s->time.accuracy == usec)
1280 s->time.accuracy = usec;
1282 if (s->type == SOURCE_REALTIME)
1283 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1285 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1290 _public_ int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid) {
1291 assert_return(s, -EINVAL);
1292 assert_return(pid, -EINVAL);
1293 assert_return(s->type == SOURCE_CHILD, -EDOM);
1294 assert_return(!event_pid_changed(s->event), -ECHILD);
1296 *pid = s->child.pid;
1300 _public_ int sd_event_source_set_prepare(sd_event_source *s, sd_event_handler_t callback) {
1303 assert_return(s, -EINVAL);
1304 assert_return(s->type != SOURCE_QUIT, -EDOM);
1305 assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1306 assert_return(!event_pid_changed(s->event), -ECHILD);
1308 if (s->prepare == callback)
1311 if (callback && s->prepare) {
1312 s->prepare = callback;
1316 r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1320 s->prepare = callback;
1323 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1327 prioq_remove(s->event->prepare, s, &s->prepare_index);
1332 _public_ void* sd_event_source_get_userdata(sd_event_source *s) {
1333 assert_return(s, NULL);
1338 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1350 Find a good time to wake up again between times a and b. We
1351 have two goals here:
1353 a) We want to wake up as seldom as possible, hence prefer
1354 later times over earlier times.
1356 b) But if we have to wake up, then let's make sure to
1357 dispatch as much as possible on the entire system.
1359 We implement this by waking up everywhere at the same time
1360 within any given second if we can, synchronised via the
1361 perturbation value determined from the boot ID. If we can't,
1362 then we try to find the same spot in every a 250ms
1363 step. Otherwise, we pick the last possible time to wake up.
1366 c = (b / USEC_PER_SEC) * USEC_PER_SEC + e->perturb;
1368 if (_unlikely_(c < USEC_PER_SEC))
1377 c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1379 if (_unlikely_(c < USEC_PER_MSEC*250))
1382 c -= USEC_PER_MSEC*250;
1391 static int event_arm_timer(
1398 struct itimerspec its = {};
1399 sd_event_source *a, *b;
1406 a = prioq_peek(earliest);
1407 if (!a || a->enabled == SD_EVENT_OFF) {
1409 if (*next == (usec_t) -1)
1413 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1417 *next = (usec_t) -1;
1422 b = prioq_peek(latest);
1423 assert_se(b && b->enabled != SD_EVENT_OFF);
1425 t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1429 assert_se(timer_fd >= 0);
1432 /* We don' want to disarm here, just mean some time looooong ago. */
1433 its.it_value.tv_sec = 0;
1434 its.it_value.tv_nsec = 1;
1436 timespec_store(&its.it_value, t);
1438 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1446 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1449 assert(s->type == SOURCE_IO);
1451 s->io.revents = events;
1453 return source_set_pending(s, true);
1456 static int flush_timer(sd_event *e, int fd, uint32_t events, usec_t *next) {
1464 assert_return(events == EPOLLIN, -EIO);
1466 ss = read(fd, &x, sizeof(x));
1468 if (errno == EAGAIN || errno == EINTR)
1474 if (ss != sizeof(x))
1477 *next = (usec_t) -1;
1482 static int process_timer(
1494 s = prioq_peek(earliest);
1497 s->enabled == SD_EVENT_OFF ||
1501 r = source_set_pending(s, true);
1505 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1506 prioq_reshuffle(latest, s, &s->time.latest_index);
1512 static int process_child(sd_event *e) {
1519 e->need_process_child = false;
1522 So, this is ugly. We iteratively invoke waitid() with P_PID
1523 + WNOHANG for each PID we wait for, instead of using
1524 P_ALL. This is because we only want to get child
1525 information of very specific child processes, and not all
1526 of them. We might not have processed the SIGCHLD even of a
1527 previous invocation and we don't want to maintain a
1528 unbounded *per-child* event queue, hence we really don't
1529 want anything flushed out of the kernel's queue that we
1530 don't care about. Since this is O(n) this means that if you
1531 have a lot of processes you probably want to handle SIGCHLD
1535 HASHMAP_FOREACH(s, e->child_sources, i) {
1536 assert(s->type == SOURCE_CHILD);
1541 if (s->enabled == SD_EVENT_OFF)
1544 zero(s->child.siginfo);
1545 r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options);
1549 if (s->child.siginfo.si_pid != 0) {
1550 r = source_set_pending(s, true);
1559 static int process_signal(sd_event *e, uint32_t events) {
1560 bool read_one = false;
1564 assert(e->signal_sources);
1566 assert_return(events == EPOLLIN, -EIO);
1569 struct signalfd_siginfo si;
1573 ss = read(e->signal_fd, &si, sizeof(si));
1575 if (errno == EAGAIN || errno == EINTR)
1581 if (ss != sizeof(si))
1586 s = e->signal_sources[si.ssi_signo];
1587 if (si.ssi_signo == SIGCHLD) {
1588 r = process_child(e);
1597 s->signal.siginfo = si;
1598 r = source_set_pending(s, true);
1607 static int source_dispatch(sd_event_source *s) {
1611 assert(s->pending || s->type == SOURCE_QUIT);
1613 if (s->type != SOURCE_DEFER && s->type != SOURCE_QUIT) {
1614 r = source_set_pending(s, false);
1619 if (s->enabled == SD_EVENT_ONESHOT) {
1620 r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
1625 sd_event_source_ref(s);
1630 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1633 case SOURCE_MONOTONIC:
1634 r = s->time.callback(s, s->time.next, s->userdata);
1637 case SOURCE_REALTIME:
1638 r = s->time.callback(s, s->time.next, s->userdata);
1642 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1646 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1650 r = s->defer.callback(s, s->userdata);
1654 r = s->quit.callback(s, s->userdata);
1658 sd_event_source_unref(s);
1663 static int event_prepare(sd_event *e) {
1671 s = prioq_peek(e->prepare);
1672 if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
1675 s->prepare_iteration = e->iteration;
1676 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1681 r = s->prepare(s, s->userdata);
1690 static int dispatch_quit(sd_event *e) {
1696 p = prioq_peek(e->quit);
1697 if (!p || p->enabled == SD_EVENT_OFF) {
1698 e->state = SD_EVENT_FINISHED;
1704 e->state = SD_EVENT_QUITTING;
1706 r = source_dispatch(p);
1708 e->state = SD_EVENT_PASSIVE;
1714 static sd_event_source* event_next_pending(sd_event *e) {
1719 p = prioq_peek(e->pending);
1723 if (p->enabled == SD_EVENT_OFF)
1729 _public_ int sd_event_run(sd_event *e, uint64_t timeout) {
1730 struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1734 assert_return(e, -EINVAL);
1735 assert_return(!event_pid_changed(e), -ECHILD);
1736 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1737 assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1739 if (e->quit_requested)
1740 return dispatch_quit(e);
1744 e->state = SD_EVENT_RUNNING;
1746 r = event_prepare(e);
1750 if (event_next_pending(e) || e->need_process_child)
1754 r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1758 r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1763 m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1764 timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1766 r = errno == EAGAIN || errno == EINTR ? 0 : -errno;
1770 dual_timestamp_get(&e->timestamp);
1772 for (i = 0; i < m; i++) {
1774 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1775 r = flush_timer(e, e->monotonic_fd, ev_queue[i].events, &e->monotonic_next);
1776 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1777 r = flush_timer(e, e->realtime_fd, ev_queue[i].events, &e->realtime_next);
1778 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1779 r = process_signal(e, ev_queue[i].events);
1781 r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1787 r = process_timer(e, e->timestamp.monotonic, e->monotonic_earliest, e->monotonic_latest);
1791 r = process_timer(e, e->timestamp.realtime, e->realtime_earliest, e->realtime_latest);
1795 if (e->need_process_child) {
1796 r = process_child(e);
1801 p = event_next_pending(e);
1807 r = source_dispatch(p);
1810 e->state = SD_EVENT_PASSIVE;
1816 _public_ int sd_event_loop(sd_event *e) {
1819 assert_return(e, -EINVAL);
1820 assert_return(!event_pid_changed(e), -ECHILD);
1821 assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1825 while (e->state != SD_EVENT_FINISHED) {
1826 r = sd_event_run(e, (uint64_t) -1);
1838 _public_ int sd_event_get_state(sd_event *e) {
1839 assert_return(e, -EINVAL);
1840 assert_return(!event_pid_changed(e), -ECHILD);
1845 _public_ int sd_event_get_quit(sd_event *e) {
1846 assert_return(e, -EINVAL);
1847 assert_return(!event_pid_changed(e), -ECHILD);
1849 return e->quit_requested;
1852 _public_ int sd_event_request_quit(sd_event *e) {
1853 assert_return(e, -EINVAL);
1854 assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1855 assert_return(!event_pid_changed(e), -ECHILD);
1857 e->quit_requested = true;
1861 _public_ int sd_event_get_now_realtime(sd_event *e, uint64_t *usec) {
1862 assert_return(e, -EINVAL);
1863 assert_return(usec, -EINVAL);
1864 assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1865 assert_return(!event_pid_changed(e), -ECHILD);
1867 *usec = e->timestamp.realtime;
1871 _public_ int sd_event_get_now_monotonic(sd_event *e, uint64_t *usec) {
1872 assert_return(e, -EINVAL);
1873 assert_return(usec, -EINVAL);
1874 assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1875 assert_return(!event_pid_changed(e), -ECHILD);
1877 *usec = e->timestamp.monotonic;
1881 _public_ int sd_event_default(sd_event **ret) {
1883 static __thread sd_event *default_event = NULL;
1888 return !!default_event;
1890 if (default_event) {
1891 *ret = sd_event_ref(default_event);
1895 r = sd_event_new(&e);
1899 e->default_event_ptr = &default_event;
1907 _public_ int sd_event_get_tid(sd_event *e, pid_t *tid) {
1908 assert_return(e, -EINVAL);
1909 assert_return(tid, -EINVAL);
1910 assert_return(!event_pid_changed(e), -ECHILD);