chiark / gitweb /
bus: fix error-path in bus_map_all_properties()
[elogind.git] / src / libsystemd-bus / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25
26 #include "sd-id128.h"
27 #include "macro.h"
28 #include "prioq.h"
29 #include "hashmap.h"
30 #include "util.h"
31 #include "time-util.h"
32 #include "missing.h"
33
34 #include "sd-event.h"
35
36 #define EPOLL_QUEUE_MAX 64
37 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
38
39 typedef enum EventSourceType {
40         SOURCE_IO,
41         SOURCE_MONOTONIC,
42         SOURCE_REALTIME,
43         SOURCE_SIGNAL,
44         SOURCE_CHILD,
45         SOURCE_DEFER,
46         SOURCE_QUIT
47 } EventSourceType;
48
49 struct sd_event_source {
50         unsigned n_ref;
51
52         sd_event *event;
53         void *userdata;
54         sd_prepare_handler_t prepare;
55
56         EventSourceType type:4;
57         int enabled:3;
58         bool pending:1;
59
60         int priority;
61         unsigned pending_index;
62         unsigned prepare_index;
63         unsigned pending_iteration;
64         unsigned prepare_iteration;
65
66         union {
67                 struct {
68                         sd_io_handler_t callback;
69                         int fd;
70                         uint32_t events;
71                         uint32_t revents;
72                         bool registered:1;
73                 } io;
74                 struct {
75                         sd_time_handler_t callback;
76                         usec_t next, accuracy;
77                         unsigned earliest_index;
78                         unsigned latest_index;
79                 } time;
80                 struct {
81                         sd_signal_handler_t callback;
82                         struct signalfd_siginfo siginfo;
83                         int sig;
84                 } signal;
85                 struct {
86                         sd_child_handler_t callback;
87                         siginfo_t siginfo;
88                         pid_t pid;
89                         int options;
90                 } child;
91                 struct {
92                         sd_defer_handler_t callback;
93                 } defer;
94                 struct {
95                         sd_quit_handler_t callback;
96                         unsigned prioq_index;
97                 } quit;
98         };
99 };
100
101 struct sd_event {
102         unsigned n_ref;
103
104         int epoll_fd;
105         int signal_fd;
106         int realtime_fd;
107         int monotonic_fd;
108
109         Prioq *pending;
110         Prioq *prepare;
111
112         /* For both clocks we maintain two priority queues each, one
113          * ordered for the earliest times the events may be
114          * dispatched, and one ordered by the latest times they must
115          * have been dispatched. The range between the top entries in
116          * the two prioqs is the time window we can freely schedule
117          * wakeups in */
118         Prioq *monotonic_earliest;
119         Prioq *monotonic_latest;
120         Prioq *realtime_earliest;
121         Prioq *realtime_latest;
122
123         usec_t realtime_next, monotonic_next;
124         usec_t perturb;
125
126         sigset_t sigset;
127         sd_event_source **signal_sources;
128
129         Hashmap *child_sources;
130         unsigned n_enabled_child_sources;
131
132         Prioq *quit;
133
134         pid_t original_pid;
135
136         unsigned iteration;
137         dual_timestamp timestamp;
138         int state;
139
140         bool quit_requested:1;
141         bool need_process_child:1;
142
143         pid_t tid;
144         sd_event **default_event_ptr;
145 };
146
147 static int pending_prioq_compare(const void *a, const void *b) {
148         const sd_event_source *x = a, *y = b;
149
150         assert(x->pending);
151         assert(y->pending);
152
153         /* Enabled ones first */
154         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
155                 return -1;
156         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
157                 return 1;
158
159         /* Lower priority values first */
160         if (x->priority < y->priority)
161                 return -1;
162         if (x->priority > y->priority)
163                 return 1;
164
165         /* Older entries first */
166         if (x->pending_iteration < y->pending_iteration)
167                 return -1;
168         if (x->pending_iteration > y->pending_iteration)
169                 return 1;
170
171         /* Stability for the rest */
172         if (x < y)
173                 return -1;
174         if (x > y)
175                 return 1;
176
177         return 0;
178 }
179
180 static int prepare_prioq_compare(const void *a, const void *b) {
181         const sd_event_source *x = a, *y = b;
182
183         assert(x->prepare);
184         assert(y->prepare);
185
186         /* Move most recently prepared ones last, so that we can stop
187          * preparing as soon as we hit one that has already been
188          * prepared in the current iteration */
189         if (x->prepare_iteration < y->prepare_iteration)
190                 return -1;
191         if (x->prepare_iteration > y->prepare_iteration)
192                 return 1;
193
194         /* Enabled ones first */
195         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
196                 return -1;
197         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
198                 return 1;
199
200         /* Lower priority values first */
201         if (x->priority < y->priority)
202                 return -1;
203         if (x->priority > y->priority)
204                 return 1;
205
206         /* Stability for the rest */
207         if (x < y)
208                 return -1;
209         if (x > y)
210                 return 1;
211
212         return 0;
213 }
214
215 static int earliest_time_prioq_compare(const void *a, const void *b) {
216         const sd_event_source *x = a, *y = b;
217
218         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
219         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
220
221         /* Enabled ones first */
222         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
223                 return -1;
224         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
225                 return 1;
226
227         /* Move the pending ones to the end */
228         if (!x->pending && y->pending)
229                 return -1;
230         if (x->pending && !y->pending)
231                 return 1;
232
233         /* Order by time */
234         if (x->time.next < y->time.next)
235                 return -1;
236         if (x->time.next > y->time.next)
237                 return -1;
238
239         /* Stability for the rest */
240         if (x < y)
241                 return -1;
242         if (x > y)
243                 return 1;
244
245         return 0;
246 }
247
248 static int latest_time_prioq_compare(const void *a, const void *b) {
249         const sd_event_source *x = a, *y = b;
250
251         assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
252                (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
253
254         /* Enabled ones first */
255         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
256                 return -1;
257         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
258                 return 1;
259
260         /* Move the pending ones to the end */
261         if (!x->pending && y->pending)
262                 return -1;
263         if (x->pending && !y->pending)
264                 return 1;
265
266         /* Order by time */
267         if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
268                 return -1;
269         if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
270                 return -1;
271
272         /* Stability for the rest */
273         if (x < y)
274                 return -1;
275         if (x > y)
276                 return 1;
277
278         return 0;
279 }
280
281 static int quit_prioq_compare(const void *a, const void *b) {
282         const sd_event_source *x = a, *y = b;
283
284         assert(x->type == SOURCE_QUIT);
285         assert(y->type == SOURCE_QUIT);
286
287         /* Enabled ones first */
288         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
289                 return -1;
290         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
291                 return 1;
292
293         /* Lower priority values first */
294         if (x->priority < y->priority)
295                 return -1;
296         if (x->priority > y->priority)
297                 return 1;
298
299         /* Stability for the rest */
300         if (x < y)
301                 return -1;
302         if (x > y)
303                 return 1;
304
305         return 0;
306 }
307
308 static void event_free(sd_event *e) {
309         assert(e);
310
311         if (e->default_event_ptr)
312                 *(e->default_event_ptr) = NULL;
313
314         if (e->epoll_fd >= 0)
315                 close_nointr_nofail(e->epoll_fd);
316
317         if (e->signal_fd >= 0)
318                 close_nointr_nofail(e->signal_fd);
319
320         if (e->realtime_fd >= 0)
321                 close_nointr_nofail(e->realtime_fd);
322
323         if (e->monotonic_fd >= 0)
324                 close_nointr_nofail(e->monotonic_fd);
325
326         prioq_free(e->pending);
327         prioq_free(e->prepare);
328         prioq_free(e->monotonic_earliest);
329         prioq_free(e->monotonic_latest);
330         prioq_free(e->realtime_earliest);
331         prioq_free(e->realtime_latest);
332         prioq_free(e->quit);
333
334         free(e->signal_sources);
335
336         hashmap_free(e->child_sources);
337         free(e);
338 }
339
340 _public_ int sd_event_new(sd_event** ret) {
341         sd_event *e;
342         int r;
343
344         assert_return(ret, -EINVAL);
345
346         e = new0(sd_event, 1);
347         if (!e)
348                 return -ENOMEM;
349
350         e->n_ref = 1;
351         e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
352         e->realtime_next = e->monotonic_next = (usec_t) -1;
353         e->original_pid = getpid();
354
355         assert_se(sigemptyset(&e->sigset) == 0);
356
357         e->pending = prioq_new(pending_prioq_compare);
358         if (!e->pending) {
359                 r = -ENOMEM;
360                 goto fail;
361         }
362
363         e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
364         if (e->epoll_fd < 0) {
365                 r = -errno;
366                 goto fail;
367         }
368
369         *ret = e;
370         return 0;
371
372 fail:
373         event_free(e);
374         return r;
375 }
376
377 _public_ sd_event* sd_event_ref(sd_event *e) {
378         assert_return(e, NULL);
379
380         assert(e->n_ref >= 1);
381         e->n_ref++;
382
383         return e;
384 }
385
386 _public_ sd_event* sd_event_unref(sd_event *e) {
387         assert_return(e, NULL);
388
389         assert(e->n_ref >= 1);
390         e->n_ref--;
391
392         if (e->n_ref <= 0)
393                 event_free(e);
394
395         return NULL;
396 }
397
398 static bool event_pid_changed(sd_event *e) {
399         assert(e);
400
401         /* We don't support people creating am event loop and keeping
402          * it around over a fork(). Let's complain. */
403
404         return e->original_pid != getpid();
405 }
406
407 static int source_io_unregister(sd_event_source *s) {
408         int r;
409
410         assert(s);
411         assert(s->type == SOURCE_IO);
412
413         if (!s->io.registered)
414                 return 0;
415
416         r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
417         if (r < 0)
418                 return -errno;
419
420         s->io.registered = false;
421         return 0;
422 }
423
424 static int source_io_register(
425                 sd_event_source *s,
426                 int enabled,
427                 uint32_t events) {
428
429         struct epoll_event ev = {};
430         int r;
431
432         assert(s);
433         assert(s->type == SOURCE_IO);
434         assert(enabled != SD_EVENT_OFF);
435
436         ev.events = events;
437         ev.data.ptr = s;
438
439         if (enabled == SD_EVENT_ONESHOT)
440                 ev.events |= EPOLLONESHOT;
441
442         if (s->io.registered)
443                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
444         else
445                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
446
447         if (r < 0)
448                 return -errno;
449
450         s->io.registered = true;
451
452         return 0;
453 }
454
455 static void source_free(sd_event_source *s) {
456         assert(s);
457
458         if (s->event) {
459                 switch (s->type) {
460
461                 case SOURCE_IO:
462                         if (s->io.fd >= 0)
463                                 source_io_unregister(s);
464
465                         break;
466
467                 case SOURCE_MONOTONIC:
468                         prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
469                         prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
470                         break;
471
472                 case SOURCE_REALTIME:
473                         prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
474                         prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
475                         break;
476
477                 case SOURCE_SIGNAL:
478                         if (s->signal.sig > 0) {
479                                 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)
480                                         assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
481
482                                 if (s->event->signal_sources)
483                                         s->event->signal_sources[s->signal.sig] = NULL;
484                         }
485
486                         break;
487
488                 case SOURCE_CHILD:
489                         if (s->child.pid > 0) {
490                                 if (s->enabled != SD_EVENT_OFF) {
491                                         assert(s->event->n_enabled_child_sources > 0);
492                                         s->event->n_enabled_child_sources--;
493                                 }
494
495                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
496                                         assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
497
498                                 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
499                         }
500
501                         break;
502
503                 case SOURCE_DEFER:
504                         /* nothing */
505                         break;
506
507                 case SOURCE_QUIT:
508                         prioq_remove(s->event->quit, s, &s->quit.prioq_index);
509                         break;
510                 }
511
512                 if (s->pending)
513                         prioq_remove(s->event->pending, s, &s->pending_index);
514
515                 if (s->prepare)
516                         prioq_remove(s->event->prepare, s, &s->prepare_index);
517
518                 sd_event_unref(s->event);
519         }
520
521         free(s);
522 }
523
524 static int source_set_pending(sd_event_source *s, bool b) {
525         int r;
526
527         assert(s);
528         assert(s->type != SOURCE_QUIT);
529
530         if (s->pending == b)
531                 return 0;
532
533         s->pending = b;
534
535         if (b) {
536                 s->pending_iteration = s->event->iteration;
537
538                 r = prioq_put(s->event->pending, s, &s->pending_index);
539                 if (r < 0) {
540                         s->pending = false;
541                         return r;
542                 }
543         } else
544                 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
545
546         return 0;
547 }
548
549 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
550         sd_event_source *s;
551
552         assert(e);
553
554         s = new0(sd_event_source, 1);
555         if (!s)
556                 return NULL;
557
558         s->n_ref = 1;
559         s->event = sd_event_ref(e);
560         s->type = type;
561         s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
562
563         return s;
564 }
565
566 _public_ int sd_event_add_io(
567                 sd_event *e,
568                 int fd,
569                 uint32_t events,
570                 sd_io_handler_t callback,
571                 void *userdata,
572                 sd_event_source **ret) {
573
574         sd_event_source *s;
575         int r;
576
577         assert_return(e, -EINVAL);
578         assert_return(fd >= 0, -EINVAL);
579         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
580         assert_return(callback, -EINVAL);
581         assert_return(ret, -EINVAL);
582         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
583         assert_return(!event_pid_changed(e), -ECHILD);
584
585         s = source_new(e, SOURCE_IO);
586         if (!s)
587                 return -ENOMEM;
588
589         s->io.fd = fd;
590         s->io.events = events;
591         s->io.callback = callback;
592         s->userdata = userdata;
593         s->enabled = SD_EVENT_ON;
594
595         r = source_io_register(s, s->enabled, events);
596         if (r < 0) {
597                 source_free(s);
598                 return -errno;
599         }
600
601         *ret = s;
602         return 0;
603 }
604
605 static int event_setup_timer_fd(
606                 sd_event *e,
607                 EventSourceType type,
608                 int *timer_fd,
609                 clockid_t id) {
610
611         struct epoll_event ev = {};
612         int r, fd;
613         sd_id128_t bootid;
614
615         assert(e);
616         assert(timer_fd);
617
618         if (_likely_(*timer_fd >= 0))
619                 return 0;
620
621         fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
622         if (fd < 0)
623                 return -errno;
624
625         ev.events = EPOLLIN;
626         ev.data.ptr = INT_TO_PTR(type);
627
628         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
629         if (r < 0) {
630                 close_nointr_nofail(fd);
631                 return -errno;
632         }
633
634         /* When we sleep for longer, we try to realign the wakeup to
635            the same time wihtin each second, so that events all across
636            the system can be coalesced into a single CPU
637            wakeup. However, let's take some system-specific randomness
638            for this value, so that in a network of systems with synced
639            clocks timer events are distributed a bit. Here, we
640            calculate a perturbation usec offset from the boot ID. */
641
642         if (sd_id128_get_boot(&bootid) >= 0)
643                 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_SEC;
644
645         *timer_fd = fd;
646         return 0;
647 }
648
649 static int event_add_time_internal(
650                 sd_event *e,
651                 EventSourceType type,
652                 int *timer_fd,
653                 clockid_t id,
654                 Prioq **earliest,
655                 Prioq **latest,
656                 uint64_t usec,
657                 uint64_t accuracy,
658                 sd_time_handler_t callback,
659                 void *userdata,
660                 sd_event_source **ret) {
661
662         sd_event_source *s;
663         int r;
664
665         assert_return(e, -EINVAL);
666         assert_return(callback, -EINVAL);
667         assert_return(ret, -EINVAL);
668         assert_return(usec != (uint64_t) -1, -EINVAL);
669         assert_return(accuracy != (uint64_t) -1, -EINVAL);
670         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
671         assert_return(!event_pid_changed(e), -ECHILD);
672
673         assert(timer_fd);
674         assert(earliest);
675         assert(latest);
676
677         if (!*earliest) {
678                 *earliest = prioq_new(earliest_time_prioq_compare);
679                 if (!*earliest)
680                         return -ENOMEM;
681         }
682
683         if (!*latest) {
684                 *latest = prioq_new(latest_time_prioq_compare);
685                 if (!*latest)
686                         return -ENOMEM;
687         }
688
689         if (*timer_fd < 0) {
690                 r = event_setup_timer_fd(e, type, timer_fd, id);
691                 if (r < 0)
692                         return r;
693         }
694
695         s = source_new(e, type);
696         if (!s)
697                 return -ENOMEM;
698
699         s->time.next = usec;
700         s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
701         s->time.callback = callback;
702         s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
703         s->userdata = userdata;
704         s->enabled = SD_EVENT_ONESHOT;
705
706         r = prioq_put(*earliest, s, &s->time.earliest_index);
707         if (r < 0)
708                 goto fail;
709
710         r = prioq_put(*latest, s, &s->time.latest_index);
711         if (r < 0)
712                 goto fail;
713
714         *ret = s;
715         return 0;
716
717 fail:
718         source_free(s);
719         return r;
720 }
721
722 _public_ int sd_event_add_monotonic(sd_event *e,
723                                     uint64_t usec,
724                                     uint64_t accuracy,
725                                     sd_time_handler_t callback,
726                                     void *userdata,
727                                     sd_event_source **ret) {
728
729         return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
730 }
731
732 _public_ int sd_event_add_realtime(sd_event *e,
733                                    uint64_t usec,
734                                    uint64_t accuracy,
735                                    sd_time_handler_t callback,
736                                    void *userdata,
737                                    sd_event_source **ret) {
738
739         return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
740 }
741
742 static int event_update_signal_fd(sd_event *e) {
743         struct epoll_event ev = {};
744         bool add_to_epoll;
745         int r;
746
747         assert(e);
748
749         add_to_epoll = e->signal_fd < 0;
750
751         r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
752         if (r < 0)
753                 return -errno;
754
755         e->signal_fd = r;
756
757         if (!add_to_epoll)
758                 return 0;
759
760         ev.events = EPOLLIN;
761         ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
762
763         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
764         if (r < 0) {
765                 close_nointr_nofail(e->signal_fd);
766                 e->signal_fd = -1;
767
768                 return -errno;
769         }
770
771         return 0;
772 }
773
774 _public_ int sd_event_add_signal(
775                 sd_event *e,
776                 int sig,
777                 sd_signal_handler_t callback,
778                 void *userdata,
779                 sd_event_source **ret) {
780
781         sd_event_source *s;
782         int r;
783
784         assert_return(e, -EINVAL);
785         assert_return(sig > 0, -EINVAL);
786         assert_return(sig < _NSIG, -EINVAL);
787         assert_return(callback, -EINVAL);
788         assert_return(ret, -EINVAL);
789         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
790         assert_return(!event_pid_changed(e), -ECHILD);
791
792         if (!e->signal_sources) {
793                 e->signal_sources = new0(sd_event_source*, _NSIG);
794                 if (!e->signal_sources)
795                         return -ENOMEM;
796         } else if (e->signal_sources[sig])
797                 return -EBUSY;
798
799         s = source_new(e, SOURCE_SIGNAL);
800         if (!s)
801                 return -ENOMEM;
802
803         s->signal.sig = sig;
804         s->signal.callback = callback;
805         s->userdata = userdata;
806         s->enabled = SD_EVENT_ON;
807
808         e->signal_sources[sig] = s;
809         assert_se(sigaddset(&e->sigset, sig) == 0);
810
811         if (sig != SIGCHLD || e->n_enabled_child_sources == 0) {
812                 r = event_update_signal_fd(e);
813                 if (r < 0) {
814                         source_free(s);
815                         return r;
816                 }
817         }
818
819         *ret = s;
820         return 0;
821 }
822
823 _public_ int sd_event_add_child(
824                 sd_event *e,
825                 pid_t pid,
826                 int options,
827                 sd_child_handler_t callback,
828                 void *userdata,
829                 sd_event_source **ret) {
830
831         sd_event_source *s;
832         int r;
833
834         assert_return(e, -EINVAL);
835         assert_return(pid > 1, -EINVAL);
836         assert_return(!(options & ~(WEXITED|WSTOPPED|WCONTINUED)), -EINVAL);
837         assert_return(options != 0, -EINVAL);
838         assert_return(callback, -EINVAL);
839         assert_return(ret, -EINVAL);
840         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
841         assert_return(!event_pid_changed(e), -ECHILD);
842
843         r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
844         if (r < 0)
845                 return r;
846
847         if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
848                 return -EBUSY;
849
850         s = source_new(e, SOURCE_CHILD);
851         if (!s)
852                 return -ENOMEM;
853
854         s->child.pid = pid;
855         s->child.options = options;
856         s->child.callback = callback;
857         s->userdata = userdata;
858         s->enabled = SD_EVENT_ONESHOT;
859
860         r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
861         if (r < 0) {
862                 source_free(s);
863                 return r;
864         }
865
866         e->n_enabled_child_sources ++;
867
868         assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
869
870         if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
871                 r = event_update_signal_fd(e);
872                 if (r < 0) {
873                         source_free(s);
874                         return -errno;
875                 }
876         }
877
878         e->need_process_child = true;
879
880         *ret = s;
881         return 0;
882 }
883
884 _public_ int sd_event_add_defer(
885                 sd_event *e,
886                 sd_defer_handler_t callback,
887                 void *userdata,
888                 sd_event_source **ret) {
889
890         sd_event_source *s;
891         int r;
892
893         assert_return(e, -EINVAL);
894         assert_return(callback, -EINVAL);
895         assert_return(ret, -EINVAL);
896         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
897         assert_return(!event_pid_changed(e), -ECHILD);
898
899         s = source_new(e, SOURCE_DEFER);
900         if (!s)
901                 return -ENOMEM;
902
903         s->defer.callback = callback;
904         s->userdata = userdata;
905         s->enabled = SD_EVENT_ONESHOT;
906
907         r = source_set_pending(s, true);
908         if (r < 0) {
909                 source_free(s);
910                 return r;
911         }
912
913         *ret = s;
914         return 0;
915 }
916
917 _public_ int sd_event_add_quit(
918                 sd_event *e,
919                 sd_quit_handler_t callback,
920                 void *userdata,
921                 sd_event_source **ret) {
922
923         sd_event_source *s;
924         int r;
925
926         assert_return(e, -EINVAL);
927         assert_return(callback, -EINVAL);
928         assert_return(ret, -EINVAL);
929         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
930         assert_return(!event_pid_changed(e), -ECHILD);
931
932         if (!e->quit) {
933                 e->quit = prioq_new(quit_prioq_compare);
934                 if (!e->quit)
935                         return -ENOMEM;
936         }
937
938         s = source_new(e, SOURCE_QUIT);
939         if (!s)
940                 return -ENOMEM;
941
942         s->quit.callback = callback;
943         s->userdata = userdata;
944         s->quit.prioq_index = PRIOQ_IDX_NULL;
945         s->enabled = SD_EVENT_ONESHOT;
946
947         r = prioq_put(s->event->quit, s, &s->quit.prioq_index);
948         if (r < 0) {
949                 source_free(s);
950                 return r;
951         }
952
953         *ret = s;
954         return 0;
955 }
956
957 _public_ sd_event_source* sd_event_source_ref(sd_event_source *s) {
958         assert_return(s, NULL);
959
960         assert(s->n_ref >= 1);
961         s->n_ref++;
962
963         return s;
964 }
965
966 _public_ sd_event_source* sd_event_source_unref(sd_event_source *s) {
967         assert_return(s, NULL);
968
969         assert(s->n_ref >= 1);
970         s->n_ref--;
971
972         if (s->n_ref <= 0)
973                 source_free(s);
974
975         return NULL;
976 }
977
978 _public_ sd_event *sd_event_get(sd_event_source *s) {
979         assert_return(s, NULL);
980
981         return s->event;
982 }
983
984 _public_ int sd_event_source_get_pending(sd_event_source *s) {
985         assert_return(s, -EINVAL);
986         assert_return(s->type != SOURCE_QUIT, -EDOM);
987         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
988         assert_return(!event_pid_changed(s->event), -ECHILD);
989
990         return s->pending;
991 }
992
993 _public_ int sd_event_source_get_io_fd(sd_event_source *s) {
994         assert_return(s, -EINVAL);
995         assert_return(s->type == SOURCE_IO, -EDOM);
996         assert_return(!event_pid_changed(s->event), -ECHILD);
997
998         return s->io.fd;
999 }
1000
1001 _public_ int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
1002         assert_return(s, -EINVAL);
1003         assert_return(events, -EINVAL);
1004         assert_return(s->type == SOURCE_IO, -EDOM);
1005         assert_return(!event_pid_changed(s->event), -ECHILD);
1006
1007         *events = s->io.events;
1008         return 0;
1009 }
1010
1011 _public_ int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
1012         int r;
1013
1014         assert_return(s, -EINVAL);
1015         assert_return(s->type == SOURCE_IO, -EDOM);
1016         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
1017         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1018         assert_return(!event_pid_changed(s->event), -ECHILD);
1019
1020         if (s->io.events == events)
1021                 return 0;
1022
1023         if (s->enabled != SD_EVENT_OFF) {
1024                 r = source_io_register(s, s->enabled, events);
1025                 if (r < 0)
1026                         return r;
1027         }
1028
1029         s->io.events = events;
1030
1031         return 0;
1032 }
1033
1034 _public_ int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1035         assert_return(s, -EINVAL);
1036         assert_return(revents, -EINVAL);
1037         assert_return(s->type == SOURCE_IO, -EDOM);
1038         assert_return(s->pending, -ENODATA);
1039         assert_return(!event_pid_changed(s->event), -ECHILD);
1040
1041         *revents = s->io.revents;
1042         return 0;
1043 }
1044
1045 _public_ int sd_event_source_get_signal(sd_event_source *s) {
1046         assert_return(s, -EINVAL);
1047         assert_return(s->type == SOURCE_SIGNAL, -EDOM);
1048         assert_return(!event_pid_changed(s->event), -ECHILD);
1049
1050         return s->signal.sig;
1051 }
1052
1053 _public_ int sd_event_source_get_priority(sd_event_source *s, int *priority) {
1054         assert_return(s, -EINVAL);
1055         assert_return(!event_pid_changed(s->event), -ECHILD);
1056
1057         return s->priority;
1058 }
1059
1060 _public_ int sd_event_source_set_priority(sd_event_source *s, int priority) {
1061         assert_return(s, -EINVAL);
1062         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1063         assert_return(!event_pid_changed(s->event), -ECHILD);
1064
1065         if (s->priority == priority)
1066                 return 0;
1067
1068         s->priority = priority;
1069
1070         if (s->pending)
1071                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1072
1073         if (s->prepare)
1074                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1075
1076         if (s->type == SOURCE_QUIT)
1077                 prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1078
1079         return 0;
1080 }
1081
1082 _public_ int sd_event_source_get_enabled(sd_event_source *s, int *m) {
1083         assert_return(s, -EINVAL);
1084         assert_return(m, -EINVAL);
1085         assert_return(!event_pid_changed(s->event), -ECHILD);
1086
1087         *m = s->enabled;
1088         return 0;
1089 }
1090
1091 _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
1092         int r;
1093
1094         assert_return(s, -EINVAL);
1095         assert_return(m == SD_EVENT_OFF || m == SD_EVENT_ON || m == SD_EVENT_ONESHOT, -EINVAL);
1096         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1097         assert_return(!event_pid_changed(s->event), -ECHILD);
1098
1099         if (s->enabled == m)
1100                 return 0;
1101
1102         if (m == SD_EVENT_OFF) {
1103
1104                 switch (s->type) {
1105
1106                 case SOURCE_IO:
1107                         r = source_io_unregister(s);
1108                         if (r < 0)
1109                                 return r;
1110
1111                         s->enabled = m;
1112                         break;
1113
1114                 case SOURCE_MONOTONIC:
1115                         s->enabled = m;
1116                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1117                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1118                         break;
1119
1120                 case SOURCE_REALTIME:
1121                         s->enabled = m;
1122                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1123                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1124                         break;
1125
1126                 case SOURCE_SIGNAL:
1127                         s->enabled = m;
1128                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1129                                 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1130                                 event_update_signal_fd(s->event);
1131                         }
1132
1133                         break;
1134
1135                 case SOURCE_CHILD:
1136                         s->enabled = m;
1137
1138                         assert(s->event->n_enabled_child_sources > 0);
1139                         s->event->n_enabled_child_sources--;
1140
1141                         if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1142                                 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1143                                 event_update_signal_fd(s->event);
1144                         }
1145
1146                         break;
1147
1148                 case SOURCE_QUIT:
1149                         s->enabled = m;
1150                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1151                         break;
1152
1153                 case SOURCE_DEFER:
1154                         s->enabled = m;
1155                         break;
1156                 }
1157
1158         } else {
1159                 switch (s->type) {
1160
1161                 case SOURCE_IO:
1162                         r = source_io_register(s, m, s->io.events);
1163                         if (r < 0)
1164                                 return r;
1165
1166                         s->enabled = m;
1167                         break;
1168
1169                 case SOURCE_MONOTONIC:
1170                         s->enabled = m;
1171                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1172                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1173                         break;
1174
1175                 case SOURCE_REALTIME:
1176                         s->enabled = m;
1177                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1178                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1179                         break;
1180
1181                 case SOURCE_SIGNAL:
1182                         s->enabled = m;
1183
1184                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)  {
1185                                 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1186                                 event_update_signal_fd(s->event);
1187                         }
1188                         break;
1189
1190                 case SOURCE_CHILD:
1191                         s->enabled = m;
1192
1193                         if (s->enabled == SD_EVENT_OFF) {
1194                                 s->event->n_enabled_child_sources++;
1195
1196                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1197                                         assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1198                                         event_update_signal_fd(s->event);
1199                                 }
1200                         }
1201                         break;
1202
1203                 case SOURCE_QUIT:
1204                         s->enabled = m;
1205                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1206                         break;
1207
1208                 case SOURCE_DEFER:
1209                         s->enabled = m;
1210                         break;
1211                 }
1212         }
1213
1214         if (s->pending)
1215                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1216
1217         if (s->prepare)
1218                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1219
1220         return 0;
1221 }
1222
1223 _public_ int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1224         assert_return(s, -EINVAL);
1225         assert_return(usec, -EINVAL);
1226         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1227         assert_return(!event_pid_changed(s->event), -ECHILD);
1228
1229         *usec = s->time.next;
1230         return 0;
1231 }
1232
1233 _public_ int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1234         assert_return(s, -EINVAL);
1235         assert_return(usec != (uint64_t) -1, -EINVAL);
1236         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1237         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1238         assert_return(!event_pid_changed(s->event), -ECHILD);
1239
1240         if (s->time.next == usec)
1241                 return 0;
1242
1243         s->time.next = usec;
1244
1245         if (s->type == SOURCE_REALTIME) {
1246                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1247                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1248         } else {
1249                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1250                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1251         }
1252
1253         return 0;
1254 }
1255
1256 _public_ int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1257         assert_return(s, -EINVAL);
1258         assert_return(usec, -EINVAL);
1259         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1260         assert_return(!event_pid_changed(s->event), -ECHILD);
1261
1262         *usec = s->time.accuracy;
1263         return 0;
1264 }
1265
1266 _public_ int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1267         assert_return(s, -EINVAL);
1268         assert_return(usec != (uint64_t) -1, -EINVAL);
1269         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1270         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1271         assert_return(!event_pid_changed(s->event), -ECHILD);
1272
1273         if (usec == 0)
1274                 usec = DEFAULT_ACCURACY_USEC;
1275
1276         if (s->time.accuracy == usec)
1277                 return 0;
1278
1279         s->time.accuracy = usec;
1280
1281         if (s->type == SOURCE_REALTIME)
1282                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1283         else
1284                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1285
1286         return 0;
1287 }
1288
1289 _public_ int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid) {
1290         assert_return(s, -EINVAL);
1291         assert_return(pid, -EINVAL);
1292         assert_return(s->type == SOURCE_CHILD, -EDOM);
1293         assert_return(!event_pid_changed(s->event), -ECHILD);
1294
1295         *pid = s->child.pid;
1296         return 0;
1297 }
1298
1299 _public_ int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback) {
1300         int r;
1301
1302         assert_return(s, -EINVAL);
1303         assert_return(s->type != SOURCE_QUIT, -EDOM);
1304         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1305         assert_return(!event_pid_changed(s->event), -ECHILD);
1306
1307         if (s->prepare == callback)
1308                 return 0;
1309
1310         if (callback && s->prepare) {
1311                 s->prepare = callback;
1312                 return 0;
1313         }
1314
1315         r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1316         if (r < 0)
1317                 return r;
1318
1319         s->prepare = callback;
1320
1321         if (callback) {
1322                 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1323                 if (r < 0)
1324                         return r;
1325         } else
1326                 prioq_remove(s->event->prepare, s, &s->prepare_index);
1327
1328         return 0;
1329 }
1330
1331 _public_ void* sd_event_source_get_userdata(sd_event_source *s) {
1332         assert_return(s, NULL);
1333
1334         return s->userdata;
1335 }
1336
1337 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1338         usec_t c;
1339         assert(e);
1340         assert(a <= b);
1341
1342         if (a <= 0)
1343                 return 0;
1344
1345         if (b <= a + 1)
1346                 return a;
1347
1348         /*
1349           Find a good time to wake up again between times a and b. We
1350           have two goals here:
1351
1352           a) We want to wake up as seldom as possible, hence prefer
1353              later times over earlier times.
1354
1355           b) But if we have to wake up, then let's make sure to
1356              dispatch as much as possible on the entire system.
1357
1358           We implement this by waking up everywhere at the same time
1359           within any given second if we can, synchronised via the
1360           perturbation value determined from the boot ID. If we can't,
1361           then we try to find the same spot in every a 250ms
1362           step. Otherwise, we pick the last possible time to wake up.
1363         */
1364
1365         c = (b / USEC_PER_SEC) * USEC_PER_SEC + e->perturb;
1366         if (c >= b) {
1367                 if (_unlikely_(c < USEC_PER_SEC))
1368                         return b;
1369
1370                 c -= USEC_PER_SEC;
1371         }
1372
1373         if (c >= a)
1374                 return c;
1375
1376         c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1377         if (c >= b) {
1378                 if (_unlikely_(c < USEC_PER_MSEC*250))
1379                         return b;
1380
1381                 c -= USEC_PER_MSEC*250;
1382         }
1383
1384         if (c >= a)
1385                 return c;
1386
1387         return b;
1388 }
1389
1390 static int event_arm_timer(
1391                 sd_event *e,
1392                 int timer_fd,
1393                 Prioq *earliest,
1394                 Prioq *latest,
1395                 usec_t *next) {
1396
1397         struct itimerspec its = {};
1398         sd_event_source *a, *b;
1399         usec_t t;
1400         int r;
1401
1402         assert_se(e);
1403         assert_se(next);
1404
1405         a = prioq_peek(earliest);
1406         if (!a || a->enabled == SD_EVENT_OFF) {
1407
1408                 if (*next == (usec_t) -1)
1409                         return 0;
1410
1411                 /* disarm */
1412                 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1413                 if (r < 0)
1414                         return r;
1415
1416                 *next = (usec_t) -1;
1417
1418                 return 0;
1419         }
1420
1421         b = prioq_peek(latest);
1422         assert_se(b && b->enabled != SD_EVENT_OFF);
1423
1424         t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1425         if (*next == t)
1426                 return 0;
1427
1428         assert_se(timer_fd >= 0);
1429
1430         if (t == 0) {
1431                 /* We don' want to disarm here, just mean some time looooong ago. */
1432                 its.it_value.tv_sec = 0;
1433                 its.it_value.tv_nsec = 1;
1434         } else
1435                 timespec_store(&its.it_value, t);
1436
1437         r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1438         if (r < 0)
1439                 return r;
1440
1441         *next = t;
1442         return 0;
1443 }
1444
1445 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1446         assert(e);
1447         assert(s);
1448         assert(s->type == SOURCE_IO);
1449
1450         s->io.revents = events;
1451
1452         return source_set_pending(s, true);
1453 }
1454
1455 static int flush_timer(sd_event *e, int fd, uint32_t events, usec_t *next) {
1456         uint64_t x;
1457         ssize_t ss;
1458
1459         assert(e);
1460         assert(fd >= 0);
1461         assert(next);
1462
1463         assert_return(events == EPOLLIN, -EIO);
1464
1465         ss = read(fd, &x, sizeof(x));
1466         if (ss < 0) {
1467                 if (errno == EAGAIN || errno == EINTR)
1468                         return 0;
1469
1470                 return -errno;
1471         }
1472
1473         if (ss != sizeof(x))
1474                 return -EIO;
1475
1476         *next = (usec_t) -1;
1477
1478         return 0;
1479 }
1480
1481 static int process_timer(
1482                 sd_event *e,
1483                 usec_t n,
1484                 Prioq *earliest,
1485                 Prioq *latest) {
1486
1487         sd_event_source *s;
1488         int r;
1489
1490         assert(e);
1491
1492         for (;;) {
1493                 s = prioq_peek(earliest);
1494                 if (!s ||
1495                     s->time.next > n ||
1496                     s->enabled == SD_EVENT_OFF ||
1497                     s->pending)
1498                         break;
1499
1500                 r = source_set_pending(s, true);
1501                 if (r < 0)
1502                         return r;
1503
1504                 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1505                 prioq_reshuffle(latest, s, &s->time.latest_index);
1506         }
1507
1508         return 0;
1509 }
1510
1511 static int process_child(sd_event *e) {
1512         sd_event_source *s;
1513         Iterator i;
1514         int r;
1515
1516         assert(e);
1517
1518         e->need_process_child = false;
1519
1520         /*
1521            So, this is ugly. We iteratively invoke waitid() with P_PID
1522            + WNOHANG for each PID we wait for, instead of using
1523            P_ALL. This is because we only want to get child
1524            information of very specific child processes, and not all
1525            of them. We might not have processed the SIGCHLD even of a
1526            previous invocation and we don't want to maintain a
1527            unbounded *per-child* event queue, hence we really don't
1528            want anything flushed out of the kernel's queue that we
1529            don't care about. Since this is O(n) this means that if you
1530            have a lot of processes you probably want to handle SIGCHLD
1531            yourself.
1532         */
1533
1534         HASHMAP_FOREACH(s, e->child_sources, i) {
1535                 assert(s->type == SOURCE_CHILD);
1536
1537                 if (s->pending)
1538                         continue;
1539
1540                 if (s->enabled == SD_EVENT_OFF)
1541                         continue;
1542
1543                 zero(s->child.siginfo);
1544                 r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options);
1545                 if (r < 0)
1546                         return -errno;
1547
1548                 if (s->child.siginfo.si_pid != 0) {
1549                         r = source_set_pending(s, true);
1550                         if (r < 0)
1551                                 return r;
1552                 }
1553         }
1554
1555         return 0;
1556 }
1557
1558 static int process_signal(sd_event *e, uint32_t events) {
1559         bool read_one = false;
1560         int r;
1561
1562         assert(e);
1563         assert(e->signal_sources);
1564
1565         assert_return(events == EPOLLIN, -EIO);
1566
1567         for (;;) {
1568                 struct signalfd_siginfo si;
1569                 ssize_t ss;
1570                 sd_event_source *s;
1571
1572                 ss = read(e->signal_fd, &si, sizeof(si));
1573                 if (ss < 0) {
1574                         if (errno == EAGAIN || errno == EINTR)
1575                                 return read_one;
1576
1577                         return -errno;
1578                 }
1579
1580                 if (ss != sizeof(si))
1581                         return -EIO;
1582
1583                 read_one = true;
1584
1585                 s = e->signal_sources[si.ssi_signo];
1586                 if (si.ssi_signo == SIGCHLD) {
1587                         r = process_child(e);
1588                         if (r < 0)
1589                                 return r;
1590                         if (r > 0 || !s)
1591                                 continue;
1592                 } else
1593                         if (!s)
1594                                 return -EIO;
1595
1596                 s->signal.siginfo = si;
1597                 r = source_set_pending(s, true);
1598                 if (r < 0)
1599                         return r;
1600         }
1601
1602
1603         return 0;
1604 }
1605
1606 static int source_dispatch(sd_event_source *s) {
1607         int r = 0;
1608
1609         assert(s);
1610         assert(s->pending || s->type == SOURCE_QUIT);
1611
1612         if (s->type != SOURCE_DEFER && s->type != SOURCE_QUIT) {
1613                 r = source_set_pending(s, false);
1614                 if (r < 0)
1615                         return r;
1616         }
1617
1618         if (s->enabled == SD_EVENT_ONESHOT) {
1619                 r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
1620                 if (r < 0)
1621                         return r;
1622         }
1623
1624         sd_event_source_ref(s);
1625
1626         switch (s->type) {
1627
1628         case SOURCE_IO:
1629                 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1630                 break;
1631
1632         case SOURCE_MONOTONIC:
1633                 r = s->time.callback(s, s->time.next, s->userdata);
1634                 break;
1635
1636         case SOURCE_REALTIME:
1637                 r = s->time.callback(s, s->time.next, s->userdata);
1638                 break;
1639
1640         case SOURCE_SIGNAL:
1641                 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1642                 break;
1643
1644         case SOURCE_CHILD:
1645                 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1646                 break;
1647
1648         case SOURCE_DEFER:
1649                 r = s->defer.callback(s, s->userdata);
1650                 break;
1651
1652         case SOURCE_QUIT:
1653                 r = s->quit.callback(s, s->userdata);
1654                 break;
1655         }
1656
1657         sd_event_source_unref(s);
1658
1659         return r;
1660 }
1661
1662 static int event_prepare(sd_event *e) {
1663         int r;
1664
1665         assert(e);
1666
1667         for (;;) {
1668                 sd_event_source *s;
1669
1670                 s = prioq_peek(e->prepare);
1671                 if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
1672                         break;
1673
1674                 s->prepare_iteration = e->iteration;
1675                 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1676                 if (r < 0)
1677                         return r;
1678
1679                 assert(s->prepare);
1680                 r = s->prepare(s, s->userdata);
1681                 if (r < 0)
1682                         return r;
1683
1684         }
1685
1686         return 0;
1687 }
1688
1689 static int dispatch_quit(sd_event *e) {
1690         sd_event_source *p;
1691         int r;
1692
1693         assert(e);
1694
1695         p = prioq_peek(e->quit);
1696         if (!p || p->enabled == SD_EVENT_OFF) {
1697                 e->state = SD_EVENT_FINISHED;
1698                 return 0;
1699         }
1700
1701         sd_event_ref(e);
1702         e->iteration++;
1703         e->state = SD_EVENT_QUITTING;
1704
1705         r = source_dispatch(p);
1706
1707         e->state = SD_EVENT_PASSIVE;
1708         sd_event_unref(e);
1709
1710         return r;
1711 }
1712
1713 static sd_event_source* event_next_pending(sd_event *e) {
1714         sd_event_source *p;
1715
1716         assert(e);
1717
1718         p = prioq_peek(e->pending);
1719         if (!p)
1720                 return NULL;
1721
1722         if (p->enabled == SD_EVENT_OFF)
1723                 return NULL;
1724
1725         return p;
1726 }
1727
1728 _public_ int sd_event_run(sd_event *e, uint64_t timeout) {
1729         struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1730         sd_event_source *p;
1731         int r, i, m;
1732
1733         assert_return(e, -EINVAL);
1734         assert_return(!event_pid_changed(e), -ECHILD);
1735         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1736         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1737
1738         if (e->quit_requested)
1739                 return dispatch_quit(e);
1740
1741         sd_event_ref(e);
1742         e->iteration++;
1743         e->state = SD_EVENT_RUNNING;
1744
1745         r = event_prepare(e);
1746         if (r < 0)
1747                 goto finish;
1748
1749         if (event_next_pending(e) || e->need_process_child)
1750                 timeout = 0;
1751
1752         if (timeout > 0) {
1753                 r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1754                 if (r < 0)
1755                         goto finish;
1756
1757                 r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1758                 if (r < 0)
1759                         goto finish;
1760         }
1761
1762         m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1763                        timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1764         if (m < 0) {
1765                 r = errno == EAGAIN || errno == EINTR ? 0 : -errno;
1766                 goto finish;
1767         }
1768
1769         dual_timestamp_get(&e->timestamp);
1770
1771         for (i = 0; i < m; i++) {
1772
1773                 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1774                         r = flush_timer(e, e->monotonic_fd, ev_queue[i].events, &e->monotonic_next);
1775                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1776                         r = flush_timer(e, e->realtime_fd, ev_queue[i].events, &e->realtime_next);
1777                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1778                         r = process_signal(e, ev_queue[i].events);
1779                 else
1780                         r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1781
1782                 if (r < 0)
1783                         goto finish;
1784         }
1785
1786         r = process_timer(e, e->timestamp.monotonic, e->monotonic_earliest, e->monotonic_latest);
1787         if (r < 0)
1788                 goto finish;
1789
1790         r = process_timer(e, e->timestamp.realtime, e->realtime_earliest, e->realtime_latest);
1791         if (r < 0)
1792                 goto finish;
1793
1794         if (e->need_process_child) {
1795                 r = process_child(e);
1796                 if (r < 0)
1797                         goto finish;
1798         }
1799
1800         p = event_next_pending(e);
1801         if (!p) {
1802                 r = 0;
1803                 goto finish;
1804         }
1805
1806         r = source_dispatch(p);
1807
1808 finish:
1809         e->state = SD_EVENT_PASSIVE;
1810         sd_event_unref(e);
1811
1812         return r;
1813 }
1814
1815 _public_ int sd_event_loop(sd_event *e) {
1816         int r;
1817
1818         assert_return(e, -EINVAL);
1819         assert_return(!event_pid_changed(e), -ECHILD);
1820         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1821
1822         sd_event_ref(e);
1823
1824         while (e->state != SD_EVENT_FINISHED) {
1825                 r = sd_event_run(e, (uint64_t) -1);
1826                 if (r < 0)
1827                         goto finish;
1828         }
1829
1830         r = 0;
1831
1832 finish:
1833         sd_event_unref(e);
1834         return r;
1835 }
1836
1837 _public_ int sd_event_get_state(sd_event *e) {
1838         assert_return(e, -EINVAL);
1839         assert_return(!event_pid_changed(e), -ECHILD);
1840
1841         return e->state;
1842 }
1843
1844 _public_ int sd_event_get_quit(sd_event *e) {
1845         assert_return(e, -EINVAL);
1846         assert_return(!event_pid_changed(e), -ECHILD);
1847
1848         return e->quit_requested;
1849 }
1850
1851 _public_ int sd_event_request_quit(sd_event *e) {
1852         assert_return(e, -EINVAL);
1853         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1854         assert_return(!event_pid_changed(e), -ECHILD);
1855
1856         e->quit_requested = true;
1857         return 0;
1858 }
1859
1860 _public_ int sd_event_get_now_realtime(sd_event *e, uint64_t *usec) {
1861         assert_return(e, -EINVAL);
1862         assert_return(usec, -EINVAL);
1863         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1864         assert_return(!event_pid_changed(e), -ECHILD);
1865
1866         *usec = e->timestamp.realtime;
1867         return 0;
1868 }
1869
1870 _public_ int sd_event_get_now_monotonic(sd_event *e, uint64_t *usec) {
1871         assert_return(e, -EINVAL);
1872         assert_return(usec, -EINVAL);
1873         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1874         assert_return(!event_pid_changed(e), -ECHILD);
1875
1876         *usec = e->timestamp.monotonic;
1877         return 0;
1878 }
1879
1880 _public_ int sd_event_default(sd_event **ret) {
1881
1882         static __thread sd_event *default_event = NULL;
1883         sd_event *e;
1884         int r;
1885
1886         if (!ret)
1887                 return !!default_event;
1888
1889         if (default_event) {
1890                 *ret = sd_event_ref(default_event);
1891                 return 0;
1892         }
1893
1894         r = sd_event_new(&e);
1895         if (r < 0)
1896                 return r;
1897
1898         e->default_event_ptr = &default_event;
1899         e->tid = gettid();
1900         default_event = e;
1901
1902         *ret = e;
1903         return 1;
1904 }
1905
1906 _public_ int sd_event_get_tid(sd_event *e, pid_t *tid) {
1907         assert_return(e, -EINVAL);
1908         assert_return(tid, -EINVAL);
1909         assert_return(!event_pid_changed(e), -ECHILD);
1910
1911         if (e->tid != 0) {
1912                 *tid = e->tid;
1913                 return 0;
1914         }
1915
1916         return -ENXIO;
1917 }