chiark / gitweb /
event: make sure we keep a reference to all events we dispatch while we do so.
[elogind.git] / src / libsystemd-bus / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25
26 #include "macro.h"
27 #include "prioq.h"
28 #include "hashmap.h"
29 #include "util.h"
30 #include "time-util.h"
31 #include "sd-id128.h"
32
33 #include "sd-event.h"
34
35 #define EPOLL_QUEUE_MAX 64
36 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
37
38 typedef enum EventSourceType {
39         SOURCE_IO,
40         SOURCE_MONOTONIC,
41         SOURCE_REALTIME,
42         SOURCE_SIGNAL,
43         SOURCE_CHILD,
44         SOURCE_DEFER,
45         SOURCE_QUIT
46 } EventSourceType;
47
48 struct sd_event_source {
49         unsigned n_ref;
50
51         sd_event *event;
52         void *userdata;
53         sd_prepare_handler_t prepare;
54
55         EventSourceType type:4;
56         int enabled:3;
57         bool pending:1;
58
59         int priority;
60         unsigned pending_index;
61         unsigned prepare_index;
62         unsigned pending_iteration;
63         unsigned prepare_iteration;
64
65         union {
66                 struct {
67                         sd_io_handler_t callback;
68                         int fd;
69                         uint32_t events;
70                         uint32_t revents;
71                         bool registered:1;
72                 } io;
73                 struct {
74                         sd_time_handler_t callback;
75                         usec_t next, accuracy;
76                         unsigned earliest_index;
77                         unsigned latest_index;
78                 } time;
79                 struct {
80                         sd_signal_handler_t callback;
81                         struct signalfd_siginfo siginfo;
82                         int sig;
83                 } signal;
84                 struct {
85                         sd_child_handler_t callback;
86                         siginfo_t siginfo;
87                         pid_t pid;
88                         int options;
89                 } child;
90                 struct {
91                         sd_defer_handler_t callback;
92                 } defer;
93                 struct {
94                         sd_quit_handler_t callback;
95                         unsigned prioq_index;
96                 } quit;
97         };
98 };
99
100 struct sd_event {
101         unsigned n_ref;
102
103         int epoll_fd;
104         int signal_fd;
105         int realtime_fd;
106         int monotonic_fd;
107
108         Prioq *pending;
109         Prioq *prepare;
110
111         /* For both clocks we maintain two priority queues each, one
112          * ordered for the earliest times the events may be
113          * dispatched, and one ordered by the latest times they must
114          * have been dispatched. The range between the top entries in
115          * the two prioqs is the time window we can freely schedule
116          * wakeups in */
117         Prioq *monotonic_earliest;
118         Prioq *monotonic_latest;
119         Prioq *realtime_earliest;
120         Prioq *realtime_latest;
121
122         usec_t realtime_next, monotonic_next;
123         usec_t perturb;
124
125         sigset_t sigset;
126         sd_event_source **signal_sources;
127
128         Hashmap *child_sources;
129         unsigned n_enabled_child_sources;
130
131         Prioq *quit;
132
133         pid_t original_pid;
134
135         unsigned iteration;
136         dual_timestamp timestamp;
137         int state;
138
139         bool quit_requested:1;
140         bool need_process_child:1;
141 };
142
143 static int pending_prioq_compare(const void *a, const void *b) {
144         const sd_event_source *x = a, *y = b;
145
146         assert(x->pending);
147         assert(y->pending);
148
149         /* Enabled ones first */
150         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
151                 return -1;
152         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
153                 return 1;
154
155         /* Lower priority values first */
156         if (x->priority < y->priority)
157                 return -1;
158         if (x->priority > y->priority)
159                 return 1;
160
161         /* Older entries first */
162         if (x->pending_iteration < y->pending_iteration)
163                 return -1;
164         if (x->pending_iteration > y->pending_iteration)
165                 return 1;
166
167         /* Stability for the rest */
168         if (x < y)
169                 return -1;
170         if (x > y)
171                 return 1;
172
173         return 0;
174 }
175
176 static int prepare_prioq_compare(const void *a, const void *b) {
177         const sd_event_source *x = a, *y = b;
178
179         assert(x->prepare);
180         assert(y->prepare);
181
182         /* Move most recently prepared ones last, so that we can stop
183          * preparing as soon as we hit one that has already been
184          * prepared in the current iteration */
185         if (x->prepare_iteration < y->prepare_iteration)
186                 return -1;
187         if (x->prepare_iteration > y->prepare_iteration)
188                 return 1;
189
190         /* Enabled ones first */
191         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
192                 return -1;
193         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
194                 return 1;
195
196         /* Lower priority values first */
197         if (x->priority < y->priority)
198                 return -1;
199         if (x->priority > y->priority)
200                 return 1;
201
202         /* Stability for the rest */
203         if (x < y)
204                 return -1;
205         if (x > y)
206                 return 1;
207
208         return 0;
209 }
210
211 static int earliest_time_prioq_compare(const void *a, const void *b) {
212         const sd_event_source *x = a, *y = b;
213
214         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
215         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
216
217         /* Enabled ones first */
218         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
219                 return -1;
220         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
221                 return 1;
222
223         /* Move the pending ones to the end */
224         if (!x->pending && y->pending)
225                 return -1;
226         if (x->pending && !y->pending)
227                 return 1;
228
229         /* Order by time */
230         if (x->time.next < y->time.next)
231                 return -1;
232         if (x->time.next > y->time.next)
233                 return -1;
234
235         /* Stability for the rest */
236         if (x < y)
237                 return -1;
238         if (x > y)
239                 return 1;
240
241         return 0;
242 }
243
244 static int latest_time_prioq_compare(const void *a, const void *b) {
245         const sd_event_source *x = a, *y = b;
246
247         assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
248                (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
249
250         /* Enabled ones first */
251         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
252                 return -1;
253         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
254                 return 1;
255
256         /* Move the pending ones to the end */
257         if (!x->pending && y->pending)
258                 return -1;
259         if (x->pending && !y->pending)
260                 return 1;
261
262         /* Order by time */
263         if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
264                 return -1;
265         if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
266                 return -1;
267
268         /* Stability for the rest */
269         if (x < y)
270                 return -1;
271         if (x > y)
272                 return 1;
273
274         return 0;
275 }
276
277 static int quit_prioq_compare(const void *a, const void *b) {
278         const sd_event_source *x = a, *y = b;
279
280         assert(x->type == SOURCE_QUIT);
281         assert(y->type == SOURCE_QUIT);
282
283         /* Enabled ones first */
284         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
285                 return -1;
286         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
287                 return 1;
288
289         /* Lower priority values first */
290         if (x->priority < y->priority)
291                 return -1;
292         if (x->priority > y->priority)
293                 return 1;
294
295         /* Stability for the rest */
296         if (x < y)
297                 return -1;
298         if (x > y)
299                 return 1;
300
301         return 0;
302 }
303
304 static void event_free(sd_event *e) {
305         assert(e);
306
307         if (e->epoll_fd >= 0)
308                 close_nointr_nofail(e->epoll_fd);
309
310         if (e->signal_fd >= 0)
311                 close_nointr_nofail(e->signal_fd);
312
313         if (e->realtime_fd >= 0)
314                 close_nointr_nofail(e->realtime_fd);
315
316         if (e->monotonic_fd >= 0)
317                 close_nointr_nofail(e->monotonic_fd);
318
319         prioq_free(e->pending);
320         prioq_free(e->prepare);
321         prioq_free(e->monotonic_earliest);
322         prioq_free(e->monotonic_latest);
323         prioq_free(e->realtime_earliest);
324         prioq_free(e->realtime_latest);
325         prioq_free(e->quit);
326
327         free(e->signal_sources);
328
329         hashmap_free(e->child_sources);
330         free(e);
331 }
332
333 _public_ int sd_event_new(sd_event** ret) {
334         sd_event *e;
335         int r;
336
337         assert_return(ret, -EINVAL);
338
339         e = new0(sd_event, 1);
340         if (!e)
341                 return -ENOMEM;
342
343         e->n_ref = 1;
344         e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
345         e->realtime_next = e->monotonic_next = (usec_t) -1;
346         e->original_pid = getpid();
347
348         assert_se(sigemptyset(&e->sigset) == 0);
349
350         e->pending = prioq_new(pending_prioq_compare);
351         if (!e->pending) {
352                 r = -ENOMEM;
353                 goto fail;
354         }
355
356         e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
357         if (e->epoll_fd < 0) {
358                 r = -errno;
359                 goto fail;
360         }
361
362         *ret = e;
363         return 0;
364
365 fail:
366         event_free(e);
367         return r;
368 }
369
370 _public_ sd_event* sd_event_ref(sd_event *e) {
371         assert_return(e, NULL);
372
373         assert(e->n_ref >= 1);
374         e->n_ref++;
375
376         return e;
377 }
378
379 _public_ sd_event* sd_event_unref(sd_event *e) {
380         assert_return(e, NULL);
381
382         assert(e->n_ref >= 1);
383         e->n_ref--;
384
385         if (e->n_ref <= 0)
386                 event_free(e);
387
388         return NULL;
389 }
390
391 static bool event_pid_changed(sd_event *e) {
392         assert(e);
393
394         /* We don't support people creating am event loop and keeping
395          * it around over a fork(). Let's complain. */
396
397         return e->original_pid != getpid();
398 }
399
400 static int source_io_unregister(sd_event_source *s) {
401         int r;
402
403         assert(s);
404         assert(s->type == SOURCE_IO);
405
406         if (!s->io.registered)
407                 return 0;
408
409         r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
410         if (r < 0)
411                 return -errno;
412
413         s->io.registered = false;
414         return 0;
415 }
416
417 static int source_io_register(
418                 sd_event_source *s,
419                 int enabled,
420                 uint32_t events) {
421
422         struct epoll_event ev = {};
423         int r;
424
425         assert(s);
426         assert(s->type == SOURCE_IO);
427         assert(enabled != SD_EVENT_OFF);
428
429         ev.events = events;
430         ev.data.ptr = s;
431
432         if (enabled == SD_EVENT_ONESHOT)
433                 ev.events |= EPOLLONESHOT;
434
435         if (s->io.registered)
436                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
437         else
438                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
439
440         if (r < 0)
441                 return -errno;
442
443         s->io.registered = true;
444
445         return 0;
446 }
447
448 static void source_free(sd_event_source *s) {
449         assert(s);
450
451         if (s->event) {
452                 switch (s->type) {
453
454                 case SOURCE_IO:
455                         if (s->io.fd >= 0)
456                                 source_io_unregister(s);
457
458                         break;
459
460                 case SOURCE_MONOTONIC:
461                         prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
462                         prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
463                         break;
464
465                 case SOURCE_REALTIME:
466                         prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
467                         prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
468                         break;
469
470                 case SOURCE_SIGNAL:
471                         if (s->signal.sig > 0) {
472                                 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)
473                                         assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
474
475                                 if (s->event->signal_sources)
476                                         s->event->signal_sources[s->signal.sig] = NULL;
477                         }
478
479                         break;
480
481                 case SOURCE_CHILD:
482                         if (s->child.pid > 0) {
483                                 if (s->enabled != SD_EVENT_OFF) {
484                                         assert(s->event->n_enabled_child_sources > 0);
485                                         s->event->n_enabled_child_sources--;
486                                 }
487
488                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
489                                         assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
490
491                                 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
492                         }
493
494                         break;
495
496                 case SOURCE_DEFER:
497                         /* nothing */
498                         break;
499
500                 case SOURCE_QUIT:
501                         prioq_remove(s->event->quit, s, &s->quit.prioq_index);
502                         break;
503                 }
504
505                 if (s->pending)
506                         prioq_remove(s->event->pending, s, &s->pending_index);
507
508                 if (s->prepare)
509                         prioq_remove(s->event->prepare, s, &s->prepare_index);
510
511                 sd_event_unref(s->event);
512         }
513
514         free(s);
515 }
516
517 static int source_set_pending(sd_event_source *s, bool b) {
518         int r;
519
520         assert(s);
521         assert(s->type != SOURCE_QUIT);
522
523         if (s->pending == b)
524                 return 0;
525
526         s->pending = b;
527
528         if (b) {
529                 s->pending_iteration = s->event->iteration;
530
531                 r = prioq_put(s->event->pending, s, &s->pending_index);
532                 if (r < 0) {
533                         s->pending = false;
534                         return r;
535                 }
536         } else
537                 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
538
539         return 0;
540 }
541
542 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
543         sd_event_source *s;
544
545         assert(e);
546
547         s = new0(sd_event_source, 1);
548         if (!s)
549                 return NULL;
550
551         s->n_ref = 1;
552         s->event = sd_event_ref(e);
553         s->type = type;
554         s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
555
556         return s;
557 }
558
559 _public_ int sd_event_add_io(
560                 sd_event *e,
561                 int fd,
562                 uint32_t events,
563                 sd_io_handler_t callback,
564                 void *userdata,
565                 sd_event_source **ret) {
566
567         sd_event_source *s;
568         int r;
569
570         assert_return(e, -EINVAL);
571         assert_return(fd >= 0, -EINVAL);
572         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
573         assert_return(callback, -EINVAL);
574         assert_return(ret, -EINVAL);
575         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
576         assert_return(!event_pid_changed(e), -ECHILD);
577
578         s = source_new(e, SOURCE_IO);
579         if (!s)
580                 return -ENOMEM;
581
582         s->io.fd = fd;
583         s->io.events = events;
584         s->io.callback = callback;
585         s->userdata = userdata;
586         s->enabled = SD_EVENT_ON;
587
588         r = source_io_register(s, s->enabled, events);
589         if (r < 0) {
590                 source_free(s);
591                 return -errno;
592         }
593
594         *ret = s;
595         return 0;
596 }
597
598 static int event_setup_timer_fd(
599                 sd_event *e,
600                 EventSourceType type,
601                 int *timer_fd,
602                 clockid_t id) {
603
604         struct epoll_event ev = {};
605         int r, fd;
606         sd_id128_t bootid;
607
608         assert(e);
609         assert(timer_fd);
610
611         if (_likely_(*timer_fd >= 0))
612                 return 0;
613
614         fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
615         if (fd < 0)
616                 return -errno;
617
618         ev.events = EPOLLIN;
619         ev.data.ptr = INT_TO_PTR(type);
620
621         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
622         if (r < 0) {
623                 close_nointr_nofail(fd);
624                 return -errno;
625         }
626
627         /* When we sleep for longer, we try to realign the wakeup to
628            the same time wihtin each second, so that events all across
629            the system can be coalesced into a single CPU
630            wakeup. However, let's take some system-specific randomness
631            for this value, so that in a network of systems with synced
632            clocks timer events are distributed a bit. Here, we
633            calculate a perturbation usec offset from the boot ID. */
634
635         if (sd_id128_get_boot(&bootid) >= 0)
636                 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_SEC;
637
638         *timer_fd = fd;
639         return 0;
640 }
641
642 static int event_add_time_internal(
643                 sd_event *e,
644                 EventSourceType type,
645                 int *timer_fd,
646                 clockid_t id,
647                 Prioq **earliest,
648                 Prioq **latest,
649                 uint64_t usec,
650                 uint64_t accuracy,
651                 sd_time_handler_t callback,
652                 void *userdata,
653                 sd_event_source **ret) {
654
655         sd_event_source *s;
656         int r;
657
658         assert_return(e, -EINVAL);
659         assert_return(callback, -EINVAL);
660         assert_return(ret, -EINVAL);
661         assert_return(usec != (uint64_t) -1, -EINVAL);
662         assert_return(accuracy != (uint64_t) -1, -EINVAL);
663         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
664         assert_return(!event_pid_changed(e), -ECHILD);
665
666         assert(timer_fd);
667         assert(earliest);
668         assert(latest);
669
670         if (!*earliest) {
671                 *earliest = prioq_new(earliest_time_prioq_compare);
672                 if (!*earliest)
673                         return -ENOMEM;
674         }
675
676         if (!*latest) {
677                 *latest = prioq_new(latest_time_prioq_compare);
678                 if (!*latest)
679                         return -ENOMEM;
680         }
681
682         if (*timer_fd < 0) {
683                 r = event_setup_timer_fd(e, type, timer_fd, id);
684                 if (r < 0)
685                         return r;
686         }
687
688         s = source_new(e, type);
689         if (!s)
690                 return -ENOMEM;
691
692         s->time.next = usec;
693         s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
694         s->time.callback = callback;
695         s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
696         s->userdata = userdata;
697         s->enabled = SD_EVENT_ONESHOT;
698
699         r = prioq_put(*earliest, s, &s->time.earliest_index);
700         if (r < 0)
701                 goto fail;
702
703         r = prioq_put(*latest, s, &s->time.latest_index);
704         if (r < 0)
705                 goto fail;
706
707         *ret = s;
708         return 0;
709
710 fail:
711         source_free(s);
712         return r;
713 }
714
715 _public_ int sd_event_add_monotonic(sd_event *e,
716                                     uint64_t usec,
717                                     uint64_t accuracy,
718                                     sd_time_handler_t callback,
719                                     void *userdata,
720                                     sd_event_source **ret) {
721
722         return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
723 }
724
725 _public_ int sd_event_add_realtime(sd_event *e,
726                                    uint64_t usec,
727                                    uint64_t accuracy,
728                                    sd_time_handler_t callback,
729                                    void *userdata,
730                                    sd_event_source **ret) {
731
732         return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
733 }
734
735 static int event_update_signal_fd(sd_event *e) {
736         struct epoll_event ev = {};
737         bool add_to_epoll;
738         int r;
739
740         assert(e);
741
742         add_to_epoll = e->signal_fd < 0;
743
744         r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
745         if (r < 0)
746                 return -errno;
747
748         e->signal_fd = r;
749
750         if (!add_to_epoll)
751                 return 0;
752
753         ev.events = EPOLLIN;
754         ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
755
756         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
757         if (r < 0) {
758                 close_nointr_nofail(e->signal_fd);
759                 e->signal_fd = -1;
760
761                 return -errno;
762         }
763
764         return 0;
765 }
766
767 _public_ int sd_event_add_signal(
768                 sd_event *e,
769                 int sig,
770                 sd_signal_handler_t callback,
771                 void *userdata,
772                 sd_event_source **ret) {
773
774         sd_event_source *s;
775         int r;
776
777         assert_return(e, -EINVAL);
778         assert_return(sig > 0, -EINVAL);
779         assert_return(sig < _NSIG, -EINVAL);
780         assert_return(callback, -EINVAL);
781         assert_return(ret, -EINVAL);
782         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
783         assert_return(!event_pid_changed(e), -ECHILD);
784
785         if (!e->signal_sources) {
786                 e->signal_sources = new0(sd_event_source*, _NSIG);
787                 if (!e->signal_sources)
788                         return -ENOMEM;
789         } else if (e->signal_sources[sig])
790                 return -EBUSY;
791
792         s = source_new(e, SOURCE_SIGNAL);
793         if (!s)
794                 return -ENOMEM;
795
796         s->signal.sig = sig;
797         s->signal.callback = callback;
798         s->userdata = userdata;
799         s->enabled = SD_EVENT_ON;
800
801         e->signal_sources[sig] = s;
802         assert_se(sigaddset(&e->sigset, sig) == 0);
803
804         if (sig != SIGCHLD || e->n_enabled_child_sources == 0) {
805                 r = event_update_signal_fd(e);
806                 if (r < 0) {
807                         source_free(s);
808                         return r;
809                 }
810         }
811
812         *ret = s;
813         return 0;
814 }
815
816 _public_ int sd_event_add_child(
817                 sd_event *e,
818                 pid_t pid,
819                 int options,
820                 sd_child_handler_t callback,
821                 void *userdata,
822                 sd_event_source **ret) {
823
824         sd_event_source *s;
825         int r;
826
827         assert_return(e, -EINVAL);
828         assert_return(pid > 1, -EINVAL);
829         assert_return(!(options & ~(WEXITED|WSTOPPED|WCONTINUED)), -EINVAL);
830         assert_return(options != 0, -EINVAL);
831         assert_return(callback, -EINVAL);
832         assert_return(ret, -EINVAL);
833         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
834         assert_return(!event_pid_changed(e), -ECHILD);
835
836         r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
837         if (r < 0)
838                 return r;
839
840         if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
841                 return -EBUSY;
842
843         s = source_new(e, SOURCE_CHILD);
844         if (!s)
845                 return -ENOMEM;
846
847         s->child.pid = pid;
848         s->child.options = options;
849         s->child.callback = callback;
850         s->userdata = userdata;
851         s->enabled = SD_EVENT_ONESHOT;
852
853         r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
854         if (r < 0) {
855                 source_free(s);
856                 return r;
857         }
858
859         e->n_enabled_child_sources ++;
860
861         assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
862
863         if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
864                 r = event_update_signal_fd(e);
865                 if (r < 0) {
866                         source_free(s);
867                         return -errno;
868                 }
869         }
870
871         e->need_process_child = true;
872
873         *ret = s;
874         return 0;
875 }
876
877 _public_ int sd_event_add_defer(
878                 sd_event *e,
879                 sd_defer_handler_t callback,
880                 void *userdata,
881                 sd_event_source **ret) {
882
883         sd_event_source *s;
884         int r;
885
886         assert_return(e, -EINVAL);
887         assert_return(callback, -EINVAL);
888         assert_return(ret, -EINVAL);
889         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
890         assert_return(!event_pid_changed(e), -ECHILD);
891
892         s = source_new(e, SOURCE_DEFER);
893         if (!s)
894                 return -ENOMEM;
895
896         s->defer.callback = callback;
897         s->userdata = userdata;
898         s->enabled = SD_EVENT_ONESHOT;
899
900         r = source_set_pending(s, true);
901         if (r < 0) {
902                 source_free(s);
903                 return r;
904         }
905
906         *ret = s;
907         return 0;
908 }
909
910 _public_ int sd_event_add_quit(
911                 sd_event *e,
912                 sd_quit_handler_t callback,
913                 void *userdata,
914                 sd_event_source **ret) {
915
916         sd_event_source *s;
917         int r;
918
919         assert_return(e, -EINVAL);
920         assert_return(callback, -EINVAL);
921         assert_return(ret, -EINVAL);
922         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
923         assert_return(!event_pid_changed(e), -ECHILD);
924
925         if (!e->quit) {
926                 e->quit = prioq_new(quit_prioq_compare);
927                 if (!e->quit)
928                         return -ENOMEM;
929         }
930
931         s = source_new(e, SOURCE_QUIT);
932         if (!s)
933                 return -ENOMEM;
934
935         s->quit.callback = callback;
936         s->userdata = userdata;
937         s->quit.prioq_index = PRIOQ_IDX_NULL;
938         s->enabled = SD_EVENT_ONESHOT;
939
940         r = prioq_put(s->event->quit, s, &s->quit.prioq_index);
941         if (r < 0) {
942                 source_free(s);
943                 return r;
944         }
945
946         *ret = s;
947         return 0;
948 }
949
950 _public_ sd_event_source* sd_event_source_ref(sd_event_source *s) {
951         assert_return(s, NULL);
952
953         assert(s->n_ref >= 1);
954         s->n_ref++;
955
956         return s;
957 }
958
959 _public_ sd_event_source* sd_event_source_unref(sd_event_source *s) {
960         assert_return(s, NULL);
961
962         assert(s->n_ref >= 1);
963         s->n_ref--;
964
965         if (s->n_ref <= 0)
966                 source_free(s);
967
968         return NULL;
969 }
970
971 _public_ sd_event *sd_event_get(sd_event_source *s) {
972         assert_return(s, NULL);
973
974         return s->event;
975 }
976
977 _public_ int sd_event_source_get_pending(sd_event_source *s) {
978         assert_return(s, -EINVAL);
979         assert_return(s->type != SOURCE_QUIT, -EDOM);
980         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
981         assert_return(!event_pid_changed(s->event), -ECHILD);
982
983         return s->pending;
984 }
985
986 _public_ int sd_event_source_get_io_fd(sd_event_source *s) {
987         assert_return(s, -EINVAL);
988         assert_return(s->type == SOURCE_IO, -EDOM);
989         assert_return(!event_pid_changed(s->event), -ECHILD);
990
991         return s->io.fd;
992 }
993
994 _public_ int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
995         assert_return(s, -EINVAL);
996         assert_return(events, -EINVAL);
997         assert_return(s->type == SOURCE_IO, -EDOM);
998         assert_return(!event_pid_changed(s->event), -ECHILD);
999
1000         *events = s->io.events;
1001         return 0;
1002 }
1003
1004 _public_ int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
1005         int r;
1006
1007         assert_return(s, -EINVAL);
1008         assert_return(s->type == SOURCE_IO, -EDOM);
1009         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
1010         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1011         assert_return(!event_pid_changed(s->event), -ECHILD);
1012
1013         if (s->io.events == events)
1014                 return 0;
1015
1016         if (s->enabled != SD_EVENT_OFF) {
1017                 r = source_io_register(s, s->enabled, events);
1018                 if (r < 0)
1019                         return r;
1020         }
1021
1022         s->io.events = events;
1023
1024         return 0;
1025 }
1026
1027 _public_ int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1028         assert_return(s, -EINVAL);
1029         assert_return(revents, -EINVAL);
1030         assert_return(s->type == SOURCE_IO, -EDOM);
1031         assert_return(s->pending, -ENODATA);
1032         assert_return(!event_pid_changed(s->event), -ECHILD);
1033
1034         *revents = s->io.revents;
1035         return 0;
1036 }
1037
1038 _public_ int sd_event_source_get_signal(sd_event_source *s) {
1039         assert_return(s, -EINVAL);
1040         assert_return(s->type == SOURCE_SIGNAL, -EDOM);
1041         assert_return(!event_pid_changed(s->event), -ECHILD);
1042
1043         return s->signal.sig;
1044 }
1045
1046 _public_ int sd_event_source_get_priority(sd_event_source *s, int *priority) {
1047         assert_return(s, -EINVAL);
1048         assert_return(!event_pid_changed(s->event), -ECHILD);
1049
1050         return s->priority;
1051 }
1052
1053 _public_ int sd_event_source_set_priority(sd_event_source *s, int priority) {
1054         assert_return(s, -EINVAL);
1055         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1056         assert_return(!event_pid_changed(s->event), -ECHILD);
1057
1058         if (s->priority == priority)
1059                 return 0;
1060
1061         s->priority = priority;
1062
1063         if (s->pending)
1064                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1065
1066         if (s->prepare)
1067                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1068
1069         if (s->type == SOURCE_QUIT)
1070                 prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1071
1072         return 0;
1073 }
1074
1075 _public_ int sd_event_source_get_enabled(sd_event_source *s, int *m) {
1076         assert_return(s, -EINVAL);
1077         assert_return(m, -EINVAL);
1078         assert_return(!event_pid_changed(s->event), -ECHILD);
1079
1080         *m = s->enabled;
1081         return 0;
1082 }
1083
1084 _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
1085         int r;
1086
1087         assert_return(s, -EINVAL);
1088         assert_return(m == SD_EVENT_OFF || m == SD_EVENT_ON || m == SD_EVENT_ONESHOT, -EINVAL);
1089         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1090         assert_return(!event_pid_changed(s->event), -ECHILD);
1091
1092         if (s->enabled == m)
1093                 return 0;
1094
1095         if (m == SD_EVENT_OFF) {
1096
1097                 switch (s->type) {
1098
1099                 case SOURCE_IO:
1100                         r = source_io_unregister(s);
1101                         if (r < 0)
1102                                 return r;
1103
1104                         s->enabled = m;
1105                         break;
1106
1107                 case SOURCE_MONOTONIC:
1108                         s->enabled = m;
1109                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1110                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1111                         break;
1112
1113                 case SOURCE_REALTIME:
1114                         s->enabled = m;
1115                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1116                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1117                         break;
1118
1119                 case SOURCE_SIGNAL:
1120                         s->enabled = m;
1121                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1122                                 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1123                                 event_update_signal_fd(s->event);
1124                         }
1125
1126                         break;
1127
1128                 case SOURCE_CHILD:
1129                         s->enabled = m;
1130
1131                         assert(s->event->n_enabled_child_sources > 0);
1132                         s->event->n_enabled_child_sources--;
1133
1134                         if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1135                                 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1136                                 event_update_signal_fd(s->event);
1137                         }
1138
1139                         break;
1140
1141                 case SOURCE_QUIT:
1142                         s->enabled = m;
1143                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1144                         break;
1145
1146                 case SOURCE_DEFER:
1147                         s->enabled = m;
1148                         break;
1149                 }
1150
1151         } else {
1152                 switch (s->type) {
1153
1154                 case SOURCE_IO:
1155                         r = source_io_register(s, m, s->io.events);
1156                         if (r < 0)
1157                                 return r;
1158
1159                         s->enabled = m;
1160                         break;
1161
1162                 case SOURCE_MONOTONIC:
1163                         s->enabled = m;
1164                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1165                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1166                         break;
1167
1168                 case SOURCE_REALTIME:
1169                         s->enabled = m;
1170                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1171                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1172                         break;
1173
1174                 case SOURCE_SIGNAL:
1175                         s->enabled = m;
1176
1177                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)  {
1178                                 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1179                                 event_update_signal_fd(s->event);
1180                         }
1181                         break;
1182
1183                 case SOURCE_CHILD:
1184                         s->enabled = m;
1185
1186                         if (s->enabled == SD_EVENT_OFF) {
1187                                 s->event->n_enabled_child_sources++;
1188
1189                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1190                                         assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1191                                         event_update_signal_fd(s->event);
1192                                 }
1193                         }
1194                         break;
1195
1196                 case SOURCE_QUIT:
1197                         s->enabled = m;
1198                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1199                         break;
1200
1201                 case SOURCE_DEFER:
1202                         s->enabled = m;
1203                         break;
1204                 }
1205         }
1206
1207         if (s->pending)
1208                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1209
1210         if (s->prepare)
1211                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1212
1213         return 0;
1214 }
1215
1216 _public_ int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1217         assert_return(s, -EINVAL);
1218         assert_return(usec, -EINVAL);
1219         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1220         assert_return(!event_pid_changed(s->event), -ECHILD);
1221
1222         *usec = s->time.next;
1223         return 0;
1224 }
1225
1226 _public_ int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1227         assert_return(s, -EINVAL);
1228         assert_return(usec != (uint64_t) -1, -EINVAL);
1229         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1230         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1231         assert_return(!event_pid_changed(s->event), -ECHILD);
1232
1233         if (s->time.next == usec)
1234                 return 0;
1235
1236         s->time.next = usec;
1237
1238         if (s->type == SOURCE_REALTIME) {
1239                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1240                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1241         } else {
1242                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1243                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1244         }
1245
1246         return 0;
1247 }
1248
1249 _public_ int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1250         assert_return(s, -EINVAL);
1251         assert_return(usec, -EINVAL);
1252         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1253         assert_return(!event_pid_changed(s->event), -ECHILD);
1254
1255         *usec = s->time.accuracy;
1256         return 0;
1257 }
1258
1259 _public_ int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1260         assert_return(s, -EINVAL);
1261         assert_return(usec != (uint64_t) -1, -EINVAL);
1262         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1263         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1264         assert_return(!event_pid_changed(s->event), -ECHILD);
1265
1266         if (usec == 0)
1267                 usec = DEFAULT_ACCURACY_USEC;
1268
1269         if (s->time.accuracy == usec)
1270                 return 0;
1271
1272         s->time.accuracy = usec;
1273
1274         if (s->type == SOURCE_REALTIME)
1275                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1276         else
1277                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1278
1279         return 0;
1280 }
1281
1282 _public_ int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid) {
1283         assert_return(s, -EINVAL);
1284         assert_return(pid, -EINVAL);
1285         assert_return(s->type == SOURCE_CHILD, -EDOM);
1286         assert_return(!event_pid_changed(s->event), -ECHILD);
1287
1288         *pid = s->child.pid;
1289         return 0;
1290 }
1291
1292 _public_ int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback) {
1293         int r;
1294
1295         assert_return(s, -EINVAL);
1296         assert_return(s->type != SOURCE_QUIT, -EDOM);
1297         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1298         assert_return(!event_pid_changed(s->event), -ECHILD);
1299
1300         if (s->prepare == callback)
1301                 return 0;
1302
1303         if (callback && s->prepare) {
1304                 s->prepare = callback;
1305                 return 0;
1306         }
1307
1308         r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1309         if (r < 0)
1310                 return r;
1311
1312         s->prepare = callback;
1313
1314         if (callback) {
1315                 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1316                 if (r < 0)
1317                         return r;
1318         } else
1319                 prioq_remove(s->event->prepare, s, &s->prepare_index);
1320
1321         return 0;
1322 }
1323
1324 _public_ void* sd_event_source_get_userdata(sd_event_source *s) {
1325         assert_return(s, NULL);
1326
1327         return s->userdata;
1328 }
1329
1330 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1331         usec_t c;
1332         assert(e);
1333         assert(a <= b);
1334
1335         if (a <= 0)
1336                 return 0;
1337
1338         if (b <= a + 1)
1339                 return a;
1340
1341         /*
1342           Find a good time to wake up again between times a and b. We
1343           have two goals here:
1344
1345           a) We want to wake up as seldom as possible, hence prefer
1346              later times over earlier times.
1347
1348           b) But if we have to wake up, then let's make sure to
1349              dispatch as much as possible on the entire system.
1350
1351           We implement this by waking up everywhere at the same time
1352           within any given second if we can, synchronised via the
1353           perturbation value determined from the boot ID. If we can't,
1354           then we try to find the same spot in every a 250ms
1355           step. Otherwise, we pick the last possible time to wake up.
1356         */
1357
1358         c = (b / USEC_PER_SEC) * USEC_PER_SEC + e->perturb;
1359         if (c >= b) {
1360                 if (_unlikely_(c < USEC_PER_SEC))
1361                         return b;
1362
1363                 c -= USEC_PER_SEC;
1364         }
1365
1366         if (c >= a)
1367                 return c;
1368
1369         c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1370         if (c >= b) {
1371                 if (_unlikely_(c < USEC_PER_MSEC*250))
1372                         return b;
1373
1374                 c -= USEC_PER_MSEC*250;
1375         }
1376
1377         if (c >= a)
1378                 return c;
1379
1380         return b;
1381 }
1382
1383 static int event_arm_timer(
1384                 sd_event *e,
1385                 int timer_fd,
1386                 Prioq *earliest,
1387                 Prioq *latest,
1388                 usec_t *next) {
1389
1390         struct itimerspec its = {};
1391         sd_event_source *a, *b;
1392         usec_t t;
1393         int r;
1394
1395         assert_se(e);
1396         assert_se(next);
1397
1398         a = prioq_peek(earliest);
1399         if (!a || a->enabled == SD_EVENT_OFF) {
1400
1401                 if (*next == (usec_t) -1)
1402                         return 0;
1403
1404                 /* disarm */
1405                 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1406                 if (r < 0)
1407                         return r;
1408
1409                 *next = (usec_t) -1;
1410
1411                 return 0;
1412         }
1413
1414         b = prioq_peek(latest);
1415         assert_se(b && b->enabled != SD_EVENT_OFF);
1416
1417         t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1418         if (*next == t)
1419                 return 0;
1420
1421         assert_se(timer_fd >= 0);
1422
1423         if (t == 0) {
1424                 /* We don' want to disarm here, just mean some time looooong ago. */
1425                 its.it_value.tv_sec = 0;
1426                 its.it_value.tv_nsec = 1;
1427         } else
1428                 timespec_store(&its.it_value, t);
1429
1430         r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1431         if (r < 0)
1432                 return r;
1433
1434         *next = t;
1435         return 0;
1436 }
1437
1438 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1439         assert(e);
1440         assert(s);
1441         assert(s->type == SOURCE_IO);
1442
1443         s->io.revents = events;
1444
1445         return source_set_pending(s, true);
1446 }
1447
1448 static int flush_timer(sd_event *e, int fd, uint32_t events, usec_t *next) {
1449         uint64_t x;
1450         ssize_t ss;
1451
1452         assert(e);
1453         assert(fd >= 0);
1454         assert(next);
1455
1456         assert_return(events == EPOLLIN, -EIO);
1457
1458         ss = read(fd, &x, sizeof(x));
1459         if (ss < 0) {
1460                 if (errno == EAGAIN || errno == EINTR)
1461                         return 0;
1462
1463                 return -errno;
1464         }
1465
1466         if (ss != sizeof(x))
1467                 return -EIO;
1468
1469         *next = (usec_t) -1;
1470
1471         return 0;
1472 }
1473
1474 static int process_timer(
1475                 sd_event *e,
1476                 usec_t n,
1477                 Prioq *earliest,
1478                 Prioq *latest) {
1479
1480         sd_event_source *s;
1481         int r;
1482
1483         assert(e);
1484
1485         for (;;) {
1486                 s = prioq_peek(earliest);
1487                 if (!s ||
1488                     s->time.next > n ||
1489                     s->enabled == SD_EVENT_OFF ||
1490                     s->pending)
1491                         break;
1492
1493                 r = source_set_pending(s, true);
1494                 if (r < 0)
1495                         return r;
1496
1497                 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1498                 prioq_reshuffle(latest, s, &s->time.latest_index);
1499         }
1500
1501         return 0;
1502 }
1503
1504 static int process_child(sd_event *e) {
1505         sd_event_source *s;
1506         Iterator i;
1507         int r;
1508
1509         assert(e);
1510
1511         e->need_process_child = false;
1512
1513         /*
1514            So, this is ugly. We iteratively invoke waitid() with P_PID
1515            + WNOHANG for each PID we wait for, instead of using
1516            P_ALL. This is because we only want to get child
1517            information of very specific child processes, and not all
1518            of them. We might not have processed the SIGCHLD even of a
1519            previous invocation and we don't want to maintain a
1520            unbounded *per-child* event queue, hence we really don't
1521            want anything flushed out of the kernel's queue that we
1522            don't care about. Since this is O(n) this means that if you
1523            have a lot of processes you probably want to handle SIGCHLD
1524            yourself.
1525         */
1526
1527         HASHMAP_FOREACH(s, e->child_sources, i) {
1528                 assert(s->type == SOURCE_CHILD);
1529
1530                 if (s->pending)
1531                         continue;
1532
1533                 if (s->enabled == SD_EVENT_OFF)
1534                         continue;
1535
1536                 zero(s->child.siginfo);
1537                 r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options);
1538                 if (r < 0)
1539                         return -errno;
1540
1541                 if (s->child.siginfo.si_pid != 0) {
1542                         r = source_set_pending(s, true);
1543                         if (r < 0)
1544                                 return r;
1545                 }
1546         }
1547
1548         return 0;
1549 }
1550
1551 static int process_signal(sd_event *e, uint32_t events) {
1552         bool read_one = false;
1553         int r;
1554
1555         assert(e);
1556         assert(e->signal_sources);
1557
1558         assert_return(events == EPOLLIN, -EIO);
1559
1560         for (;;) {
1561                 struct signalfd_siginfo si;
1562                 ssize_t ss;
1563                 sd_event_source *s;
1564
1565                 ss = read(e->signal_fd, &si, sizeof(si));
1566                 if (ss < 0) {
1567                         if (errno == EAGAIN || errno == EINTR)
1568                                 return read_one;
1569
1570                         return -errno;
1571                 }
1572
1573                 if (ss != sizeof(si))
1574                         return -EIO;
1575
1576                 read_one = true;
1577
1578                 s = e->signal_sources[si.ssi_signo];
1579                 if (si.ssi_signo == SIGCHLD) {
1580                         r = process_child(e);
1581                         if (r < 0)
1582                                 return r;
1583                         if (r > 0 || !s)
1584                                 continue;
1585                 } else
1586                         if (!s)
1587                                 return -EIO;
1588
1589                 s->signal.siginfo = si;
1590                 r = source_set_pending(s, true);
1591                 if (r < 0)
1592                         return r;
1593         }
1594
1595
1596         return 0;
1597 }
1598
1599 static int source_dispatch(sd_event_source *s) {
1600         int r = 0;
1601
1602         assert(s);
1603         assert(s->pending || s->type == SOURCE_QUIT);
1604
1605         if (s->type != SOURCE_DEFER && s->type != SOURCE_QUIT) {
1606                 r = source_set_pending(s, false);
1607                 if (r < 0)
1608                         return r;
1609         }
1610
1611         if (s->enabled == SD_EVENT_ONESHOT) {
1612                 r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
1613                 if (r < 0)
1614                         return r;
1615         }
1616
1617         sd_event_source_ref(s);
1618
1619         switch (s->type) {
1620
1621         case SOURCE_IO:
1622                 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1623                 break;
1624
1625         case SOURCE_MONOTONIC:
1626                 r = s->time.callback(s, s->time.next, s->userdata);
1627                 break;
1628
1629         case SOURCE_REALTIME:
1630                 r = s->time.callback(s, s->time.next, s->userdata);
1631                 break;
1632
1633         case SOURCE_SIGNAL:
1634                 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1635                 break;
1636
1637         case SOURCE_CHILD:
1638                 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1639                 break;
1640
1641         case SOURCE_DEFER:
1642                 r = s->defer.callback(s, s->userdata);
1643                 break;
1644
1645         case SOURCE_QUIT:
1646                 r = s->quit.callback(s, s->userdata);
1647                 break;
1648         }
1649
1650         sd_event_source_unref(s);
1651
1652         return r;
1653 }
1654
1655 static int event_prepare(sd_event *e) {
1656         int r;
1657
1658         assert(e);
1659
1660         for (;;) {
1661                 sd_event_source *s;
1662
1663                 s = prioq_peek(e->prepare);
1664                 if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
1665                         break;
1666
1667                 s->prepare_iteration = e->iteration;
1668                 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1669                 if (r < 0)
1670                         return r;
1671
1672                 assert(s->prepare);
1673                 r = s->prepare(s, s->userdata);
1674                 if (r < 0)
1675                         return r;
1676
1677         }
1678
1679         return 0;
1680 }
1681
1682 static int dispatch_quit(sd_event *e) {
1683         sd_event_source *p;
1684         int r;
1685
1686         assert(e);
1687
1688         p = prioq_peek(e->quit);
1689         if (!p || p->enabled == SD_EVENT_OFF) {
1690                 e->state = SD_EVENT_FINISHED;
1691                 return 0;
1692         }
1693
1694         sd_event_ref(e);
1695         e->iteration++;
1696         e->state = SD_EVENT_QUITTING;
1697
1698         r = source_dispatch(p);
1699
1700         e->state = SD_EVENT_PASSIVE;
1701         sd_event_unref(e);
1702
1703         return r;
1704 }
1705
1706 static sd_event_source* event_next_pending(sd_event *e) {
1707         sd_event_source *p;
1708
1709         assert(e);
1710
1711         p = prioq_peek(e->pending);
1712         if (!p)
1713                 return NULL;
1714
1715         if (p->enabled == SD_EVENT_OFF)
1716                 return NULL;
1717
1718         return p;
1719 }
1720
1721 _public_ int sd_event_run(sd_event *e, uint64_t timeout) {
1722         struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1723         sd_event_source *p;
1724         int r, i, m;
1725
1726         assert_return(e, -EINVAL);
1727         assert_return(!event_pid_changed(e), -ECHILD);
1728         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1729         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1730
1731         if (e->quit_requested)
1732                 return dispatch_quit(e);
1733
1734         sd_event_ref(e);
1735         e->iteration++;
1736         e->state = SD_EVENT_RUNNING;
1737
1738         r = event_prepare(e);
1739         if (r < 0)
1740                 goto finish;
1741
1742         if (event_next_pending(e) || e->need_process_child)
1743                 timeout = 0;
1744
1745         if (timeout > 0) {
1746                 r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1747                 if (r < 0)
1748                         goto finish;
1749
1750                 r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1751                 if (r < 0)
1752                         goto finish;
1753         }
1754
1755         m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1756                        timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1757         if (m < 0) {
1758                 r = errno == EAGAIN || errno == EINTR ? 0 : -errno;
1759                 goto finish;
1760         }
1761
1762         dual_timestamp_get(&e->timestamp);
1763
1764         for (i = 0; i < m; i++) {
1765
1766                 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1767                         r = flush_timer(e, e->monotonic_fd, ev_queue[i].events, &e->monotonic_next);
1768                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1769                         r = flush_timer(e, e->realtime_fd, ev_queue[i].events, &e->realtime_next);
1770                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1771                         r = process_signal(e, ev_queue[i].events);
1772                 else
1773                         r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1774
1775                 if (r < 0)
1776                         goto finish;
1777         }
1778
1779         r = process_timer(e, e->timestamp.monotonic, e->monotonic_earliest, e->monotonic_latest);
1780         if (r < 0)
1781                 goto finish;
1782
1783         r = process_timer(e, e->timestamp.realtime, e->realtime_earliest, e->realtime_latest);
1784         if (r < 0)
1785                 goto finish;
1786
1787         if (e->need_process_child) {
1788                 r = process_child(e);
1789                 if (r < 0)
1790                         goto finish;
1791         }
1792
1793         p = event_next_pending(e);
1794         if (!p) {
1795                 r = 0;
1796                 goto finish;
1797         }
1798
1799         r = source_dispatch(p);
1800
1801 finish:
1802         e->state = SD_EVENT_PASSIVE;
1803         sd_event_unref(e);
1804
1805         return r;
1806 }
1807
1808 _public_ int sd_event_loop(sd_event *e) {
1809         int r;
1810
1811         assert_return(e, -EINVAL);
1812         assert_return(!event_pid_changed(e), -ECHILD);
1813         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1814
1815         sd_event_ref(e);
1816
1817         while (e->state != SD_EVENT_FINISHED) {
1818                 r = sd_event_run(e, (uint64_t) -1);
1819                 if (r < 0)
1820                         goto finish;
1821         }
1822
1823         r = 0;
1824
1825 finish:
1826         sd_event_unref(e);
1827         return r;
1828 }
1829
1830 _public_ int sd_event_get_state(sd_event *e) {
1831         assert_return(e, -EINVAL);
1832         assert_return(!event_pid_changed(e), -ECHILD);
1833
1834         return e->state;
1835 }
1836
1837 _public_ int sd_event_get_quit(sd_event *e) {
1838         assert_return(e, -EINVAL);
1839         assert_return(!event_pid_changed(e), -ECHILD);
1840
1841         return e->quit_requested;
1842 }
1843
1844 _public_ int sd_event_request_quit(sd_event *e) {
1845         assert_return(e, -EINVAL);
1846         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1847         assert_return(!event_pid_changed(e), -ECHILD);
1848
1849         e->quit_requested = true;
1850         return 0;
1851 }
1852
1853 _public_ int sd_event_get_now_realtime(sd_event *e, uint64_t *usec) {
1854         assert_return(e, -EINVAL);
1855         assert_return(usec, -EINVAL);
1856         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1857         assert_return(!event_pid_changed(e), -ECHILD);
1858
1859         *usec = e->timestamp.realtime;
1860         return 0;
1861 }
1862
1863 _public_ int sd_event_get_now_monotonic(sd_event *e, uint64_t *usec) {
1864         assert_return(e, -EINVAL);
1865         assert_return(usec, -EINVAL);
1866         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1867         assert_return(!event_pid_changed(e), -ECHILD);
1868
1869         *usec = e->timestamp.monotonic;
1870         return 0;
1871 }