1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
31 #include "time-util.h"
36 #define EPOLL_QUEUE_MAX 64
37 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
39 typedef enum EventSourceType {
48 struct sd_event_source {
53 sd_prepare_handler_t prepare;
55 EventSourceType type:4;
56 sd_event_mute_t mute:3;
60 unsigned pending_index;
61 unsigned prepare_index;
62 unsigned pending_iteration;
63 unsigned prepare_iteration;
67 sd_io_handler_t callback;
74 sd_time_handler_t callback;
75 usec_t next, accuracy;
76 unsigned earliest_index;
77 unsigned latest_index;
80 sd_signal_handler_t callback;
81 struct signalfd_siginfo siginfo;
85 sd_child_handler_t callback;
91 sd_defer_handler_t callback;
107 /* For both clocks we maintain two priority queues each, one
108 * ordered for the earliest times the events may be
109 * dispatched, and one ordered by the latest times they must
110 * have been dispatched. The range between the top entries in
111 * the two prioqs is the time window we can freely schedule
113 Prioq *monotonic_earliest;
114 Prioq *monotonic_latest;
115 Prioq *realtime_earliest;
116 Prioq *realtime_latest;
119 sd_event_source **signal_sources;
121 Hashmap *child_sources;
122 unsigned n_unmuted_child_sources;
126 usec_t realtime_next, monotonic_next;
130 bool need_process_child:1;
135 static int pending_prioq_compare(const void *a, const void *b) {
136 const sd_event_source *x = a, *y = b;
141 /* Unmuted ones first */
142 if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
144 if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
147 /* Lower priority values first */
148 if (x->priority < y->priority)
150 if (x->priority > y->priority)
153 /* Older entries first */
154 if (x->pending_iteration < y->pending_iteration)
156 if (x->pending_iteration > y->pending_iteration)
159 /* Stability for the rest */
168 static int prepare_prioq_compare(const void *a, const void *b) {
169 const sd_event_source *x = a, *y = b;
174 /* Move most recently prepared ones last, so that we can stop
175 * preparing as soon as we hit one that has already been
176 * prepared in the current iteration */
177 if (x->prepare_iteration < y->prepare_iteration)
179 if (x->prepare_iteration > y->prepare_iteration)
182 /* Unmuted ones first */
183 if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
185 if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
188 /* Lower priority values first */
189 if (x->priority < y->priority)
191 if (x->priority > y->priority)
194 /* Stability for the rest */
203 static int earliest_time_prioq_compare(const void *a, const void *b) {
204 const sd_event_source *x = a, *y = b;
206 assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
207 assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
209 /* Unmuted ones first */
210 if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
212 if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
215 /* Move the pending ones to the end */
216 if (!x->pending && y->pending)
218 if (x->pending && !y->pending)
222 if (x->time.next < y->time.next)
224 if (x->time.next > y->time.next)
227 /* Stability for the rest */
236 static int latest_time_prioq_compare(const void *a, const void *b) {
237 const sd_event_source *x = a, *y = b;
239 assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
240 assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
242 /* Unmuted ones first */
243 if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
245 if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
248 /* Move the pending ones to the end */
249 if (!x->pending && y->pending)
251 if (x->pending && !y->pending)
255 if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
257 if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
260 /* Stability for the rest */
269 static void event_free(sd_event *e) {
272 if (e->epoll_fd >= 0)
273 close_nointr_nofail(e->epoll_fd);
275 if (e->signal_fd >= 0)
276 close_nointr_nofail(e->signal_fd);
278 if (e->realtime_fd >= 0)
279 close_nointr_nofail(e->realtime_fd);
281 if (e->monotonic_fd >= 0)
282 close_nointr_nofail(e->monotonic_fd);
284 prioq_free(e->pending);
285 prioq_free(e->prepare);
286 prioq_free(e->monotonic_earliest);
287 prioq_free(e->monotonic_latest);
288 prioq_free(e->realtime_earliest);
289 prioq_free(e->realtime_latest);
291 free(e->signal_sources);
293 hashmap_free(e->child_sources);
297 int sd_event_new(sd_event** ret) {
304 e = new0(sd_event, 1);
308 e->n_ref = REFCNT_INIT;
309 e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
310 e->realtime_next = e->monotonic_next = (usec_t) -1;
311 e->original_pid = getpid();
313 assert_se(sigemptyset(&e->sigset) == 0);
315 e->pending = prioq_new(pending_prioq_compare);
321 e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
322 if (e->epoll_fd < 0) {
335 sd_event* sd_event_ref(sd_event *e) {
339 assert_se(REFCNT_INC(e->n_ref) >= 2);
344 sd_event* sd_event_unref(sd_event *e) {
348 if (REFCNT_DEC(e->n_ref) <= 0)
354 static bool event_pid_changed(sd_event *e) {
357 /* We don't support people creating am event loop and keeping
358 * it around over a fork(). Let's complain. */
360 return e->original_pid != getpid();
363 static int source_io_unregister(sd_event_source *s) {
367 assert(s->type == SOURCE_IO);
369 if (!s->io.registered)
372 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
376 s->io.registered = false;
380 static int source_io_register(sd_event_source *s, sd_event_mute_t m, uint32_t events) {
381 struct epoll_event ev = {};
385 assert(s->type == SOURCE_IO);
386 assert(m != SD_EVENT_MUTED);
391 if (m == SD_EVENT_ONESHOT)
392 ev.events |= EPOLLONESHOT;
394 if (s->io.registered)
395 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
397 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
402 s->io.registered = true;
407 static void source_free(sd_event_source *s) {
415 source_io_unregister(s);
419 case SOURCE_MONOTONIC:
420 prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
421 prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
424 case SOURCE_REALTIME:
425 prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
426 prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
430 if (s->signal.sig > 0) {
431 if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0)
432 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
434 if (s->event->signal_sources)
435 s->event->signal_sources[s->signal.sig] = NULL;
441 if (s->child.pid > 0) {
442 if (s->mute != SD_EVENT_MUTED) {
443 assert(s->event->n_unmuted_child_sources > 0);
444 s->event->n_unmuted_child_sources--;
447 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
448 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
450 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
457 prioq_remove(s->event->pending, s, &s->pending_index);
460 prioq_remove(s->event->prepare, s, &s->prepare_index);
462 sd_event_unref(s->event);
468 static int source_set_pending(sd_event_source *s, bool b) {
479 s->pending_iteration = s->event->iteration;
481 r = prioq_put(s->event->pending, s, &s->pending_index);
487 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
492 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
497 s = new0(sd_event_source, 1);
501 s->n_ref = REFCNT_INIT;
502 s->event = sd_event_ref(e);
504 s->mute = SD_EVENT_UNMUTED;
505 s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
514 sd_io_handler_t callback,
516 sd_event_source **ret) {
525 if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP))
531 if (event_pid_changed(e))
534 s = source_new(e, SOURCE_IO);
539 s->io.events = events;
540 s->io.callback = callback;
541 s->userdata = userdata;
543 r = source_io_register(s, s->mute, events);
553 static int event_setup_timer_fd(
555 EventSourceType type,
559 struct epoll_event ev = {};
566 if (_likely_(*timer_fd >= 0))
569 fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
574 ev.data.ptr = INT_TO_PTR(type);
576 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
578 close_nointr_nofail(fd);
582 /* When we sleep for longer, we try to realign the wakeup to
583 the same time wihtin each second, so that events all across
584 the system can be coalesced into a single CPU
585 wakeup. However, let's take some system-specific randomness
586 for this value, so that in a network of systems with synced
587 clocks timer events are distributed a bit. Here, we
588 calculate a perturbation usec offset from the boot ID. */
590 if (sd_id128_get_boot(&bootid) >= 0)
591 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_SEC;
597 static int event_add_time_internal(
599 EventSourceType type,
606 sd_time_handler_t callback,
608 sd_event_source **ret) {
619 if (usec == (uint64_t) -1)
621 if (accuracy == (uint64_t) -1)
623 if (event_pid_changed(e))
631 *earliest = prioq_new(earliest_time_prioq_compare);
637 *latest = prioq_new(latest_time_prioq_compare);
643 r = event_setup_timer_fd(e, type, timer_fd, id);
648 s = source_new(e, type);
653 s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
654 s->time.callback = callback;
655 s->time.earliest_index = PRIOQ_IDX_NULL;
656 s->time.latest_index = PRIOQ_IDX_NULL;
657 s->userdata = userdata;
659 r = prioq_put(*earliest, s, &s->time.earliest_index);
663 r = prioq_put(*latest, s, &s->time.latest_index);
675 int sd_event_add_monotonic(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
676 return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
679 int sd_event_add_realtime(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
680 return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
683 static int event_update_signal_fd(sd_event *e) {
684 struct epoll_event ev = {};
690 add_to_epoll = e->signal_fd < 0;
692 r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
702 ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
704 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
706 close_nointr_nofail(e->signal_fd);
715 int sd_event_add_signal(sd_event *e, int sig, sd_signal_handler_t callback, void *userdata, sd_event_source **ret) {
729 if (event_pid_changed(e))
732 if (!e->signal_sources) {
733 e->signal_sources = new0(sd_event_source*, _NSIG);
734 if (!e->signal_sources)
736 } else if (e->signal_sources[sig])
739 s = source_new(e, SOURCE_SIGNAL);
744 s->signal.callback = callback;
745 s->userdata = userdata;
747 e->signal_sources[sig] = s;
748 assert_se(sigaddset(&e->sigset, sig) == 0);
750 if (sig != SIGCHLD || e->n_unmuted_child_sources == 0) {
751 r = event_update_signal_fd(e);
762 int sd_event_add_child(sd_event *e, pid_t pid, int options, sd_child_handler_t callback, void *userdata, sd_event_source **ret) {
770 if (options & ~(WEXITED|WSTOPPED|WCONTINUED))
776 if (event_pid_changed(e))
779 r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
783 if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
786 s = source_new(e, SOURCE_CHILD);
791 s->child.options = options;
792 s->child.callback = callback;
793 s->userdata = userdata;
795 r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
801 e->n_unmuted_child_sources ++;
803 assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
805 if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
806 r = event_update_signal_fd(e);
813 e->need_process_child = true;
819 int sd_event_add_defer(sd_event *e, sd_defer_handler_t callback, void *userdata, sd_event_source **ret) {
827 if (event_pid_changed(e))
830 s = source_new(e, SOURCE_DEFER);
834 s->defer.callback = callback;
835 s->userdata = userdata;
837 r = source_set_pending(s, true);
847 sd_event_source* sd_event_source_ref(sd_event_source *s) {
851 assert_se(REFCNT_INC(s->n_ref) >= 2);
856 sd_event_source* sd_event_source_unref(sd_event_source *s) {
860 if (REFCNT_DEC(s->n_ref) <= 0)
867 sd_event *sd_event_get(sd_event_source *s) {
874 int sd_event_source_get_pending(sd_event_source *s) {
877 if (event_pid_changed(s->event))
883 int sd_event_source_get_io_fd(sd_event_source *s) {
886 if (s->type != SOURCE_IO)
888 if (event_pid_changed(s->event))
894 int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
897 if (s->type != SOURCE_IO)
901 if (event_pid_changed(s->event))
904 *events = s->io.events;
908 int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
913 if (!s->type != SOURCE_IO)
915 if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP))
917 if (event_pid_changed(s->event))
920 if (s->io.events == events)
923 if (s->mute != SD_EVENT_MUTED) {
924 r = source_io_register(s, s->io.events, events);
929 s->io.events = events;
934 int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
937 if (s->type != SOURCE_IO)
943 if (event_pid_changed(s->event))
946 *revents = s->io.revents;
950 int sd_event_source_get_signal(sd_event_source *s) {
953 if (s->type != SOURCE_SIGNAL)
955 if (event_pid_changed(s->event))
958 return s->signal.sig;
961 int sd_event_source_get_priority(sd_event_source *s, int *priority) {
964 if (event_pid_changed(s->event))
970 int sd_event_source_set_priority(sd_event_source *s, int priority) {
973 if (event_pid_changed(s->event))
976 if (s->priority == priority)
979 s->priority = priority;
982 prioq_reshuffle(s->event->pending, s, &s->pending_index);
985 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
990 int sd_event_source_get_mute(sd_event_source *s, sd_event_mute_t *m) {
995 if (event_pid_changed(s->event))
1002 int sd_event_source_set_mute(sd_event_source *s, sd_event_mute_t m) {
1007 if (m != SD_EVENT_MUTED && m != SD_EVENT_UNMUTED && !SD_EVENT_ONESHOT)
1009 if (event_pid_changed(s->event))
1015 if (m == SD_EVENT_MUTED) {
1020 r = source_io_unregister(s);
1027 case SOURCE_MONOTONIC:
1029 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1030 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1033 case SOURCE_REALTIME:
1035 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1036 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1041 if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0) {
1042 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1043 event_update_signal_fd(s->event);
1051 assert(s->event->n_unmuted_child_sources > 0);
1052 s->event->n_unmuted_child_sources--;
1054 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1055 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1056 event_update_signal_fd(s->event);
1070 r = source_io_register(s, m, s->io.events);
1077 case SOURCE_MONOTONIC:
1079 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1080 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1083 case SOURCE_REALTIME:
1085 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1086 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1092 if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0) {
1093 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1094 event_update_signal_fd(s->event);
1101 if (s->mute == SD_EVENT_MUTED) {
1102 s->event->n_unmuted_child_sources++;
1104 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1105 assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1106 event_update_signal_fd(s->event);
1118 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1121 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1126 int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1131 if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
1133 if (event_pid_changed(s->event))
1136 *usec = s->time.next;
1140 int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1143 if (usec == (uint64_t) -1)
1145 if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
1147 if (event_pid_changed(s->event))
1150 if (s->time.next == usec)
1153 s->time.next = usec;
1155 if (s->type == SOURCE_REALTIME) {
1156 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1157 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1159 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1160 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1166 int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1169 if (s->type != SOURCE_MONOTONIC && s->type != SOURCE_REALTIME)
1171 if (event_pid_changed(s->event))
1175 usec = DEFAULT_ACCURACY_USEC;
1177 if (s->time.accuracy == usec)
1181 s->time.accuracy = usec;
1183 if (s->type == SOURCE_REALTIME)
1184 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1186 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1191 int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1196 if (s->type != SOURCE_MONOTONIC && s->type != SOURCE_REALTIME)
1198 if (event_pid_changed(s->event))
1201 *usec = s->time.accuracy;
1205 int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback) {
1210 if (event_pid_changed(s->event))
1213 if (s->prepare == callback)
1216 if (callback && s->prepare) {
1217 s->prepare = callback;
1221 r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1225 s->prepare = callback;
1228 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1232 prioq_remove(s->event->prepare, s, &s->prepare_index);
1237 void* sd_event_source_get_userdata(sd_event_source *s) {
1244 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1256 Find a good time to wake up again between times a and b. We
1257 have two goals here:
1259 a) We want to wake up as seldom as possible, hence prefer
1260 later times over earlier times.
1262 b) But if we have to wake up, then let's make sure to
1263 dispatch as much as possible on the entire system.
1265 We implement this by waking up everywhere at the same time
1266 within any given second if we can, synchronised via the
1267 perturbation value determined from the boot ID. If we can't,
1268 then we try to find the same spot in every a 250ms
1269 step. Otherwise, we pick the last possible time to wake up.
1272 c = (b / USEC_PER_SEC) * USEC_PER_SEC + e->perturb;
1274 if (_unlikely_(c < USEC_PER_SEC))
1283 c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1285 if (_unlikely_(c < USEC_PER_MSEC*250))
1288 c -= USEC_PER_MSEC*250;
1297 static int event_arm_timer(
1304 struct itimerspec its = {};
1305 sd_event_source *a, *b;
1312 a = prioq_peek(earliest);
1313 if (!a || a->mute == SD_EVENT_MUTED)
1316 b = prioq_peek(latest);
1317 assert_se(b && b->mute != SD_EVENT_MUTED);
1319 t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1323 assert_se(timer_fd >= 0);
1326 /* We don' want to disarm here, just mean some time looooong ago. */
1327 its.it_value.tv_sec = 0;
1328 its.it_value.tv_nsec = 1;
1330 timespec_store(&its.it_value, t);
1332 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1340 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1343 assert(s->type == SOURCE_IO);
1345 s->io.revents = events;
1348 If this is a oneshot event source, then we added it to the
1349 epoll with EPOLLONESHOT, hence we know it's not registered
1350 anymore. We can save a syscall here...
1353 if (s->mute == SD_EVENT_ONESHOT)
1354 s->io.registered = false;
1356 return source_set_pending(s, true);
1359 static int flush_timer(sd_event *e, int fd, uint32_t events) {
1365 if (events != EPOLLIN)
1368 ss = read(fd, &x, sizeof(x));
1370 if (errno == EAGAIN || errno == EINTR)
1376 if (ss != sizeof(x))
1382 static int process_timer(sd_event *e, usec_t n, Prioq *earliest, Prioq *latest) {
1389 s = prioq_peek(earliest);
1392 s->mute == SD_EVENT_MUTED ||
1396 r = source_set_pending(s, true);
1400 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1401 prioq_reshuffle(latest, s, &s->time.latest_index);
1407 static int process_child(sd_event *e) {
1414 e->need_process_child = false;
1417 So, this is ugly. We iteratively invoke waitid() with P_PID
1418 + WNOHANG for each PID we wait for, instead of using
1419 P_ALL. This is because we only want to get child
1420 information of very specific child processes, and not all
1421 of them. We might not have processed the SIGCHLD even of a
1422 previous invocation and we don't want to maintain a
1423 unbounded *per-child* event queue, hence we really don't
1424 want anything flushed out of the kernel's queue that we
1425 don't care about. Since this is O(n) this means that if you
1426 have a lot of processes you probably want to handle SIGCHLD
1430 HASHMAP_FOREACH(s, e->child_sources, i) {
1431 assert(s->type == SOURCE_CHILD);
1436 if (s->mute == SD_EVENT_MUTED)
1439 zero(s->child.siginfo);
1440 r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options);
1444 if (s->child.siginfo.si_pid != 0) {
1445 r = source_set_pending(s, true);
1454 static int process_signal(sd_event *e, uint32_t events) {
1455 struct signalfd_siginfo si;
1456 bool read_one = false;
1460 if (events != EPOLLIN)
1466 ss = read(e->signal_fd, &si, sizeof(si));
1468 if (errno == EAGAIN || errno == EINTR)
1474 if (ss != sizeof(si))
1479 if (si.ssi_signo == SIGCHLD) {
1480 r = process_child(e);
1483 if (r > 0 || !e->signal_sources[si.ssi_signo])
1486 s = e->signal_sources[si.ssi_signo];
1491 s->signal.siginfo = si;
1492 r = source_set_pending(s, true);
1501 static int source_dispatch(sd_event_source *s) {
1507 r = source_set_pending(s, false);
1511 if (s->mute == SD_EVENT_ONESHOT) {
1512 r = sd_event_source_set_mute(s, SD_EVENT_MUTED);
1520 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1523 case SOURCE_MONOTONIC:
1524 r = s->time.callback(s, s->time.next, s->userdata);
1527 case SOURCE_REALTIME:
1528 r = s->time.callback(s, s->time.next, s->userdata);
1532 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1536 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1540 r = s->defer.callback(s, s->userdata);
1547 static int event_prepare(sd_event *e) {
1555 s = prioq_peek(e->prepare);
1556 if (!s || s->prepare_iteration == e->iteration || s->mute == SD_EVENT_MUTED)
1559 s->prepare_iteration = e->iteration;
1560 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1565 r = s->prepare(s, s->userdata);
1574 static sd_event_source* event_next_pending(sd_event *e) {
1577 p = prioq_peek(e->pending);
1581 if (p->mute == SD_EVENT_MUTED)
1587 int sd_event_run(sd_event *e, uint64_t timeout) {
1588 struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1597 if (event_pid_changed(e))
1602 r = event_prepare(e);
1606 if (event_next_pending(e) || e->need_process_child)
1610 r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1614 r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1619 m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1620 timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1624 dual_timestamp_get(&n);
1626 for (i = 0; i < m; i++) {
1628 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1629 r = flush_timer(e, e->monotonic_fd, ev_queue[i].events);
1630 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1631 r = flush_timer(e, e->realtime_fd, ev_queue[i].events);
1632 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1633 r = process_signal(e, ev_queue[i].events);
1635 r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1641 r = process_timer(e, n.monotonic, e->monotonic_earliest, e->monotonic_latest);
1645 r = process_timer(e, n.realtime, e->realtime_earliest, e->realtime_latest);
1649 if (e->need_process_child) {
1650 r = process_child(e);
1655 p = event_next_pending(e);
1659 return source_dispatch(p);
1662 int sd_event_loop(sd_event *e) {
1667 if (event_pid_changed(e))
1671 r = sd_event_run(e, (uint64_t) -1);
1679 int sd_event_quit(sd_event *e) {
1682 if (event_pid_changed(e))
1688 int sd_event_request_quit(sd_event *e) {
1691 if (event_pid_changed(e))