chiark / gitweb /
6a6581bec616878719614c65136e36d6f0fe197b
[elogind.git] / src / libsystemd-bus / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25
26 #include "sd-id128.h"
27 #include "macro.h"
28 #include "prioq.h"
29 #include "hashmap.h"
30 #include "util.h"
31 #include "time-util.h"
32 #include "missing.h"
33
34 #include "sd-event.h"
35
36 #define EPOLL_QUEUE_MAX 64
37 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
38
39 typedef enum EventSourceType {
40         SOURCE_IO,
41         SOURCE_MONOTONIC,
42         SOURCE_REALTIME,
43         SOURCE_SIGNAL,
44         SOURCE_CHILD,
45         SOURCE_DEFER,
46         SOURCE_QUIT
47 } EventSourceType;
48
49 struct sd_event_source {
50         unsigned n_ref;
51
52         sd_event *event;
53         void *userdata;
54         sd_event_handler_t prepare;
55
56         EventSourceType type:4;
57         int enabled:3;
58         bool pending:1;
59
60         int priority;
61         unsigned pending_index;
62         unsigned prepare_index;
63         unsigned pending_iteration;
64         unsigned prepare_iteration;
65
66         union {
67                 struct {
68                         sd_event_io_handler_t callback;
69                         int fd;
70                         uint32_t events;
71                         uint32_t revents;
72                         bool registered:1;
73                 } io;
74                 struct {
75                         sd_event_time_handler_t callback;
76                         usec_t next, accuracy;
77                         unsigned earliest_index;
78                         unsigned latest_index;
79                 } time;
80                 struct {
81                         sd_event_signal_handler_t callback;
82                         struct signalfd_siginfo siginfo;
83                         int sig;
84                 } signal;
85                 struct {
86                         sd_event_child_handler_t callback;
87                         siginfo_t siginfo;
88                         pid_t pid;
89                         int options;
90                 } child;
91                 struct {
92                         sd_event_handler_t callback;
93                 } defer;
94                 struct {
95                         sd_event_handler_t callback;
96                         unsigned prioq_index;
97                 } quit;
98         };
99 };
100
101 struct sd_event {
102         unsigned n_ref;
103
104         int epoll_fd;
105         int signal_fd;
106         int realtime_fd;
107         int monotonic_fd;
108
109         Prioq *pending;
110         Prioq *prepare;
111
112         /* For both clocks we maintain two priority queues each, one
113          * ordered for the earliest times the events may be
114          * dispatched, and one ordered by the latest times they must
115          * have been dispatched. The range between the top entries in
116          * the two prioqs is the time window we can freely schedule
117          * wakeups in */
118         Prioq *monotonic_earliest;
119         Prioq *monotonic_latest;
120         Prioq *realtime_earliest;
121         Prioq *realtime_latest;
122
123         usec_t realtime_next, monotonic_next;
124         usec_t perturb;
125
126         sigset_t sigset;
127         sd_event_source **signal_sources;
128
129         Hashmap *child_sources;
130         unsigned n_enabled_child_sources;
131
132         Prioq *quit;
133
134         pid_t original_pid;
135
136         unsigned iteration;
137         dual_timestamp timestamp;
138         int state;
139
140         bool quit_requested:1;
141         bool need_process_child:1;
142
143         pid_t tid;
144         sd_event **default_event_ptr;
145 };
146
147 static int pending_prioq_compare(const void *a, const void *b) {
148         const sd_event_source *x = a, *y = b;
149
150         assert(x->pending);
151         assert(y->pending);
152
153         /* Enabled ones first */
154         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
155                 return -1;
156         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
157                 return 1;
158
159         /* Lower priority values first */
160         if (x->priority < y->priority)
161                 return -1;
162         if (x->priority > y->priority)
163                 return 1;
164
165         /* Older entries first */
166         if (x->pending_iteration < y->pending_iteration)
167                 return -1;
168         if (x->pending_iteration > y->pending_iteration)
169                 return 1;
170
171         /* Stability for the rest */
172         if (x < y)
173                 return -1;
174         if (x > y)
175                 return 1;
176
177         return 0;
178 }
179
180 static int prepare_prioq_compare(const void *a, const void *b) {
181         const sd_event_source *x = a, *y = b;
182
183         assert(x->prepare);
184         assert(y->prepare);
185
186         /* Move most recently prepared ones last, so that we can stop
187          * preparing as soon as we hit one that has already been
188          * prepared in the current iteration */
189         if (x->prepare_iteration < y->prepare_iteration)
190                 return -1;
191         if (x->prepare_iteration > y->prepare_iteration)
192                 return 1;
193
194         /* Enabled ones first */
195         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
196                 return -1;
197         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
198                 return 1;
199
200         /* Lower priority values first */
201         if (x->priority < y->priority)
202                 return -1;
203         if (x->priority > y->priority)
204                 return 1;
205
206         /* Stability for the rest */
207         if (x < y)
208                 return -1;
209         if (x > y)
210                 return 1;
211
212         return 0;
213 }
214
215 static int earliest_time_prioq_compare(const void *a, const void *b) {
216         const sd_event_source *x = a, *y = b;
217
218         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
219         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
220
221         /* Enabled ones first */
222         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
223                 return -1;
224         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
225                 return 1;
226
227         /* Move the pending ones to the end */
228         if (!x->pending && y->pending)
229                 return -1;
230         if (x->pending && !y->pending)
231                 return 1;
232
233         /* Order by time */
234         if (x->time.next < y->time.next)
235                 return -1;
236         if (x->time.next > y->time.next)
237                 return 1;
238
239         /* Stability for the rest */
240         if (x < y)
241                 return -1;
242         if (x > y)
243                 return 1;
244
245         return 0;
246 }
247
248 static int latest_time_prioq_compare(const void *a, const void *b) {
249         const sd_event_source *x = a, *y = b;
250
251         assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
252                (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
253
254         /* Enabled ones first */
255         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
256                 return -1;
257         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
258                 return 1;
259
260         /* Move the pending ones to the end */
261         if (!x->pending && y->pending)
262                 return -1;
263         if (x->pending && !y->pending)
264                 return 1;
265
266         /* Order by time */
267         if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
268                 return -1;
269         if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
270                 return 1;
271
272         /* Stability for the rest */
273         if (x < y)
274                 return -1;
275         if (x > y)
276                 return 1;
277
278         return 0;
279 }
280
281 static int quit_prioq_compare(const void *a, const void *b) {
282         const sd_event_source *x = a, *y = b;
283
284         assert(x->type == SOURCE_QUIT);
285         assert(y->type == SOURCE_QUIT);
286
287         /* Enabled ones first */
288         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
289                 return -1;
290         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
291                 return 1;
292
293         /* Lower priority values first */
294         if (x->priority < y->priority)
295                 return -1;
296         if (x->priority > y->priority)
297                 return 1;
298
299         /* Stability for the rest */
300         if (x < y)
301                 return -1;
302         if (x > y)
303                 return 1;
304
305         return 0;
306 }
307
308 static void event_free(sd_event *e) {
309         assert(e);
310
311         if (e->default_event_ptr)
312                 *(e->default_event_ptr) = NULL;
313
314         if (e->epoll_fd >= 0)
315                 close_nointr_nofail(e->epoll_fd);
316
317         if (e->signal_fd >= 0)
318                 close_nointr_nofail(e->signal_fd);
319
320         if (e->realtime_fd >= 0)
321                 close_nointr_nofail(e->realtime_fd);
322
323         if (e->monotonic_fd >= 0)
324                 close_nointr_nofail(e->monotonic_fd);
325
326         prioq_free(e->pending);
327         prioq_free(e->prepare);
328         prioq_free(e->monotonic_earliest);
329         prioq_free(e->monotonic_latest);
330         prioq_free(e->realtime_earliest);
331         prioq_free(e->realtime_latest);
332         prioq_free(e->quit);
333
334         free(e->signal_sources);
335
336         hashmap_free(e->child_sources);
337         free(e);
338 }
339
340 _public_ int sd_event_new(sd_event** ret) {
341         sd_event *e;
342         int r;
343
344         assert_return(ret, -EINVAL);
345
346         e = new0(sd_event, 1);
347         if (!e)
348                 return -ENOMEM;
349
350         e->n_ref = 1;
351         e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
352         e->realtime_next = e->monotonic_next = (usec_t) -1;
353         e->original_pid = getpid();
354
355         assert_se(sigemptyset(&e->sigset) == 0);
356
357         e->pending = prioq_new(pending_prioq_compare);
358         if (!e->pending) {
359                 r = -ENOMEM;
360                 goto fail;
361         }
362
363         e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
364         if (e->epoll_fd < 0) {
365                 r = -errno;
366                 goto fail;
367         }
368
369         *ret = e;
370         return 0;
371
372 fail:
373         event_free(e);
374         return r;
375 }
376
377 _public_ sd_event* sd_event_ref(sd_event *e) {
378         assert_return(e, NULL);
379
380         assert(e->n_ref >= 1);
381         e->n_ref++;
382
383         return e;
384 }
385
386 _public_ sd_event* sd_event_unref(sd_event *e) {
387         assert_return(e, NULL);
388
389         assert(e->n_ref >= 1);
390         e->n_ref--;
391
392         if (e->n_ref <= 0)
393                 event_free(e);
394
395         return NULL;
396 }
397
398 static bool event_pid_changed(sd_event *e) {
399         assert(e);
400
401         /* We don't support people creating am event loop and keeping
402          * it around over a fork(). Let's complain. */
403
404         return e->original_pid != getpid();
405 }
406
407 static int source_io_unregister(sd_event_source *s) {
408         int r;
409
410         assert(s);
411         assert(s->type == SOURCE_IO);
412
413         if (!s->io.registered)
414                 return 0;
415
416         r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
417         if (r < 0)
418                 return -errno;
419
420         s->io.registered = false;
421         return 0;
422 }
423
424 static int source_io_register(
425                 sd_event_source *s,
426                 int enabled,
427                 uint32_t events) {
428
429         struct epoll_event ev = {};
430         int r;
431
432         assert(s);
433         assert(s->type == SOURCE_IO);
434         assert(enabled != SD_EVENT_OFF);
435
436         ev.events = events;
437         ev.data.ptr = s;
438
439         if (enabled == SD_EVENT_ONESHOT)
440                 ev.events |= EPOLLONESHOT;
441
442         if (s->io.registered)
443                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
444         else
445                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
446
447         if (r < 0)
448                 return -errno;
449
450         s->io.registered = true;
451
452         return 0;
453 }
454
455 static void source_free(sd_event_source *s) {
456         assert(s);
457
458         if (s->event) {
459                 switch (s->type) {
460
461                 case SOURCE_IO:
462                         if (s->io.fd >= 0)
463                                 source_io_unregister(s);
464
465                         break;
466
467                 case SOURCE_MONOTONIC:
468                         prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
469                         prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
470                         break;
471
472                 case SOURCE_REALTIME:
473                         prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
474                         prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
475                         break;
476
477                 case SOURCE_SIGNAL:
478                         if (s->signal.sig > 0) {
479                                 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)
480                                         assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
481
482                                 if (s->event->signal_sources)
483                                         s->event->signal_sources[s->signal.sig] = NULL;
484                         }
485
486                         break;
487
488                 case SOURCE_CHILD:
489                         if (s->child.pid > 0) {
490                                 if (s->enabled != SD_EVENT_OFF) {
491                                         assert(s->event->n_enabled_child_sources > 0);
492                                         s->event->n_enabled_child_sources--;
493                                 }
494
495                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
496                                         assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
497
498                                 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
499                         }
500
501                         break;
502
503                 case SOURCE_DEFER:
504                         /* nothing */
505                         break;
506
507                 case SOURCE_QUIT:
508                         prioq_remove(s->event->quit, s, &s->quit.prioq_index);
509                         break;
510                 }
511
512                 if (s->pending)
513                         prioq_remove(s->event->pending, s, &s->pending_index);
514
515                 if (s->prepare)
516                         prioq_remove(s->event->prepare, s, &s->prepare_index);
517
518                 sd_event_unref(s->event);
519         }
520
521         free(s);
522 }
523
524 static int source_set_pending(sd_event_source *s, bool b) {
525         int r;
526
527         assert(s);
528         assert(s->type != SOURCE_QUIT);
529
530         if (s->pending == b)
531                 return 0;
532
533         s->pending = b;
534
535         if (b) {
536                 s->pending_iteration = s->event->iteration;
537
538                 r = prioq_put(s->event->pending, s, &s->pending_index);
539                 if (r < 0) {
540                         s->pending = false;
541                         return r;
542                 }
543         } else
544                 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
545
546         if (s->type == SOURCE_REALTIME) {
547                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
548                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
549         } else if (s->type == SOURCE_MONOTONIC) {
550                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
551                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
552         }
553
554         return 0;
555 }
556
557 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
558         sd_event_source *s;
559
560         assert(e);
561
562         s = new0(sd_event_source, 1);
563         if (!s)
564                 return NULL;
565
566         s->n_ref = 1;
567         s->event = sd_event_ref(e);
568         s->type = type;
569         s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
570
571         return s;
572 }
573
574 _public_ int sd_event_add_io(
575                 sd_event *e,
576                 int fd,
577                 uint32_t events,
578                 sd_event_io_handler_t callback,
579                 void *userdata,
580                 sd_event_source **ret) {
581
582         sd_event_source *s;
583         int r;
584
585         assert_return(e, -EINVAL);
586         assert_return(fd >= 0, -EINVAL);
587         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
588         assert_return(callback, -EINVAL);
589         assert_return(ret, -EINVAL);
590         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
591         assert_return(!event_pid_changed(e), -ECHILD);
592
593         s = source_new(e, SOURCE_IO);
594         if (!s)
595                 return -ENOMEM;
596
597         s->io.fd = fd;
598         s->io.events = events;
599         s->io.callback = callback;
600         s->userdata = userdata;
601         s->enabled = SD_EVENT_ON;
602
603         r = source_io_register(s, s->enabled, events);
604         if (r < 0) {
605                 source_free(s);
606                 return -errno;
607         }
608
609         *ret = s;
610         return 0;
611 }
612
613 static int event_setup_timer_fd(
614                 sd_event *e,
615                 EventSourceType type,
616                 int *timer_fd,
617                 clockid_t id) {
618
619         struct epoll_event ev = {};
620         int r, fd;
621         sd_id128_t bootid;
622
623         assert(e);
624         assert(timer_fd);
625
626         if (_likely_(*timer_fd >= 0))
627                 return 0;
628
629         fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
630         if (fd < 0)
631                 return -errno;
632
633         ev.events = EPOLLIN;
634         ev.data.ptr = INT_TO_PTR(type);
635
636         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
637         if (r < 0) {
638                 close_nointr_nofail(fd);
639                 return -errno;
640         }
641
642         /* When we sleep for longer, we try to realign the wakeup to
643            the same time wihtin each minute/second/250ms, so that
644            events all across the system can be coalesced into a single
645            CPU wakeup. However, let's take some system-specific
646            randomness for this value, so that in a network of systems
647            with synced clocks timer events are distributed a
648            bit. Here, we calculate a perturbation usec offset from the
649            boot ID. */
650
651         if (sd_id128_get_boot(&bootid) >= 0)
652                 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_MINUTE;
653
654         *timer_fd = fd;
655         return 0;
656 }
657
658 static int event_add_time_internal(
659                 sd_event *e,
660                 EventSourceType type,
661                 int *timer_fd,
662                 clockid_t id,
663                 Prioq **earliest,
664                 Prioq **latest,
665                 uint64_t usec,
666                 uint64_t accuracy,
667                 sd_event_time_handler_t callback,
668                 void *userdata,
669                 sd_event_source **ret) {
670
671         sd_event_source *s;
672         int r;
673
674         assert_return(e, -EINVAL);
675         assert_return(callback, -EINVAL);
676         assert_return(ret, -EINVAL);
677         assert_return(usec != (uint64_t) -1, -EINVAL);
678         assert_return(accuracy != (uint64_t) -1, -EINVAL);
679         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
680         assert_return(!event_pid_changed(e), -ECHILD);
681
682         assert(timer_fd);
683         assert(earliest);
684         assert(latest);
685
686         if (!*earliest) {
687                 *earliest = prioq_new(earliest_time_prioq_compare);
688                 if (!*earliest)
689                         return -ENOMEM;
690         }
691
692         if (!*latest) {
693                 *latest = prioq_new(latest_time_prioq_compare);
694                 if (!*latest)
695                         return -ENOMEM;
696         }
697
698         if (*timer_fd < 0) {
699                 r = event_setup_timer_fd(e, type, timer_fd, id);
700                 if (r < 0)
701                         return r;
702         }
703
704         s = source_new(e, type);
705         if (!s)
706                 return -ENOMEM;
707
708         s->time.next = usec;
709         s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
710         s->time.callback = callback;
711         s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
712         s->userdata = userdata;
713         s->enabled = SD_EVENT_ONESHOT;
714
715         r = prioq_put(*earliest, s, &s->time.earliest_index);
716         if (r < 0)
717                 goto fail;
718
719         r = prioq_put(*latest, s, &s->time.latest_index);
720         if (r < 0)
721                 goto fail;
722
723         *ret = s;
724         return 0;
725
726 fail:
727         source_free(s);
728         return r;
729 }
730
731 _public_ int sd_event_add_monotonic(sd_event *e,
732                                     uint64_t usec,
733                                     uint64_t accuracy,
734                                     sd_event_time_handler_t callback,
735                                     void *userdata,
736                                     sd_event_source **ret) {
737
738         return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
739 }
740
741 _public_ int sd_event_add_realtime(sd_event *e,
742                                    uint64_t usec,
743                                    uint64_t accuracy,
744                                    sd_event_time_handler_t callback,
745                                    void *userdata,
746                                    sd_event_source **ret) {
747
748         return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
749 }
750
751 static int event_update_signal_fd(sd_event *e) {
752         struct epoll_event ev = {};
753         bool add_to_epoll;
754         int r;
755
756         assert(e);
757
758         add_to_epoll = e->signal_fd < 0;
759
760         r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
761         if (r < 0)
762                 return -errno;
763
764         e->signal_fd = r;
765
766         if (!add_to_epoll)
767                 return 0;
768
769         ev.events = EPOLLIN;
770         ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
771
772         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
773         if (r < 0) {
774                 close_nointr_nofail(e->signal_fd);
775                 e->signal_fd = -1;
776
777                 return -errno;
778         }
779
780         return 0;
781 }
782
783 _public_ int sd_event_add_signal(
784                 sd_event *e,
785                 int sig,
786                 sd_event_signal_handler_t callback,
787                 void *userdata,
788                 sd_event_source **ret) {
789
790         sd_event_source *s;
791         int r;
792
793         assert_return(e, -EINVAL);
794         assert_return(sig > 0, -EINVAL);
795         assert_return(sig < _NSIG, -EINVAL);
796         assert_return(callback, -EINVAL);
797         assert_return(ret, -EINVAL);
798         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
799         assert_return(!event_pid_changed(e), -ECHILD);
800
801         if (!e->signal_sources) {
802                 e->signal_sources = new0(sd_event_source*, _NSIG);
803                 if (!e->signal_sources)
804                         return -ENOMEM;
805         } else if (e->signal_sources[sig])
806                 return -EBUSY;
807
808         s = source_new(e, SOURCE_SIGNAL);
809         if (!s)
810                 return -ENOMEM;
811
812         s->signal.sig = sig;
813         s->signal.callback = callback;
814         s->userdata = userdata;
815         s->enabled = SD_EVENT_ON;
816
817         e->signal_sources[sig] = s;
818         assert_se(sigaddset(&e->sigset, sig) == 0);
819
820         if (sig != SIGCHLD || e->n_enabled_child_sources == 0) {
821                 r = event_update_signal_fd(e);
822                 if (r < 0) {
823                         source_free(s);
824                         return r;
825                 }
826         }
827
828         *ret = s;
829         return 0;
830 }
831
832 _public_ int sd_event_add_child(
833                 sd_event *e,
834                 pid_t pid,
835                 int options,
836                 sd_event_child_handler_t callback,
837                 void *userdata,
838                 sd_event_source **ret) {
839
840         sd_event_source *s;
841         int r;
842
843         assert_return(e, -EINVAL);
844         assert_return(pid > 1, -EINVAL);
845         assert_return(!(options & ~(WEXITED|WSTOPPED|WCONTINUED)), -EINVAL);
846         assert_return(options != 0, -EINVAL);
847         assert_return(callback, -EINVAL);
848         assert_return(ret, -EINVAL);
849         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
850         assert_return(!event_pid_changed(e), -ECHILD);
851
852         r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
853         if (r < 0)
854                 return r;
855
856         if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
857                 return -EBUSY;
858
859         s = source_new(e, SOURCE_CHILD);
860         if (!s)
861                 return -ENOMEM;
862
863         s->child.pid = pid;
864         s->child.options = options;
865         s->child.callback = callback;
866         s->userdata = userdata;
867         s->enabled = SD_EVENT_ONESHOT;
868
869         r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
870         if (r < 0) {
871                 source_free(s);
872                 return r;
873         }
874
875         e->n_enabled_child_sources ++;
876
877         assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
878
879         if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
880                 r = event_update_signal_fd(e);
881                 if (r < 0) {
882                         source_free(s);
883                         return -errno;
884                 }
885         }
886
887         e->need_process_child = true;
888
889         *ret = s;
890         return 0;
891 }
892
893 _public_ int sd_event_add_defer(
894                 sd_event *e,
895                 sd_event_handler_t callback,
896                 void *userdata,
897                 sd_event_source **ret) {
898
899         sd_event_source *s;
900         int r;
901
902         assert_return(e, -EINVAL);
903         assert_return(callback, -EINVAL);
904         assert_return(ret, -EINVAL);
905         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
906         assert_return(!event_pid_changed(e), -ECHILD);
907
908         s = source_new(e, SOURCE_DEFER);
909         if (!s)
910                 return -ENOMEM;
911
912         s->defer.callback = callback;
913         s->userdata = userdata;
914         s->enabled = SD_EVENT_ONESHOT;
915
916         r = source_set_pending(s, true);
917         if (r < 0) {
918                 source_free(s);
919                 return r;
920         }
921
922         *ret = s;
923         return 0;
924 }
925
926 _public_ int sd_event_add_quit(
927                 sd_event *e,
928                 sd_event_handler_t callback,
929                 void *userdata,
930                 sd_event_source **ret) {
931
932         sd_event_source *s;
933         int r;
934
935         assert_return(e, -EINVAL);
936         assert_return(callback, -EINVAL);
937         assert_return(ret, -EINVAL);
938         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
939         assert_return(!event_pid_changed(e), -ECHILD);
940
941         if (!e->quit) {
942                 e->quit = prioq_new(quit_prioq_compare);
943                 if (!e->quit)
944                         return -ENOMEM;
945         }
946
947         s = source_new(e, SOURCE_QUIT);
948         if (!s)
949                 return -ENOMEM;
950
951         s->quit.callback = callback;
952         s->userdata = userdata;
953         s->quit.prioq_index = PRIOQ_IDX_NULL;
954         s->enabled = SD_EVENT_ONESHOT;
955
956         r = prioq_put(s->event->quit, s, &s->quit.prioq_index);
957         if (r < 0) {
958                 source_free(s);
959                 return r;
960         }
961
962         *ret = s;
963         return 0;
964 }
965
966 _public_ sd_event_source* sd_event_source_ref(sd_event_source *s) {
967         assert_return(s, NULL);
968
969         assert(s->n_ref >= 1);
970         s->n_ref++;
971
972         return s;
973 }
974
975 _public_ sd_event_source* sd_event_source_unref(sd_event_source *s) {
976         assert_return(s, NULL);
977
978         assert(s->n_ref >= 1);
979         s->n_ref--;
980
981         if (s->n_ref <= 0)
982                 source_free(s);
983
984         return NULL;
985 }
986
987 _public_ sd_event *sd_event_source_get_event(sd_event_source *s) {
988         assert_return(s, NULL);
989
990         return s->event;
991 }
992
993 _public_ int sd_event_source_get_pending(sd_event_source *s) {
994         assert_return(s, -EINVAL);
995         assert_return(s->type != SOURCE_QUIT, -EDOM);
996         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
997         assert_return(!event_pid_changed(s->event), -ECHILD);
998
999         return s->pending;
1000 }
1001
1002 _public_ int sd_event_source_get_io_fd(sd_event_source *s) {
1003         assert_return(s, -EINVAL);
1004         assert_return(s->type == SOURCE_IO, -EDOM);
1005         assert_return(!event_pid_changed(s->event), -ECHILD);
1006
1007         return s->io.fd;
1008 }
1009
1010 _public_ int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
1011         assert_return(s, -EINVAL);
1012         assert_return(events, -EINVAL);
1013         assert_return(s->type == SOURCE_IO, -EDOM);
1014         assert_return(!event_pid_changed(s->event), -ECHILD);
1015
1016         *events = s->io.events;
1017         return 0;
1018 }
1019
1020 _public_ int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
1021         int r;
1022
1023         assert_return(s, -EINVAL);
1024         assert_return(s->type == SOURCE_IO, -EDOM);
1025         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
1026         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1027         assert_return(!event_pid_changed(s->event), -ECHILD);
1028
1029         if (s->io.events == events)
1030                 return 0;
1031
1032         if (s->enabled != SD_EVENT_OFF) {
1033                 r = source_io_register(s, s->enabled, events);
1034                 if (r < 0)
1035                         return r;
1036         }
1037
1038         s->io.events = events;
1039         source_set_pending(s, false);
1040
1041         return 0;
1042 }
1043
1044 _public_ int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1045         assert_return(s, -EINVAL);
1046         assert_return(revents, -EINVAL);
1047         assert_return(s->type == SOURCE_IO, -EDOM);
1048         assert_return(s->pending, -ENODATA);
1049         assert_return(!event_pid_changed(s->event), -ECHILD);
1050
1051         *revents = s->io.revents;
1052         return 0;
1053 }
1054
1055 _public_ int sd_event_source_get_signal(sd_event_source *s) {
1056         assert_return(s, -EINVAL);
1057         assert_return(s->type == SOURCE_SIGNAL, -EDOM);
1058         assert_return(!event_pid_changed(s->event), -ECHILD);
1059
1060         return s->signal.sig;
1061 }
1062
1063 _public_ int sd_event_source_get_priority(sd_event_source *s, int *priority) {
1064         assert_return(s, -EINVAL);
1065         assert_return(!event_pid_changed(s->event), -ECHILD);
1066
1067         return s->priority;
1068 }
1069
1070 _public_ int sd_event_source_set_priority(sd_event_source *s, int priority) {
1071         assert_return(s, -EINVAL);
1072         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1073         assert_return(!event_pid_changed(s->event), -ECHILD);
1074
1075         if (s->priority == priority)
1076                 return 0;
1077
1078         s->priority = priority;
1079
1080         if (s->pending)
1081                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1082
1083         if (s->prepare)
1084                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1085
1086         if (s->type == SOURCE_QUIT)
1087                 prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1088
1089         return 0;
1090 }
1091
1092 _public_ int sd_event_source_get_enabled(sd_event_source *s, int *m) {
1093         assert_return(s, -EINVAL);
1094         assert_return(m, -EINVAL);
1095         assert_return(!event_pid_changed(s->event), -ECHILD);
1096
1097         *m = s->enabled;
1098         return 0;
1099 }
1100
1101 _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
1102         int r;
1103
1104         assert_return(s, -EINVAL);
1105         assert_return(m == SD_EVENT_OFF || m == SD_EVENT_ON || m == SD_EVENT_ONESHOT, -EINVAL);
1106         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1107         assert_return(!event_pid_changed(s->event), -ECHILD);
1108
1109         if (s->enabled == m)
1110                 return 0;
1111
1112         if (m == SD_EVENT_OFF) {
1113
1114                 switch (s->type) {
1115
1116                 case SOURCE_IO:
1117                         r = source_io_unregister(s);
1118                         if (r < 0)
1119                                 return r;
1120
1121                         s->enabled = m;
1122                         break;
1123
1124                 case SOURCE_MONOTONIC:
1125                         s->enabled = m;
1126                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1127                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1128                         break;
1129
1130                 case SOURCE_REALTIME:
1131                         s->enabled = m;
1132                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1133                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1134                         break;
1135
1136                 case SOURCE_SIGNAL:
1137                         s->enabled = m;
1138                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1139                                 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1140                                 event_update_signal_fd(s->event);
1141                         }
1142
1143                         break;
1144
1145                 case SOURCE_CHILD:
1146                         s->enabled = m;
1147
1148                         assert(s->event->n_enabled_child_sources > 0);
1149                         s->event->n_enabled_child_sources--;
1150
1151                         if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1152                                 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1153                                 event_update_signal_fd(s->event);
1154                         }
1155
1156                         break;
1157
1158                 case SOURCE_QUIT:
1159                         s->enabled = m;
1160                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1161                         break;
1162
1163                 case SOURCE_DEFER:
1164                         s->enabled = m;
1165                         break;
1166                 }
1167
1168         } else {
1169                 switch (s->type) {
1170
1171                 case SOURCE_IO:
1172                         r = source_io_register(s, m, s->io.events);
1173                         if (r < 0)
1174                                 return r;
1175
1176                         s->enabled = m;
1177                         break;
1178
1179                 case SOURCE_MONOTONIC:
1180                         s->enabled = m;
1181                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1182                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1183                         break;
1184
1185                 case SOURCE_REALTIME:
1186                         s->enabled = m;
1187                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1188                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1189                         break;
1190
1191                 case SOURCE_SIGNAL:
1192                         s->enabled = m;
1193
1194                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)  {
1195                                 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1196                                 event_update_signal_fd(s->event);
1197                         }
1198                         break;
1199
1200                 case SOURCE_CHILD:
1201                         s->enabled = m;
1202
1203                         if (s->enabled == SD_EVENT_OFF) {
1204                                 s->event->n_enabled_child_sources++;
1205
1206                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1207                                         assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1208                                         event_update_signal_fd(s->event);
1209                                 }
1210                         }
1211                         break;
1212
1213                 case SOURCE_QUIT:
1214                         s->enabled = m;
1215                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1216                         break;
1217
1218                 case SOURCE_DEFER:
1219                         s->enabled = m;
1220                         break;
1221                 }
1222         }
1223
1224         if (s->pending)
1225                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1226
1227         if (s->prepare)
1228                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1229
1230         return 0;
1231 }
1232
1233 _public_ int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1234         assert_return(s, -EINVAL);
1235         assert_return(usec, -EINVAL);
1236         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1237         assert_return(!event_pid_changed(s->event), -ECHILD);
1238
1239         *usec = s->time.next;
1240         return 0;
1241 }
1242
1243 _public_ int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1244         assert_return(s, -EINVAL);
1245         assert_return(usec != (uint64_t) -1, -EINVAL);
1246         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1247         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1248         assert_return(!event_pid_changed(s->event), -ECHILD);
1249
1250         s->time.next = usec;
1251
1252         source_set_pending(s, false);
1253
1254         if (s->type == SOURCE_REALTIME) {
1255                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1256                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1257         } else {
1258                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1259                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1260         }
1261
1262         return 0;
1263 }
1264
1265 _public_ int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1266         assert_return(s, -EINVAL);
1267         assert_return(usec, -EINVAL);
1268         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1269         assert_return(!event_pid_changed(s->event), -ECHILD);
1270
1271         *usec = s->time.accuracy;
1272         return 0;
1273 }
1274
1275 _public_ int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1276         assert_return(s, -EINVAL);
1277         assert_return(usec != (uint64_t) -1, -EINVAL);
1278         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1279         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1280         assert_return(!event_pid_changed(s->event), -ECHILD);
1281
1282         if (usec == 0)
1283                 usec = DEFAULT_ACCURACY_USEC;
1284
1285         s->time.accuracy = usec;
1286
1287         source_set_pending(s, false);
1288
1289         if (s->type == SOURCE_REALTIME)
1290                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1291         else
1292                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1293
1294         return 0;
1295 }
1296
1297 _public_ int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid) {
1298         assert_return(s, -EINVAL);
1299         assert_return(pid, -EINVAL);
1300         assert_return(s->type == SOURCE_CHILD, -EDOM);
1301         assert_return(!event_pid_changed(s->event), -ECHILD);
1302
1303         *pid = s->child.pid;
1304         return 0;
1305 }
1306
1307 _public_ int sd_event_source_set_prepare(sd_event_source *s, sd_event_handler_t callback) {
1308         int r;
1309
1310         assert_return(s, -EINVAL);
1311         assert_return(s->type != SOURCE_QUIT, -EDOM);
1312         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1313         assert_return(!event_pid_changed(s->event), -ECHILD);
1314
1315         if (s->prepare == callback)
1316                 return 0;
1317
1318         if (callback && s->prepare) {
1319                 s->prepare = callback;
1320                 return 0;
1321         }
1322
1323         r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1324         if (r < 0)
1325                 return r;
1326
1327         s->prepare = callback;
1328
1329         if (callback) {
1330                 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1331                 if (r < 0)
1332                         return r;
1333         } else
1334                 prioq_remove(s->event->prepare, s, &s->prepare_index);
1335
1336         return 0;
1337 }
1338
1339 _public_ void* sd_event_source_get_userdata(sd_event_source *s) {
1340         assert_return(s, NULL);
1341
1342         return s->userdata;
1343 }
1344
1345 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1346         usec_t c;
1347         assert(e);
1348         assert(a <= b);
1349
1350         if (a <= 0)
1351                 return 0;
1352
1353         if (b <= a + 1)
1354                 return a;
1355
1356         /*
1357           Find a good time to wake up again between times a and b. We
1358           have two goals here:
1359
1360           a) We want to wake up as seldom as possible, hence prefer
1361              later times over earlier times.
1362
1363           b) But if we have to wake up, then let's make sure to
1364              dispatch as much as possible on the entire system.
1365
1366           We implement this by waking up everywhere at the same time
1367           within any given minute if we can, synchronised via the
1368           perturbation value determined from the boot ID. If we can't,
1369           then we try to find the same spot in every 1s and then 250ms
1370           step. Otherwise, we pick the last possible time to wake up.
1371         */
1372
1373         c = (b / USEC_PER_MINUTE) * USEC_PER_MINUTE + e->perturb;
1374         if (c >= b) {
1375                 if (_unlikely_(c < USEC_PER_MINUTE))
1376                         return b;
1377
1378                 c -= USEC_PER_MINUTE;
1379         }
1380
1381         if (c >= a)
1382                 return c;
1383
1384         c = (b / USEC_PER_SEC) * USEC_PER_SEC + (e->perturb % USEC_PER_SEC);
1385         if (c >= b) {
1386                 if (_unlikely_(c < USEC_PER_SEC))
1387                         return b;
1388
1389                 c -= USEC_PER_SEC;
1390         }
1391
1392         if (c >= a)
1393                 return c;
1394
1395         c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1396         if (c >= b) {
1397                 if (_unlikely_(c < USEC_PER_MSEC*250))
1398                         return b;
1399
1400                 c -= USEC_PER_MSEC*250;
1401         }
1402
1403         if (c >= a)
1404                 return c;
1405
1406         return b;
1407 }
1408
1409 static int event_arm_timer(
1410                 sd_event *e,
1411                 int timer_fd,
1412                 Prioq *earliest,
1413                 Prioq *latest,
1414                 usec_t *next) {
1415
1416         struct itimerspec its = {};
1417         sd_event_source *a, *b;
1418         usec_t t;
1419         int r;
1420
1421         assert_se(e);
1422         assert_se(next);
1423
1424         a = prioq_peek(earliest);
1425         if (!a || a->enabled == SD_EVENT_OFF) {
1426
1427                 if (timer_fd < 0)
1428                         return 0;
1429
1430                 if (*next == (usec_t) -1)
1431                         return 0;
1432
1433                 /* disarm */
1434                 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1435                 if (r < 0)
1436                         return r;
1437
1438                 *next = (usec_t) -1;
1439
1440                 return 0;
1441         }
1442
1443         b = prioq_peek(latest);
1444         assert_se(b && b->enabled != SD_EVENT_OFF);
1445
1446         t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1447         if (*next == t)
1448                 return 0;
1449
1450         assert_se(timer_fd >= 0);
1451
1452         if (t == 0) {
1453                 /* We don' want to disarm here, just mean some time looooong ago. */
1454                 its.it_value.tv_sec = 0;
1455                 its.it_value.tv_nsec = 1;
1456         } else
1457                 timespec_store(&its.it_value, t);
1458
1459         r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1460         if (r < 0)
1461                 return r;
1462
1463         *next = t;
1464         return 0;
1465 }
1466
1467 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1468         assert(e);
1469         assert(s);
1470         assert(s->type == SOURCE_IO);
1471
1472         s->io.revents = events;
1473
1474         return source_set_pending(s, true);
1475 }
1476
1477 static int flush_timer(sd_event *e, int fd, uint32_t events, usec_t *next) {
1478         uint64_t x;
1479         ssize_t ss;
1480
1481         assert(e);
1482         assert(fd >= 0);
1483         assert(next);
1484
1485         assert_return(events == EPOLLIN, -EIO);
1486
1487         ss = read(fd, &x, sizeof(x));
1488         if (ss < 0) {
1489                 if (errno == EAGAIN || errno == EINTR)
1490                         return 0;
1491
1492                 return -errno;
1493         }
1494
1495         if (ss != sizeof(x))
1496                 return -EIO;
1497
1498         *next = (usec_t) -1;
1499
1500         return 0;
1501 }
1502
1503 static int process_timer(
1504                 sd_event *e,
1505                 usec_t n,
1506                 Prioq *earliest,
1507                 Prioq *latest) {
1508
1509         sd_event_source *s;
1510         int r;
1511
1512         assert(e);
1513
1514         for (;;) {
1515                 s = prioq_peek(earliest);
1516                 if (!s ||
1517                     s->time.next > n ||
1518                     s->enabled == SD_EVENT_OFF ||
1519                     s->pending)
1520                         break;
1521
1522                 r = source_set_pending(s, true);
1523                 if (r < 0)
1524                         return r;
1525
1526                 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1527                 prioq_reshuffle(latest, s, &s->time.latest_index);
1528         }
1529
1530         return 0;
1531 }
1532
1533 static int process_child(sd_event *e) {
1534         sd_event_source *s;
1535         Iterator i;
1536         int r;
1537
1538         assert(e);
1539
1540         e->need_process_child = false;
1541
1542         /*
1543            So, this is ugly. We iteratively invoke waitid() with P_PID
1544            + WNOHANG for each PID we wait for, instead of using
1545            P_ALL. This is because we only want to get child
1546            information of very specific child processes, and not all
1547            of them. We might not have processed the SIGCHLD even of a
1548            previous invocation and we don't want to maintain a
1549            unbounded *per-child* event queue, hence we really don't
1550            want anything flushed out of the kernel's queue that we
1551            don't care about. Since this is O(n) this means that if you
1552            have a lot of processes you probably want to handle SIGCHLD
1553            yourself.
1554         */
1555
1556         HASHMAP_FOREACH(s, e->child_sources, i) {
1557                 assert(s->type == SOURCE_CHILD);
1558
1559                 if (s->pending)
1560                         continue;
1561
1562                 if (s->enabled == SD_EVENT_OFF)
1563                         continue;
1564
1565                 zero(s->child.siginfo);
1566                 r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options);
1567                 if (r < 0)
1568                         return -errno;
1569
1570                 if (s->child.siginfo.si_pid != 0) {
1571                         r = source_set_pending(s, true);
1572                         if (r < 0)
1573                                 return r;
1574                 }
1575         }
1576
1577         return 0;
1578 }
1579
1580 static int process_signal(sd_event *e, uint32_t events) {
1581         bool read_one = false;
1582         int r;
1583
1584         assert(e);
1585         assert(e->signal_sources);
1586
1587         assert_return(events == EPOLLIN, -EIO);
1588
1589         for (;;) {
1590                 struct signalfd_siginfo si;
1591                 ssize_t ss;
1592                 sd_event_source *s;
1593
1594                 ss = read(e->signal_fd, &si, sizeof(si));
1595                 if (ss < 0) {
1596                         if (errno == EAGAIN || errno == EINTR)
1597                                 return read_one;
1598
1599                         return -errno;
1600                 }
1601
1602                 if (ss != sizeof(si))
1603                         return -EIO;
1604
1605                 read_one = true;
1606
1607                 s = e->signal_sources[si.ssi_signo];
1608                 if (si.ssi_signo == SIGCHLD) {
1609                         r = process_child(e);
1610                         if (r < 0)
1611                                 return r;
1612                         if (r > 0 || !s)
1613                                 continue;
1614                 } else
1615                         if (!s)
1616                                 return -EIO;
1617
1618                 s->signal.siginfo = si;
1619                 r = source_set_pending(s, true);
1620                 if (r < 0)
1621                         return r;
1622         }
1623
1624
1625         return 0;
1626 }
1627
1628 static int source_dispatch(sd_event_source *s) {
1629         int r = 0;
1630
1631         assert(s);
1632         assert(s->pending || s->type == SOURCE_QUIT);
1633
1634         if (s->type != SOURCE_DEFER && s->type != SOURCE_QUIT) {
1635                 r = source_set_pending(s, false);
1636                 if (r < 0)
1637                         return r;
1638         }
1639
1640         if (s->enabled == SD_EVENT_ONESHOT) {
1641                 r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
1642                 if (r < 0)
1643                         return r;
1644         }
1645
1646         sd_event_source_ref(s);
1647
1648         switch (s->type) {
1649
1650         case SOURCE_IO:
1651                 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1652                 break;
1653
1654         case SOURCE_MONOTONIC:
1655                 r = s->time.callback(s, s->time.next, s->userdata);
1656                 break;
1657
1658         case SOURCE_REALTIME:
1659                 r = s->time.callback(s, s->time.next, s->userdata);
1660                 break;
1661
1662         case SOURCE_SIGNAL:
1663                 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1664                 break;
1665
1666         case SOURCE_CHILD:
1667                 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1668                 break;
1669
1670         case SOURCE_DEFER:
1671                 r = s->defer.callback(s, s->userdata);
1672                 break;
1673
1674         case SOURCE_QUIT:
1675                 r = s->quit.callback(s, s->userdata);
1676                 break;
1677         }
1678
1679         sd_event_source_unref(s);
1680
1681         return r;
1682 }
1683
1684 static int event_prepare(sd_event *e) {
1685         int r;
1686
1687         assert(e);
1688
1689         for (;;) {
1690                 sd_event_source *s;
1691
1692                 s = prioq_peek(e->prepare);
1693                 if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
1694                         break;
1695
1696                 s->prepare_iteration = e->iteration;
1697                 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1698                 if (r < 0)
1699                         return r;
1700
1701                 assert(s->prepare);
1702                 r = s->prepare(s, s->userdata);
1703                 if (r < 0)
1704                         return r;
1705
1706         }
1707
1708         return 0;
1709 }
1710
1711 static int dispatch_quit(sd_event *e) {
1712         sd_event_source *p;
1713         int r;
1714
1715         assert(e);
1716
1717         p = prioq_peek(e->quit);
1718         if (!p || p->enabled == SD_EVENT_OFF) {
1719                 e->state = SD_EVENT_FINISHED;
1720                 return 0;
1721         }
1722
1723         sd_event_ref(e);
1724         e->iteration++;
1725         e->state = SD_EVENT_QUITTING;
1726
1727         r = source_dispatch(p);
1728
1729         e->state = SD_EVENT_PASSIVE;
1730         sd_event_unref(e);
1731
1732         return r;
1733 }
1734
1735 static sd_event_source* event_next_pending(sd_event *e) {
1736         sd_event_source *p;
1737
1738         assert(e);
1739
1740         p = prioq_peek(e->pending);
1741         if (!p)
1742                 return NULL;
1743
1744         if (p->enabled == SD_EVENT_OFF)
1745                 return NULL;
1746
1747         return p;
1748 }
1749
1750 _public_ int sd_event_run(sd_event *e, uint64_t timeout) {
1751         struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1752         sd_event_source *p;
1753         int r, i, m;
1754
1755         assert_return(e, -EINVAL);
1756         assert_return(!event_pid_changed(e), -ECHILD);
1757         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1758         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1759
1760         if (e->quit_requested)
1761                 return dispatch_quit(e);
1762
1763         sd_event_ref(e);
1764         e->iteration++;
1765         e->state = SD_EVENT_RUNNING;
1766
1767         r = event_prepare(e);
1768         if (r < 0)
1769                 goto finish;
1770
1771         r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1772         if (r < 0)
1773                 goto finish;
1774
1775         r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1776         if (r < 0)
1777                 goto finish;
1778
1779         if (event_next_pending(e) || e->need_process_child)
1780                 timeout = 0;
1781
1782         m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1783                        timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1784         if (m < 0) {
1785                 r = errno == EAGAIN || errno == EINTR ? 0 : -errno;
1786                 goto finish;
1787         }
1788
1789         dual_timestamp_get(&e->timestamp);
1790
1791         for (i = 0; i < m; i++) {
1792
1793                 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1794                         r = flush_timer(e, e->monotonic_fd, ev_queue[i].events, &e->monotonic_next);
1795                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1796                         r = flush_timer(e, e->realtime_fd, ev_queue[i].events, &e->realtime_next);
1797                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1798                         r = process_signal(e, ev_queue[i].events);
1799                 else
1800                         r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1801
1802                 if (r < 0)
1803                         goto finish;
1804         }
1805
1806         r = process_timer(e, e->timestamp.monotonic, e->monotonic_earliest, e->monotonic_latest);
1807         if (r < 0)
1808                 goto finish;
1809
1810         r = process_timer(e, e->timestamp.realtime, e->realtime_earliest, e->realtime_latest);
1811         if (r < 0)
1812                 goto finish;
1813
1814         if (e->need_process_child) {
1815                 r = process_child(e);
1816                 if (r < 0)
1817                         goto finish;
1818         }
1819
1820         p = event_next_pending(e);
1821         if (!p) {
1822                 r = 0;
1823                 goto finish;
1824         }
1825
1826         r = source_dispatch(p);
1827
1828 finish:
1829         e->state = SD_EVENT_PASSIVE;
1830         sd_event_unref(e);
1831
1832         return r;
1833 }
1834
1835 _public_ int sd_event_loop(sd_event *e) {
1836         int r;
1837
1838         assert_return(e, -EINVAL);
1839         assert_return(!event_pid_changed(e), -ECHILD);
1840         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1841
1842         sd_event_ref(e);
1843
1844         while (e->state != SD_EVENT_FINISHED) {
1845                 r = sd_event_run(e, (uint64_t) -1);
1846                 if (r < 0)
1847                         goto finish;
1848         }
1849
1850         r = 0;
1851
1852 finish:
1853         sd_event_unref(e);
1854         return r;
1855 }
1856
1857 _public_ int sd_event_get_state(sd_event *e) {
1858         assert_return(e, -EINVAL);
1859         assert_return(!event_pid_changed(e), -ECHILD);
1860
1861         return e->state;
1862 }
1863
1864 _public_ int sd_event_get_quit(sd_event *e) {
1865         assert_return(e, -EINVAL);
1866         assert_return(!event_pid_changed(e), -ECHILD);
1867
1868         return e->quit_requested;
1869 }
1870
1871 _public_ int sd_event_request_quit(sd_event *e) {
1872         assert_return(e, -EINVAL);
1873         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1874         assert_return(!event_pid_changed(e), -ECHILD);
1875
1876         e->quit_requested = true;
1877         return 0;
1878 }
1879
1880 _public_ int sd_event_get_now_realtime(sd_event *e, uint64_t *usec) {
1881         assert_return(e, -EINVAL);
1882         assert_return(usec, -EINVAL);
1883         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1884         assert_return(!event_pid_changed(e), -ECHILD);
1885
1886         *usec = e->timestamp.realtime;
1887         return 0;
1888 }
1889
1890 _public_ int sd_event_get_now_monotonic(sd_event *e, uint64_t *usec) {
1891         assert_return(e, -EINVAL);
1892         assert_return(usec, -EINVAL);
1893         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1894         assert_return(!event_pid_changed(e), -ECHILD);
1895
1896         *usec = e->timestamp.monotonic;
1897         return 0;
1898 }
1899
1900 _public_ int sd_event_default(sd_event **ret) {
1901
1902         static __thread sd_event *default_event = NULL;
1903         sd_event *e;
1904         int r;
1905
1906         if (!ret)
1907                 return !!default_event;
1908
1909         if (default_event) {
1910                 *ret = sd_event_ref(default_event);
1911                 return 0;
1912         }
1913
1914         r = sd_event_new(&e);
1915         if (r < 0)
1916                 return r;
1917
1918         e->default_event_ptr = &default_event;
1919         e->tid = gettid();
1920         default_event = e;
1921
1922         *ret = e;
1923         return 1;
1924 }
1925
1926 _public_ int sd_event_get_tid(sd_event *e, pid_t *tid) {
1927         assert_return(e, -EINVAL);
1928         assert_return(tid, -EINVAL);
1929         assert_return(!event_pid_changed(e), -ECHILD);
1930
1931         if (e->tid != 0) {
1932                 *tid = e->tid;
1933                 return 0;
1934         }
1935
1936         return -ENXIO;
1937 }