chiark / gitweb /
e3110c3d6bc50aa7b9fbd8a36fbfc8681c4bbcbd
[elogind.git] / src / libsystemd-bus / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25
26 #include "macro.h"
27 #include "refcnt.h"
28 #include "prioq.h"
29 #include "hashmap.h"
30 #include "util.h"
31 #include "time-util.h"
32 #include "sd-id128.h"
33
34 #include "sd-event.h"
35
36 #define EPOLL_QUEUE_MAX 64
37 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
38
39 typedef enum EventSourceType {
40         SOURCE_IO,
41         SOURCE_MONOTONIC,
42         SOURCE_REALTIME,
43         SOURCE_SIGNAL,
44         SOURCE_CHILD,
45         SOURCE_DEFER
46 } EventSourceType;
47
48 struct sd_event_source {
49         RefCount n_ref;
50
51         sd_event *event;
52         void *userdata;
53         sd_prepare_handler_t prepare;
54
55         EventSourceType type:4;
56         sd_event_mute_t mute:3;
57         bool pending:1;
58
59         int priority;
60         unsigned pending_index;
61         unsigned prepare_index;
62         unsigned pending_iteration;
63         unsigned prepare_iteration;
64
65         union {
66                 struct {
67                         sd_io_handler_t callback;
68                         int fd;
69                         uint32_t events;
70                         uint32_t revents;
71                         bool registered:1;
72                 } io;
73                 struct {
74                         sd_time_handler_t callback;
75                         usec_t next, accuracy;
76                         unsigned earliest_index;
77                         unsigned latest_index;
78                 } time;
79                 struct {
80                         sd_signal_handler_t callback;
81                         struct signalfd_siginfo siginfo;
82                         int sig;
83                 } signal;
84                 struct {
85                         sd_child_handler_t callback;
86                         siginfo_t siginfo;
87                         pid_t pid;
88                         int options;
89                 } child;
90                 struct {
91                         sd_defer_handler_t callback;
92                 } defer;
93         };
94 };
95
96 struct sd_event {
97         RefCount n_ref;
98
99         int epoll_fd;
100         int signal_fd;
101         int realtime_fd;
102         int monotonic_fd;
103
104         Prioq *pending;
105         Prioq *prepare;
106
107         /* For both clocks we maintain two priority queues each, one
108          * ordered for the earliest times the events may be
109          * dispatched, and one ordered by the latest times they must
110          * have been dispatched. The range between the top entries in
111          * the two prioqs is the time window we can freely schedule
112          * wakeups in */
113         Prioq *monotonic_earliest;
114         Prioq *monotonic_latest;
115         Prioq *realtime_earliest;
116         Prioq *realtime_latest;
117
118         sigset_t sigset;
119         sd_event_source **signal_sources;
120
121         Hashmap *child_sources;
122         unsigned n_unmuted_child_sources;
123
124         unsigned iteration;
125
126         usec_t realtime_next, monotonic_next;
127         usec_t perturb;
128
129         bool quit:1;
130         bool need_process_child:1;
131
132         pid_t original_pid;
133 };
134
135 static int pending_prioq_compare(const void *a, const void *b) {
136         const sd_event_source *x = a, *y = b;
137
138         assert(x->pending);
139         assert(y->pending);
140
141         /* Unmuted ones first */
142         if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
143                 return -1;
144         if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
145                 return 1;
146
147         /* Lower priority values first */
148         if (x->priority < y->priority)
149                 return -1;
150         if (x->priority > y->priority)
151                 return 1;
152
153         /* Older entries first */
154         if (x->pending_iteration < y->pending_iteration)
155                 return -1;
156         if (x->pending_iteration > y->pending_iteration)
157                 return 1;
158
159         /* Stability for the rest */
160         if (x < y)
161                 return -1;
162         if (x > y)
163                 return 1;
164
165         return 0;
166 }
167
168 static int prepare_prioq_compare(const void *a, const void *b) {
169         const sd_event_source *x = a, *y = b;
170
171         assert(x->prepare);
172         assert(y->prepare);
173
174         /* Move most recently prepared ones last, so that we can stop
175          * preparing as soon as we hit one that has already been
176          * prepared in the current iteration */
177         if (x->prepare_iteration < y->prepare_iteration)
178                 return -1;
179         if (x->prepare_iteration > y->prepare_iteration)
180                 return 1;
181
182         /* Unmuted ones first */
183         if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
184                 return -1;
185         if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
186                 return 1;
187
188         /* Lower priority values first */
189         if (x->priority < y->priority)
190                 return -1;
191         if (x->priority > y->priority)
192                 return 1;
193
194         /* Stability for the rest */
195         if (x < y)
196                 return -1;
197         if (x > y)
198                 return 1;
199
200         return 0;
201 }
202
203 static int earliest_time_prioq_compare(const void *a, const void *b) {
204         const sd_event_source *x = a, *y = b;
205
206         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
207         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
208
209         /* Unmuted ones first */
210         if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
211                 return -1;
212         if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
213                 return 1;
214
215         /* Move the pending ones to the end */
216         if (!x->pending && y->pending)
217                 return -1;
218         if (x->pending && !y->pending)
219                 return 1;
220
221         /* Order by time */
222         if (x->time.next < y->time.next)
223                 return -1;
224         if (x->time.next > y->time.next)
225                 return -1;
226
227         /* Stability for the rest */
228         if (x < y)
229                 return -1;
230         if (x > y)
231                 return 1;
232
233         return 0;
234 }
235
236 static int latest_time_prioq_compare(const void *a, const void *b) {
237         const sd_event_source *x = a, *y = b;
238
239         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
240         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
241
242         /* Unmuted ones first */
243         if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
244                 return -1;
245         if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
246                 return 1;
247
248         /* Move the pending ones to the end */
249         if (!x->pending && y->pending)
250                 return -1;
251         if (x->pending && !y->pending)
252                 return 1;
253
254         /* Order by time */
255         if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
256                 return -1;
257         if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
258                 return -1;
259
260         /* Stability for the rest */
261         if (x < y)
262                 return -1;
263         if (x > y)
264                 return 1;
265
266         return 0;
267 }
268
269 static void event_free(sd_event *e) {
270         assert(e);
271
272         if (e->epoll_fd >= 0)
273                 close_nointr_nofail(e->epoll_fd);
274
275         if (e->signal_fd >= 0)
276                 close_nointr_nofail(e->signal_fd);
277
278         if (e->realtime_fd >= 0)
279                 close_nointr_nofail(e->realtime_fd);
280
281         if (e->monotonic_fd >= 0)
282                 close_nointr_nofail(e->monotonic_fd);
283
284         prioq_free(e->pending);
285         prioq_free(e->prepare);
286         prioq_free(e->monotonic_earliest);
287         prioq_free(e->monotonic_latest);
288         prioq_free(e->realtime_earliest);
289         prioq_free(e->realtime_latest);
290
291         free(e->signal_sources);
292
293         hashmap_free(e->child_sources);
294         free(e);
295 }
296
297 int sd_event_new(sd_event** ret) {
298         sd_event *e;
299         int r;
300
301         if (!ret)
302                 return -EINVAL;
303
304         e = new0(sd_event, 1);
305         if (!e)
306                 return -ENOMEM;
307
308         e->n_ref = REFCNT_INIT;
309         e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
310         e->realtime_next = e->monotonic_next = (usec_t) -1;
311         e->original_pid = getpid();
312
313         assert_se(sigemptyset(&e->sigset) == 0);
314
315         e->pending = prioq_new(pending_prioq_compare);
316         if (!e->pending) {
317                 r = -ENOMEM;
318                 goto fail;
319         }
320
321         e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
322         if (e->epoll_fd < 0) {
323                 r = -errno;
324                 goto fail;
325         }
326
327         *ret = e;
328         return 0;
329
330 fail:
331         event_free(e);
332         return r;
333 }
334
335 sd_event* sd_event_ref(sd_event *e) {
336         if (!e)
337                 return NULL;
338
339         assert_se(REFCNT_INC(e->n_ref) >= 2);
340
341         return e;
342 }
343
344 sd_event* sd_event_unref(sd_event *e) {
345         if (!e)
346                 return NULL;
347
348         if (REFCNT_DEC(e->n_ref) <= 0)
349                 event_free(e);
350
351         return NULL;
352 }
353
354 static bool event_pid_changed(sd_event *e) {
355         assert(e);
356
357         /* We don't support people creating am event loop and keeping
358          * it around over a fork(). Let's complain. */
359
360         return e->original_pid != getpid();
361 }
362
363 static int source_io_unregister(sd_event_source *s) {
364         int r;
365
366         assert(s);
367         assert(s->type == SOURCE_IO);
368
369         if (!s->io.registered)
370                 return 0;
371
372         r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
373         if (r < 0)
374                 return -errno;
375
376         s->io.registered = false;
377         return 0;
378 }
379
380 static int source_io_register(sd_event_source *s, sd_event_mute_t m, uint32_t events) {
381         struct epoll_event ev = {};
382         int r;
383
384         assert(s);
385         assert(s->type == SOURCE_IO);
386         assert(m != SD_EVENT_MUTED);
387
388         ev.events = events;
389         ev.data.ptr = s;
390
391         if (m == SD_EVENT_ONESHOT)
392                 ev.events |= EPOLLONESHOT;
393
394         if (s->io.registered)
395                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
396         else
397                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
398
399         if (r < 0)
400                 return -errno;
401
402         s->io.registered = true;
403
404         return 0;
405 }
406
407 static void source_free(sd_event_source *s) {
408         assert(s);
409
410         if (s->event) {
411                 switch (s->type) {
412
413                 case SOURCE_IO:
414                         if (s->io.fd >= 0)
415                                 source_io_unregister(s);
416
417                         break;
418
419                 case SOURCE_MONOTONIC:
420                         prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
421                         prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
422                         break;
423
424                 case SOURCE_REALTIME:
425                         prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
426                         prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
427                         break;
428
429                 case SOURCE_SIGNAL:
430                         if (s->signal.sig > 0) {
431                                 if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0)
432                                         assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
433
434                                 if (s->event->signal_sources)
435                                         s->event->signal_sources[s->signal.sig] = NULL;
436                         }
437
438                         break;
439
440                 case SOURCE_CHILD:
441                         if (s->child.pid > 0) {
442                                 if (s->mute != SD_EVENT_MUTED) {
443                                         assert(s->event->n_unmuted_child_sources > 0);
444                                         s->event->n_unmuted_child_sources--;
445                                 }
446
447                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
448                                         assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
449
450                                 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
451                         }
452
453                         break;
454                 }
455
456                 if (s->pending)
457                         prioq_remove(s->event->pending, s, &s->pending_index);
458
459                 if (s->prepare)
460                         prioq_remove(s->event->prepare, s, &s->prepare_index);
461
462                 sd_event_unref(s->event);
463         }
464
465         free(s);
466 }
467
468 static int source_set_pending(sd_event_source *s, bool b) {
469         int r;
470
471         assert(s);
472
473         if (s->pending == b)
474                 return 0;
475
476         s->pending = b;
477
478         if (b) {
479                 s->pending_iteration = s->event->iteration;
480
481                 r = prioq_put(s->event->pending, s, &s->pending_index);
482                 if (r < 0) {
483                         s->pending = false;
484                         return r;
485                 }
486         } else
487                 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
488
489         return 0;
490 }
491
492 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
493         sd_event_source *s;
494
495         assert(e);
496
497         s = new0(sd_event_source, 1);
498         if (!s)
499                 return NULL;
500
501         s->n_ref = REFCNT_INIT;
502         s->event = sd_event_ref(e);
503         s->type = type;
504         s->mute = SD_EVENT_UNMUTED;
505         s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
506
507         return s;
508 }
509
510 int sd_event_add_io(
511                 sd_event *e,
512                 int fd,
513                 uint32_t events,
514                 sd_io_handler_t callback,
515                 void *userdata,
516                 sd_event_source **ret) {
517
518         sd_event_source *s;
519         int r;
520
521         if (!e)
522                 return -EINVAL;
523         if (fd < 0)
524                 return -EINVAL;
525         if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP))
526                 return -EINVAL;
527         if (!callback)
528                 return -EINVAL;
529         if (!ret)
530                 return -EINVAL;
531         if (event_pid_changed(e))
532                 return -ECHILD;
533
534         s = source_new(e, SOURCE_IO);
535         if (!s)
536                 return -ENOMEM;
537
538         s->io.fd = fd;
539         s->io.events = events;
540         s->io.callback = callback;
541         s->userdata = userdata;
542
543         r = source_io_register(s, s->mute, events);
544         if (r < 0) {
545                 source_free(s);
546                 return -errno;
547         }
548
549         *ret = s;
550         return 0;
551 }
552
553 static int event_setup_timer_fd(
554                 sd_event *e,
555                 EventSourceType type,
556                 int *timer_fd,
557                 clockid_t id) {
558
559         struct epoll_event ev = {};
560         int r, fd;
561         sd_id128_t bootid;
562
563         assert(e);
564         assert(timer_fd);
565
566         if (_likely_(*timer_fd >= 0))
567                 return 0;
568
569         fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
570         if (fd < 0)
571                 return -errno;
572
573         ev.events = EPOLLIN;
574         ev.data.ptr = INT_TO_PTR(type);
575
576         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
577         if (r < 0) {
578                 close_nointr_nofail(fd);
579                 return -errno;
580         }
581
582         /* When we sleep for longer, we try to realign the wakeup to
583            the same time wihtin each second, so that events all across
584            the system can be coalesced into a single CPU
585            wakeup. However, let's take some system-specific randomness
586            for this value, so that in a network of systems with synced
587            clocks timer events are distributed a bit. Here, we
588            calculate a perturbation usec offset from the boot ID. */
589
590         if (sd_id128_get_boot(&bootid) >= 0)
591                 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_SEC;
592
593         *timer_fd = fd;
594         return 0;
595 }
596
597 static int event_add_time_internal(
598                 sd_event *e,
599                 EventSourceType type,
600                 int *timer_fd,
601                 clockid_t id,
602                 Prioq **earliest,
603                 Prioq **latest,
604                 uint64_t usec,
605                 uint64_t accuracy,
606                 sd_time_handler_t callback,
607                 void *userdata,
608                 sd_event_source **ret) {
609
610         sd_event_source *s;
611         int r;
612
613         if (!e)
614                 return -EINVAL;
615         if (!callback)
616                 return -EINVAL;
617         if (!ret)
618                 return -EINVAL;
619         if (usec == (uint64_t) -1)
620                 return -EINVAL;
621         if (accuracy == (uint64_t) -1)
622                 return -EINVAL;
623         if (event_pid_changed(e))
624                 return -ECHILD;
625
626         assert(timer_fd);
627         assert(earliest);
628         assert(latest);
629
630         if (!*earliest) {
631                 *earliest = prioq_new(earliest_time_prioq_compare);
632                 if (!*earliest)
633                         return -ENOMEM;
634         }
635
636         if (!*latest) {
637                 *latest = prioq_new(latest_time_prioq_compare);
638                 if (!*latest)
639                         return -ENOMEM;
640         }
641
642         if (*timer_fd < 0) {
643                 r = event_setup_timer_fd(e, type, timer_fd, id);
644                 if (r < 0)
645                         return r;
646         }
647
648         s = source_new(e, type);
649         if (!s)
650                 return -ENOMEM;
651
652         s->time.next = usec;
653         s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
654         s->time.callback = callback;
655         s->time.earliest_index = PRIOQ_IDX_NULL;
656         s->time.latest_index = PRIOQ_IDX_NULL;
657         s->userdata = userdata;
658
659         r = prioq_put(*earliest, s, &s->time.earliest_index);
660         if (r < 0)
661                 goto fail;
662
663         r = prioq_put(*latest, s, &s->time.latest_index);
664         if (r < 0)
665                 goto fail;
666
667         *ret = s;
668         return 0;
669
670 fail:
671         source_free(s);
672         return r;
673 }
674
675 int sd_event_add_monotonic(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
676         return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
677 }
678
679 int sd_event_add_realtime(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
680         return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
681 }
682
683 static int event_update_signal_fd(sd_event *e) {
684         struct epoll_event ev = {};
685         bool add_to_epoll;
686         int r;
687
688         assert(e);
689
690         add_to_epoll = e->signal_fd < 0;
691
692         r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
693         if (r < 0)
694                 return -errno;
695
696         e->signal_fd = r;
697
698         if (!add_to_epoll)
699                 return 0;
700
701         ev.events = EPOLLIN;
702         ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
703
704         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
705         if (r < 0) {
706                 close_nointr_nofail(e->signal_fd);
707                 e->signal_fd = -1;
708
709                 return -errno;
710         }
711
712         return 0;
713 }
714
715 int sd_event_add_signal(sd_event *e, int sig, sd_signal_handler_t callback, void *userdata, sd_event_source **ret) {
716         sd_event_source *s;
717         int r;
718
719         if (!e)
720                 return -EINVAL;
721         if (sig <= 0)
722                 return -EINVAL;
723         if (sig >= _NSIG)
724                 return -EINVAL;
725         if (!callback)
726                 return -EINVAL;
727         if (!ret)
728                 return -EINVAL;
729         if (event_pid_changed(e))
730                 return -ECHILD;
731
732         if (!e->signal_sources) {
733                 e->signal_sources = new0(sd_event_source*, _NSIG);
734                 if (!e->signal_sources)
735                         return -ENOMEM;
736         } else if (e->signal_sources[sig])
737                 return -EBUSY;
738
739         s = source_new(e, SOURCE_SIGNAL);
740         if (!s)
741                 return -ENOMEM;
742
743         s->signal.sig = sig;
744         s->signal.callback = callback;
745         s->userdata = userdata;
746
747         e->signal_sources[sig] = s;
748         assert_se(sigaddset(&e->sigset, sig) == 0);
749
750         if (sig != SIGCHLD || e->n_unmuted_child_sources == 0) {
751                 r = event_update_signal_fd(e);
752                 if (r < 0) {
753                         source_free(s);
754                         return r;
755                 }
756         }
757
758         *ret = s;
759         return 0;
760 }
761
762 int sd_event_add_child(sd_event *e, pid_t pid, int options, sd_child_handler_t callback, void *userdata, sd_event_source **ret) {
763         sd_event_source *s;
764         int r;
765
766         if (!e)
767                 return -EINVAL;
768         if (pid <= 1)
769                 return -EINVAL;
770         if (options & ~(WEXITED|WSTOPPED|WCONTINUED))
771                 return -EINVAL;
772         if (!callback)
773                 return -EINVAL;
774         if (!ret)
775                 return -EINVAL;
776         if (event_pid_changed(e))
777                 return -ECHILD;
778
779         r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
780         if (r < 0)
781                 return r;
782
783         if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
784                 return -EBUSY;
785
786         s = source_new(e, SOURCE_CHILD);
787         if (!s)
788                 return -ENOMEM;
789
790         s->child.pid = pid;
791         s->child.options = options;
792         s->child.callback = callback;
793         s->userdata = userdata;
794
795         r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
796         if (r < 0) {
797                 source_free(s);
798                 return r;
799         }
800
801         e->n_unmuted_child_sources ++;
802
803         assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
804
805         if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
806                 r = event_update_signal_fd(e);
807                 if (r < 0) {
808                         source_free(s);
809                         return -errno;
810                 }
811         }
812
813         e->need_process_child = true;
814
815         *ret = s;
816         return 0;
817 }
818
819 int sd_event_add_defer(sd_event *e, sd_defer_handler_t callback, void *userdata, sd_event_source **ret) {
820         sd_event_source *s;
821         int r;
822
823         if (!e)
824                 return -EINVAL;
825         if (!ret)
826                 return -EINVAL;
827         if (event_pid_changed(e))
828                 return -ECHILD;
829
830         s = source_new(e, SOURCE_DEFER);
831         if (!s)
832                 return -ENOMEM;
833
834         s->defer.callback = callback;
835         s->userdata = userdata;
836
837         r = source_set_pending(s, true);
838         if (r < 0) {
839                 source_free(s);
840                 return r;
841         }
842
843         *ret = s;
844         return 0;
845 }
846
847 sd_event_source* sd_event_source_ref(sd_event_source *s) {
848         if (!s)
849                 return NULL;
850
851         assert_se(REFCNT_INC(s->n_ref) >= 2);
852
853         return s;
854 }
855
856 sd_event_source* sd_event_source_unref(sd_event_source *s) {
857         if (!s)
858                 return NULL;
859
860         if (REFCNT_DEC(s->n_ref) <= 0)
861                 source_free(s);
862
863         return NULL;
864 }
865
866
867 sd_event *sd_event_get(sd_event_source *s) {
868         if (!s)
869                 return NULL;
870
871         return s->event;
872 }
873
874 int sd_event_source_get_pending(sd_event_source *s) {
875         if (!s)
876                 return -EINVAL;
877         if (event_pid_changed(s->event))
878                 return -ECHILD;
879
880         return s->pending;
881 }
882
883 int sd_event_source_get_io_fd(sd_event_source *s) {
884         if (!s)
885                 return -EINVAL;
886         if (s->type != SOURCE_IO)
887                 return -EDOM;
888         if (event_pid_changed(s->event))
889                 return -ECHILD;
890
891         return s->io.fd;
892 }
893
894 int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
895         if (!s)
896                 return -EINVAL;
897         if (s->type != SOURCE_IO)
898                 return -EDOM;
899         if (!events)
900                 return -EINVAL;
901         if (event_pid_changed(s->event))
902                 return -ECHILD;
903
904         *events = s->io.events;
905         return 0;
906 }
907
908 int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
909         int r;
910
911         if (!s)
912                 return -EINVAL;
913         if (!s->type != SOURCE_IO)
914                 return -EDOM;
915         if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP))
916                 return -EINVAL;
917         if (event_pid_changed(s->event))
918                 return -ECHILD;
919
920         if (s->io.events == events)
921                 return 0;
922
923         if (s->mute != SD_EVENT_MUTED) {
924                 r = source_io_register(s, s->io.events, events);
925                 if (r < 0)
926                         return r;
927         }
928
929         s->io.events = events;
930
931         return 0;
932 }
933
934 int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
935         if (!s)
936                 return -EINVAL;
937         if (s->type != SOURCE_IO)
938                 return -EDOM;
939         if (!revents)
940                 return -EINVAL;
941         if (!s->pending)
942                 return -ENODATA;
943         if (event_pid_changed(s->event))
944                 return -ECHILD;
945
946         *revents = s->io.revents;
947         return 0;
948 }
949
950 int sd_event_source_get_signal(sd_event_source *s) {
951         if (!s)
952                 return -EINVAL;
953         if (s->type != SOURCE_SIGNAL)
954                 return -EDOM;
955         if (event_pid_changed(s->event))
956                 return -ECHILD;
957
958         return s->signal.sig;
959 }
960
961 int sd_event_source_get_priority(sd_event_source *s, int *priority) {
962         if (!s)
963                 return -EINVAL;
964         if (event_pid_changed(s->event))
965                 return -ECHILD;
966
967         return s->priority;
968 }
969
970 int sd_event_source_set_priority(sd_event_source *s, int priority) {
971         if (!s)
972                 return -EINVAL;
973         if (event_pid_changed(s->event))
974                 return -ECHILD;
975
976         if (s->priority == priority)
977                 return 0;
978
979         s->priority = priority;
980
981         if (s->pending)
982                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
983
984         if (s->prepare)
985                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
986
987         return 0;
988 }
989
990 int sd_event_source_get_mute(sd_event_source *s, sd_event_mute_t *m) {
991         if (!s)
992                 return -EINVAL;
993         if (!m)
994                 return -EINVAL;
995         if (event_pid_changed(s->event))
996                 return -ECHILD;
997
998         *m = s->mute;
999         return 0;
1000 }
1001
1002 int sd_event_source_set_mute(sd_event_source *s, sd_event_mute_t m) {
1003         int r;
1004
1005         if (!s)
1006                 return -EINVAL;
1007         if (m != SD_EVENT_MUTED && m != SD_EVENT_UNMUTED && !SD_EVENT_ONESHOT)
1008                 return -EINVAL;
1009         if (event_pid_changed(s->event))
1010                 return -ECHILD;
1011
1012         if (s->mute == m)
1013                 return 0;
1014
1015         if (m == SD_EVENT_MUTED) {
1016
1017                 switch (s->type) {
1018
1019                 case SOURCE_IO:
1020                         r = source_io_unregister(s);
1021                         if (r < 0)
1022                                 return r;
1023
1024                         s->mute = m;
1025                         break;
1026
1027                 case SOURCE_MONOTONIC:
1028                         s->mute = m;
1029                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1030                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1031                         break;
1032
1033                 case SOURCE_REALTIME:
1034                         s->mute = m;
1035                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1036                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1037                         break;
1038
1039                 case SOURCE_SIGNAL:
1040                         s->mute = m;
1041                         if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0) {
1042                                 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1043                                 event_update_signal_fd(s->event);
1044                         }
1045
1046                         break;
1047
1048                 case SOURCE_CHILD:
1049                         s->mute = m;
1050
1051                         assert(s->event->n_unmuted_child_sources > 0);
1052                         s->event->n_unmuted_child_sources--;
1053
1054                         if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1055                                 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1056                                 event_update_signal_fd(s->event);
1057                         }
1058
1059                         break;
1060
1061                 case SOURCE_DEFER:
1062                         s->mute = m;
1063                         break;
1064                 }
1065
1066         } else {
1067                 switch (s->type) {
1068
1069                 case SOURCE_IO:
1070                         r = source_io_register(s, m, s->io.events);
1071                         if (r < 0)
1072                                 return r;
1073
1074                         s->mute = m;
1075                         break;
1076
1077                 case SOURCE_MONOTONIC:
1078                         s->mute = m;
1079                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1080                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1081                         break;
1082
1083                 case SOURCE_REALTIME:
1084                         s->mute = m;
1085                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1086                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1087                         break;
1088
1089                 case SOURCE_SIGNAL:
1090                         s->mute = m;
1091
1092                         if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0)  {
1093                                 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1094                                 event_update_signal_fd(s->event);
1095                         }
1096                         break;
1097
1098                 case SOURCE_CHILD:
1099                         s->mute = m;
1100
1101                         if (s->mute == SD_EVENT_MUTED) {
1102                                 s->event->n_unmuted_child_sources++;
1103
1104                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1105                                         assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1106                                         event_update_signal_fd(s->event);
1107                                 }
1108                         }
1109                         break;
1110
1111                 case SOURCE_DEFER:
1112                         s->mute = m;
1113                         break;
1114                 }
1115         }
1116
1117         if (s->pending)
1118                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1119
1120         if (s->prepare)
1121                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1122
1123         return 0;
1124 }
1125
1126 int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1127         if (!s)
1128                 return -EINVAL;
1129         if (!usec)
1130                 return -EINVAL;
1131         if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
1132                 return -EDOM;
1133         if (event_pid_changed(s->event))
1134                 return -ECHILD;
1135
1136         *usec = s->time.next;
1137         return 0;
1138 }
1139
1140 int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1141         if (!s)
1142                 return -EINVAL;
1143         if (usec == (uint64_t) -1)
1144                 return -EINVAL;
1145         if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
1146                 return -EDOM;
1147         if (event_pid_changed(s->event))
1148                 return -ECHILD;
1149
1150         if (s->time.next == usec)
1151                 return 0;
1152
1153         s->time.next = usec;
1154
1155         if (s->type == SOURCE_REALTIME) {
1156                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1157                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1158         } else {
1159                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1160                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1161         }
1162
1163         return 0;
1164 }
1165
1166 int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1167         if (!s)
1168                 return -EINVAL;
1169         if (s->type != SOURCE_MONOTONIC && s->type != SOURCE_REALTIME)
1170                 return -EDOM;
1171         if (event_pid_changed(s->event))
1172                 return -ECHILD;
1173
1174         if (usec == 0)
1175                 usec = DEFAULT_ACCURACY_USEC;
1176
1177         if (s->time.accuracy == usec)
1178                 return 0;
1179
1180
1181         s->time.accuracy = usec;
1182
1183         if (s->type == SOURCE_REALTIME)
1184                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1185         else
1186                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1187
1188         return 0;
1189 }
1190
1191 int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1192         if (!s)
1193                 return -EINVAL;
1194         if (!usec)
1195                 return -EINVAL;
1196         if (s->type != SOURCE_MONOTONIC && s->type != SOURCE_REALTIME)
1197                 return -EDOM;
1198         if (event_pid_changed(s->event))
1199                 return -ECHILD;
1200
1201         *usec = s->time.accuracy;
1202         return 0;
1203 }
1204
1205 int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback) {
1206         int r;
1207
1208         if (!s)
1209                 return -EINVAL;
1210         if (event_pid_changed(s->event))
1211                 return -ECHILD;
1212
1213         if (s->prepare == callback)
1214                 return 0;
1215
1216         if (callback && s->prepare) {
1217                 s->prepare = callback;
1218                 return 0;
1219         }
1220
1221         r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1222         if (r < 0)
1223                 return r;
1224
1225         s->prepare = callback;
1226
1227         if (callback) {
1228                 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1229                 if (r < 0)
1230                         return r;
1231         } else
1232                 prioq_remove(s->event->prepare, s, &s->prepare_index);
1233
1234         return 0;
1235 }
1236
1237 void* sd_event_source_get_userdata(sd_event_source *s) {
1238         if (!s)
1239                 return NULL;
1240
1241         return s->userdata;
1242 }
1243
1244 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1245         usec_t c;
1246         assert(e);
1247         assert(a <= b);
1248
1249         if (a <= 0)
1250                 return 0;
1251
1252         if (b <= a + 1)
1253                 return a;
1254
1255         /*
1256           Find a good time to wake up again between times a and b. We
1257           have two goals here:
1258
1259           a) We want to wake up as seldom as possible, hence prefer
1260              later times over earlier times.
1261
1262           b) But if we have to wake up, then let's make sure to
1263              dispatch as much as possible on the entire system.
1264
1265           We implement this by waking up everywhere at the same time
1266           within any given second if we can, synchronised via the
1267           perturbation value determined from the boot ID. If we can't,
1268           then we try to find the same spot in every a 250ms
1269           step. Otherwise, we pick the last possible time to wake up.
1270         */
1271
1272         c = (b / USEC_PER_SEC) * USEC_PER_SEC + e->perturb;
1273         if (c >= b) {
1274                 if (_unlikely_(c < USEC_PER_SEC))
1275                         return b;
1276
1277                 c -= USEC_PER_SEC;
1278         }
1279
1280         if (c >= a)
1281                 return c;
1282
1283         c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1284         if (c >= b) {
1285                 if (_unlikely_(c < USEC_PER_MSEC*250))
1286                         return b;
1287
1288                 c -= USEC_PER_MSEC*250;
1289         }
1290
1291         if (c >= a)
1292                 return c;
1293
1294         return b;
1295 }
1296
1297 static int event_arm_timer(
1298                 sd_event *e,
1299                 int timer_fd,
1300                 Prioq *earliest,
1301                 Prioq *latest,
1302                 usec_t *next) {
1303
1304         struct itimerspec its = {};
1305         sd_event_source *a, *b;
1306         usec_t t;
1307         int r;
1308
1309         assert_se(e);
1310         assert_se(next);
1311
1312         a = prioq_peek(earliest);
1313         if (!a || a->mute == SD_EVENT_MUTED)
1314                 return 0;
1315
1316         b = prioq_peek(latest);
1317         assert_se(b && b->mute != SD_EVENT_MUTED);
1318
1319         t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1320         if (*next == t)
1321                 return 0;
1322
1323         assert_se(timer_fd >= 0);
1324
1325         if (t == 0) {
1326                 /* We don' want to disarm here, just mean some time looooong ago. */
1327                 its.it_value.tv_sec = 0;
1328                 its.it_value.tv_nsec = 1;
1329         } else
1330                 timespec_store(&its.it_value, t);
1331
1332         r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1333         if (r < 0)
1334                 return r;
1335
1336         *next = t;
1337         return 0;
1338 }
1339
1340 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1341         assert(e);
1342         assert(s);
1343         assert(s->type == SOURCE_IO);
1344
1345         s->io.revents = events;
1346
1347         /*
1348            If this is a oneshot event source, then we added it to the
1349            epoll with EPOLLONESHOT, hence we know it's not registered
1350            anymore. We can save a syscall here...
1351         */
1352
1353         if (s->mute == SD_EVENT_ONESHOT)
1354                 s->io.registered = false;
1355
1356         return source_set_pending(s, true);
1357 }
1358
1359 static int flush_timer(sd_event *e, int fd, uint32_t events) {
1360         uint64_t x;
1361         ssize_t ss;
1362
1363         assert(e);
1364
1365         if (events != EPOLLIN)
1366                 return -EIO;
1367
1368         ss = read(fd, &x, sizeof(x));
1369         if (ss < 0) {
1370                 if (errno == EAGAIN || errno == EINTR)
1371                         return 0;
1372
1373                 return -errno;
1374         }
1375
1376         if (ss != sizeof(x))
1377                 return -EIO;
1378
1379         return 0;
1380 }
1381
1382 static int process_timer(sd_event *e, usec_t n, Prioq *earliest, Prioq *latest) {
1383         sd_event_source *s;
1384         int r;
1385
1386         assert(e);
1387
1388         for (;;) {
1389                 s = prioq_peek(earliest);
1390                 if (!s ||
1391                     s->time.next > n ||
1392                     s->mute == SD_EVENT_MUTED ||
1393                     s->pending)
1394                         break;
1395
1396                 r = source_set_pending(s, true);
1397                 if (r < 0)
1398                         return r;
1399
1400                 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1401                 prioq_reshuffle(latest, s, &s->time.latest_index);
1402         }
1403
1404         return 0;
1405 }
1406
1407 static int process_child(sd_event *e) {
1408         sd_event_source *s;
1409         Iterator i;
1410         int r;
1411
1412         assert(e);
1413
1414         e->need_process_child = false;
1415
1416         /*
1417            So, this is ugly. We iteratively invoke waitid() with P_PID
1418            + WNOHANG for each PID we wait for, instead of using
1419            P_ALL. This is because we only want to get child
1420            information of very specific child processes, and not all
1421            of them. We might not have processed the SIGCHLD even of a
1422            previous invocation and we don't want to maintain a
1423            unbounded *per-child* event queue, hence we really don't
1424            want anything flushed out of the kernel's queue that we
1425            don't care about. Since this is O(n) this means that if you
1426            have a lot of processes you probably want to handle SIGCHLD
1427            yourself.
1428         */
1429
1430         HASHMAP_FOREACH(s, e->child_sources, i) {
1431                 assert(s->type == SOURCE_CHILD);
1432
1433                 if (s->pending)
1434                         continue;
1435
1436                 if (s->mute == SD_EVENT_MUTED)
1437                         continue;
1438
1439                 zero(s->child.siginfo);
1440                 r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options);
1441                 if (r < 0)
1442                         return -errno;
1443
1444                 if (s->child.siginfo.si_pid != 0) {
1445                         r = source_set_pending(s, true);
1446                         if (r < 0)
1447                                 return r;
1448                 }
1449         }
1450
1451         return 0;
1452 }
1453
1454 static int process_signal(sd_event *e, uint32_t events) {
1455         struct signalfd_siginfo si;
1456         bool read_one = false;
1457         ssize_t ss;
1458         int r;
1459
1460         if (events != EPOLLIN)
1461                 return -EIO;
1462
1463         for (;;) {
1464                 sd_event_source *s;
1465
1466                 ss = read(e->signal_fd, &si, sizeof(si));
1467                 if (ss < 0) {
1468                         if (errno == EAGAIN || errno == EINTR)
1469                                 return read_one;
1470
1471                         return -errno;
1472                 }
1473
1474                 if (ss != sizeof(si))
1475                         return -EIO;
1476
1477                 read_one = true;
1478
1479                 if (si.ssi_signo == SIGCHLD) {
1480                         r = process_child(e);
1481                         if (r < 0)
1482                                 return r;
1483                         if (r > 0 || !e->signal_sources[si.ssi_signo])
1484                                 continue;
1485                 } else {
1486                         s = e->signal_sources[si.ssi_signo];
1487                         if (!s)
1488                                 return -EIO;
1489                 }
1490
1491                 s->signal.siginfo = si;
1492                 r = source_set_pending(s, true);
1493                 if (r < 0)
1494                         return r;
1495         }
1496
1497
1498         return 0;
1499 }
1500
1501 static int source_dispatch(sd_event_source *s) {
1502         int r;
1503
1504         assert(s);
1505         assert(s->pending);
1506
1507         r = source_set_pending(s, false);
1508         if (r < 0)
1509                 return r;
1510
1511         if (s->mute == SD_EVENT_ONESHOT) {
1512                 r = sd_event_source_set_mute(s, SD_EVENT_MUTED);
1513                 if (r < 0)
1514                         return r;
1515         }
1516
1517         switch (s->type) {
1518
1519         case SOURCE_IO:
1520                 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1521                 break;
1522
1523         case SOURCE_MONOTONIC:
1524                 r = s->time.callback(s, s->time.next, s->userdata);
1525                 break;
1526
1527         case SOURCE_REALTIME:
1528                 r = s->time.callback(s, s->time.next, s->userdata);
1529                 break;
1530
1531         case SOURCE_SIGNAL:
1532                 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1533                 break;
1534
1535         case SOURCE_CHILD:
1536                 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1537                 break;
1538
1539         case SOURCE_DEFER:
1540                 r = s->defer.callback(s, s->userdata);
1541                 break;
1542         }
1543
1544         return r;
1545 }
1546
1547 static int event_prepare(sd_event *e) {
1548         int r;
1549
1550         assert(e);
1551
1552         for (;;) {
1553                 sd_event_source *s;
1554
1555                 s = prioq_peek(e->prepare);
1556                 if (!s || s->prepare_iteration == e->iteration || s->mute == SD_EVENT_MUTED)
1557                         break;
1558
1559                 s->prepare_iteration = e->iteration;
1560                 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1561                 if (r < 0)
1562                         return r;
1563
1564                 assert(s->prepare);
1565                 r = s->prepare(s, s->userdata);
1566                 if (r < 0)
1567                         return r;
1568
1569         }
1570
1571         return 0;
1572 }
1573
1574 static sd_event_source* event_next_pending(sd_event *e) {
1575         sd_event_source *p;
1576
1577         p = prioq_peek(e->pending);
1578         if (!p)
1579                 return NULL;
1580
1581         if (p->mute == SD_EVENT_MUTED)
1582                 return NULL;
1583
1584         return p;
1585 }
1586
1587 int sd_event_run(sd_event *e, uint64_t timeout) {
1588         struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1589         sd_event_source *p;
1590         int r, i, m;
1591         dual_timestamp n;
1592
1593         if (!e)
1594                 return -EINVAL;
1595         if (e->quit)
1596                 return -ESTALE;
1597         if (event_pid_changed(e))
1598                 return -ECHILD;
1599
1600         e->iteration++;
1601
1602         r = event_prepare(e);
1603         if (r < 0)
1604                 return r;
1605
1606         if (event_next_pending(e) || e->need_process_child)
1607                 timeout = 0;
1608
1609         if (timeout > 0) {
1610                 r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1611                 if (r < 0)
1612                         return r;
1613
1614                 r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1615                 if (r < 0)
1616                         return r;
1617         }
1618
1619         m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1620                        timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1621         if (m < 0)
1622                 return m;
1623
1624         dual_timestamp_get(&n);
1625
1626         for (i = 0; i < m; i++) {
1627
1628                 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1629                         r = flush_timer(e, e->monotonic_fd, ev_queue[i].events);
1630                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1631                         r = flush_timer(e, e->realtime_fd, ev_queue[i].events);
1632                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1633                         r = process_signal(e, ev_queue[i].events);
1634                 else
1635                         r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1636
1637                 if (r < 0)
1638                         return r;
1639         }
1640
1641         r = process_timer(e, n.monotonic, e->monotonic_earliest, e->monotonic_latest);
1642         if (r < 0)
1643                 return r;
1644
1645         r = process_timer(e, n.realtime, e->realtime_earliest, e->realtime_latest);
1646         if (r < 0)
1647                 return r;
1648
1649         if (e->need_process_child) {
1650                 r = process_child(e);
1651                 if (r < 0)
1652                         return r;
1653         }
1654
1655         p = event_next_pending(e);
1656         if (!p)
1657                 return 0;
1658
1659         return source_dispatch(p);
1660 }
1661
1662 int sd_event_loop(sd_event *e) {
1663         int r;
1664
1665         if (!e)
1666                 return -EINVAL;
1667         if (event_pid_changed(e))
1668                 return -ECHILD;
1669
1670         while (!e->quit) {
1671                 r = sd_event_run(e, (uint64_t) -1);
1672                 if (r < 0)
1673                         return r;
1674         }
1675
1676         return 0;
1677 }
1678
1679 int sd_event_quit(sd_event *e) {
1680         if (!e)
1681                 return EINVAL;
1682         if (event_pid_changed(e))
1683                 return -ECHILD;
1684
1685         return e->quit;
1686 }
1687
1688 int sd_event_request_quit(sd_event *e) {
1689         if (!e)
1690                 return -EINVAL;
1691         if (event_pid_changed(e))
1692                 return -ECHILD;
1693
1694         e->quit = true;
1695         return 0;
1696 }