chiark / gitweb /
eb0392300d0eeddb3b98e2b9eff15e1ea9a54ac1
[elogind.git] / src / libsystemd-bus / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25
26 #include "sd-id128.h"
27 #include "macro.h"
28 #include "prioq.h"
29 #include "hashmap.h"
30 #include "util.h"
31 #include "time-util.h"
32 #include "missing.h"
33
34 #include "sd-event.h"
35
36 #define EPOLL_QUEUE_MAX 64
37 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
38
39 typedef enum EventSourceType {
40         SOURCE_IO,
41         SOURCE_MONOTONIC,
42         SOURCE_REALTIME,
43         SOURCE_SIGNAL,
44         SOURCE_CHILD,
45         SOURCE_DEFER,
46         SOURCE_QUIT
47 } EventSourceType;
48
49 struct sd_event_source {
50         unsigned n_ref;
51
52         sd_event *event;
53         void *userdata;
54         sd_event_handler_t prepare;
55
56         EventSourceType type:4;
57         int enabled:3;
58         bool pending:1;
59
60         int priority;
61         unsigned pending_index;
62         unsigned prepare_index;
63         unsigned pending_iteration;
64         unsigned prepare_iteration;
65
66         union {
67                 struct {
68                         sd_event_io_handler_t callback;
69                         int fd;
70                         uint32_t events;
71                         uint32_t revents;
72                         bool registered:1;
73                 } io;
74                 struct {
75                         sd_event_time_handler_t callback;
76                         usec_t next, accuracy;
77                         unsigned earliest_index;
78                         unsigned latest_index;
79                 } time;
80                 struct {
81                         sd_event_signal_handler_t callback;
82                         struct signalfd_siginfo siginfo;
83                         int sig;
84                 } signal;
85                 struct {
86                         sd_event_child_handler_t callback;
87                         siginfo_t siginfo;
88                         pid_t pid;
89                         int options;
90                 } child;
91                 struct {
92                         sd_event_handler_t callback;
93                 } defer;
94                 struct {
95                         sd_event_handler_t callback;
96                         unsigned prioq_index;
97                 } quit;
98         };
99 };
100
101 struct sd_event {
102         unsigned n_ref;
103
104         int epoll_fd;
105         int signal_fd;
106         int realtime_fd;
107         int monotonic_fd;
108
109         Prioq *pending;
110         Prioq *prepare;
111
112         /* For both clocks we maintain two priority queues each, one
113          * ordered for the earliest times the events may be
114          * dispatched, and one ordered by the latest times they must
115          * have been dispatched. The range between the top entries in
116          * the two prioqs is the time window we can freely schedule
117          * wakeups in */
118         Prioq *monotonic_earliest;
119         Prioq *monotonic_latest;
120         Prioq *realtime_earliest;
121         Prioq *realtime_latest;
122
123         usec_t realtime_next, monotonic_next;
124         usec_t perturb;
125
126         sigset_t sigset;
127         sd_event_source **signal_sources;
128
129         Hashmap *child_sources;
130         unsigned n_enabled_child_sources;
131
132         Prioq *quit;
133
134         pid_t original_pid;
135
136         unsigned iteration;
137         dual_timestamp timestamp;
138         int state;
139
140         bool quit_requested:1;
141         bool need_process_child:1;
142
143         pid_t tid;
144         sd_event **default_event_ptr;
145 };
146
147 static int pending_prioq_compare(const void *a, const void *b) {
148         const sd_event_source *x = a, *y = b;
149
150         assert(x->pending);
151         assert(y->pending);
152
153         /* Enabled ones first */
154         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
155                 return -1;
156         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
157                 return 1;
158
159         /* Lower priority values first */
160         if (x->priority < y->priority)
161                 return -1;
162         if (x->priority > y->priority)
163                 return 1;
164
165         /* Older entries first */
166         if (x->pending_iteration < y->pending_iteration)
167                 return -1;
168         if (x->pending_iteration > y->pending_iteration)
169                 return 1;
170
171         /* Stability for the rest */
172         if (x < y)
173                 return -1;
174         if (x > y)
175                 return 1;
176
177         return 0;
178 }
179
180 static int prepare_prioq_compare(const void *a, const void *b) {
181         const sd_event_source *x = a, *y = b;
182
183         assert(x->prepare);
184         assert(y->prepare);
185
186         /* Move most recently prepared ones last, so that we can stop
187          * preparing as soon as we hit one that has already been
188          * prepared in the current iteration */
189         if (x->prepare_iteration < y->prepare_iteration)
190                 return -1;
191         if (x->prepare_iteration > y->prepare_iteration)
192                 return 1;
193
194         /* Enabled ones first */
195         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
196                 return -1;
197         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
198                 return 1;
199
200         /* Lower priority values first */
201         if (x->priority < y->priority)
202                 return -1;
203         if (x->priority > y->priority)
204                 return 1;
205
206         /* Stability for the rest */
207         if (x < y)
208                 return -1;
209         if (x > y)
210                 return 1;
211
212         return 0;
213 }
214
215 static int earliest_time_prioq_compare(const void *a, const void *b) {
216         const sd_event_source *x = a, *y = b;
217
218         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
219         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
220
221         /* Enabled ones first */
222         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
223                 return -1;
224         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
225                 return 1;
226
227         /* Move the pending ones to the end */
228         if (!x->pending && y->pending)
229                 return -1;
230         if (x->pending && !y->pending)
231                 return 1;
232
233         /* Order by time */
234         if (x->time.next < y->time.next)
235                 return -1;
236         if (x->time.next > y->time.next)
237                 return 1;
238
239         /* Stability for the rest */
240         if (x < y)
241                 return -1;
242         if (x > y)
243                 return 1;
244
245         return 0;
246 }
247
248 static int latest_time_prioq_compare(const void *a, const void *b) {
249         const sd_event_source *x = a, *y = b;
250
251         assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
252                (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
253
254         /* Enabled ones first */
255         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
256                 return -1;
257         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
258                 return 1;
259
260         /* Move the pending ones to the end */
261         if (!x->pending && y->pending)
262                 return -1;
263         if (x->pending && !y->pending)
264                 return 1;
265
266         /* Order by time */
267         if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
268                 return -1;
269         if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
270                 return 1;
271
272         /* Stability for the rest */
273         if (x < y)
274                 return -1;
275         if (x > y)
276                 return 1;
277
278         return 0;
279 }
280
281 static int quit_prioq_compare(const void *a, const void *b) {
282         const sd_event_source *x = a, *y = b;
283
284         assert(x->type == SOURCE_QUIT);
285         assert(y->type == SOURCE_QUIT);
286
287         /* Enabled ones first */
288         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
289                 return -1;
290         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
291                 return 1;
292
293         /* Lower priority values first */
294         if (x->priority < y->priority)
295                 return -1;
296         if (x->priority > y->priority)
297                 return 1;
298
299         /* Stability for the rest */
300         if (x < y)
301                 return -1;
302         if (x > y)
303                 return 1;
304
305         return 0;
306 }
307
308 static void event_free(sd_event *e) {
309         assert(e);
310
311         if (e->default_event_ptr)
312                 *(e->default_event_ptr) = NULL;
313
314         if (e->epoll_fd >= 0)
315                 close_nointr_nofail(e->epoll_fd);
316
317         if (e->signal_fd >= 0)
318                 close_nointr_nofail(e->signal_fd);
319
320         if (e->realtime_fd >= 0)
321                 close_nointr_nofail(e->realtime_fd);
322
323         if (e->monotonic_fd >= 0)
324                 close_nointr_nofail(e->monotonic_fd);
325
326         prioq_free(e->pending);
327         prioq_free(e->prepare);
328         prioq_free(e->monotonic_earliest);
329         prioq_free(e->monotonic_latest);
330         prioq_free(e->realtime_earliest);
331         prioq_free(e->realtime_latest);
332         prioq_free(e->quit);
333
334         free(e->signal_sources);
335
336         hashmap_free(e->child_sources);
337         free(e);
338 }
339
340 _public_ int sd_event_new(sd_event** ret) {
341         sd_event *e;
342         int r;
343
344         assert_return(ret, -EINVAL);
345
346         e = new0(sd_event, 1);
347         if (!e)
348                 return -ENOMEM;
349
350         e->n_ref = 1;
351         e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
352         e->realtime_next = e->monotonic_next = (usec_t) -1;
353         e->original_pid = getpid();
354
355         assert_se(sigemptyset(&e->sigset) == 0);
356
357         e->pending = prioq_new(pending_prioq_compare);
358         if (!e->pending) {
359                 r = -ENOMEM;
360                 goto fail;
361         }
362
363         e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
364         if (e->epoll_fd < 0) {
365                 r = -errno;
366                 goto fail;
367         }
368
369         *ret = e;
370         return 0;
371
372 fail:
373         event_free(e);
374         return r;
375 }
376
377 _public_ sd_event* sd_event_ref(sd_event *e) {
378         assert_return(e, NULL);
379
380         assert(e->n_ref >= 1);
381         e->n_ref++;
382
383         return e;
384 }
385
386 _public_ sd_event* sd_event_unref(sd_event *e) {
387
388         if (!e)
389                 return NULL;
390
391         assert(e->n_ref >= 1);
392         e->n_ref--;
393
394         if (e->n_ref <= 0)
395                 event_free(e);
396
397         return NULL;
398 }
399
400 static bool event_pid_changed(sd_event *e) {
401         assert(e);
402
403         /* We don't support people creating am event loop and keeping
404          * it around over a fork(). Let's complain. */
405
406         return e->original_pid != getpid();
407 }
408
409 static int source_io_unregister(sd_event_source *s) {
410         int r;
411
412         assert(s);
413         assert(s->type == SOURCE_IO);
414
415         if (!s->io.registered)
416                 return 0;
417
418         r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
419         if (r < 0)
420                 return -errno;
421
422         s->io.registered = false;
423         return 0;
424 }
425
426 static int source_io_register(
427                 sd_event_source *s,
428                 int enabled,
429                 uint32_t events) {
430
431         struct epoll_event ev = {};
432         int r;
433
434         assert(s);
435         assert(s->type == SOURCE_IO);
436         assert(enabled != SD_EVENT_OFF);
437
438         ev.events = events;
439         ev.data.ptr = s;
440
441         if (enabled == SD_EVENT_ONESHOT)
442                 ev.events |= EPOLLONESHOT;
443
444         if (s->io.registered)
445                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
446         else
447                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
448
449         if (r < 0)
450                 return -errno;
451
452         s->io.registered = true;
453
454         return 0;
455 }
456
457 static void source_free(sd_event_source *s) {
458         assert(s);
459
460         if (s->event) {
461                 switch (s->type) {
462
463                 case SOURCE_IO:
464                         if (s->io.fd >= 0)
465                                 source_io_unregister(s);
466
467                         break;
468
469                 case SOURCE_MONOTONIC:
470                         prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
471                         prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
472                         break;
473
474                 case SOURCE_REALTIME:
475                         prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
476                         prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
477                         break;
478
479                 case SOURCE_SIGNAL:
480                         if (s->signal.sig > 0) {
481                                 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)
482                                         assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
483
484                                 if (s->event->signal_sources)
485                                         s->event->signal_sources[s->signal.sig] = NULL;
486                         }
487
488                         break;
489
490                 case SOURCE_CHILD:
491                         if (s->child.pid > 0) {
492                                 if (s->enabled != SD_EVENT_OFF) {
493                                         assert(s->event->n_enabled_child_sources > 0);
494                                         s->event->n_enabled_child_sources--;
495                                 }
496
497                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
498                                         assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
499
500                                 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
501                         }
502
503                         break;
504
505                 case SOURCE_DEFER:
506                         /* nothing */
507                         break;
508
509                 case SOURCE_QUIT:
510                         prioq_remove(s->event->quit, s, &s->quit.prioq_index);
511                         break;
512                 }
513
514                 if (s->pending)
515                         prioq_remove(s->event->pending, s, &s->pending_index);
516
517                 if (s->prepare)
518                         prioq_remove(s->event->prepare, s, &s->prepare_index);
519
520                 sd_event_unref(s->event);
521         }
522
523         free(s);
524 }
525
526 static int source_set_pending(sd_event_source *s, bool b) {
527         int r;
528
529         assert(s);
530         assert(s->type != SOURCE_QUIT);
531
532         if (s->pending == b)
533                 return 0;
534
535         s->pending = b;
536
537         if (b) {
538                 s->pending_iteration = s->event->iteration;
539
540                 r = prioq_put(s->event->pending, s, &s->pending_index);
541                 if (r < 0) {
542                         s->pending = false;
543                         return r;
544                 }
545         } else
546                 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
547
548         if (s->type == SOURCE_REALTIME) {
549                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
550                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
551         } else if (s->type == SOURCE_MONOTONIC) {
552                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
553                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
554         }
555
556         return 0;
557 }
558
559 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
560         sd_event_source *s;
561
562         assert(e);
563
564         s = new0(sd_event_source, 1);
565         if (!s)
566                 return NULL;
567
568         s->n_ref = 1;
569         s->event = sd_event_ref(e);
570         s->type = type;
571         s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
572
573         return s;
574 }
575
576 _public_ int sd_event_add_io(
577                 sd_event *e,
578                 int fd,
579                 uint32_t events,
580                 sd_event_io_handler_t callback,
581                 void *userdata,
582                 sd_event_source **ret) {
583
584         sd_event_source *s;
585         int r;
586
587         assert_return(e, -EINVAL);
588         assert_return(fd >= 0, -EINVAL);
589         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP|EPOLLET)), -EINVAL);
590         assert_return(callback, -EINVAL);
591         assert_return(ret, -EINVAL);
592         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
593         assert_return(!event_pid_changed(e), -ECHILD);
594
595         s = source_new(e, SOURCE_IO);
596         if (!s)
597                 return -ENOMEM;
598
599         s->io.fd = fd;
600         s->io.events = events;
601         s->io.callback = callback;
602         s->userdata = userdata;
603         s->enabled = SD_EVENT_ON;
604
605         r = source_io_register(s, s->enabled, events);
606         if (r < 0) {
607                 source_free(s);
608                 return -errno;
609         }
610
611         *ret = s;
612         return 0;
613 }
614
615 static int event_setup_timer_fd(
616                 sd_event *e,
617                 EventSourceType type,
618                 int *timer_fd,
619                 clockid_t id) {
620
621         struct epoll_event ev = {};
622         int r, fd;
623         sd_id128_t bootid;
624
625         assert(e);
626         assert(timer_fd);
627
628         if (_likely_(*timer_fd >= 0))
629                 return 0;
630
631         fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
632         if (fd < 0)
633                 return -errno;
634
635         ev.events = EPOLLIN;
636         ev.data.ptr = INT_TO_PTR(type);
637
638         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
639         if (r < 0) {
640                 close_nointr_nofail(fd);
641                 return -errno;
642         }
643
644         /* When we sleep for longer, we try to realign the wakeup to
645            the same time wihtin each minute/second/250ms, so that
646            events all across the system can be coalesced into a single
647            CPU wakeup. However, let's take some system-specific
648            randomness for this value, so that in a network of systems
649            with synced clocks timer events are distributed a
650            bit. Here, we calculate a perturbation usec offset from the
651            boot ID. */
652
653         if (sd_id128_get_boot(&bootid) >= 0)
654                 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_MINUTE;
655
656         *timer_fd = fd;
657         return 0;
658 }
659
660 static int event_add_time_internal(
661                 sd_event *e,
662                 EventSourceType type,
663                 int *timer_fd,
664                 clockid_t id,
665                 Prioq **earliest,
666                 Prioq **latest,
667                 uint64_t usec,
668                 uint64_t accuracy,
669                 sd_event_time_handler_t callback,
670                 void *userdata,
671                 sd_event_source **ret) {
672
673         sd_event_source *s;
674         int r;
675
676         assert_return(e, -EINVAL);
677         assert_return(callback, -EINVAL);
678         assert_return(ret, -EINVAL);
679         assert_return(usec != (uint64_t) -1, -EINVAL);
680         assert_return(accuracy != (uint64_t) -1, -EINVAL);
681         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
682         assert_return(!event_pid_changed(e), -ECHILD);
683
684         assert(timer_fd);
685         assert(earliest);
686         assert(latest);
687
688         if (!*earliest) {
689                 *earliest = prioq_new(earliest_time_prioq_compare);
690                 if (!*earliest)
691                         return -ENOMEM;
692         }
693
694         if (!*latest) {
695                 *latest = prioq_new(latest_time_prioq_compare);
696                 if (!*latest)
697                         return -ENOMEM;
698         }
699
700         if (*timer_fd < 0) {
701                 r = event_setup_timer_fd(e, type, timer_fd, id);
702                 if (r < 0)
703                         return r;
704         }
705
706         s = source_new(e, type);
707         if (!s)
708                 return -ENOMEM;
709
710         s->time.next = usec;
711         s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
712         s->time.callback = callback;
713         s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
714         s->userdata = userdata;
715         s->enabled = SD_EVENT_ONESHOT;
716
717         r = prioq_put(*earliest, s, &s->time.earliest_index);
718         if (r < 0)
719                 goto fail;
720
721         r = prioq_put(*latest, s, &s->time.latest_index);
722         if (r < 0)
723                 goto fail;
724
725         *ret = s;
726         return 0;
727
728 fail:
729         source_free(s);
730         return r;
731 }
732
733 _public_ int sd_event_add_monotonic(sd_event *e,
734                                     uint64_t usec,
735                                     uint64_t accuracy,
736                                     sd_event_time_handler_t callback,
737                                     void *userdata,
738                                     sd_event_source **ret) {
739
740         return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
741 }
742
743 _public_ int sd_event_add_realtime(sd_event *e,
744                                    uint64_t usec,
745                                    uint64_t accuracy,
746                                    sd_event_time_handler_t callback,
747                                    void *userdata,
748                                    sd_event_source **ret) {
749
750         return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
751 }
752
753 static int event_update_signal_fd(sd_event *e) {
754         struct epoll_event ev = {};
755         bool add_to_epoll;
756         int r;
757
758         assert(e);
759
760         add_to_epoll = e->signal_fd < 0;
761
762         r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
763         if (r < 0)
764                 return -errno;
765
766         e->signal_fd = r;
767
768         if (!add_to_epoll)
769                 return 0;
770
771         ev.events = EPOLLIN;
772         ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
773
774         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
775         if (r < 0) {
776                 close_nointr_nofail(e->signal_fd);
777                 e->signal_fd = -1;
778
779                 return -errno;
780         }
781
782         return 0;
783 }
784
785 _public_ int sd_event_add_signal(
786                 sd_event *e,
787                 int sig,
788                 sd_event_signal_handler_t callback,
789                 void *userdata,
790                 sd_event_source **ret) {
791
792         sd_event_source *s;
793         int r;
794
795         assert_return(e, -EINVAL);
796         assert_return(sig > 0, -EINVAL);
797         assert_return(sig < _NSIG, -EINVAL);
798         assert_return(callback, -EINVAL);
799         assert_return(ret, -EINVAL);
800         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
801         assert_return(!event_pid_changed(e), -ECHILD);
802
803         if (!e->signal_sources) {
804                 e->signal_sources = new0(sd_event_source*, _NSIG);
805                 if (!e->signal_sources)
806                         return -ENOMEM;
807         } else if (e->signal_sources[sig])
808                 return -EBUSY;
809
810         s = source_new(e, SOURCE_SIGNAL);
811         if (!s)
812                 return -ENOMEM;
813
814         s->signal.sig = sig;
815         s->signal.callback = callback;
816         s->userdata = userdata;
817         s->enabled = SD_EVENT_ON;
818
819         e->signal_sources[sig] = s;
820         assert_se(sigaddset(&e->sigset, sig) == 0);
821
822         if (sig != SIGCHLD || e->n_enabled_child_sources == 0) {
823                 r = event_update_signal_fd(e);
824                 if (r < 0) {
825                         source_free(s);
826                         return r;
827                 }
828         }
829
830         *ret = s;
831         return 0;
832 }
833
834 _public_ int sd_event_add_child(
835                 sd_event *e,
836                 pid_t pid,
837                 int options,
838                 sd_event_child_handler_t callback,
839                 void *userdata,
840                 sd_event_source **ret) {
841
842         sd_event_source *s;
843         int r;
844
845         assert_return(e, -EINVAL);
846         assert_return(pid > 1, -EINVAL);
847         assert_return(!(options & ~(WEXITED|WSTOPPED|WCONTINUED)), -EINVAL);
848         assert_return(options != 0, -EINVAL);
849         assert_return(callback, -EINVAL);
850         assert_return(ret, -EINVAL);
851         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
852         assert_return(!event_pid_changed(e), -ECHILD);
853
854         r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
855         if (r < 0)
856                 return r;
857
858         if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
859                 return -EBUSY;
860
861         s = source_new(e, SOURCE_CHILD);
862         if (!s)
863                 return -ENOMEM;
864
865         s->child.pid = pid;
866         s->child.options = options;
867         s->child.callback = callback;
868         s->userdata = userdata;
869         s->enabled = SD_EVENT_ONESHOT;
870
871         r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
872         if (r < 0) {
873                 source_free(s);
874                 return r;
875         }
876
877         e->n_enabled_child_sources ++;
878
879         assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
880
881         if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
882                 r = event_update_signal_fd(e);
883                 if (r < 0) {
884                         source_free(s);
885                         return -errno;
886                 }
887         }
888
889         e->need_process_child = true;
890
891         *ret = s;
892         return 0;
893 }
894
895 _public_ int sd_event_add_defer(
896                 sd_event *e,
897                 sd_event_handler_t callback,
898                 void *userdata,
899                 sd_event_source **ret) {
900
901         sd_event_source *s;
902         int r;
903
904         assert_return(e, -EINVAL);
905         assert_return(callback, -EINVAL);
906         assert_return(ret, -EINVAL);
907         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
908         assert_return(!event_pid_changed(e), -ECHILD);
909
910         s = source_new(e, SOURCE_DEFER);
911         if (!s)
912                 return -ENOMEM;
913
914         s->defer.callback = callback;
915         s->userdata = userdata;
916         s->enabled = SD_EVENT_ONESHOT;
917
918         r = source_set_pending(s, true);
919         if (r < 0) {
920                 source_free(s);
921                 return r;
922         }
923
924         *ret = s;
925         return 0;
926 }
927
928 _public_ int sd_event_add_quit(
929                 sd_event *e,
930                 sd_event_handler_t callback,
931                 void *userdata,
932                 sd_event_source **ret) {
933
934         sd_event_source *s;
935         int r;
936
937         assert_return(e, -EINVAL);
938         assert_return(callback, -EINVAL);
939         assert_return(ret, -EINVAL);
940         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
941         assert_return(!event_pid_changed(e), -ECHILD);
942
943         if (!e->quit) {
944                 e->quit = prioq_new(quit_prioq_compare);
945                 if (!e->quit)
946                         return -ENOMEM;
947         }
948
949         s = source_new(e, SOURCE_QUIT);
950         if (!s)
951                 return -ENOMEM;
952
953         s->quit.callback = callback;
954         s->userdata = userdata;
955         s->quit.prioq_index = PRIOQ_IDX_NULL;
956         s->enabled = SD_EVENT_ONESHOT;
957
958         r = prioq_put(s->event->quit, s, &s->quit.prioq_index);
959         if (r < 0) {
960                 source_free(s);
961                 return r;
962         }
963
964         *ret = s;
965         return 0;
966 }
967
968 _public_ sd_event_source* sd_event_source_ref(sd_event_source *s) {
969         assert_return(s, NULL);
970
971         assert(s->n_ref >= 1);
972         s->n_ref++;
973
974         return s;
975 }
976
977 _public_ sd_event_source* sd_event_source_unref(sd_event_source *s) {
978
979         if (!s)
980                 return NULL;
981
982         assert(s->n_ref >= 1);
983         s->n_ref--;
984
985         if (s->n_ref <= 0)
986                 source_free(s);
987
988         return NULL;
989 }
990
991 _public_ sd_event *sd_event_source_get_event(sd_event_source *s) {
992         assert_return(s, NULL);
993
994         return s->event;
995 }
996
997 _public_ int sd_event_source_get_pending(sd_event_source *s) {
998         assert_return(s, -EINVAL);
999         assert_return(s->type != SOURCE_QUIT, -EDOM);
1000         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1001         assert_return(!event_pid_changed(s->event), -ECHILD);
1002
1003         return s->pending;
1004 }
1005
1006 _public_ int sd_event_source_get_io_fd(sd_event_source *s) {
1007         assert_return(s, -EINVAL);
1008         assert_return(s->type == SOURCE_IO, -EDOM);
1009         assert_return(!event_pid_changed(s->event), -ECHILD);
1010
1011         return s->io.fd;
1012 }
1013
1014 _public_ int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
1015         assert_return(s, -EINVAL);
1016         assert_return(events, -EINVAL);
1017         assert_return(s->type == SOURCE_IO, -EDOM);
1018         assert_return(!event_pid_changed(s->event), -ECHILD);
1019
1020         *events = s->io.events;
1021         return 0;
1022 }
1023
1024 _public_ int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
1025         int r;
1026
1027         assert_return(s, -EINVAL);
1028         assert_return(s->type == SOURCE_IO, -EDOM);
1029         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP|EPOLLET)), -EINVAL);
1030         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1031         assert_return(!event_pid_changed(s->event), -ECHILD);
1032
1033         if (s->io.events == events)
1034                 return 0;
1035
1036         if (s->enabled != SD_EVENT_OFF) {
1037                 r = source_io_register(s, s->enabled, events);
1038                 if (r < 0)
1039                         return r;
1040         }
1041
1042         s->io.events = events;
1043         source_set_pending(s, false);
1044
1045         return 0;
1046 }
1047
1048 _public_ int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1049         assert_return(s, -EINVAL);
1050         assert_return(revents, -EINVAL);
1051         assert_return(s->type == SOURCE_IO, -EDOM);
1052         assert_return(s->pending, -ENODATA);
1053         assert_return(!event_pid_changed(s->event), -ECHILD);
1054
1055         *revents = s->io.revents;
1056         return 0;
1057 }
1058
1059 _public_ int sd_event_source_get_signal(sd_event_source *s) {
1060         assert_return(s, -EINVAL);
1061         assert_return(s->type == SOURCE_SIGNAL, -EDOM);
1062         assert_return(!event_pid_changed(s->event), -ECHILD);
1063
1064         return s->signal.sig;
1065 }
1066
1067 _public_ int sd_event_source_get_priority(sd_event_source *s, int *priority) {
1068         assert_return(s, -EINVAL);
1069         assert_return(!event_pid_changed(s->event), -ECHILD);
1070
1071         return s->priority;
1072 }
1073
1074 _public_ int sd_event_source_set_priority(sd_event_source *s, int priority) {
1075         assert_return(s, -EINVAL);
1076         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1077         assert_return(!event_pid_changed(s->event), -ECHILD);
1078
1079         if (s->priority == priority)
1080                 return 0;
1081
1082         s->priority = priority;
1083
1084         if (s->pending)
1085                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1086
1087         if (s->prepare)
1088                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1089
1090         if (s->type == SOURCE_QUIT)
1091                 prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1092
1093         return 0;
1094 }
1095
1096 _public_ int sd_event_source_get_enabled(sd_event_source *s, int *m) {
1097         assert_return(s, -EINVAL);
1098         assert_return(m, -EINVAL);
1099         assert_return(!event_pid_changed(s->event), -ECHILD);
1100
1101         *m = s->enabled;
1102         return 0;
1103 }
1104
1105 _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
1106         int r;
1107
1108         assert_return(s, -EINVAL);
1109         assert_return(m == SD_EVENT_OFF || m == SD_EVENT_ON || m == SD_EVENT_ONESHOT, -EINVAL);
1110         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1111         assert_return(!event_pid_changed(s->event), -ECHILD);
1112
1113         if (s->enabled == m)
1114                 return 0;
1115
1116         if (m == SD_EVENT_OFF) {
1117
1118                 switch (s->type) {
1119
1120                 case SOURCE_IO:
1121                         r = source_io_unregister(s);
1122                         if (r < 0)
1123                                 return r;
1124
1125                         s->enabled = m;
1126                         break;
1127
1128                 case SOURCE_MONOTONIC:
1129                         s->enabled = m;
1130                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1131                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1132                         break;
1133
1134                 case SOURCE_REALTIME:
1135                         s->enabled = m;
1136                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1137                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1138                         break;
1139
1140                 case SOURCE_SIGNAL:
1141                         s->enabled = m;
1142                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1143                                 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1144                                 event_update_signal_fd(s->event);
1145                         }
1146
1147                         break;
1148
1149                 case SOURCE_CHILD:
1150                         s->enabled = m;
1151
1152                         assert(s->event->n_enabled_child_sources > 0);
1153                         s->event->n_enabled_child_sources--;
1154
1155                         if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1156                                 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1157                                 event_update_signal_fd(s->event);
1158                         }
1159
1160                         break;
1161
1162                 case SOURCE_QUIT:
1163                         s->enabled = m;
1164                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1165                         break;
1166
1167                 case SOURCE_DEFER:
1168                         s->enabled = m;
1169                         break;
1170                 }
1171
1172         } else {
1173                 switch (s->type) {
1174
1175                 case SOURCE_IO:
1176                         r = source_io_register(s, m, s->io.events);
1177                         if (r < 0)
1178                                 return r;
1179
1180                         s->enabled = m;
1181                         break;
1182
1183                 case SOURCE_MONOTONIC:
1184                         s->enabled = m;
1185                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1186                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1187                         break;
1188
1189                 case SOURCE_REALTIME:
1190                         s->enabled = m;
1191                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1192                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1193                         break;
1194
1195                 case SOURCE_SIGNAL:
1196                         s->enabled = m;
1197
1198                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)  {
1199                                 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1200                                 event_update_signal_fd(s->event);
1201                         }
1202                         break;
1203
1204                 case SOURCE_CHILD:
1205                         s->enabled = m;
1206
1207                         if (s->enabled == SD_EVENT_OFF) {
1208                                 s->event->n_enabled_child_sources++;
1209
1210                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1211                                         assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1212                                         event_update_signal_fd(s->event);
1213                                 }
1214                         }
1215                         break;
1216
1217                 case SOURCE_QUIT:
1218                         s->enabled = m;
1219                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1220                         break;
1221
1222                 case SOURCE_DEFER:
1223                         s->enabled = m;
1224                         break;
1225                 }
1226         }
1227
1228         if (s->pending)
1229                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1230
1231         if (s->prepare)
1232                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1233
1234         return 0;
1235 }
1236
1237 _public_ int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1238         assert_return(s, -EINVAL);
1239         assert_return(usec, -EINVAL);
1240         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1241         assert_return(!event_pid_changed(s->event), -ECHILD);
1242
1243         *usec = s->time.next;
1244         return 0;
1245 }
1246
1247 _public_ int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1248         assert_return(s, -EINVAL);
1249         assert_return(usec != (uint64_t) -1, -EINVAL);
1250         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1251         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1252         assert_return(!event_pid_changed(s->event), -ECHILD);
1253
1254         s->time.next = usec;
1255
1256         source_set_pending(s, false);
1257
1258         if (s->type == SOURCE_REALTIME) {
1259                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1260                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1261         } else {
1262                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1263                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1264         }
1265
1266         return 0;
1267 }
1268
1269 _public_ int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1270         assert_return(s, -EINVAL);
1271         assert_return(usec, -EINVAL);
1272         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1273         assert_return(!event_pid_changed(s->event), -ECHILD);
1274
1275         *usec = s->time.accuracy;
1276         return 0;
1277 }
1278
1279 _public_ int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1280         assert_return(s, -EINVAL);
1281         assert_return(usec != (uint64_t) -1, -EINVAL);
1282         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1283         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1284         assert_return(!event_pid_changed(s->event), -ECHILD);
1285
1286         if (usec == 0)
1287                 usec = DEFAULT_ACCURACY_USEC;
1288
1289         s->time.accuracy = usec;
1290
1291         source_set_pending(s, false);
1292
1293         if (s->type == SOURCE_REALTIME)
1294                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1295         else
1296                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1297
1298         return 0;
1299 }
1300
1301 _public_ int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid) {
1302         assert_return(s, -EINVAL);
1303         assert_return(pid, -EINVAL);
1304         assert_return(s->type == SOURCE_CHILD, -EDOM);
1305         assert_return(!event_pid_changed(s->event), -ECHILD);
1306
1307         *pid = s->child.pid;
1308         return 0;
1309 }
1310
1311 _public_ int sd_event_source_set_prepare(sd_event_source *s, sd_event_handler_t callback) {
1312         int r;
1313
1314         assert_return(s, -EINVAL);
1315         assert_return(s->type != SOURCE_QUIT, -EDOM);
1316         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1317         assert_return(!event_pid_changed(s->event), -ECHILD);
1318
1319         if (s->prepare == callback)
1320                 return 0;
1321
1322         if (callback && s->prepare) {
1323                 s->prepare = callback;
1324                 return 0;
1325         }
1326
1327         r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1328         if (r < 0)
1329                 return r;
1330
1331         s->prepare = callback;
1332
1333         if (callback) {
1334                 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1335                 if (r < 0)
1336                         return r;
1337         } else
1338                 prioq_remove(s->event->prepare, s, &s->prepare_index);
1339
1340         return 0;
1341 }
1342
1343 _public_ void* sd_event_source_get_userdata(sd_event_source *s) {
1344         assert_return(s, NULL);
1345
1346         return s->userdata;
1347 }
1348
1349 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1350         usec_t c;
1351         assert(e);
1352         assert(a <= b);
1353
1354         if (a <= 0)
1355                 return 0;
1356
1357         if (b <= a + 1)
1358                 return a;
1359
1360         /*
1361           Find a good time to wake up again between times a and b. We
1362           have two goals here:
1363
1364           a) We want to wake up as seldom as possible, hence prefer
1365              later times over earlier times.
1366
1367           b) But if we have to wake up, then let's make sure to
1368              dispatch as much as possible on the entire system.
1369
1370           We implement this by waking up everywhere at the same time
1371           within any given minute if we can, synchronised via the
1372           perturbation value determined from the boot ID. If we can't,
1373           then we try to find the same spot in every 1s and then 250ms
1374           step. Otherwise, we pick the last possible time to wake up.
1375         */
1376
1377         c = (b / USEC_PER_MINUTE) * USEC_PER_MINUTE + e->perturb;
1378         if (c >= b) {
1379                 if (_unlikely_(c < USEC_PER_MINUTE))
1380                         return b;
1381
1382                 c -= USEC_PER_MINUTE;
1383         }
1384
1385         if (c >= a)
1386                 return c;
1387
1388         c = (b / USEC_PER_SEC) * USEC_PER_SEC + (e->perturb % USEC_PER_SEC);
1389         if (c >= b) {
1390                 if (_unlikely_(c < USEC_PER_SEC))
1391                         return b;
1392
1393                 c -= USEC_PER_SEC;
1394         }
1395
1396         if (c >= a)
1397                 return c;
1398
1399         c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1400         if (c >= b) {
1401                 if (_unlikely_(c < USEC_PER_MSEC*250))
1402                         return b;
1403
1404                 c -= USEC_PER_MSEC*250;
1405         }
1406
1407         if (c >= a)
1408                 return c;
1409
1410         return b;
1411 }
1412
1413 static int event_arm_timer(
1414                 sd_event *e,
1415                 int timer_fd,
1416                 Prioq *earliest,
1417                 Prioq *latest,
1418                 usec_t *next) {
1419
1420         struct itimerspec its = {};
1421         sd_event_source *a, *b;
1422         usec_t t;
1423         int r;
1424
1425         assert_se(e);
1426         assert_se(next);
1427
1428         a = prioq_peek(earliest);
1429         if (!a || a->enabled == SD_EVENT_OFF) {
1430
1431                 if (timer_fd < 0)
1432                         return 0;
1433
1434                 if (*next == (usec_t) -1)
1435                         return 0;
1436
1437                 /* disarm */
1438                 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1439                 if (r < 0)
1440                         return r;
1441
1442                 *next = (usec_t) -1;
1443
1444                 return 0;
1445         }
1446
1447         b = prioq_peek(latest);
1448         assert_se(b && b->enabled != SD_EVENT_OFF);
1449
1450         t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1451         if (*next == t)
1452                 return 0;
1453
1454         assert_se(timer_fd >= 0);
1455
1456         if (t == 0) {
1457                 /* We don' want to disarm here, just mean some time looooong ago. */
1458                 its.it_value.tv_sec = 0;
1459                 its.it_value.tv_nsec = 1;
1460         } else
1461                 timespec_store(&its.it_value, t);
1462
1463         r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1464         if (r < 0)
1465                 return r;
1466
1467         *next = t;
1468         return 0;
1469 }
1470
1471 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1472         assert(e);
1473         assert(s);
1474         assert(s->type == SOURCE_IO);
1475
1476         s->io.revents = events;
1477
1478         return source_set_pending(s, true);
1479 }
1480
1481 static int flush_timer(sd_event *e, int fd, uint32_t events, usec_t *next) {
1482         uint64_t x;
1483         ssize_t ss;
1484
1485         assert(e);
1486         assert(fd >= 0);
1487         assert(next);
1488
1489         assert_return(events == EPOLLIN, -EIO);
1490
1491         ss = read(fd, &x, sizeof(x));
1492         if (ss < 0) {
1493                 if (errno == EAGAIN || errno == EINTR)
1494                         return 0;
1495
1496                 return -errno;
1497         }
1498
1499         if (ss != sizeof(x))
1500                 return -EIO;
1501
1502         *next = (usec_t) -1;
1503
1504         return 0;
1505 }
1506
1507 static int process_timer(
1508                 sd_event *e,
1509                 usec_t n,
1510                 Prioq *earliest,
1511                 Prioq *latest) {
1512
1513         sd_event_source *s;
1514         int r;
1515
1516         assert(e);
1517
1518         for (;;) {
1519                 s = prioq_peek(earliest);
1520                 if (!s ||
1521                     s->time.next > n ||
1522                     s->enabled == SD_EVENT_OFF ||
1523                     s->pending)
1524                         break;
1525
1526                 r = source_set_pending(s, true);
1527                 if (r < 0)
1528                         return r;
1529
1530                 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1531                 prioq_reshuffle(latest, s, &s->time.latest_index);
1532         }
1533
1534         return 0;
1535 }
1536
1537 static int process_child(sd_event *e) {
1538         sd_event_source *s;
1539         Iterator i;
1540         int r;
1541
1542         assert(e);
1543
1544         e->need_process_child = false;
1545
1546         /*
1547            So, this is ugly. We iteratively invoke waitid() with P_PID
1548            + WNOHANG for each PID we wait for, instead of using
1549            P_ALL. This is because we only want to get child
1550            information of very specific child processes, and not all
1551            of them. We might not have processed the SIGCHLD even of a
1552            previous invocation and we don't want to maintain a
1553            unbounded *per-child* event queue, hence we really don't
1554            want anything flushed out of the kernel's queue that we
1555            don't care about. Since this is O(n) this means that if you
1556            have a lot of processes you probably want to handle SIGCHLD
1557            yourself.
1558
1559            We do not reap the children here (by using WNOWAIT), this
1560            is only done after the event source is dispatched so that
1561            the callback still sees the process as a zombie.
1562         */
1563
1564         HASHMAP_FOREACH(s, e->child_sources, i) {
1565                 assert(s->type == SOURCE_CHILD);
1566
1567                 if (s->pending)
1568                         continue;
1569
1570                 if (s->enabled == SD_EVENT_OFF)
1571                         continue;
1572
1573                 zero(s->child.siginfo);
1574                 r = waitid(P_PID, s->child.pid, &s->child.siginfo,
1575                            WNOHANG | (s->child.options & WEXITED ? WNOWAIT : 0) | s->child.options);
1576                 if (r < 0)
1577                         return -errno;
1578
1579                 if (s->child.siginfo.si_pid != 0) {
1580                         bool zombie =
1581                                 s->child.siginfo.si_code == CLD_EXITED ||
1582                                 s->child.siginfo.si_code == CLD_KILLED ||
1583                                 s->child.siginfo.si_code == CLD_DUMPED;
1584
1585                         if (!zombie && (s->child.options & WEXITED)) {
1586                                 /* If the child isn't dead then let's
1587                                  * immediately remove the state change
1588                                  * from the queue, since there's no
1589                                  * benefit in leaving it queued */
1590
1591                                 assert(s->child.options & (WSTOPPED|WCONTINUED));
1592                                 waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|(s->child.options & (WSTOPPED|WCONTINUED)));
1593                         }
1594
1595                         r = source_set_pending(s, true);
1596                         if (r < 0)
1597                                 return r;
1598                 }
1599         }
1600
1601         return 0;
1602 }
1603
1604 static int process_signal(sd_event *e, uint32_t events) {
1605         bool read_one = false;
1606         int r;
1607
1608         assert(e);
1609         assert(e->signal_sources);
1610
1611         assert_return(events == EPOLLIN, -EIO);
1612
1613         for (;;) {
1614                 struct signalfd_siginfo si;
1615                 ssize_t ss;
1616                 sd_event_source *s;
1617
1618                 ss = read(e->signal_fd, &si, sizeof(si));
1619                 if (ss < 0) {
1620                         if (errno == EAGAIN || errno == EINTR)
1621                                 return read_one;
1622
1623                         return -errno;
1624                 }
1625
1626                 if (ss != sizeof(si))
1627                         return -EIO;
1628
1629                 read_one = true;
1630
1631                 s = e->signal_sources[si.ssi_signo];
1632                 if (si.ssi_signo == SIGCHLD) {
1633                         r = process_child(e);
1634                         if (r < 0)
1635                                 return r;
1636                         if (r > 0 || !s)
1637                                 continue;
1638                 } else
1639                         if (!s)
1640                                 return -EIO;
1641
1642                 s->signal.siginfo = si;
1643                 r = source_set_pending(s, true);
1644                 if (r < 0)
1645                         return r;
1646         }
1647
1648         return 0;
1649 }
1650
1651 static int source_dispatch(sd_event_source *s) {
1652         int r = 0;
1653
1654         assert(s);
1655         assert(s->pending || s->type == SOURCE_QUIT);
1656
1657         if (s->type != SOURCE_DEFER && s->type != SOURCE_QUIT) {
1658                 r = source_set_pending(s, false);
1659                 if (r < 0)
1660                         return r;
1661         }
1662
1663         if (s->enabled == SD_EVENT_ONESHOT) {
1664                 r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
1665                 if (r < 0)
1666                         return r;
1667         }
1668
1669         sd_event_source_ref(s);
1670
1671         switch (s->type) {
1672
1673         case SOURCE_IO:
1674                 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1675                 break;
1676
1677         case SOURCE_MONOTONIC:
1678                 r = s->time.callback(s, s->time.next, s->userdata);
1679                 break;
1680
1681         case SOURCE_REALTIME:
1682                 r = s->time.callback(s, s->time.next, s->userdata);
1683                 break;
1684
1685         case SOURCE_SIGNAL:
1686                 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1687                 break;
1688
1689         case SOURCE_CHILD: {
1690                 bool zombie;
1691
1692                 zombie = s->child.siginfo.si_code == CLD_EXITED ||
1693                          s->child.siginfo.si_code == CLD_KILLED ||
1694                          s->child.siginfo.si_code == CLD_DUMPED;
1695
1696                 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1697
1698                 /* Now, reap the PID for good. */
1699                 if (zombie)
1700                         waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|WEXITED);
1701
1702                 break;
1703         }
1704
1705         case SOURCE_DEFER:
1706                 r = s->defer.callback(s, s->userdata);
1707                 break;
1708
1709         case SOURCE_QUIT:
1710                 r = s->quit.callback(s, s->userdata);
1711                 break;
1712         }
1713
1714         sd_event_source_unref(s);
1715
1716         return r;
1717 }
1718
1719 static int event_prepare(sd_event *e) {
1720         int r;
1721
1722         assert(e);
1723
1724         for (;;) {
1725                 sd_event_source *s;
1726
1727                 s = prioq_peek(e->prepare);
1728                 if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
1729                         break;
1730
1731                 s->prepare_iteration = e->iteration;
1732                 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1733                 if (r < 0)
1734                         return r;
1735
1736                 assert(s->prepare);
1737                 r = s->prepare(s, s->userdata);
1738                 if (r < 0)
1739                         return r;
1740
1741         }
1742
1743         return 0;
1744 }
1745
1746 static int dispatch_quit(sd_event *e) {
1747         sd_event_source *p;
1748         int r;
1749
1750         assert(e);
1751
1752         p = prioq_peek(e->quit);
1753         if (!p || p->enabled == SD_EVENT_OFF) {
1754                 e->state = SD_EVENT_FINISHED;
1755                 return 0;
1756         }
1757
1758         sd_event_ref(e);
1759         e->iteration++;
1760         e->state = SD_EVENT_QUITTING;
1761
1762         r = source_dispatch(p);
1763
1764         e->state = SD_EVENT_PASSIVE;
1765         sd_event_unref(e);
1766
1767         return r;
1768 }
1769
1770 static sd_event_source* event_next_pending(sd_event *e) {
1771         sd_event_source *p;
1772
1773         assert(e);
1774
1775         p = prioq_peek(e->pending);
1776         if (!p)
1777                 return NULL;
1778
1779         if (p->enabled == SD_EVENT_OFF)
1780                 return NULL;
1781
1782         return p;
1783 }
1784
1785 _public_ int sd_event_run(sd_event *e, uint64_t timeout) {
1786         struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1787         sd_event_source *p;
1788         int r, i, m;
1789
1790         assert_return(e, -EINVAL);
1791         assert_return(!event_pid_changed(e), -ECHILD);
1792         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1793         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1794
1795         if (e->quit_requested)
1796                 return dispatch_quit(e);
1797
1798         sd_event_ref(e);
1799         e->iteration++;
1800         e->state = SD_EVENT_RUNNING;
1801
1802         r = event_prepare(e);
1803         if (r < 0)
1804                 goto finish;
1805
1806         r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1807         if (r < 0)
1808                 goto finish;
1809
1810         r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1811         if (r < 0)
1812                 goto finish;
1813
1814         if (event_next_pending(e) || e->need_process_child)
1815                 timeout = 0;
1816
1817         m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1818                        timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1819         if (m < 0) {
1820                 r = errno == EAGAIN || errno == EINTR ? 0 : -errno;
1821                 goto finish;
1822         }
1823
1824         dual_timestamp_get(&e->timestamp);
1825
1826         for (i = 0; i < m; i++) {
1827
1828                 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1829                         r = flush_timer(e, e->monotonic_fd, ev_queue[i].events, &e->monotonic_next);
1830                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1831                         r = flush_timer(e, e->realtime_fd, ev_queue[i].events, &e->realtime_next);
1832                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1833                         r = process_signal(e, ev_queue[i].events);
1834                 else
1835                         r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1836
1837                 if (r < 0)
1838                         goto finish;
1839         }
1840
1841         r = process_timer(e, e->timestamp.monotonic, e->monotonic_earliest, e->monotonic_latest);
1842         if (r < 0)
1843                 goto finish;
1844
1845         r = process_timer(e, e->timestamp.realtime, e->realtime_earliest, e->realtime_latest);
1846         if (r < 0)
1847                 goto finish;
1848
1849         if (e->need_process_child) {
1850                 r = process_child(e);
1851                 if (r < 0)
1852                         goto finish;
1853         }
1854
1855         p = event_next_pending(e);
1856         if (!p) {
1857                 r = 0;
1858                 goto finish;
1859         }
1860
1861         r = source_dispatch(p);
1862
1863 finish:
1864         e->state = SD_EVENT_PASSIVE;
1865         sd_event_unref(e);
1866
1867         return r;
1868 }
1869
1870 _public_ int sd_event_loop(sd_event *e) {
1871         int r;
1872
1873         assert_return(e, -EINVAL);
1874         assert_return(!event_pid_changed(e), -ECHILD);
1875         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1876
1877         sd_event_ref(e);
1878
1879         while (e->state != SD_EVENT_FINISHED) {
1880                 r = sd_event_run(e, (uint64_t) -1);
1881                 if (r < 0)
1882                         goto finish;
1883         }
1884
1885         r = 0;
1886
1887 finish:
1888         sd_event_unref(e);
1889         return r;
1890 }
1891
1892 _public_ int sd_event_get_state(sd_event *e) {
1893         assert_return(e, -EINVAL);
1894         assert_return(!event_pid_changed(e), -ECHILD);
1895
1896         return e->state;
1897 }
1898
1899 _public_ int sd_event_get_quit(sd_event *e) {
1900         assert_return(e, -EINVAL);
1901         assert_return(!event_pid_changed(e), -ECHILD);
1902
1903         return e->quit_requested;
1904 }
1905
1906 _public_ int sd_event_request_quit(sd_event *e) {
1907         assert_return(e, -EINVAL);
1908         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1909         assert_return(!event_pid_changed(e), -ECHILD);
1910
1911         e->quit_requested = true;
1912         return 0;
1913 }
1914
1915 _public_ int sd_event_get_now_realtime(sd_event *e, uint64_t *usec) {
1916         assert_return(e, -EINVAL);
1917         assert_return(usec, -EINVAL);
1918         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1919         assert_return(!event_pid_changed(e), -ECHILD);
1920
1921         *usec = e->timestamp.realtime;
1922         return 0;
1923 }
1924
1925 _public_ int sd_event_get_now_monotonic(sd_event *e, uint64_t *usec) {
1926         assert_return(e, -EINVAL);
1927         assert_return(usec, -EINVAL);
1928         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1929         assert_return(!event_pid_changed(e), -ECHILD);
1930
1931         *usec = e->timestamp.monotonic;
1932         return 0;
1933 }
1934
1935 _public_ int sd_event_default(sd_event **ret) {
1936
1937         static __thread sd_event *default_event = NULL;
1938         sd_event *e;
1939         int r;
1940
1941         if (!ret)
1942                 return !!default_event;
1943
1944         if (default_event) {
1945                 *ret = sd_event_ref(default_event);
1946                 return 0;
1947         }
1948
1949         r = sd_event_new(&e);
1950         if (r < 0)
1951                 return r;
1952
1953         e->default_event_ptr = &default_event;
1954         e->tid = gettid();
1955         default_event = e;
1956
1957         *ret = e;
1958         return 1;
1959 }
1960
1961 _public_ int sd_event_get_tid(sd_event *e, pid_t *tid) {
1962         assert_return(e, -EINVAL);
1963         assert_return(tid, -EINVAL);
1964         assert_return(!event_pid_changed(e), -ECHILD);
1965
1966         if (e->tid != 0) {
1967                 *tid = e->tid;
1968                 return 0;
1969         }
1970
1971         return -ENXIO;
1972 }