chiark / gitweb /
event: add sd_event_source_get_child_pid() call to query the PID of a child event...
[elogind.git] / src / libsystemd-bus / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25
26 #include "macro.h"
27 #include "prioq.h"
28 #include "hashmap.h"
29 #include "util.h"
30 #include "time-util.h"
31 #include "sd-id128.h"
32
33 #include "sd-event.h"
34
35 #define EPOLL_QUEUE_MAX 64
36 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
37
38 typedef enum EventSourceType {
39         SOURCE_IO,
40         SOURCE_MONOTONIC,
41         SOURCE_REALTIME,
42         SOURCE_SIGNAL,
43         SOURCE_CHILD,
44         SOURCE_DEFER,
45         SOURCE_QUIT
46 } EventSourceType;
47
48 struct sd_event_source {
49         unsigned n_ref;
50
51         sd_event *event;
52         void *userdata;
53         sd_prepare_handler_t prepare;
54
55         EventSourceType type:4;
56         int enabled:3;
57         bool pending:1;
58
59         int priority;
60         unsigned pending_index;
61         unsigned prepare_index;
62         unsigned pending_iteration;
63         unsigned prepare_iteration;
64
65         union {
66                 struct {
67                         sd_io_handler_t callback;
68                         int fd;
69                         uint32_t events;
70                         uint32_t revents;
71                         bool registered:1;
72                 } io;
73                 struct {
74                         sd_time_handler_t callback;
75                         usec_t next, accuracy;
76                         unsigned earliest_index;
77                         unsigned latest_index;
78                 } time;
79                 struct {
80                         sd_signal_handler_t callback;
81                         struct signalfd_siginfo siginfo;
82                         int sig;
83                 } signal;
84                 struct {
85                         sd_child_handler_t callback;
86                         siginfo_t siginfo;
87                         pid_t pid;
88                         int options;
89                 } child;
90                 struct {
91                         sd_defer_handler_t callback;
92                 } defer;
93                 struct {
94                         sd_quit_handler_t callback;
95                         unsigned prioq_index;
96                 } quit;
97         };
98 };
99
100 struct sd_event {
101         unsigned n_ref;
102
103         int epoll_fd;
104         int signal_fd;
105         int realtime_fd;
106         int monotonic_fd;
107
108         Prioq *pending;
109         Prioq *prepare;
110
111         /* For both clocks we maintain two priority queues each, one
112          * ordered for the earliest times the events may be
113          * dispatched, and one ordered by the latest times they must
114          * have been dispatched. The range between the top entries in
115          * the two prioqs is the time window we can freely schedule
116          * wakeups in */
117         Prioq *monotonic_earliest;
118         Prioq *monotonic_latest;
119         Prioq *realtime_earliest;
120         Prioq *realtime_latest;
121
122         usec_t realtime_next, monotonic_next;
123         usec_t perturb;
124
125         sigset_t sigset;
126         sd_event_source **signal_sources;
127
128         Hashmap *child_sources;
129         unsigned n_enabled_child_sources;
130
131         Prioq *quit;
132
133         pid_t original_pid;
134
135         unsigned iteration;
136         int state;
137
138         bool quit_requested:1;
139         bool need_process_child:1;
140 };
141
142 static int pending_prioq_compare(const void *a, const void *b) {
143         const sd_event_source *x = a, *y = b;
144
145         assert(x->pending);
146         assert(y->pending);
147
148         /* Enabled ones first */
149         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
150                 return -1;
151         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
152                 return 1;
153
154         /* Lower priority values first */
155         if (x->priority < y->priority)
156                 return -1;
157         if (x->priority > y->priority)
158                 return 1;
159
160         /* Older entries first */
161         if (x->pending_iteration < y->pending_iteration)
162                 return -1;
163         if (x->pending_iteration > y->pending_iteration)
164                 return 1;
165
166         /* Stability for the rest */
167         if (x < y)
168                 return -1;
169         if (x > y)
170                 return 1;
171
172         return 0;
173 }
174
175 static int prepare_prioq_compare(const void *a, const void *b) {
176         const sd_event_source *x = a, *y = b;
177
178         assert(x->prepare);
179         assert(y->prepare);
180
181         /* Move most recently prepared ones last, so that we can stop
182          * preparing as soon as we hit one that has already been
183          * prepared in the current iteration */
184         if (x->prepare_iteration < y->prepare_iteration)
185                 return -1;
186         if (x->prepare_iteration > y->prepare_iteration)
187                 return 1;
188
189         /* Enabled ones first */
190         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
191                 return -1;
192         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
193                 return 1;
194
195         /* Lower priority values first */
196         if (x->priority < y->priority)
197                 return -1;
198         if (x->priority > y->priority)
199                 return 1;
200
201         /* Stability for the rest */
202         if (x < y)
203                 return -1;
204         if (x > y)
205                 return 1;
206
207         return 0;
208 }
209
210 static int earliest_time_prioq_compare(const void *a, const void *b) {
211         const sd_event_source *x = a, *y = b;
212
213         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
214         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
215
216         /* Enabled ones first */
217         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
218                 return -1;
219         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
220                 return 1;
221
222         /* Move the pending ones to the end */
223         if (!x->pending && y->pending)
224                 return -1;
225         if (x->pending && !y->pending)
226                 return 1;
227
228         /* Order by time */
229         if (x->time.next < y->time.next)
230                 return -1;
231         if (x->time.next > y->time.next)
232                 return -1;
233
234         /* Stability for the rest */
235         if (x < y)
236                 return -1;
237         if (x > y)
238                 return 1;
239
240         return 0;
241 }
242
243 static int latest_time_prioq_compare(const void *a, const void *b) {
244         const sd_event_source *x = a, *y = b;
245
246         assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
247                (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
248
249         /* Enabled ones first */
250         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
251                 return -1;
252         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
253                 return 1;
254
255         /* Move the pending ones to the end */
256         if (!x->pending && y->pending)
257                 return -1;
258         if (x->pending && !y->pending)
259                 return 1;
260
261         /* Order by time */
262         if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
263                 return -1;
264         if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
265                 return -1;
266
267         /* Stability for the rest */
268         if (x < y)
269                 return -1;
270         if (x > y)
271                 return 1;
272
273         return 0;
274 }
275
276 static int quit_prioq_compare(const void *a, const void *b) {
277         const sd_event_source *x = a, *y = b;
278
279         assert(x->type == SOURCE_QUIT);
280         assert(y->type == SOURCE_QUIT);
281
282         /* Enabled ones first */
283         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
284                 return -1;
285         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
286                 return 1;
287
288         /* Lower priority values first */
289         if (x->priority < y->priority)
290                 return -1;
291         if (x->priority > y->priority)
292                 return 1;
293
294         /* Stability for the rest */
295         if (x < y)
296                 return -1;
297         if (x > y)
298                 return 1;
299
300         return 0;
301 }
302
303 static void event_free(sd_event *e) {
304         assert(e);
305
306         if (e->epoll_fd >= 0)
307                 close_nointr_nofail(e->epoll_fd);
308
309         if (e->signal_fd >= 0)
310                 close_nointr_nofail(e->signal_fd);
311
312         if (e->realtime_fd >= 0)
313                 close_nointr_nofail(e->realtime_fd);
314
315         if (e->monotonic_fd >= 0)
316                 close_nointr_nofail(e->monotonic_fd);
317
318         prioq_free(e->pending);
319         prioq_free(e->prepare);
320         prioq_free(e->monotonic_earliest);
321         prioq_free(e->monotonic_latest);
322         prioq_free(e->realtime_earliest);
323         prioq_free(e->realtime_latest);
324         prioq_free(e->quit);
325
326         free(e->signal_sources);
327
328         hashmap_free(e->child_sources);
329         free(e);
330 }
331
332 int sd_event_new(sd_event** ret) {
333         sd_event *e;
334         int r;
335
336         if (!ret)
337                 return -EINVAL;
338
339         e = new0(sd_event, 1);
340         if (!e)
341                 return -ENOMEM;
342
343         e->n_ref = 1;
344         e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
345         e->realtime_next = e->monotonic_next = (usec_t) -1;
346         e->original_pid = getpid();
347
348         assert_se(sigemptyset(&e->sigset) == 0);
349
350         e->pending = prioq_new(pending_prioq_compare);
351         if (!e->pending) {
352                 r = -ENOMEM;
353                 goto fail;
354         }
355
356         e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
357         if (e->epoll_fd < 0) {
358                 r = -errno;
359                 goto fail;
360         }
361
362         *ret = e;
363         return 0;
364
365 fail:
366         event_free(e);
367         return r;
368 }
369
370 sd_event* sd_event_ref(sd_event *e) {
371         if (!e)
372                 return NULL;
373
374         assert(e->n_ref >= 1);
375         e->n_ref++;
376
377         return e;
378 }
379
380 sd_event* sd_event_unref(sd_event *e) {
381         if (!e)
382                 return NULL;
383
384         assert(e->n_ref >= 1);
385         e->n_ref--;
386
387         if (e->n_ref <= 0)
388                 event_free(e);
389
390         return NULL;
391 }
392
393 static bool event_pid_changed(sd_event *e) {
394         assert(e);
395
396         /* We don't support people creating am event loop and keeping
397          * it around over a fork(). Let's complain. */
398
399         return e->original_pid != getpid();
400 }
401
402 static int source_io_unregister(sd_event_source *s) {
403         int r;
404
405         assert(s);
406         assert(s->type == SOURCE_IO);
407
408         if (!s->io.registered)
409                 return 0;
410
411         r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
412         if (r < 0)
413                 return -errno;
414
415         s->io.registered = false;
416         return 0;
417 }
418
419 static int source_io_register(sd_event_source *s, int enabled, uint32_t events) {
420         struct epoll_event ev = {};
421         int r;
422
423         assert(s);
424         assert(s->type == SOURCE_IO);
425         assert(enabled != SD_EVENT_OFF);
426
427         ev.events = events;
428         ev.data.ptr = s;
429
430         if (enabled == SD_EVENT_ONESHOT)
431                 ev.events |= EPOLLONESHOT;
432
433         if (s->io.registered)
434                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
435         else
436                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
437
438         if (r < 0)
439                 return -errno;
440
441         s->io.registered = true;
442
443         return 0;
444 }
445
446 static void source_free(sd_event_source *s) {
447         assert(s);
448
449         if (s->event) {
450                 switch (s->type) {
451
452                 case SOURCE_IO:
453                         if (s->io.fd >= 0)
454                                 source_io_unregister(s);
455
456                         break;
457
458                 case SOURCE_MONOTONIC:
459                         prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
460                         prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
461                         break;
462
463                 case SOURCE_REALTIME:
464                         prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
465                         prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
466                         break;
467
468                 case SOURCE_SIGNAL:
469                         if (s->signal.sig > 0) {
470                                 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)
471                                         assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
472
473                                 if (s->event->signal_sources)
474                                         s->event->signal_sources[s->signal.sig] = NULL;
475                         }
476
477                         break;
478
479                 case SOURCE_CHILD:
480                         if (s->child.pid > 0) {
481                                 if (s->enabled != SD_EVENT_OFF) {
482                                         assert(s->event->n_enabled_child_sources > 0);
483                                         s->event->n_enabled_child_sources--;
484                                 }
485
486                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
487                                         assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
488
489                                 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
490                         }
491
492                         break;
493
494                 case SOURCE_QUIT:
495                         prioq_remove(s->event->quit, s, &s->quit.prioq_index);
496                         break;
497                 }
498
499                 if (s->pending)
500                         prioq_remove(s->event->pending, s, &s->pending_index);
501
502                 if (s->prepare)
503                         prioq_remove(s->event->prepare, s, &s->prepare_index);
504
505                 sd_event_unref(s->event);
506         }
507
508         free(s);
509 }
510
511 static int source_set_pending(sd_event_source *s, bool b) {
512         int r;
513
514         assert(s);
515         assert(s->type != SOURCE_QUIT);
516
517         if (s->pending == b)
518                 return 0;
519
520         s->pending = b;
521
522         if (b) {
523                 s->pending_iteration = s->event->iteration;
524
525                 r = prioq_put(s->event->pending, s, &s->pending_index);
526                 if (r < 0) {
527                         s->pending = false;
528                         return r;
529                 }
530         } else
531                 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
532
533         return 0;
534 }
535
536 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
537         sd_event_source *s;
538
539         assert(e);
540
541         s = new0(sd_event_source, 1);
542         if (!s)
543                 return NULL;
544
545         s->n_ref = 1;
546         s->event = sd_event_ref(e);
547         s->type = type;
548         s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
549
550         return s;
551 }
552
553 int sd_event_add_io(
554                 sd_event *e,
555                 int fd,
556                 uint32_t events,
557                 sd_io_handler_t callback,
558                 void *userdata,
559                 sd_event_source **ret) {
560
561         sd_event_source *s;
562         int r;
563
564         if (!e)
565                 return -EINVAL;
566         if (fd < 0)
567                 return -EINVAL;
568         if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP))
569                 return -EINVAL;
570         if (!callback)
571                 return -EINVAL;
572         if (!ret)
573                 return -EINVAL;
574         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
575         if (event_pid_changed(e))
576                 return -ECHILD;
577
578         s = source_new(e, SOURCE_IO);
579         if (!s)
580                 return -ENOMEM;
581
582         s->io.fd = fd;
583         s->io.events = events;
584         s->io.callback = callback;
585         s->userdata = userdata;
586         s->enabled = SD_EVENT_ON;
587
588         r = source_io_register(s, s->enabled, events);
589         if (r < 0) {
590                 source_free(s);
591                 return -errno;
592         }
593
594         *ret = s;
595         return 0;
596 }
597
598 static int event_setup_timer_fd(
599                 sd_event *e,
600                 EventSourceType type,
601                 int *timer_fd,
602                 clockid_t id) {
603
604         struct epoll_event ev = {};
605         int r, fd;
606         sd_id128_t bootid;
607
608         assert(e);
609         assert(timer_fd);
610
611         if (_likely_(*timer_fd >= 0))
612                 return 0;
613
614         fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
615         if (fd < 0)
616                 return -errno;
617
618         ev.events = EPOLLIN;
619         ev.data.ptr = INT_TO_PTR(type);
620
621         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
622         if (r < 0) {
623                 close_nointr_nofail(fd);
624                 return -errno;
625         }
626
627         /* When we sleep for longer, we try to realign the wakeup to
628            the same time wihtin each second, so that events all across
629            the system can be coalesced into a single CPU
630            wakeup. However, let's take some system-specific randomness
631            for this value, so that in a network of systems with synced
632            clocks timer events are distributed a bit. Here, we
633            calculate a perturbation usec offset from the boot ID. */
634
635         if (sd_id128_get_boot(&bootid) >= 0)
636                 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_SEC;
637
638         *timer_fd = fd;
639         return 0;
640 }
641
642 static int event_add_time_internal(
643                 sd_event *e,
644                 EventSourceType type,
645                 int *timer_fd,
646                 clockid_t id,
647                 Prioq **earliest,
648                 Prioq **latest,
649                 uint64_t usec,
650                 uint64_t accuracy,
651                 sd_time_handler_t callback,
652                 void *userdata,
653                 sd_event_source **ret) {
654
655         sd_event_source *s;
656         int r;
657
658         if (!e)
659                 return -EINVAL;
660         if (!callback)
661                 return -EINVAL;
662         if (!ret)
663                 return -EINVAL;
664         if (usec == (uint64_t) -1)
665                 return -EINVAL;
666         if (accuracy == (uint64_t) -1)
667                 return -EINVAL;
668         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
669         if (event_pid_changed(e))
670                 return -ECHILD;
671
672         assert(timer_fd);
673         assert(earliest);
674         assert(latest);
675
676         if (!*earliest) {
677                 *earliest = prioq_new(earliest_time_prioq_compare);
678                 if (!*earliest)
679                         return -ENOMEM;
680         }
681
682         if (!*latest) {
683                 *latest = prioq_new(latest_time_prioq_compare);
684                 if (!*latest)
685                         return -ENOMEM;
686         }
687
688         if (*timer_fd < 0) {
689                 r = event_setup_timer_fd(e, type, timer_fd, id);
690                 if (r < 0)
691                         return r;
692         }
693
694         s = source_new(e, type);
695         if (!s)
696                 return -ENOMEM;
697
698         s->time.next = usec;
699         s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
700         s->time.callback = callback;
701         s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
702         s->userdata = userdata;
703         s->enabled = SD_EVENT_ONESHOT;
704
705         r = prioq_put(*earliest, s, &s->time.earliest_index);
706         if (r < 0)
707                 goto fail;
708
709         r = prioq_put(*latest, s, &s->time.latest_index);
710         if (r < 0)
711                 goto fail;
712
713         *ret = s;
714         return 0;
715
716 fail:
717         source_free(s);
718         return r;
719 }
720
721 int sd_event_add_monotonic(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
722         return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
723 }
724
725 int sd_event_add_realtime(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
726         return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
727 }
728
729 static int event_update_signal_fd(sd_event *e) {
730         struct epoll_event ev = {};
731         bool add_to_epoll;
732         int r;
733
734         assert(e);
735
736         add_to_epoll = e->signal_fd < 0;
737
738         r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
739         if (r < 0)
740                 return -errno;
741
742         e->signal_fd = r;
743
744         if (!add_to_epoll)
745                 return 0;
746
747         ev.events = EPOLLIN;
748         ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
749
750         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
751         if (r < 0) {
752                 close_nointr_nofail(e->signal_fd);
753                 e->signal_fd = -1;
754
755                 return -errno;
756         }
757
758         return 0;
759 }
760
761 int sd_event_add_signal(sd_event *e, int sig, sd_signal_handler_t callback, void *userdata, sd_event_source **ret) {
762         sd_event_source *s;
763         int r;
764
765         if (!e)
766                 return -EINVAL;
767         if (sig <= 0)
768                 return -EINVAL;
769         if (sig >= _NSIG)
770                 return -EINVAL;
771         if (!callback)
772                 return -EINVAL;
773         if (!ret)
774                 return -EINVAL;
775         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
776         if (event_pid_changed(e))
777                 return -ECHILD;
778
779         if (!e->signal_sources) {
780                 e->signal_sources = new0(sd_event_source*, _NSIG);
781                 if (!e->signal_sources)
782                         return -ENOMEM;
783         } else if (e->signal_sources[sig])
784                 return -EBUSY;
785
786         s = source_new(e, SOURCE_SIGNAL);
787         if (!s)
788                 return -ENOMEM;
789
790         s->signal.sig = sig;
791         s->signal.callback = callback;
792         s->userdata = userdata;
793         s->enabled = SD_EVENT_ON;
794
795         e->signal_sources[sig] = s;
796         assert_se(sigaddset(&e->sigset, sig) == 0);
797
798         if (sig != SIGCHLD || e->n_enabled_child_sources == 0) {
799                 r = event_update_signal_fd(e);
800                 if (r < 0) {
801                         source_free(s);
802                         return r;
803                 }
804         }
805
806         *ret = s;
807         return 0;
808 }
809
810 int sd_event_add_child(sd_event *e, pid_t pid, int options, sd_child_handler_t callback, void *userdata, sd_event_source **ret) {
811         sd_event_source *s;
812         int r;
813
814         if (!e)
815                 return -EINVAL;
816         if (pid <= 1)
817                 return -EINVAL;
818         if (options & ~(WEXITED|WSTOPPED|WCONTINUED))
819                 return -EINVAL;
820         if (!callback)
821                 return -EINVAL;
822         if (!ret)
823                 return -EINVAL;
824         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
825         if (event_pid_changed(e))
826                 return -ECHILD;
827
828         r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
829         if (r < 0)
830                 return r;
831
832         if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
833                 return -EBUSY;
834
835         s = source_new(e, SOURCE_CHILD);
836         if (!s)
837                 return -ENOMEM;
838
839         s->child.pid = pid;
840         s->child.options = options;
841         s->child.callback = callback;
842         s->userdata = userdata;
843         s->enabled = SD_EVENT_ONESHOT;
844
845         r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
846         if (r < 0) {
847                 source_free(s);
848                 return r;
849         }
850
851         e->n_enabled_child_sources ++;
852
853         assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
854
855         if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
856                 r = event_update_signal_fd(e);
857                 if (r < 0) {
858                         source_free(s);
859                         return -errno;
860                 }
861         }
862
863         e->need_process_child = true;
864
865         *ret = s;
866         return 0;
867 }
868
869 int sd_event_add_defer(sd_event *e, sd_defer_handler_t callback, void *userdata, sd_event_source **ret) {
870         sd_event_source *s;
871         int r;
872
873         if (!e)
874                 return -EINVAL;
875         if (!ret)
876                 return -EINVAL;
877         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
878         if (event_pid_changed(e))
879                 return -ECHILD;
880
881         s = source_new(e, SOURCE_DEFER);
882         if (!s)
883                 return -ENOMEM;
884
885         s->defer.callback = callback;
886         s->userdata = userdata;
887         s->enabled = SD_EVENT_ONESHOT;
888
889         r = source_set_pending(s, true);
890         if (r < 0) {
891                 source_free(s);
892                 return r;
893         }
894
895         *ret = s;
896         return 0;
897 }
898
899 int sd_event_add_quit(sd_event *e, sd_quit_handler_t callback, void *userdata, sd_event_source **ret) {
900         sd_event_source *s;
901         int r;
902
903         assert_return(e, -EINVAL);
904         assert_return(callback, -EINVAL);
905         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
906         assert_return(!event_pid_changed(e), -ECHILD);
907
908         if (!e->quit) {
909                 e->quit = prioq_new(quit_prioq_compare);
910                 if (!e->quit)
911                         return -ENOMEM;
912         }
913
914         s = source_new(e, SOURCE_QUIT);
915         if (!s)
916                 return -ENOMEM;
917
918         s->quit.callback = callback;
919         s->userdata = userdata;
920         s->quit.prioq_index = PRIOQ_IDX_NULL;
921         s->enabled = SD_EVENT_ONESHOT;
922
923         r = prioq_put(s->event->quit, s, &s->quit.prioq_index);
924         if (r < 0) {
925                 source_free(s);
926                 return r;
927         }
928
929         *ret = s;
930         return 0;
931 }
932
933 sd_event_source* sd_event_source_ref(sd_event_source *s) {
934         assert_return(s, NULL);
935
936         assert(s->n_ref >= 1);
937         s->n_ref++;
938
939         return s;
940 }
941
942 sd_event_source* sd_event_source_unref(sd_event_source *s) {
943         assert_return(s, NULL);
944
945         assert(s->n_ref >= 1);
946         s->n_ref--;
947
948         if (s->n_ref <= 0)
949                 source_free(s);
950
951         return NULL;
952 }
953
954 sd_event *sd_event_get(sd_event_source *s) {
955         if (!s)
956                 return NULL;
957
958         return s->event;
959 }
960
961 int sd_event_source_get_pending(sd_event_source *s) {
962         if (!s)
963                 return -EINVAL;
964         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
965         if (event_pid_changed(s->event))
966                 return -ECHILD;
967
968         return s->pending;
969 }
970
971 int sd_event_source_get_io_fd(sd_event_source *s) {
972         if (!s)
973                 return -EINVAL;
974         if (s->type != SOURCE_IO)
975                 return -EDOM;
976         if (event_pid_changed(s->event))
977                 return -ECHILD;
978
979         return s->io.fd;
980 }
981
982 int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
983         if (!s)
984                 return -EINVAL;
985         if (s->type != SOURCE_IO)
986                 return -EDOM;
987         if (!events)
988                 return -EINVAL;
989         if (event_pid_changed(s->event))
990                 return -ECHILD;
991
992         *events = s->io.events;
993         return 0;
994 }
995
996 int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
997         int r;
998
999         if (!s)
1000                 return -EINVAL;
1001         if (!s->type != SOURCE_IO)
1002                 return -EDOM;
1003         if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP))
1004                 return -EINVAL;
1005         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1006         if (event_pid_changed(s->event))
1007                 return -ECHILD;
1008
1009         if (s->io.events == events)
1010                 return 0;
1011
1012         if (s->enabled != SD_EVENT_OFF) {
1013                 r = source_io_register(s, s->io.events, events);
1014                 if (r < 0)
1015                         return r;
1016         }
1017
1018         s->io.events = events;
1019
1020         return 0;
1021 }
1022
1023 int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1024         if (!s)
1025                 return -EINVAL;
1026         if (s->type != SOURCE_IO)
1027                 return -EDOM;
1028         if (!revents)
1029                 return -EINVAL;
1030         if (!s->pending)
1031                 return -ENODATA;
1032         if (event_pid_changed(s->event))
1033                 return -ECHILD;
1034
1035         *revents = s->io.revents;
1036         return 0;
1037 }
1038
1039 int sd_event_source_get_signal(sd_event_source *s) {
1040         if (!s)
1041                 return -EINVAL;
1042         if (s->type != SOURCE_SIGNAL)
1043                 return -EDOM;
1044         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1045         if (event_pid_changed(s->event))
1046                 return -ECHILD;
1047
1048         return s->signal.sig;
1049 }
1050
1051 int sd_event_source_get_priority(sd_event_source *s, int *priority) {
1052         if (!s)
1053                 return -EINVAL;
1054         if (event_pid_changed(s->event))
1055                 return -ECHILD;
1056
1057         return s->priority;
1058 }
1059
1060 int sd_event_source_set_priority(sd_event_source *s, int priority) {
1061         if (!s)
1062                 return -EINVAL;
1063         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1064         if (event_pid_changed(s->event))
1065                 return -ECHILD;
1066
1067         if (s->priority == priority)
1068                 return 0;
1069
1070         s->priority = priority;
1071
1072         if (s->pending)
1073                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1074
1075         if (s->prepare)
1076                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1077
1078         return 0;
1079 }
1080
1081 int sd_event_source_get_enabled(sd_event_source *s, int *m) {
1082         if (!s)
1083                 return -EINVAL;
1084         if (!m)
1085                 return -EINVAL;
1086         if (event_pid_changed(s->event))
1087                 return -ECHILD;
1088
1089         *m = s->enabled;
1090         return 0;
1091 }
1092
1093 int sd_event_source_set_enabled(sd_event_source *s, int m) {
1094         int r;
1095
1096         if (!s)
1097                 return -EINVAL;
1098         if (m != SD_EVENT_OFF && m != SD_EVENT_ON && !SD_EVENT_ONESHOT)
1099                 return -EINVAL;
1100         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1101         if (event_pid_changed(s->event))
1102                 return -ECHILD;
1103
1104         if (s->enabled == m)
1105                 return 0;
1106
1107         if (m == SD_EVENT_OFF) {
1108
1109                 switch (s->type) {
1110
1111                 case SOURCE_IO:
1112                         r = source_io_unregister(s);
1113                         if (r < 0)
1114                                 return r;
1115
1116                         s->enabled = m;
1117                         break;
1118
1119                 case SOURCE_MONOTONIC:
1120                         s->enabled = m;
1121                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1122                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1123                         break;
1124
1125                 case SOURCE_REALTIME:
1126                         s->enabled = m;
1127                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1128                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1129                         break;
1130
1131                 case SOURCE_SIGNAL:
1132                         s->enabled = m;
1133                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1134                                 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1135                                 event_update_signal_fd(s->event);
1136                         }
1137
1138                         break;
1139
1140                 case SOURCE_CHILD:
1141                         s->enabled = m;
1142
1143                         assert(s->event->n_enabled_child_sources > 0);
1144                         s->event->n_enabled_child_sources--;
1145
1146                         if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1147                                 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1148                                 event_update_signal_fd(s->event);
1149                         }
1150
1151                         break;
1152
1153                 case SOURCE_DEFER:
1154                 case SOURCE_QUIT:
1155                         s->enabled = m;
1156                         break;
1157                 }
1158
1159         } else {
1160                 switch (s->type) {
1161
1162                 case SOURCE_IO:
1163                         r = source_io_register(s, m, s->io.events);
1164                         if (r < 0)
1165                                 return r;
1166
1167                         s->enabled = m;
1168                         break;
1169
1170                 case SOURCE_MONOTONIC:
1171                         s->enabled = m;
1172                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1173                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1174                         break;
1175
1176                 case SOURCE_REALTIME:
1177                         s->enabled = m;
1178                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1179                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1180                         break;
1181
1182                 case SOURCE_SIGNAL:
1183                         s->enabled = m;
1184
1185                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)  {
1186                                 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1187                                 event_update_signal_fd(s->event);
1188                         }
1189                         break;
1190
1191                 case SOURCE_CHILD:
1192                         s->enabled = m;
1193
1194                         if (s->enabled == SD_EVENT_OFF) {
1195                                 s->event->n_enabled_child_sources++;
1196
1197                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1198                                         assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1199                                         event_update_signal_fd(s->event);
1200                                 }
1201                         }
1202                         break;
1203
1204                 case SOURCE_DEFER:
1205                 case SOURCE_QUIT:
1206                         s->enabled = m;
1207                         break;
1208                 }
1209         }
1210
1211         if (s->pending)
1212                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1213
1214         if (s->prepare)
1215                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1216
1217         return 0;
1218 }
1219
1220 int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1221         if (!s)
1222                 return -EINVAL;
1223         if (!usec)
1224                 return -EINVAL;
1225         if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
1226                 return -EDOM;
1227         if (event_pid_changed(s->event))
1228                 return -ECHILD;
1229
1230         *usec = s->time.next;
1231         return 0;
1232 }
1233
1234 int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1235         if (!s)
1236                 return -EINVAL;
1237         if (usec == (uint64_t) -1)
1238                 return -EINVAL;
1239         if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
1240                 return -EDOM;
1241         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1242         if (event_pid_changed(s->event))
1243                 return -ECHILD;
1244
1245         if (s->time.next == usec)
1246                 return 0;
1247
1248         s->time.next = usec;
1249
1250         if (s->type == SOURCE_REALTIME) {
1251                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1252                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1253         } else {
1254                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1255                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1256         }
1257
1258         return 0;
1259 }
1260
1261 int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1262         if (!s)
1263                 return -EINVAL;
1264         if (s->type != SOURCE_MONOTONIC && s->type != SOURCE_REALTIME)
1265                 return -EDOM;
1266         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1267         if (event_pid_changed(s->event))
1268                 return -ECHILD;
1269
1270         if (usec == 0)
1271                 usec = DEFAULT_ACCURACY_USEC;
1272
1273         if (s->time.accuracy == usec)
1274                 return 0;
1275
1276         s->time.accuracy = usec;
1277
1278         if (s->type == SOURCE_REALTIME)
1279                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1280         else
1281                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1282
1283         return 0;
1284 }
1285
1286 int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1287         if (!s)
1288                 return -EINVAL;
1289         if (!usec)
1290                 return -EINVAL;
1291         if (s->type != SOURCE_MONOTONIC && s->type != SOURCE_REALTIME)
1292                 return -EDOM;
1293         if (event_pid_changed(s->event))
1294                 return -ECHILD;
1295
1296         *usec = s->time.accuracy;
1297         return 0;
1298 }
1299
1300 int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid) {
1301         assert_return(s, -EINVAL);
1302         assert_return(pid, -EINVAL);
1303         assert_return(s->type == SOURCE_CHILD, -EDOM);
1304         assert_return(!event_pid_changed(s->event), -ECHILD);
1305
1306         *pid = s->child.pid;
1307         return 0;
1308 }
1309
1310 int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback) {
1311         int r;
1312
1313         assert_return(s, -EINVAL);
1314         assert_return(s->type != SOURCE_QUIT, -EDOM);
1315         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1316         assert_return(!event_pid_changed(s->event), -ECHILD);
1317
1318         if (s->prepare == callback)
1319                 return 0;
1320
1321         if (callback && s->prepare) {
1322                 s->prepare = callback;
1323                 return 0;
1324         }
1325
1326         r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1327         if (r < 0)
1328                 return r;
1329
1330         s->prepare = callback;
1331
1332         if (callback) {
1333                 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1334                 if (r < 0)
1335                         return r;
1336         } else
1337                 prioq_remove(s->event->prepare, s, &s->prepare_index);
1338
1339         return 0;
1340 }
1341
1342 void* sd_event_source_get_userdata(sd_event_source *s) {
1343         assert_return(s, NULL);
1344
1345         return s->userdata;
1346 }
1347
1348 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1349         usec_t c;
1350         assert(e);
1351         assert(a <= b);
1352
1353         if (a <= 0)
1354                 return 0;
1355
1356         if (b <= a + 1)
1357                 return a;
1358
1359         /*
1360           Find a good time to wake up again between times a and b. We
1361           have two goals here:
1362
1363           a) We want to wake up as seldom as possible, hence prefer
1364              later times over earlier times.
1365
1366           b) But if we have to wake up, then let's make sure to
1367              dispatch as much as possible on the entire system.
1368
1369           We implement this by waking up everywhere at the same time
1370           within any given second if we can, synchronised via the
1371           perturbation value determined from the boot ID. If we can't,
1372           then we try to find the same spot in every a 250ms
1373           step. Otherwise, we pick the last possible time to wake up.
1374         */
1375
1376         c = (b / USEC_PER_SEC) * USEC_PER_SEC + e->perturb;
1377         if (c >= b) {
1378                 if (_unlikely_(c < USEC_PER_SEC))
1379                         return b;
1380
1381                 c -= USEC_PER_SEC;
1382         }
1383
1384         if (c >= a)
1385                 return c;
1386
1387         c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1388         if (c >= b) {
1389                 if (_unlikely_(c < USEC_PER_MSEC*250))
1390                         return b;
1391
1392                 c -= USEC_PER_MSEC*250;
1393         }
1394
1395         if (c >= a)
1396                 return c;
1397
1398         return b;
1399 }
1400
1401 static int event_arm_timer(
1402                 sd_event *e,
1403                 int timer_fd,
1404                 Prioq *earliest,
1405                 Prioq *latest,
1406                 usec_t *next) {
1407
1408         struct itimerspec its = {};
1409         sd_event_source *a, *b;
1410         usec_t t;
1411         int r;
1412
1413         assert_se(e);
1414         assert_se(next);
1415
1416         a = prioq_peek(earliest);
1417         if (!a || a->enabled == SD_EVENT_OFF)
1418                 return 0;
1419
1420         b = prioq_peek(latest);
1421         assert_se(b && b->enabled != SD_EVENT_OFF);
1422
1423         t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1424         if (*next == t)
1425                 return 0;
1426
1427         assert_se(timer_fd >= 0);
1428
1429         if (t == 0) {
1430                 /* We don' want to disarm here, just mean some time looooong ago. */
1431                 its.it_value.tv_sec = 0;
1432                 its.it_value.tv_nsec = 1;
1433         } else
1434                 timespec_store(&its.it_value, t);
1435
1436         r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1437         if (r < 0)
1438                 return r;
1439
1440         *next = t;
1441         return 0;
1442 }
1443
1444 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1445         assert(e);
1446         assert(s);
1447         assert(s->type == SOURCE_IO);
1448
1449         s->io.revents = events;
1450
1451         /*
1452            If this is a oneshot event source, then we added it to the
1453            epoll with EPOLLONESHOT, hence we know it's not registered
1454            anymore. We can save a syscall here...
1455         */
1456
1457         if (s->enabled == SD_EVENT_ONESHOT)
1458                 s->io.registered = false;
1459
1460         return source_set_pending(s, true);
1461 }
1462
1463 static int flush_timer(sd_event *e, int fd, uint32_t events) {
1464         uint64_t x;
1465         ssize_t ss;
1466
1467         assert(e);
1468         assert(fd >= 0);
1469
1470         if (events != EPOLLIN)
1471                 return -EIO;
1472
1473         ss = read(fd, &x, sizeof(x));
1474         if (ss < 0) {
1475                 if (errno == EAGAIN || errno == EINTR)
1476                         return 0;
1477
1478                 return -errno;
1479         }
1480
1481         if (ss != sizeof(x))
1482                 return -EIO;
1483
1484         return 0;
1485 }
1486
1487 static int process_timer(sd_event *e, usec_t n, Prioq *earliest, Prioq *latest) {
1488         sd_event_source *s;
1489         int r;
1490
1491         assert(e);
1492
1493         for (;;) {
1494                 s = prioq_peek(earliest);
1495                 if (!s ||
1496                     s->time.next > n ||
1497                     s->enabled == SD_EVENT_OFF ||
1498                     s->pending)
1499                         break;
1500
1501                 r = source_set_pending(s, true);
1502                 if (r < 0)
1503                         return r;
1504
1505                 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1506                 prioq_reshuffle(latest, s, &s->time.latest_index);
1507         }
1508
1509         return 0;
1510 }
1511
1512 static int process_child(sd_event *e) {
1513         sd_event_source *s;
1514         Iterator i;
1515         int r;
1516
1517         assert(e);
1518
1519         e->need_process_child = false;
1520
1521         /*
1522            So, this is ugly. We iteratively invoke waitid() with P_PID
1523            + WNOHANG for each PID we wait for, instead of using
1524            P_ALL. This is because we only want to get child
1525            information of very specific child processes, and not all
1526            of them. We might not have processed the SIGCHLD even of a
1527            previous invocation and we don't want to maintain a
1528            unbounded *per-child* event queue, hence we really don't
1529            want anything flushed out of the kernel's queue that we
1530            don't care about. Since this is O(n) this means that if you
1531            have a lot of processes you probably want to handle SIGCHLD
1532            yourself.
1533         */
1534
1535         HASHMAP_FOREACH(s, e->child_sources, i) {
1536                 assert(s->type == SOURCE_CHILD);
1537
1538                 if (s->pending)
1539                         continue;
1540
1541                 if (s->enabled == SD_EVENT_OFF)
1542                         continue;
1543
1544                 zero(s->child.siginfo);
1545                 r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options);
1546                 if (r < 0)
1547                         return -errno;
1548
1549                 if (s->child.siginfo.si_pid != 0) {
1550                         r = source_set_pending(s, true);
1551                         if (r < 0)
1552                                 return r;
1553                 }
1554         }
1555
1556         return 0;
1557 }
1558
1559 static int process_signal(sd_event *e, uint32_t events) {
1560         struct signalfd_siginfo si;
1561         bool read_one = false;
1562         ssize_t ss;
1563         int r;
1564
1565         assert(e);
1566
1567         if (events != EPOLLIN)
1568                 return -EIO;
1569
1570         for (;;) {
1571                 sd_event_source *s;
1572
1573                 ss = read(e->signal_fd, &si, sizeof(si));
1574                 if (ss < 0) {
1575                         if (errno == EAGAIN || errno == EINTR)
1576                                 return read_one;
1577
1578                         return -errno;
1579                 }
1580
1581                 if (ss != sizeof(si))
1582                         return -EIO;
1583
1584                 read_one = true;
1585
1586                 if (si.ssi_signo == SIGCHLD) {
1587                         r = process_child(e);
1588                         if (r < 0)
1589                                 return r;
1590                         if (r > 0 || !e->signal_sources[si.ssi_signo])
1591                                 continue;
1592                 } else {
1593                         s = e->signal_sources[si.ssi_signo];
1594                         if (!s)
1595                                 return -EIO;
1596                 }
1597
1598                 s->signal.siginfo = si;
1599                 r = source_set_pending(s, true);
1600                 if (r < 0)
1601                         return r;
1602         }
1603
1604
1605         return 0;
1606 }
1607
1608 static int source_dispatch(sd_event_source *s) {
1609         int r;
1610
1611         assert(s);
1612         assert(s->pending || s->type == SOURCE_QUIT);
1613
1614         if (s->type != SOURCE_DEFER && s->type != SOURCE_QUIT) {
1615                 r = source_set_pending(s, false);
1616                 if (r < 0)
1617                         return r;
1618         }
1619
1620         if (s->enabled == SD_EVENT_ONESHOT) {
1621                 r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
1622                 if (r < 0)
1623                         return r;
1624         }
1625
1626         switch (s->type) {
1627
1628         case SOURCE_IO:
1629                 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1630                 break;
1631
1632         case SOURCE_MONOTONIC:
1633                 r = s->time.callback(s, s->time.next, s->userdata);
1634                 break;
1635
1636         case SOURCE_REALTIME:
1637                 r = s->time.callback(s, s->time.next, s->userdata);
1638                 break;
1639
1640         case SOURCE_SIGNAL:
1641                 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1642                 break;
1643
1644         case SOURCE_CHILD:
1645                 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1646                 break;
1647
1648         case SOURCE_DEFER:
1649                 r = s->defer.callback(s, s->userdata);
1650                 break;
1651
1652         case SOURCE_QUIT:
1653                 r = s->quit.callback(s, s->userdata);
1654                 break;
1655         }
1656
1657         return r;
1658 }
1659
1660 static int event_prepare(sd_event *e) {
1661         int r;
1662
1663         assert(e);
1664
1665         for (;;) {
1666                 sd_event_source *s;
1667
1668                 s = prioq_peek(e->prepare);
1669                 if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
1670                         break;
1671
1672                 s->prepare_iteration = e->iteration;
1673                 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1674                 if (r < 0)
1675                         return r;
1676
1677                 assert(s->prepare);
1678                 r = s->prepare(s, s->userdata);
1679                 if (r < 0)
1680                         return r;
1681
1682         }
1683
1684         return 0;
1685 }
1686
1687 static int dispatch_quit(sd_event *e) {
1688         sd_event_source *p;
1689         int r;
1690
1691         assert(e);
1692
1693         p = prioq_peek(e->quit);
1694         if (!p || p->enabled == SD_EVENT_OFF) {
1695                 e->state = SD_EVENT_FINISHED;
1696                 return 0;
1697         }
1698
1699         sd_event_ref(e);
1700         e->iteration++;
1701         e->state = SD_EVENT_QUITTING;
1702
1703         r = source_dispatch(p);
1704
1705         e->state = SD_EVENT_PASSIVE;
1706         sd_event_unref(e);
1707
1708         return r;
1709 }
1710
1711 static sd_event_source* event_next_pending(sd_event *e) {
1712         sd_event_source *p;
1713
1714         assert(e);
1715
1716         p = prioq_peek(e->pending);
1717         if (!p)
1718                 return NULL;
1719
1720         if (p->enabled == SD_EVENT_OFF)
1721                 return NULL;
1722
1723         return p;
1724 }
1725
1726 int sd_event_run(sd_event *e, uint64_t timeout) {
1727         struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1728         sd_event_source *p;
1729         int r, i, m;
1730         dual_timestamp n;
1731
1732         assert_return(e, -EINVAL);
1733         assert_return(!event_pid_changed(e), -ECHILD);
1734         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1735         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1736
1737         if (e->quit_requested)
1738                 return dispatch_quit(e);
1739
1740         sd_event_ref(e);
1741         e->iteration++;
1742         e->state = SD_EVENT_RUNNING;
1743
1744         r = event_prepare(e);
1745         if (r < 0)
1746                 goto finish;
1747
1748         if (event_next_pending(e) || e->need_process_child)
1749                 timeout = 0;
1750
1751         if (timeout > 0) {
1752                 r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1753                 if (r < 0)
1754                         goto finish;
1755
1756                 r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1757                 if (r < 0)
1758                         goto finish;
1759         }
1760
1761         m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1762                        timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1763         if (m < 0) {
1764                 r = m;
1765                 goto finish;
1766         }
1767
1768         dual_timestamp_get(&n);
1769
1770         for (i = 0; i < m; i++) {
1771
1772                 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1773                         r = flush_timer(e, e->monotonic_fd, ev_queue[i].events);
1774                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1775                         r = flush_timer(e, e->realtime_fd, ev_queue[i].events);
1776                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1777                         r = process_signal(e, ev_queue[i].events);
1778                 else
1779                         r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1780
1781                 if (r < 0)
1782                         goto finish;
1783         }
1784
1785         r = process_timer(e, n.monotonic, e->monotonic_earliest, e->monotonic_latest);
1786         if (r < 0)
1787                 goto finish;
1788
1789         r = process_timer(e, n.realtime, e->realtime_earliest, e->realtime_latest);
1790         if (r < 0)
1791                 goto finish;
1792
1793         if (e->need_process_child) {
1794                 r = process_child(e);
1795                 if (r < 0)
1796                         goto finish;
1797         }
1798
1799         p = event_next_pending(e);
1800         if (!p) {
1801                 r = 0;
1802                 goto finish;
1803         }
1804
1805         r = source_dispatch(p);
1806
1807 finish:
1808         e->state = SD_EVENT_PASSIVE;
1809         sd_event_unref(e);
1810
1811         return r;
1812 }
1813
1814 int sd_event_loop(sd_event *e) {
1815         int r;
1816
1817         assert_return(e, -EINVAL);
1818         assert_return(!event_pid_changed(e), -ECHILD);
1819         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1820
1821         sd_event_ref(e);
1822
1823         while (e->state != SD_EVENT_FINISHED) {
1824                 r = sd_event_run(e, (uint64_t) -1);
1825                 if (r < 0)
1826                         goto finish;
1827         }
1828
1829         r = 0;
1830
1831 finish:
1832         sd_event_unref(e);
1833         return r;
1834 }
1835
1836 int sd_event_get_state(sd_event *e) {
1837         assert_return(e, -EINVAL);
1838         assert_return(!event_pid_changed(e), -ECHILD);
1839
1840         return e->state;
1841 }
1842
1843 int sd_event_get_quit(sd_event *e) {
1844         assert_return(e, -EINVAL);
1845         assert_return(!event_pid_changed(e), -ECHILD);
1846
1847         return e->quit_requested;
1848 }
1849
1850 int sd_event_request_quit(sd_event *e) {
1851         assert_return(e, -EINVAL);
1852         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1853         assert_return(!event_pid_changed(e), -ECHILD);
1854
1855         e->quit_requested = true;
1856         return 0;
1857 }