chiark / gitweb /
event: when we change the io events to watch we need to figure out if a an event...
[elogind.git] / src / libsystemd-bus / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25
26 #include "sd-id128.h"
27 #include "macro.h"
28 #include "prioq.h"
29 #include "hashmap.h"
30 #include "util.h"
31 #include "time-util.h"
32 #include "missing.h"
33
34 #include "sd-event.h"
35
36 #define EPOLL_QUEUE_MAX 64
37 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
38
39 typedef enum EventSourceType {
40         SOURCE_IO,
41         SOURCE_MONOTONIC,
42         SOURCE_REALTIME,
43         SOURCE_SIGNAL,
44         SOURCE_CHILD,
45         SOURCE_DEFER,
46         SOURCE_QUIT
47 } EventSourceType;
48
49 struct sd_event_source {
50         unsigned n_ref;
51
52         sd_event *event;
53         void *userdata;
54         sd_event_handler_t prepare;
55
56         EventSourceType type:4;
57         int enabled:3;
58         bool pending:1;
59
60         int priority;
61         unsigned pending_index;
62         unsigned prepare_index;
63         unsigned pending_iteration;
64         unsigned prepare_iteration;
65
66         union {
67                 struct {
68                         sd_event_io_handler_t callback;
69                         int fd;
70                         uint32_t events;
71                         uint32_t revents;
72                         bool registered:1;
73                 } io;
74                 struct {
75                         sd_event_time_handler_t callback;
76                         usec_t next, accuracy;
77                         unsigned earliest_index;
78                         unsigned latest_index;
79                 } time;
80                 struct {
81                         sd_event_signal_handler_t callback;
82                         struct signalfd_siginfo siginfo;
83                         int sig;
84                 } signal;
85                 struct {
86                         sd_event_child_handler_t callback;
87                         siginfo_t siginfo;
88                         pid_t pid;
89                         int options;
90                 } child;
91                 struct {
92                         sd_event_handler_t callback;
93                 } defer;
94                 struct {
95                         sd_event_handler_t callback;
96                         unsigned prioq_index;
97                 } quit;
98         };
99 };
100
101 struct sd_event {
102         unsigned n_ref;
103
104         int epoll_fd;
105         int signal_fd;
106         int realtime_fd;
107         int monotonic_fd;
108
109         Prioq *pending;
110         Prioq *prepare;
111
112         /* For both clocks we maintain two priority queues each, one
113          * ordered for the earliest times the events may be
114          * dispatched, and one ordered by the latest times they must
115          * have been dispatched. The range between the top entries in
116          * the two prioqs is the time window we can freely schedule
117          * wakeups in */
118         Prioq *monotonic_earliest;
119         Prioq *monotonic_latest;
120         Prioq *realtime_earliest;
121         Prioq *realtime_latest;
122
123         usec_t realtime_next, monotonic_next;
124         usec_t perturb;
125
126         sigset_t sigset;
127         sd_event_source **signal_sources;
128
129         Hashmap *child_sources;
130         unsigned n_enabled_child_sources;
131
132         Prioq *quit;
133
134         pid_t original_pid;
135
136         unsigned iteration;
137         dual_timestamp timestamp;
138         int state;
139
140         bool quit_requested:1;
141         bool need_process_child:1;
142
143         pid_t tid;
144         sd_event **default_event_ptr;
145 };
146
147 static int pending_prioq_compare(const void *a, const void *b) {
148         const sd_event_source *x = a, *y = b;
149
150         assert(x->pending);
151         assert(y->pending);
152
153         /* Enabled ones first */
154         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
155                 return -1;
156         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
157                 return 1;
158
159         /* Lower priority values first */
160         if (x->priority < y->priority)
161                 return -1;
162         if (x->priority > y->priority)
163                 return 1;
164
165         /* Older entries first */
166         if (x->pending_iteration < y->pending_iteration)
167                 return -1;
168         if (x->pending_iteration > y->pending_iteration)
169                 return 1;
170
171         /* Stability for the rest */
172         if (x < y)
173                 return -1;
174         if (x > y)
175                 return 1;
176
177         return 0;
178 }
179
180 static int prepare_prioq_compare(const void *a, const void *b) {
181         const sd_event_source *x = a, *y = b;
182
183         assert(x->prepare);
184         assert(y->prepare);
185
186         /* Move most recently prepared ones last, so that we can stop
187          * preparing as soon as we hit one that has already been
188          * prepared in the current iteration */
189         if (x->prepare_iteration < y->prepare_iteration)
190                 return -1;
191         if (x->prepare_iteration > y->prepare_iteration)
192                 return 1;
193
194         /* Enabled ones first */
195         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
196                 return -1;
197         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
198                 return 1;
199
200         /* Lower priority values first */
201         if (x->priority < y->priority)
202                 return -1;
203         if (x->priority > y->priority)
204                 return 1;
205
206         /* Stability for the rest */
207         if (x < y)
208                 return -1;
209         if (x > y)
210                 return 1;
211
212         return 0;
213 }
214
215 static int earliest_time_prioq_compare(const void *a, const void *b) {
216         const sd_event_source *x = a, *y = b;
217
218         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
219         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
220
221         /* Enabled ones first */
222         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
223                 return -1;
224         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
225                 return 1;
226
227         /* Move the pending ones to the end */
228         if (!x->pending && y->pending)
229                 return -1;
230         if (x->pending && !y->pending)
231                 return 1;
232
233         /* Order by time */
234         if (x->time.next < y->time.next)
235                 return -1;
236         if (x->time.next > y->time.next)
237                 return 1;
238
239         /* Stability for the rest */
240         if (x < y)
241                 return -1;
242         if (x > y)
243                 return 1;
244
245         return 0;
246 }
247
248 static int latest_time_prioq_compare(const void *a, const void *b) {
249         const sd_event_source *x = a, *y = b;
250
251         assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
252                (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
253
254         /* Enabled ones first */
255         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
256                 return -1;
257         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
258                 return 1;
259
260         /* Move the pending ones to the end */
261         if (!x->pending && y->pending)
262                 return -1;
263         if (x->pending && !y->pending)
264                 return 1;
265
266         /* Order by time */
267         if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
268                 return -1;
269         if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
270                 return 1;
271
272         /* Stability for the rest */
273         if (x < y)
274                 return -1;
275         if (x > y)
276                 return 1;
277
278         return 0;
279 }
280
281 static int quit_prioq_compare(const void *a, const void *b) {
282         const sd_event_source *x = a, *y = b;
283
284         assert(x->type == SOURCE_QUIT);
285         assert(y->type == SOURCE_QUIT);
286
287         /* Enabled ones first */
288         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
289                 return -1;
290         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
291                 return 1;
292
293         /* Lower priority values first */
294         if (x->priority < y->priority)
295                 return -1;
296         if (x->priority > y->priority)
297                 return 1;
298
299         /* Stability for the rest */
300         if (x < y)
301                 return -1;
302         if (x > y)
303                 return 1;
304
305         return 0;
306 }
307
308 static void event_free(sd_event *e) {
309         assert(e);
310
311         if (e->default_event_ptr)
312                 *(e->default_event_ptr) = NULL;
313
314         if (e->epoll_fd >= 0)
315                 close_nointr_nofail(e->epoll_fd);
316
317         if (e->signal_fd >= 0)
318                 close_nointr_nofail(e->signal_fd);
319
320         if (e->realtime_fd >= 0)
321                 close_nointr_nofail(e->realtime_fd);
322
323         if (e->monotonic_fd >= 0)
324                 close_nointr_nofail(e->monotonic_fd);
325
326         prioq_free(e->pending);
327         prioq_free(e->prepare);
328         prioq_free(e->monotonic_earliest);
329         prioq_free(e->monotonic_latest);
330         prioq_free(e->realtime_earliest);
331         prioq_free(e->realtime_latest);
332         prioq_free(e->quit);
333
334         free(e->signal_sources);
335
336         hashmap_free(e->child_sources);
337         free(e);
338 }
339
340 _public_ int sd_event_new(sd_event** ret) {
341         sd_event *e;
342         int r;
343
344         assert_return(ret, -EINVAL);
345
346         e = new0(sd_event, 1);
347         if (!e)
348                 return -ENOMEM;
349
350         e->n_ref = 1;
351         e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
352         e->realtime_next = e->monotonic_next = (usec_t) -1;
353         e->original_pid = getpid();
354
355         assert_se(sigemptyset(&e->sigset) == 0);
356
357         e->pending = prioq_new(pending_prioq_compare);
358         if (!e->pending) {
359                 r = -ENOMEM;
360                 goto fail;
361         }
362
363         e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
364         if (e->epoll_fd < 0) {
365                 r = -errno;
366                 goto fail;
367         }
368
369         *ret = e;
370         return 0;
371
372 fail:
373         event_free(e);
374         return r;
375 }
376
377 _public_ sd_event* sd_event_ref(sd_event *e) {
378         assert_return(e, NULL);
379
380         assert(e->n_ref >= 1);
381         e->n_ref++;
382
383         return e;
384 }
385
386 _public_ sd_event* sd_event_unref(sd_event *e) {
387         assert_return(e, NULL);
388
389         assert(e->n_ref >= 1);
390         e->n_ref--;
391
392         if (e->n_ref <= 0)
393                 event_free(e);
394
395         return NULL;
396 }
397
398 static bool event_pid_changed(sd_event *e) {
399         assert(e);
400
401         /* We don't support people creating am event loop and keeping
402          * it around over a fork(). Let's complain. */
403
404         return e->original_pid != getpid();
405 }
406
407 static int source_io_unregister(sd_event_source *s) {
408         int r;
409
410         assert(s);
411         assert(s->type == SOURCE_IO);
412
413         if (!s->io.registered)
414                 return 0;
415
416         r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
417         if (r < 0)
418                 return -errno;
419
420         s->io.registered = false;
421         return 0;
422 }
423
424 static int source_io_register(
425                 sd_event_source *s,
426                 int enabled,
427                 uint32_t events) {
428
429         struct epoll_event ev = {};
430         int r;
431
432         assert(s);
433         assert(s->type == SOURCE_IO);
434         assert(enabled != SD_EVENT_OFF);
435
436         ev.events = events;
437         ev.data.ptr = s;
438
439         if (enabled == SD_EVENT_ONESHOT)
440                 ev.events |= EPOLLONESHOT;
441
442         if (s->io.registered)
443                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
444         else
445                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
446
447         if (r < 0)
448                 return -errno;
449
450         s->io.registered = true;
451
452         return 0;
453 }
454
455 static void source_free(sd_event_source *s) {
456         assert(s);
457
458         if (s->event) {
459                 switch (s->type) {
460
461                 case SOURCE_IO:
462                         if (s->io.fd >= 0)
463                                 source_io_unregister(s);
464
465                         break;
466
467                 case SOURCE_MONOTONIC:
468                         prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
469                         prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
470                         break;
471
472                 case SOURCE_REALTIME:
473                         prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
474                         prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
475                         break;
476
477                 case SOURCE_SIGNAL:
478                         if (s->signal.sig > 0) {
479                                 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)
480                                         assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
481
482                                 if (s->event->signal_sources)
483                                         s->event->signal_sources[s->signal.sig] = NULL;
484                         }
485
486                         break;
487
488                 case SOURCE_CHILD:
489                         if (s->child.pid > 0) {
490                                 if (s->enabled != SD_EVENT_OFF) {
491                                         assert(s->event->n_enabled_child_sources > 0);
492                                         s->event->n_enabled_child_sources--;
493                                 }
494
495                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
496                                         assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
497
498                                 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
499                         }
500
501                         break;
502
503                 case SOURCE_DEFER:
504                         /* nothing */
505                         break;
506
507                 case SOURCE_QUIT:
508                         prioq_remove(s->event->quit, s, &s->quit.prioq_index);
509                         break;
510                 }
511
512                 if (s->pending)
513                         prioq_remove(s->event->pending, s, &s->pending_index);
514
515                 if (s->prepare)
516                         prioq_remove(s->event->prepare, s, &s->prepare_index);
517
518                 sd_event_unref(s->event);
519         }
520
521         free(s);
522 }
523
524 static int source_set_pending(sd_event_source *s, bool b) {
525         int r;
526
527         assert(s);
528         assert(s->type != SOURCE_QUIT);
529
530         if (s->pending == b)
531                 return 0;
532
533         s->pending = b;
534
535         if (b) {
536                 s->pending_iteration = s->event->iteration;
537
538                 r = prioq_put(s->event->pending, s, &s->pending_index);
539                 if (r < 0) {
540                         s->pending = false;
541                         return r;
542                 }
543         } else
544                 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
545
546         return 0;
547 }
548
549 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
550         sd_event_source *s;
551
552         assert(e);
553
554         s = new0(sd_event_source, 1);
555         if (!s)
556                 return NULL;
557
558         s->n_ref = 1;
559         s->event = sd_event_ref(e);
560         s->type = type;
561         s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
562
563         return s;
564 }
565
566 _public_ int sd_event_add_io(
567                 sd_event *e,
568                 int fd,
569                 uint32_t events,
570                 sd_event_io_handler_t callback,
571                 void *userdata,
572                 sd_event_source **ret) {
573
574         sd_event_source *s;
575         int r;
576
577         assert_return(e, -EINVAL);
578         assert_return(fd >= 0, -EINVAL);
579         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
580         assert_return(callback, -EINVAL);
581         assert_return(ret, -EINVAL);
582         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
583         assert_return(!event_pid_changed(e), -ECHILD);
584
585         s = source_new(e, SOURCE_IO);
586         if (!s)
587                 return -ENOMEM;
588
589         s->io.fd = fd;
590         s->io.events = events;
591         s->io.callback = callback;
592         s->userdata = userdata;
593         s->enabled = SD_EVENT_ON;
594
595         r = source_io_register(s, s->enabled, events);
596         if (r < 0) {
597                 source_free(s);
598                 return -errno;
599         }
600
601         *ret = s;
602         return 0;
603 }
604
605 static int event_setup_timer_fd(
606                 sd_event *e,
607                 EventSourceType type,
608                 int *timer_fd,
609                 clockid_t id) {
610
611         struct epoll_event ev = {};
612         int r, fd;
613         sd_id128_t bootid;
614
615         assert(e);
616         assert(timer_fd);
617
618         if (_likely_(*timer_fd >= 0))
619                 return 0;
620
621         fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
622         if (fd < 0)
623                 return -errno;
624
625         ev.events = EPOLLIN;
626         ev.data.ptr = INT_TO_PTR(type);
627
628         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
629         if (r < 0) {
630                 close_nointr_nofail(fd);
631                 return -errno;
632         }
633
634         /* When we sleep for longer, we try to realign the wakeup to
635            the same time wihtin each second, so that events all across
636            the system can be coalesced into a single CPU
637            wakeup. However, let's take some system-specific randomness
638            for this value, so that in a network of systems with synced
639            clocks timer events are distributed a bit. Here, we
640            calculate a perturbation usec offset from the boot ID. */
641
642         if (sd_id128_get_boot(&bootid) >= 0)
643                 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_SEC;
644
645         *timer_fd = fd;
646         return 0;
647 }
648
649 static int event_add_time_internal(
650                 sd_event *e,
651                 EventSourceType type,
652                 int *timer_fd,
653                 clockid_t id,
654                 Prioq **earliest,
655                 Prioq **latest,
656                 uint64_t usec,
657                 uint64_t accuracy,
658                 sd_event_time_handler_t callback,
659                 void *userdata,
660                 sd_event_source **ret) {
661
662         sd_event_source *s;
663         int r;
664
665         assert_return(e, -EINVAL);
666         assert_return(callback, -EINVAL);
667         assert_return(ret, -EINVAL);
668         assert_return(usec != (uint64_t) -1, -EINVAL);
669         assert_return(accuracy != (uint64_t) -1, -EINVAL);
670         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
671         assert_return(!event_pid_changed(e), -ECHILD);
672
673         assert(timer_fd);
674         assert(earliest);
675         assert(latest);
676
677         if (!*earliest) {
678                 *earliest = prioq_new(earliest_time_prioq_compare);
679                 if (!*earliest)
680                         return -ENOMEM;
681         }
682
683         if (!*latest) {
684                 *latest = prioq_new(latest_time_prioq_compare);
685                 if (!*latest)
686                         return -ENOMEM;
687         }
688
689         if (*timer_fd < 0) {
690                 r = event_setup_timer_fd(e, type, timer_fd, id);
691                 if (r < 0)
692                         return r;
693         }
694
695         s = source_new(e, type);
696         if (!s)
697                 return -ENOMEM;
698
699         s->time.next = usec;
700         s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
701         s->time.callback = callback;
702         s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
703         s->userdata = userdata;
704         s->enabled = SD_EVENT_ONESHOT;
705
706         r = prioq_put(*earliest, s, &s->time.earliest_index);
707         if (r < 0)
708                 goto fail;
709
710         r = prioq_put(*latest, s, &s->time.latest_index);
711         if (r < 0)
712                 goto fail;
713
714         *ret = s;
715         return 0;
716
717 fail:
718         source_free(s);
719         return r;
720 }
721
722 _public_ int sd_event_add_monotonic(sd_event *e,
723                                     uint64_t usec,
724                                     uint64_t accuracy,
725                                     sd_event_time_handler_t callback,
726                                     void *userdata,
727                                     sd_event_source **ret) {
728
729         return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
730 }
731
732 _public_ int sd_event_add_realtime(sd_event *e,
733                                    uint64_t usec,
734                                    uint64_t accuracy,
735                                    sd_event_time_handler_t callback,
736                                    void *userdata,
737                                    sd_event_source **ret) {
738
739         return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
740 }
741
742 static int event_update_signal_fd(sd_event *e) {
743         struct epoll_event ev = {};
744         bool add_to_epoll;
745         int r;
746
747         assert(e);
748
749         add_to_epoll = e->signal_fd < 0;
750
751         r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
752         if (r < 0)
753                 return -errno;
754
755         e->signal_fd = r;
756
757         if (!add_to_epoll)
758                 return 0;
759
760         ev.events = EPOLLIN;
761         ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
762
763         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
764         if (r < 0) {
765                 close_nointr_nofail(e->signal_fd);
766                 e->signal_fd = -1;
767
768                 return -errno;
769         }
770
771         return 0;
772 }
773
774 _public_ int sd_event_add_signal(
775                 sd_event *e,
776                 int sig,
777                 sd_event_signal_handler_t callback,
778                 void *userdata,
779                 sd_event_source **ret) {
780
781         sd_event_source *s;
782         int r;
783
784         assert_return(e, -EINVAL);
785         assert_return(sig > 0, -EINVAL);
786         assert_return(sig < _NSIG, -EINVAL);
787         assert_return(callback, -EINVAL);
788         assert_return(ret, -EINVAL);
789         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
790         assert_return(!event_pid_changed(e), -ECHILD);
791
792         if (!e->signal_sources) {
793                 e->signal_sources = new0(sd_event_source*, _NSIG);
794                 if (!e->signal_sources)
795                         return -ENOMEM;
796         } else if (e->signal_sources[sig])
797                 return -EBUSY;
798
799         s = source_new(e, SOURCE_SIGNAL);
800         if (!s)
801                 return -ENOMEM;
802
803         s->signal.sig = sig;
804         s->signal.callback = callback;
805         s->userdata = userdata;
806         s->enabled = SD_EVENT_ON;
807
808         e->signal_sources[sig] = s;
809         assert_se(sigaddset(&e->sigset, sig) == 0);
810
811         if (sig != SIGCHLD || e->n_enabled_child_sources == 0) {
812                 r = event_update_signal_fd(e);
813                 if (r < 0) {
814                         source_free(s);
815                         return r;
816                 }
817         }
818
819         *ret = s;
820         return 0;
821 }
822
823 _public_ int sd_event_add_child(
824                 sd_event *e,
825                 pid_t pid,
826                 int options,
827                 sd_event_child_handler_t callback,
828                 void *userdata,
829                 sd_event_source **ret) {
830
831         sd_event_source *s;
832         int r;
833
834         assert_return(e, -EINVAL);
835         assert_return(pid > 1, -EINVAL);
836         assert_return(!(options & ~(WEXITED|WSTOPPED|WCONTINUED)), -EINVAL);
837         assert_return(options != 0, -EINVAL);
838         assert_return(callback, -EINVAL);
839         assert_return(ret, -EINVAL);
840         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
841         assert_return(!event_pid_changed(e), -ECHILD);
842
843         r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
844         if (r < 0)
845                 return r;
846
847         if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
848                 return -EBUSY;
849
850         s = source_new(e, SOURCE_CHILD);
851         if (!s)
852                 return -ENOMEM;
853
854         s->child.pid = pid;
855         s->child.options = options;
856         s->child.callback = callback;
857         s->userdata = userdata;
858         s->enabled = SD_EVENT_ONESHOT;
859
860         r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
861         if (r < 0) {
862                 source_free(s);
863                 return r;
864         }
865
866         e->n_enabled_child_sources ++;
867
868         assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
869
870         if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
871                 r = event_update_signal_fd(e);
872                 if (r < 0) {
873                         source_free(s);
874                         return -errno;
875                 }
876         }
877
878         e->need_process_child = true;
879
880         *ret = s;
881         return 0;
882 }
883
884 _public_ int sd_event_add_defer(
885                 sd_event *e,
886                 sd_event_handler_t callback,
887                 void *userdata,
888                 sd_event_source **ret) {
889
890         sd_event_source *s;
891         int r;
892
893         assert_return(e, -EINVAL);
894         assert_return(callback, -EINVAL);
895         assert_return(ret, -EINVAL);
896         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
897         assert_return(!event_pid_changed(e), -ECHILD);
898
899         s = source_new(e, SOURCE_DEFER);
900         if (!s)
901                 return -ENOMEM;
902
903         s->defer.callback = callback;
904         s->userdata = userdata;
905         s->enabled = SD_EVENT_ONESHOT;
906
907         r = source_set_pending(s, true);
908         if (r < 0) {
909                 source_free(s);
910                 return r;
911         }
912
913         *ret = s;
914         return 0;
915 }
916
917 _public_ int sd_event_add_quit(
918                 sd_event *e,
919                 sd_event_handler_t callback,
920                 void *userdata,
921                 sd_event_source **ret) {
922
923         sd_event_source *s;
924         int r;
925
926         assert_return(e, -EINVAL);
927         assert_return(callback, -EINVAL);
928         assert_return(ret, -EINVAL);
929         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
930         assert_return(!event_pid_changed(e), -ECHILD);
931
932         if (!e->quit) {
933                 e->quit = prioq_new(quit_prioq_compare);
934                 if (!e->quit)
935                         return -ENOMEM;
936         }
937
938         s = source_new(e, SOURCE_QUIT);
939         if (!s)
940                 return -ENOMEM;
941
942         s->quit.callback = callback;
943         s->userdata = userdata;
944         s->quit.prioq_index = PRIOQ_IDX_NULL;
945         s->enabled = SD_EVENT_ONESHOT;
946
947         r = prioq_put(s->event->quit, s, &s->quit.prioq_index);
948         if (r < 0) {
949                 source_free(s);
950                 return r;
951         }
952
953         *ret = s;
954         return 0;
955 }
956
957 _public_ sd_event_source* sd_event_source_ref(sd_event_source *s) {
958         assert_return(s, NULL);
959
960         assert(s->n_ref >= 1);
961         s->n_ref++;
962
963         return s;
964 }
965
966 _public_ sd_event_source* sd_event_source_unref(sd_event_source *s) {
967         assert_return(s, NULL);
968
969         assert(s->n_ref >= 1);
970         s->n_ref--;
971
972         if (s->n_ref <= 0)
973                 source_free(s);
974
975         return NULL;
976 }
977
978 _public_ sd_event *sd_event_get(sd_event_source *s) {
979         assert_return(s, NULL);
980
981         return s->event;
982 }
983
984 _public_ int sd_event_source_get_pending(sd_event_source *s) {
985         assert_return(s, -EINVAL);
986         assert_return(s->type != SOURCE_QUIT, -EDOM);
987         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
988         assert_return(!event_pid_changed(s->event), -ECHILD);
989
990         return s->pending;
991 }
992
993 _public_ int sd_event_source_get_io_fd(sd_event_source *s) {
994         assert_return(s, -EINVAL);
995         assert_return(s->type == SOURCE_IO, -EDOM);
996         assert_return(!event_pid_changed(s->event), -ECHILD);
997
998         return s->io.fd;
999 }
1000
1001 _public_ int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
1002         assert_return(s, -EINVAL);
1003         assert_return(events, -EINVAL);
1004         assert_return(s->type == SOURCE_IO, -EDOM);
1005         assert_return(!event_pid_changed(s->event), -ECHILD);
1006
1007         *events = s->io.events;
1008         return 0;
1009 }
1010
1011 _public_ int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
1012         int r;
1013
1014         assert_return(s, -EINVAL);
1015         assert_return(s->type == SOURCE_IO, -EDOM);
1016         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
1017         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1018         assert_return(!event_pid_changed(s->event), -ECHILD);
1019
1020         if (s->io.events == events)
1021                 return 0;
1022
1023         if (s->enabled != SD_EVENT_OFF) {
1024                 r = source_io_register(s, s->enabled, events);
1025                 if (r < 0)
1026                         return r;
1027         }
1028
1029         s->io.events = events;
1030         source_set_pending(s, false);
1031
1032         return 0;
1033 }
1034
1035 _public_ int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1036         assert_return(s, -EINVAL);
1037         assert_return(revents, -EINVAL);
1038         assert_return(s->type == SOURCE_IO, -EDOM);
1039         assert_return(s->pending, -ENODATA);
1040         assert_return(!event_pid_changed(s->event), -ECHILD);
1041
1042         *revents = s->io.revents;
1043         return 0;
1044 }
1045
1046 _public_ int sd_event_source_get_signal(sd_event_source *s) {
1047         assert_return(s, -EINVAL);
1048         assert_return(s->type == SOURCE_SIGNAL, -EDOM);
1049         assert_return(!event_pid_changed(s->event), -ECHILD);
1050
1051         return s->signal.sig;
1052 }
1053
1054 _public_ int sd_event_source_get_priority(sd_event_source *s, int *priority) {
1055         assert_return(s, -EINVAL);
1056         assert_return(!event_pid_changed(s->event), -ECHILD);
1057
1058         return s->priority;
1059 }
1060
1061 _public_ int sd_event_source_set_priority(sd_event_source *s, int priority) {
1062         assert_return(s, -EINVAL);
1063         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1064         assert_return(!event_pid_changed(s->event), -ECHILD);
1065
1066         if (s->priority == priority)
1067                 return 0;
1068
1069         s->priority = priority;
1070
1071         if (s->pending)
1072                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1073
1074         if (s->prepare)
1075                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1076
1077         if (s->type == SOURCE_QUIT)
1078                 prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1079
1080         return 0;
1081 }
1082
1083 _public_ int sd_event_source_get_enabled(sd_event_source *s, int *m) {
1084         assert_return(s, -EINVAL);
1085         assert_return(m, -EINVAL);
1086         assert_return(!event_pid_changed(s->event), -ECHILD);
1087
1088         *m = s->enabled;
1089         return 0;
1090 }
1091
1092 _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
1093         int r;
1094
1095         assert_return(s, -EINVAL);
1096         assert_return(m == SD_EVENT_OFF || m == SD_EVENT_ON || m == SD_EVENT_ONESHOT, -EINVAL);
1097         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1098         assert_return(!event_pid_changed(s->event), -ECHILD);
1099
1100         if (s->enabled == m)
1101                 return 0;
1102
1103         if (m == SD_EVENT_OFF) {
1104
1105                 switch (s->type) {
1106
1107                 case SOURCE_IO:
1108                         r = source_io_unregister(s);
1109                         if (r < 0)
1110                                 return r;
1111
1112                         s->enabled = m;
1113                         break;
1114
1115                 case SOURCE_MONOTONIC:
1116                         s->enabled = m;
1117                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1118                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1119                         break;
1120
1121                 case SOURCE_REALTIME:
1122                         s->enabled = m;
1123                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1124                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1125                         break;
1126
1127                 case SOURCE_SIGNAL:
1128                         s->enabled = m;
1129                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1130                                 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1131                                 event_update_signal_fd(s->event);
1132                         }
1133
1134                         break;
1135
1136                 case SOURCE_CHILD:
1137                         s->enabled = m;
1138
1139                         assert(s->event->n_enabled_child_sources > 0);
1140                         s->event->n_enabled_child_sources--;
1141
1142                         if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1143                                 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1144                                 event_update_signal_fd(s->event);
1145                         }
1146
1147                         break;
1148
1149                 case SOURCE_QUIT:
1150                         s->enabled = m;
1151                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1152                         break;
1153
1154                 case SOURCE_DEFER:
1155                         s->enabled = m;
1156                         break;
1157                 }
1158
1159         } else {
1160                 switch (s->type) {
1161
1162                 case SOURCE_IO:
1163                         r = source_io_register(s, m, s->io.events);
1164                         if (r < 0)
1165                                 return r;
1166
1167                         s->enabled = m;
1168                         break;
1169
1170                 case SOURCE_MONOTONIC:
1171                         s->enabled = m;
1172                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1173                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1174                         break;
1175
1176                 case SOURCE_REALTIME:
1177                         s->enabled = m;
1178                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1179                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1180                         break;
1181
1182                 case SOURCE_SIGNAL:
1183                         s->enabled = m;
1184
1185                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)  {
1186                                 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1187                                 event_update_signal_fd(s->event);
1188                         }
1189                         break;
1190
1191                 case SOURCE_CHILD:
1192                         s->enabled = m;
1193
1194                         if (s->enabled == SD_EVENT_OFF) {
1195                                 s->event->n_enabled_child_sources++;
1196
1197                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1198                                         assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1199                                         event_update_signal_fd(s->event);
1200                                 }
1201                         }
1202                         break;
1203
1204                 case SOURCE_QUIT:
1205                         s->enabled = m;
1206                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1207                         break;
1208
1209                 case SOURCE_DEFER:
1210                         s->enabled = m;
1211                         break;
1212                 }
1213         }
1214
1215         if (s->pending)
1216                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1217
1218         if (s->prepare)
1219                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1220
1221         return 0;
1222 }
1223
1224 _public_ int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1225         assert_return(s, -EINVAL);
1226         assert_return(usec, -EINVAL);
1227         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1228         assert_return(!event_pid_changed(s->event), -ECHILD);
1229
1230         *usec = s->time.next;
1231         return 0;
1232 }
1233
1234 _public_ int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1235         assert_return(s, -EINVAL);
1236         assert_return(usec != (uint64_t) -1, -EINVAL);
1237         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1238         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1239         assert_return(!event_pid_changed(s->event), -ECHILD);
1240
1241         if (s->time.next == usec)
1242                 return 0;
1243
1244         s->time.next = usec;
1245         source_set_pending(s, false);
1246
1247         if (s->type == SOURCE_REALTIME) {
1248                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1249                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1250         } else {
1251                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1252                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1253         }
1254
1255         return 0;
1256 }
1257
1258 _public_ int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1259         assert_return(s, -EINVAL);
1260         assert_return(usec, -EINVAL);
1261         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1262         assert_return(!event_pid_changed(s->event), -ECHILD);
1263
1264         *usec = s->time.accuracy;
1265         return 0;
1266 }
1267
1268 _public_ int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1269         assert_return(s, -EINVAL);
1270         assert_return(usec != (uint64_t) -1, -EINVAL);
1271         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1272         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1273         assert_return(!event_pid_changed(s->event), -ECHILD);
1274
1275         if (usec == 0)
1276                 usec = DEFAULT_ACCURACY_USEC;
1277
1278         if (s->time.accuracy == usec)
1279                 return 0;
1280
1281         s->time.accuracy = usec;
1282
1283         if (s->type == SOURCE_REALTIME)
1284                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1285         else
1286                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1287
1288         return 0;
1289 }
1290
1291 _public_ int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid) {
1292         assert_return(s, -EINVAL);
1293         assert_return(pid, -EINVAL);
1294         assert_return(s->type == SOURCE_CHILD, -EDOM);
1295         assert_return(!event_pid_changed(s->event), -ECHILD);
1296
1297         *pid = s->child.pid;
1298         return 0;
1299 }
1300
1301 _public_ int sd_event_source_set_prepare(sd_event_source *s, sd_event_handler_t callback) {
1302         int r;
1303
1304         assert_return(s, -EINVAL);
1305         assert_return(s->type != SOURCE_QUIT, -EDOM);
1306         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1307         assert_return(!event_pid_changed(s->event), -ECHILD);
1308
1309         if (s->prepare == callback)
1310                 return 0;
1311
1312         if (callback && s->prepare) {
1313                 s->prepare = callback;
1314                 return 0;
1315         }
1316
1317         r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1318         if (r < 0)
1319                 return r;
1320
1321         s->prepare = callback;
1322
1323         if (callback) {
1324                 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1325                 if (r < 0)
1326                         return r;
1327         } else
1328                 prioq_remove(s->event->prepare, s, &s->prepare_index);
1329
1330         return 0;
1331 }
1332
1333 _public_ void* sd_event_source_get_userdata(sd_event_source *s) {
1334         assert_return(s, NULL);
1335
1336         return s->userdata;
1337 }
1338
1339 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1340         usec_t c;
1341         assert(e);
1342         assert(a <= b);
1343
1344         if (a <= 0)
1345                 return 0;
1346
1347         if (b <= a + 1)
1348                 return a;
1349
1350         /*
1351           Find a good time to wake up again between times a and b. We
1352           have two goals here:
1353
1354           a) We want to wake up as seldom as possible, hence prefer
1355              later times over earlier times.
1356
1357           b) But if we have to wake up, then let's make sure to
1358              dispatch as much as possible on the entire system.
1359
1360           We implement this by waking up everywhere at the same time
1361           within any given second if we can, synchronised via the
1362           perturbation value determined from the boot ID. If we can't,
1363           then we try to find the same spot in every a 250ms
1364           step. Otherwise, we pick the last possible time to wake up.
1365         */
1366
1367         c = (b / USEC_PER_SEC) * USEC_PER_SEC + e->perturb;
1368         if (c >= b) {
1369                 if (_unlikely_(c < USEC_PER_SEC))
1370                         return b;
1371
1372                 c -= USEC_PER_SEC;
1373         }
1374
1375         if (c >= a)
1376                 return c;
1377
1378         c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1379         if (c >= b) {
1380                 if (_unlikely_(c < USEC_PER_MSEC*250))
1381                         return b;
1382
1383                 c -= USEC_PER_MSEC*250;
1384         }
1385
1386         if (c >= a)
1387                 return c;
1388
1389         return b;
1390 }
1391
1392 static int event_arm_timer(
1393                 sd_event *e,
1394                 int timer_fd,
1395                 Prioq *earliest,
1396                 Prioq *latest,
1397                 usec_t *next) {
1398
1399         struct itimerspec its = {};
1400         sd_event_source *a, *b;
1401         usec_t t;
1402         int r;
1403
1404         assert_se(e);
1405         assert_se(next);
1406
1407         a = prioq_peek(earliest);
1408         if (!a || a->enabled == SD_EVENT_OFF) {
1409
1410                 if (*next == (usec_t) -1)
1411                         return 0;
1412
1413                 /* disarm */
1414                 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1415                 if (r < 0)
1416                         return r;
1417
1418                 *next = (usec_t) -1;
1419
1420                 return 0;
1421         }
1422
1423         b = prioq_peek(latest);
1424         assert_se(b && b->enabled != SD_EVENT_OFF);
1425
1426         t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1427         if (*next == t)
1428                 return 0;
1429
1430         assert_se(timer_fd >= 0);
1431
1432         if (t == 0) {
1433                 /* We don' want to disarm here, just mean some time looooong ago. */
1434                 its.it_value.tv_sec = 0;
1435                 its.it_value.tv_nsec = 1;
1436         } else
1437                 timespec_store(&its.it_value, t);
1438
1439         r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1440         if (r < 0)
1441                 return r;
1442
1443         *next = t;
1444         return 0;
1445 }
1446
1447 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1448         assert(e);
1449         assert(s);
1450         assert(s->type == SOURCE_IO);
1451
1452         s->io.revents = events;
1453
1454         return source_set_pending(s, true);
1455 }
1456
1457 static int flush_timer(sd_event *e, int fd, uint32_t events, usec_t *next) {
1458         uint64_t x;
1459         ssize_t ss;
1460
1461         assert(e);
1462         assert(fd >= 0);
1463         assert(next);
1464
1465         assert_return(events == EPOLLIN, -EIO);
1466
1467         ss = read(fd, &x, sizeof(x));
1468         if (ss < 0) {
1469                 if (errno == EAGAIN || errno == EINTR)
1470                         return 0;
1471
1472                 return -errno;
1473         }
1474
1475         if (ss != sizeof(x))
1476                 return -EIO;
1477
1478         *next = (usec_t) -1;
1479
1480         return 0;
1481 }
1482
1483 static int process_timer(
1484                 sd_event *e,
1485                 usec_t n,
1486                 Prioq *earliest,
1487                 Prioq *latest) {
1488
1489         sd_event_source *s;
1490         int r;
1491
1492         assert(e);
1493
1494         for (;;) {
1495                 s = prioq_peek(earliest);
1496                 if (!s ||
1497                     s->time.next > n ||
1498                     s->enabled == SD_EVENT_OFF ||
1499                     s->pending)
1500                         break;
1501
1502                 r = source_set_pending(s, true);
1503                 if (r < 0)
1504                         return r;
1505
1506                 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1507                 prioq_reshuffle(latest, s, &s->time.latest_index);
1508         }
1509
1510         return 0;
1511 }
1512
1513 static int process_child(sd_event *e) {
1514         sd_event_source *s;
1515         Iterator i;
1516         int r;
1517
1518         assert(e);
1519
1520         e->need_process_child = false;
1521
1522         /*
1523            So, this is ugly. We iteratively invoke waitid() with P_PID
1524            + WNOHANG for each PID we wait for, instead of using
1525            P_ALL. This is because we only want to get child
1526            information of very specific child processes, and not all
1527            of them. We might not have processed the SIGCHLD even of a
1528            previous invocation and we don't want to maintain a
1529            unbounded *per-child* event queue, hence we really don't
1530            want anything flushed out of the kernel's queue that we
1531            don't care about. Since this is O(n) this means that if you
1532            have a lot of processes you probably want to handle SIGCHLD
1533            yourself.
1534         */
1535
1536         HASHMAP_FOREACH(s, e->child_sources, i) {
1537                 assert(s->type == SOURCE_CHILD);
1538
1539                 if (s->pending)
1540                         continue;
1541
1542                 if (s->enabled == SD_EVENT_OFF)
1543                         continue;
1544
1545                 zero(s->child.siginfo);
1546                 r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options);
1547                 if (r < 0)
1548                         return -errno;
1549
1550                 if (s->child.siginfo.si_pid != 0) {
1551                         r = source_set_pending(s, true);
1552                         if (r < 0)
1553                                 return r;
1554                 }
1555         }
1556
1557         return 0;
1558 }
1559
1560 static int process_signal(sd_event *e, uint32_t events) {
1561         bool read_one = false;
1562         int r;
1563
1564         assert(e);
1565         assert(e->signal_sources);
1566
1567         assert_return(events == EPOLLIN, -EIO);
1568
1569         for (;;) {
1570                 struct signalfd_siginfo si;
1571                 ssize_t ss;
1572                 sd_event_source *s;
1573
1574                 ss = read(e->signal_fd, &si, sizeof(si));
1575                 if (ss < 0) {
1576                         if (errno == EAGAIN || errno == EINTR)
1577                                 return read_one;
1578
1579                         return -errno;
1580                 }
1581
1582                 if (ss != sizeof(si))
1583                         return -EIO;
1584
1585                 read_one = true;
1586
1587                 s = e->signal_sources[si.ssi_signo];
1588                 if (si.ssi_signo == SIGCHLD) {
1589                         r = process_child(e);
1590                         if (r < 0)
1591                                 return r;
1592                         if (r > 0 || !s)
1593                                 continue;
1594                 } else
1595                         if (!s)
1596                                 return -EIO;
1597
1598                 s->signal.siginfo = si;
1599                 r = source_set_pending(s, true);
1600                 if (r < 0)
1601                         return r;
1602         }
1603
1604
1605         return 0;
1606 }
1607
1608 static int source_dispatch(sd_event_source *s) {
1609         int r = 0;
1610
1611         assert(s);
1612         assert(s->pending || s->type == SOURCE_QUIT);
1613
1614         if (s->type != SOURCE_DEFER && s->type != SOURCE_QUIT) {
1615                 r = source_set_pending(s, false);
1616                 if (r < 0)
1617                         return r;
1618         }
1619
1620         if (s->enabled == SD_EVENT_ONESHOT) {
1621                 r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
1622                 if (r < 0)
1623                         return r;
1624         }
1625
1626         sd_event_source_ref(s);
1627
1628         switch (s->type) {
1629
1630         case SOURCE_IO:
1631                 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1632                 break;
1633
1634         case SOURCE_MONOTONIC:
1635                 r = s->time.callback(s, s->time.next, s->userdata);
1636                 break;
1637
1638         case SOURCE_REALTIME:
1639                 r = s->time.callback(s, s->time.next, s->userdata);
1640                 break;
1641
1642         case SOURCE_SIGNAL:
1643                 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1644                 break;
1645
1646         case SOURCE_CHILD:
1647                 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1648                 break;
1649
1650         case SOURCE_DEFER:
1651                 r = s->defer.callback(s, s->userdata);
1652                 break;
1653
1654         case SOURCE_QUIT:
1655                 r = s->quit.callback(s, s->userdata);
1656                 break;
1657         }
1658
1659         sd_event_source_unref(s);
1660
1661         return r;
1662 }
1663
1664 static int event_prepare(sd_event *e) {
1665         int r;
1666
1667         assert(e);
1668
1669         for (;;) {
1670                 sd_event_source *s;
1671
1672                 s = prioq_peek(e->prepare);
1673                 if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
1674                         break;
1675
1676                 s->prepare_iteration = e->iteration;
1677                 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1678                 if (r < 0)
1679                         return r;
1680
1681                 assert(s->prepare);
1682                 r = s->prepare(s, s->userdata);
1683                 if (r < 0)
1684                         return r;
1685
1686         }
1687
1688         return 0;
1689 }
1690
1691 static int dispatch_quit(sd_event *e) {
1692         sd_event_source *p;
1693         int r;
1694
1695         assert(e);
1696
1697         p = prioq_peek(e->quit);
1698         if (!p || p->enabled == SD_EVENT_OFF) {
1699                 e->state = SD_EVENT_FINISHED;
1700                 return 0;
1701         }
1702
1703         sd_event_ref(e);
1704         e->iteration++;
1705         e->state = SD_EVENT_QUITTING;
1706
1707         r = source_dispatch(p);
1708
1709         e->state = SD_EVENT_PASSIVE;
1710         sd_event_unref(e);
1711
1712         return r;
1713 }
1714
1715 static sd_event_source* event_next_pending(sd_event *e) {
1716         sd_event_source *p;
1717
1718         assert(e);
1719
1720         p = prioq_peek(e->pending);
1721         if (!p)
1722                 return NULL;
1723
1724         if (p->enabled == SD_EVENT_OFF)
1725                 return NULL;
1726
1727         return p;
1728 }
1729
1730 _public_ int sd_event_run(sd_event *e, uint64_t timeout) {
1731         struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1732         sd_event_source *p;
1733         int r, i, m;
1734
1735         assert_return(e, -EINVAL);
1736         assert_return(!event_pid_changed(e), -ECHILD);
1737         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1738         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1739
1740         if (e->quit_requested)
1741                 return dispatch_quit(e);
1742
1743         sd_event_ref(e);
1744         e->iteration++;
1745         e->state = SD_EVENT_RUNNING;
1746
1747         r = event_prepare(e);
1748         if (r < 0)
1749                 goto finish;
1750
1751         if (event_next_pending(e) || e->need_process_child)
1752                 timeout = 0;
1753
1754         if (timeout > 0) {
1755                 r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1756                 if (r < 0)
1757                         goto finish;
1758
1759                 r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1760                 if (r < 0)
1761                         goto finish;
1762         }
1763
1764         m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1765                        timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1766         if (m < 0) {
1767                 r = errno == EAGAIN || errno == EINTR ? 0 : -errno;
1768                 goto finish;
1769         }
1770
1771         dual_timestamp_get(&e->timestamp);
1772
1773         for (i = 0; i < m; i++) {
1774
1775                 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1776                         r = flush_timer(e, e->monotonic_fd, ev_queue[i].events, &e->monotonic_next);
1777                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1778                         r = flush_timer(e, e->realtime_fd, ev_queue[i].events, &e->realtime_next);
1779                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1780                         r = process_signal(e, ev_queue[i].events);
1781                 else
1782                         r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1783
1784                 if (r < 0)
1785                         goto finish;
1786         }
1787
1788         r = process_timer(e, e->timestamp.monotonic, e->monotonic_earliest, e->monotonic_latest);
1789         if (r < 0)
1790                 goto finish;
1791
1792         r = process_timer(e, e->timestamp.realtime, e->realtime_earliest, e->realtime_latest);
1793         if (r < 0)
1794                 goto finish;
1795
1796         if (e->need_process_child) {
1797                 r = process_child(e);
1798                 if (r < 0)
1799                         goto finish;
1800         }
1801
1802         p = event_next_pending(e);
1803         if (!p) {
1804                 r = 0;
1805                 goto finish;
1806         }
1807
1808         r = source_dispatch(p);
1809
1810 finish:
1811         e->state = SD_EVENT_PASSIVE;
1812         sd_event_unref(e);
1813
1814         return r;
1815 }
1816
1817 _public_ int sd_event_loop(sd_event *e) {
1818         int r;
1819
1820         assert_return(e, -EINVAL);
1821         assert_return(!event_pid_changed(e), -ECHILD);
1822         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1823
1824         sd_event_ref(e);
1825
1826         while (e->state != SD_EVENT_FINISHED) {
1827                 r = sd_event_run(e, (uint64_t) -1);
1828                 if (r < 0)
1829                         goto finish;
1830         }
1831
1832         r = 0;
1833
1834 finish:
1835         sd_event_unref(e);
1836         return r;
1837 }
1838
1839 _public_ int sd_event_get_state(sd_event *e) {
1840         assert_return(e, -EINVAL);
1841         assert_return(!event_pid_changed(e), -ECHILD);
1842
1843         return e->state;
1844 }
1845
1846 _public_ int sd_event_get_quit(sd_event *e) {
1847         assert_return(e, -EINVAL);
1848         assert_return(!event_pid_changed(e), -ECHILD);
1849
1850         return e->quit_requested;
1851 }
1852
1853 _public_ int sd_event_request_quit(sd_event *e) {
1854         assert_return(e, -EINVAL);
1855         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1856         assert_return(!event_pid_changed(e), -ECHILD);
1857
1858         e->quit_requested = true;
1859         return 0;
1860 }
1861
1862 _public_ int sd_event_get_now_realtime(sd_event *e, uint64_t *usec) {
1863         assert_return(e, -EINVAL);
1864         assert_return(usec, -EINVAL);
1865         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1866         assert_return(!event_pid_changed(e), -ECHILD);
1867
1868         *usec = e->timestamp.realtime;
1869         return 0;
1870 }
1871
1872 _public_ int sd_event_get_now_monotonic(sd_event *e, uint64_t *usec) {
1873         assert_return(e, -EINVAL);
1874         assert_return(usec, -EINVAL);
1875         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1876         assert_return(!event_pid_changed(e), -ECHILD);
1877
1878         *usec = e->timestamp.monotonic;
1879         return 0;
1880 }
1881
1882 _public_ int sd_event_default(sd_event **ret) {
1883
1884         static __thread sd_event *default_event = NULL;
1885         sd_event *e;
1886         int r;
1887
1888         if (!ret)
1889                 return !!default_event;
1890
1891         if (default_event) {
1892                 *ret = sd_event_ref(default_event);
1893                 return 0;
1894         }
1895
1896         r = sd_event_new(&e);
1897         if (r < 0)
1898                 return r;
1899
1900         e->default_event_ptr = &default_event;
1901         e->tid = gettid();
1902         default_event = e;
1903
1904         *ret = e;
1905         return 1;
1906 }
1907
1908 _public_ int sd_event_get_tid(sd_event *e, pid_t *tid) {
1909         assert_return(e, -EINVAL);
1910         assert_return(tid, -EINVAL);
1911         assert_return(!event_pid_changed(e), -ECHILD);
1912
1913         if (e->tid != 0) {
1914                 *tid = e->tid;
1915                 return 0;
1916         }
1917
1918         return -ENXIO;
1919 }