chiark / gitweb /
event: implement quit handlers
[elogind.git] / src / libsystemd-bus / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25
26 #include "macro.h"
27 #include "prioq.h"
28 #include "hashmap.h"
29 #include "util.h"
30 #include "time-util.h"
31 #include "sd-id128.h"
32
33 #include "sd-event.h"
34
35 #define EPOLL_QUEUE_MAX 64
36 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
37
38 typedef enum EventSourceType {
39         SOURCE_IO,
40         SOURCE_MONOTONIC,
41         SOURCE_REALTIME,
42         SOURCE_SIGNAL,
43         SOURCE_CHILD,
44         SOURCE_DEFER,
45         SOURCE_QUIT
46 } EventSourceType;
47
48 struct sd_event_source {
49         unsigned n_ref;
50
51         sd_event *event;
52         void *userdata;
53         sd_prepare_handler_t prepare;
54
55         EventSourceType type:4;
56         int mute:3;
57         bool pending:1;
58
59         int priority;
60         unsigned pending_index;
61         unsigned prepare_index;
62         unsigned pending_iteration;
63         unsigned prepare_iteration;
64
65         union {
66                 struct {
67                         sd_io_handler_t callback;
68                         int fd;
69                         uint32_t events;
70                         uint32_t revents;
71                         bool registered:1;
72                 } io;
73                 struct {
74                         sd_time_handler_t callback;
75                         usec_t next, accuracy;
76                         unsigned earliest_index;
77                         unsigned latest_index;
78                 } time;
79                 struct {
80                         sd_signal_handler_t callback;
81                         struct signalfd_siginfo siginfo;
82                         int sig;
83                 } signal;
84                 struct {
85                         sd_child_handler_t callback;
86                         siginfo_t siginfo;
87                         pid_t pid;
88                         int options;
89                 } child;
90                 struct {
91                         sd_defer_handler_t callback;
92                 } defer;
93                 struct {
94                         sd_quit_handler_t callback;
95                         unsigned prioq_index;
96                 } quit;
97         };
98 };
99
100 struct sd_event {
101         unsigned n_ref;
102
103         int epoll_fd;
104         int signal_fd;
105         int realtime_fd;
106         int monotonic_fd;
107
108         Prioq *pending;
109         Prioq *prepare;
110
111         /* For both clocks we maintain two priority queues each, one
112          * ordered for the earliest times the events may be
113          * dispatched, and one ordered by the latest times they must
114          * have been dispatched. The range between the top entries in
115          * the two prioqs is the time window we can freely schedule
116          * wakeups in */
117         Prioq *monotonic_earliest;
118         Prioq *monotonic_latest;
119         Prioq *realtime_earliest;
120         Prioq *realtime_latest;
121
122         usec_t realtime_next, monotonic_next;
123         usec_t perturb;
124
125         sigset_t sigset;
126         sd_event_source **signal_sources;
127
128         Hashmap *child_sources;
129         unsigned n_unmuted_child_sources;
130
131         Prioq *quit;
132
133         pid_t original_pid;
134
135         unsigned iteration;
136         int state;
137
138         bool quit_requested:1;
139         bool need_process_child:1;
140 };
141
142 static int pending_prioq_compare(const void *a, const void *b) {
143         const sd_event_source *x = a, *y = b;
144
145         assert(x->pending);
146         assert(y->pending);
147
148         /* Unmuted ones first */
149         if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
150                 return -1;
151         if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
152                 return 1;
153
154         /* Lower priority values first */
155         if (x->priority < y->priority)
156                 return -1;
157         if (x->priority > y->priority)
158                 return 1;
159
160         /* Older entries first */
161         if (x->pending_iteration < y->pending_iteration)
162                 return -1;
163         if (x->pending_iteration > y->pending_iteration)
164                 return 1;
165
166         /* Stability for the rest */
167         if (x < y)
168                 return -1;
169         if (x > y)
170                 return 1;
171
172         return 0;
173 }
174
175 static int prepare_prioq_compare(const void *a, const void *b) {
176         const sd_event_source *x = a, *y = b;
177
178         assert(x->prepare);
179         assert(y->prepare);
180
181         /* Move most recently prepared ones last, so that we can stop
182          * preparing as soon as we hit one that has already been
183          * prepared in the current iteration */
184         if (x->prepare_iteration < y->prepare_iteration)
185                 return -1;
186         if (x->prepare_iteration > y->prepare_iteration)
187                 return 1;
188
189         /* Unmuted ones first */
190         if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
191                 return -1;
192         if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
193                 return 1;
194
195         /* Lower priority values first */
196         if (x->priority < y->priority)
197                 return -1;
198         if (x->priority > y->priority)
199                 return 1;
200
201         /* Stability for the rest */
202         if (x < y)
203                 return -1;
204         if (x > y)
205                 return 1;
206
207         return 0;
208 }
209
210 static int earliest_time_prioq_compare(const void *a, const void *b) {
211         const sd_event_source *x = a, *y = b;
212
213         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
214         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
215
216         /* Unmuted ones first */
217         if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
218                 return -1;
219         if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
220                 return 1;
221
222         /* Move the pending ones to the end */
223         if (!x->pending && y->pending)
224                 return -1;
225         if (x->pending && !y->pending)
226                 return 1;
227
228         /* Order by time */
229         if (x->time.next < y->time.next)
230                 return -1;
231         if (x->time.next > y->time.next)
232                 return -1;
233
234         /* Stability for the rest */
235         if (x < y)
236                 return -1;
237         if (x > y)
238                 return 1;
239
240         return 0;
241 }
242
243 static int latest_time_prioq_compare(const void *a, const void *b) {
244         const sd_event_source *x = a, *y = b;
245
246         assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
247                (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
248
249         /* Unmuted ones first */
250         if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
251                 return -1;
252         if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
253                 return 1;
254
255         /* Move the pending ones to the end */
256         if (!x->pending && y->pending)
257                 return -1;
258         if (x->pending && !y->pending)
259                 return 1;
260
261         /* Order by time */
262         if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
263                 return -1;
264         if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
265                 return -1;
266
267         /* Stability for the rest */
268         if (x < y)
269                 return -1;
270         if (x > y)
271                 return 1;
272
273         return 0;
274 }
275
276 static int quit_prioq_compare(const void *a, const void *b) {
277         const sd_event_source *x = a, *y = b;
278
279         assert(x->type == SOURCE_QUIT);
280         assert(y->type == SOURCE_QUIT);
281
282         /* Unmuted ones first */
283         if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
284                 return -1;
285         if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
286                 return 1;
287
288         /* Lower priority values first */
289         if (x->priority < y->priority)
290                 return -1;
291         if (x->priority > y->priority)
292                 return 1;
293
294         /* Stability for the rest */
295         if (x < y)
296                 return -1;
297         if (x > y)
298                 return 1;
299
300         return 0;
301 }
302
303 static void event_free(sd_event *e) {
304         assert(e);
305
306         if (e->epoll_fd >= 0)
307                 close_nointr_nofail(e->epoll_fd);
308
309         if (e->signal_fd >= 0)
310                 close_nointr_nofail(e->signal_fd);
311
312         if (e->realtime_fd >= 0)
313                 close_nointr_nofail(e->realtime_fd);
314
315         if (e->monotonic_fd >= 0)
316                 close_nointr_nofail(e->monotonic_fd);
317
318         prioq_free(e->pending);
319         prioq_free(e->prepare);
320         prioq_free(e->monotonic_earliest);
321         prioq_free(e->monotonic_latest);
322         prioq_free(e->realtime_earliest);
323         prioq_free(e->realtime_latest);
324         prioq_free(e->quit);
325
326         free(e->signal_sources);
327
328         hashmap_free(e->child_sources);
329         free(e);
330 }
331
332 int sd_event_new(sd_event** ret) {
333         sd_event *e;
334         int r;
335
336         if (!ret)
337                 return -EINVAL;
338
339         e = new0(sd_event, 1);
340         if (!e)
341                 return -ENOMEM;
342
343         e->n_ref = 1;
344         e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
345         e->realtime_next = e->monotonic_next = (usec_t) -1;
346         e->original_pid = getpid();
347
348         assert_se(sigemptyset(&e->sigset) == 0);
349
350         e->pending = prioq_new(pending_prioq_compare);
351         if (!e->pending) {
352                 r = -ENOMEM;
353                 goto fail;
354         }
355
356         e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
357         if (e->epoll_fd < 0) {
358                 r = -errno;
359                 goto fail;
360         }
361
362         *ret = e;
363         return 0;
364
365 fail:
366         event_free(e);
367         return r;
368 }
369
370 sd_event* sd_event_ref(sd_event *e) {
371         if (!e)
372                 return NULL;
373
374         assert(e->n_ref >= 1);
375         e->n_ref++;
376
377         return e;
378 }
379
380 sd_event* sd_event_unref(sd_event *e) {
381         if (!e)
382                 return NULL;
383
384         assert(e->n_ref >= 1);
385         e->n_ref--;
386
387         if (e->n_ref <= 0)
388                 event_free(e);
389
390         return NULL;
391 }
392
393 static bool event_pid_changed(sd_event *e) {
394         assert(e);
395
396         /* We don't support people creating am event loop and keeping
397          * it around over a fork(). Let's complain. */
398
399         return e->original_pid != getpid();
400 }
401
402 static int source_io_unregister(sd_event_source *s) {
403         int r;
404
405         assert(s);
406         assert(s->type == SOURCE_IO);
407
408         if (!s->io.registered)
409                 return 0;
410
411         r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
412         if (r < 0)
413                 return -errno;
414
415         s->io.registered = false;
416         return 0;
417 }
418
419 static int source_io_register(sd_event_source *s, int mute, uint32_t events) {
420         struct epoll_event ev = {};
421         int r;
422
423         assert(s);
424         assert(s->type == SOURCE_IO);
425         assert(mute != SD_EVENT_MUTED);
426
427         ev.events = events;
428         ev.data.ptr = s;
429
430         if (mute == SD_EVENT_ONESHOT)
431                 ev.events |= EPOLLONESHOT;
432
433         if (s->io.registered)
434                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
435         else
436                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
437
438         if (r < 0)
439                 return -errno;
440
441         s->io.registered = true;
442
443         return 0;
444 }
445
446 static void source_free(sd_event_source *s) {
447         assert(s);
448
449         if (s->event) {
450                 switch (s->type) {
451
452                 case SOURCE_IO:
453                         if (s->io.fd >= 0)
454                                 source_io_unregister(s);
455
456                         break;
457
458                 case SOURCE_MONOTONIC:
459                         prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
460                         prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
461                         break;
462
463                 case SOURCE_REALTIME:
464                         prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
465                         prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
466                         break;
467
468                 case SOURCE_SIGNAL:
469                         if (s->signal.sig > 0) {
470                                 if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0)
471                                         assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
472
473                                 if (s->event->signal_sources)
474                                         s->event->signal_sources[s->signal.sig] = NULL;
475                         }
476
477                         break;
478
479                 case SOURCE_CHILD:
480                         if (s->child.pid > 0) {
481                                 if (s->mute != SD_EVENT_MUTED) {
482                                         assert(s->event->n_unmuted_child_sources > 0);
483                                         s->event->n_unmuted_child_sources--;
484                                 }
485
486                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
487                                         assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
488
489                                 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
490                         }
491
492                         break;
493
494                 case SOURCE_QUIT:
495                         prioq_remove(s->event->quit, s, &s->quit.prioq_index);
496                         break;
497                 }
498
499                 if (s->pending)
500                         prioq_remove(s->event->pending, s, &s->pending_index);
501
502                 if (s->prepare)
503                         prioq_remove(s->event->prepare, s, &s->prepare_index);
504
505                 sd_event_unref(s->event);
506         }
507
508         free(s);
509 }
510
511 static int source_set_pending(sd_event_source *s, bool b) {
512         int r;
513
514         assert(s);
515         assert(s->type != SOURCE_QUIT);
516
517         if (s->pending == b)
518                 return 0;
519
520         s->pending = b;
521
522         if (b) {
523                 s->pending_iteration = s->event->iteration;
524
525                 r = prioq_put(s->event->pending, s, &s->pending_index);
526                 if (r < 0) {
527                         s->pending = false;
528                         return r;
529                 }
530         } else
531                 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
532
533         return 0;
534 }
535
536 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
537         sd_event_source *s;
538
539         assert(e);
540
541         s = new0(sd_event_source, 1);
542         if (!s)
543                 return NULL;
544
545         s->n_ref = 1;
546         s->event = sd_event_ref(e);
547         s->type = type;
548         s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
549
550         return s;
551 }
552
553 int sd_event_add_io(
554                 sd_event *e,
555                 int fd,
556                 uint32_t events,
557                 sd_io_handler_t callback,
558                 void *userdata,
559                 sd_event_source **ret) {
560
561         sd_event_source *s;
562         int r;
563
564         if (!e)
565                 return -EINVAL;
566         if (fd < 0)
567                 return -EINVAL;
568         if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP))
569                 return -EINVAL;
570         if (!callback)
571                 return -EINVAL;
572         if (!ret)
573                 return -EINVAL;
574         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
575         if (event_pid_changed(e))
576                 return -ECHILD;
577
578         s = source_new(e, SOURCE_IO);
579         if (!s)
580                 return -ENOMEM;
581
582         s->io.fd = fd;
583         s->io.events = events;
584         s->io.callback = callback;
585         s->userdata = userdata;
586         s->mute = SD_EVENT_UNMUTED;
587
588         r = source_io_register(s, s->mute, events);
589         if (r < 0) {
590                 source_free(s);
591                 return -errno;
592         }
593
594         *ret = s;
595         return 0;
596 }
597
598 static int event_setup_timer_fd(
599                 sd_event *e,
600                 EventSourceType type,
601                 int *timer_fd,
602                 clockid_t id) {
603
604         struct epoll_event ev = {};
605         int r, fd;
606         sd_id128_t bootid;
607
608         assert(e);
609         assert(timer_fd);
610
611         if (_likely_(*timer_fd >= 0))
612                 return 0;
613
614         fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
615         if (fd < 0)
616                 return -errno;
617
618         ev.events = EPOLLIN;
619         ev.data.ptr = INT_TO_PTR(type);
620
621         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
622         if (r < 0) {
623                 close_nointr_nofail(fd);
624                 return -errno;
625         }
626
627         /* When we sleep for longer, we try to realign the wakeup to
628            the same time wihtin each second, so that events all across
629            the system can be coalesced into a single CPU
630            wakeup. However, let's take some system-specific randomness
631            for this value, so that in a network of systems with synced
632            clocks timer events are distributed a bit. Here, we
633            calculate a perturbation usec offset from the boot ID. */
634
635         if (sd_id128_get_boot(&bootid) >= 0)
636                 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_SEC;
637
638         *timer_fd = fd;
639         return 0;
640 }
641
642 static int event_add_time_internal(
643                 sd_event *e,
644                 EventSourceType type,
645                 int *timer_fd,
646                 clockid_t id,
647                 Prioq **earliest,
648                 Prioq **latest,
649                 uint64_t usec,
650                 uint64_t accuracy,
651                 sd_time_handler_t callback,
652                 void *userdata,
653                 sd_event_source **ret) {
654
655         sd_event_source *s;
656         int r;
657
658         if (!e)
659                 return -EINVAL;
660         if (!callback)
661                 return -EINVAL;
662         if (!ret)
663                 return -EINVAL;
664         if (usec == (uint64_t) -1)
665                 return -EINVAL;
666         if (accuracy == (uint64_t) -1)
667                 return -EINVAL;
668         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
669         if (event_pid_changed(e))
670                 return -ECHILD;
671
672         assert(timer_fd);
673         assert(earliest);
674         assert(latest);
675
676         if (!*earliest) {
677                 *earliest = prioq_new(earliest_time_prioq_compare);
678                 if (!*earliest)
679                         return -ENOMEM;
680         }
681
682         if (!*latest) {
683                 *latest = prioq_new(latest_time_prioq_compare);
684                 if (!*latest)
685                         return -ENOMEM;
686         }
687
688         if (*timer_fd < 0) {
689                 r = event_setup_timer_fd(e, type, timer_fd, id);
690                 if (r < 0)
691                         return r;
692         }
693
694         s = source_new(e, type);
695         if (!s)
696                 return -ENOMEM;
697
698         s->time.next = usec;
699         s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
700         s->time.callback = callback;
701         s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
702         s->userdata = userdata;
703         s->mute = SD_EVENT_ONESHOT;
704
705         r = prioq_put(*earliest, s, &s->time.earliest_index);
706         if (r < 0)
707                 goto fail;
708
709         r = prioq_put(*latest, s, &s->time.latest_index);
710         if (r < 0)
711                 goto fail;
712
713         *ret = s;
714         return 0;
715
716 fail:
717         source_free(s);
718         return r;
719 }
720
721 int sd_event_add_monotonic(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
722         return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
723 }
724
725 int sd_event_add_realtime(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
726         return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
727 }
728
729 static int event_update_signal_fd(sd_event *e) {
730         struct epoll_event ev = {};
731         bool add_to_epoll;
732         int r;
733
734         assert(e);
735
736         add_to_epoll = e->signal_fd < 0;
737
738         r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
739         if (r < 0)
740                 return -errno;
741
742         e->signal_fd = r;
743
744         if (!add_to_epoll)
745                 return 0;
746
747         ev.events = EPOLLIN;
748         ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
749
750         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
751         if (r < 0) {
752                 close_nointr_nofail(e->signal_fd);
753                 e->signal_fd = -1;
754
755                 return -errno;
756         }
757
758         return 0;
759 }
760
761 int sd_event_add_signal(sd_event *e, int sig, sd_signal_handler_t callback, void *userdata, sd_event_source **ret) {
762         sd_event_source *s;
763         int r;
764
765         if (!e)
766                 return -EINVAL;
767         if (sig <= 0)
768                 return -EINVAL;
769         if (sig >= _NSIG)
770                 return -EINVAL;
771         if (!callback)
772                 return -EINVAL;
773         if (!ret)
774                 return -EINVAL;
775         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
776         if (event_pid_changed(e))
777                 return -ECHILD;
778
779         if (!e->signal_sources) {
780                 e->signal_sources = new0(sd_event_source*, _NSIG);
781                 if (!e->signal_sources)
782                         return -ENOMEM;
783         } else if (e->signal_sources[sig])
784                 return -EBUSY;
785
786         s = source_new(e, SOURCE_SIGNAL);
787         if (!s)
788                 return -ENOMEM;
789
790         s->signal.sig = sig;
791         s->signal.callback = callback;
792         s->userdata = userdata;
793         s->mute = SD_EVENT_UNMUTED;
794
795         e->signal_sources[sig] = s;
796         assert_se(sigaddset(&e->sigset, sig) == 0);
797
798         if (sig != SIGCHLD || e->n_unmuted_child_sources == 0) {
799                 r = event_update_signal_fd(e);
800                 if (r < 0) {
801                         source_free(s);
802                         return r;
803                 }
804         }
805
806         *ret = s;
807         return 0;
808 }
809
810 int sd_event_add_child(sd_event *e, pid_t pid, int options, sd_child_handler_t callback, void *userdata, sd_event_source **ret) {
811         sd_event_source *s;
812         int r;
813
814         if (!e)
815                 return -EINVAL;
816         if (pid <= 1)
817                 return -EINVAL;
818         if (options & ~(WEXITED|WSTOPPED|WCONTINUED))
819                 return -EINVAL;
820         if (!callback)
821                 return -EINVAL;
822         if (!ret)
823                 return -EINVAL;
824         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
825         if (event_pid_changed(e))
826                 return -ECHILD;
827
828         r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
829         if (r < 0)
830                 return r;
831
832         if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
833                 return -EBUSY;
834
835         s = source_new(e, SOURCE_CHILD);
836         if (!s)
837                 return -ENOMEM;
838
839         s->child.pid = pid;
840         s->child.options = options;
841         s->child.callback = callback;
842         s->userdata = userdata;
843         s->mute = SD_EVENT_ONESHOT;
844
845         r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
846         if (r < 0) {
847                 source_free(s);
848                 return r;
849         }
850
851         e->n_unmuted_child_sources ++;
852
853         assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
854
855         if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
856                 r = event_update_signal_fd(e);
857                 if (r < 0) {
858                         source_free(s);
859                         return -errno;
860                 }
861         }
862
863         e->need_process_child = true;
864
865         *ret = s;
866         return 0;
867 }
868
869 int sd_event_add_defer(sd_event *e, sd_defer_handler_t callback, void *userdata, sd_event_source **ret) {
870         sd_event_source *s;
871         int r;
872
873         if (!e)
874                 return -EINVAL;
875         if (!ret)
876                 return -EINVAL;
877         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
878         if (event_pid_changed(e))
879                 return -ECHILD;
880
881         s = source_new(e, SOURCE_DEFER);
882         if (!s)
883                 return -ENOMEM;
884
885         s->defer.callback = callback;
886         s->userdata = userdata;
887         s->mute = SD_EVENT_ONESHOT;
888
889         r = source_set_pending(s, true);
890         if (r < 0) {
891                 source_free(s);
892                 return r;
893         }
894
895         *ret = s;
896         return 0;
897 }
898
899 int sd_event_add_quit(sd_event *e, sd_quit_handler_t callback, void *userdata, sd_event_source **ret) {
900         sd_event_source *s;
901         int r;
902
903         assert_return(e, -EINVAL);
904         assert_return(callback, -EINVAL);
905         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
906         assert_return(!event_pid_changed(e), -ECHILD);
907
908         if (!e->quit) {
909                 e->quit = prioq_new(quit_prioq_compare);
910                 if (!e->quit)
911                         return -ENOMEM;
912         }
913
914         s = source_new(e, SOURCE_QUIT);
915         if (!s)
916                 return -ENOMEM;
917
918         s->quit.callback = callback;
919         s->userdata = userdata;
920         s->quit.prioq_index = PRIOQ_IDX_NULL;
921         s->mute = SD_EVENT_ONESHOT;
922
923         r = prioq_put(s->event->quit, s, &s->quit.prioq_index);
924         if (r < 0) {
925                 source_free(s);
926                 return r;
927         }
928
929         *ret = s;
930         return 0;
931 }
932
933 sd_event_source* sd_event_source_ref(sd_event_source *s) {
934         assert_return(s, NULL);
935
936         assert(s->n_ref >= 1);
937         s->n_ref++;
938
939         return s;
940 }
941
942 sd_event_source* sd_event_source_unref(sd_event_source *s) {
943         assert_return(s, NULL);
944
945         assert(s->n_ref >= 1);
946         s->n_ref--;
947
948         if (s->n_ref <= 0)
949                 source_free(s);
950
951         return NULL;
952 }
953
954 sd_event *sd_event_get(sd_event_source *s) {
955         if (!s)
956                 return NULL;
957
958         return s->event;
959 }
960
961 int sd_event_source_get_pending(sd_event_source *s) {
962         if (!s)
963                 return -EINVAL;
964         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
965         if (event_pid_changed(s->event))
966                 return -ECHILD;
967
968         return s->pending;
969 }
970
971 int sd_event_source_get_io_fd(sd_event_source *s) {
972         if (!s)
973                 return -EINVAL;
974         if (s->type != SOURCE_IO)
975                 return -EDOM;
976         if (event_pid_changed(s->event))
977                 return -ECHILD;
978
979         return s->io.fd;
980 }
981
982 int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
983         if (!s)
984                 return -EINVAL;
985         if (s->type != SOURCE_IO)
986                 return -EDOM;
987         if (!events)
988                 return -EINVAL;
989         if (event_pid_changed(s->event))
990                 return -ECHILD;
991
992         *events = s->io.events;
993         return 0;
994 }
995
996 int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
997         int r;
998
999         if (!s)
1000                 return -EINVAL;
1001         if (!s->type != SOURCE_IO)
1002                 return -EDOM;
1003         if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP))
1004                 return -EINVAL;
1005         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1006         if (event_pid_changed(s->event))
1007                 return -ECHILD;
1008
1009         if (s->io.events == events)
1010                 return 0;
1011
1012         if (s->mute != SD_EVENT_MUTED) {
1013                 r = source_io_register(s, s->io.events, events);
1014                 if (r < 0)
1015                         return r;
1016         }
1017
1018         s->io.events = events;
1019
1020         return 0;
1021 }
1022
1023 int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1024         if (!s)
1025                 return -EINVAL;
1026         if (s->type != SOURCE_IO)
1027                 return -EDOM;
1028         if (!revents)
1029                 return -EINVAL;
1030         if (!s->pending)
1031                 return -ENODATA;
1032         if (event_pid_changed(s->event))
1033                 return -ECHILD;
1034
1035         *revents = s->io.revents;
1036         return 0;
1037 }
1038
1039 int sd_event_source_get_signal(sd_event_source *s) {
1040         if (!s)
1041                 return -EINVAL;
1042         if (s->type != SOURCE_SIGNAL)
1043                 return -EDOM;
1044         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1045         if (event_pid_changed(s->event))
1046                 return -ECHILD;
1047
1048         return s->signal.sig;
1049 }
1050
1051 int sd_event_source_get_priority(sd_event_source *s, int *priority) {
1052         if (!s)
1053                 return -EINVAL;
1054         if (event_pid_changed(s->event))
1055                 return -ECHILD;
1056
1057         return s->priority;
1058 }
1059
1060 int sd_event_source_set_priority(sd_event_source *s, int priority) {
1061         if (!s)
1062                 return -EINVAL;
1063         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1064         if (event_pid_changed(s->event))
1065                 return -ECHILD;
1066
1067         if (s->priority == priority)
1068                 return 0;
1069
1070         s->priority = priority;
1071
1072         if (s->pending)
1073                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1074
1075         if (s->prepare)
1076                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1077
1078         return 0;
1079 }
1080
1081 int sd_event_source_get_mute(sd_event_source *s, int *m) {
1082         if (!s)
1083                 return -EINVAL;
1084         if (!m)
1085                 return -EINVAL;
1086         if (event_pid_changed(s->event))
1087                 return -ECHILD;
1088
1089         *m = s->mute;
1090         return 0;
1091 }
1092
1093 int sd_event_source_set_mute(sd_event_source *s, int m) {
1094         int r;
1095
1096         if (!s)
1097                 return -EINVAL;
1098         if (m != SD_EVENT_MUTED && m != SD_EVENT_UNMUTED && !SD_EVENT_ONESHOT)
1099                 return -EINVAL;
1100         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1101         if (event_pid_changed(s->event))
1102                 return -ECHILD;
1103
1104         if (s->mute == m)
1105                 return 0;
1106
1107         if (m == SD_EVENT_MUTED) {
1108
1109                 switch (s->type) {
1110
1111                 case SOURCE_IO:
1112                         r = source_io_unregister(s);
1113                         if (r < 0)
1114                                 return r;
1115
1116                         s->mute = m;
1117                         break;
1118
1119                 case SOURCE_MONOTONIC:
1120                         s->mute = m;
1121                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1122                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1123                         break;
1124
1125                 case SOURCE_REALTIME:
1126                         s->mute = m;
1127                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1128                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1129                         break;
1130
1131                 case SOURCE_SIGNAL:
1132                         s->mute = m;
1133                         if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0) {
1134                                 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1135                                 event_update_signal_fd(s->event);
1136                         }
1137
1138                         break;
1139
1140                 case SOURCE_CHILD:
1141                         s->mute = m;
1142
1143                         assert(s->event->n_unmuted_child_sources > 0);
1144                         s->event->n_unmuted_child_sources--;
1145
1146                         if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1147                                 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1148                                 event_update_signal_fd(s->event);
1149                         }
1150
1151                         break;
1152
1153                 case SOURCE_DEFER:
1154                 case SOURCE_QUIT:
1155                         s->mute = m;
1156                         break;
1157                 }
1158
1159         } else {
1160                 switch (s->type) {
1161
1162                 case SOURCE_IO:
1163                         r = source_io_register(s, m, s->io.events);
1164                         if (r < 0)
1165                                 return r;
1166
1167                         s->mute = m;
1168                         break;
1169
1170                 case SOURCE_MONOTONIC:
1171                         s->mute = m;
1172                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1173                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1174                         break;
1175
1176                 case SOURCE_REALTIME:
1177                         s->mute = m;
1178                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1179                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1180                         break;
1181
1182                 case SOURCE_SIGNAL:
1183                         s->mute = m;
1184
1185                         if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0)  {
1186                                 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1187                                 event_update_signal_fd(s->event);
1188                         }
1189                         break;
1190
1191                 case SOURCE_CHILD:
1192                         s->mute = m;
1193
1194                         if (s->mute == SD_EVENT_MUTED) {
1195                                 s->event->n_unmuted_child_sources++;
1196
1197                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1198                                         assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1199                                         event_update_signal_fd(s->event);
1200                                 }
1201                         }
1202                         break;
1203
1204                 case SOURCE_DEFER:
1205                 case SOURCE_QUIT:
1206                         s->mute = m;
1207                         break;
1208                 }
1209         }
1210
1211         if (s->pending)
1212                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1213
1214         if (s->prepare)
1215                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1216
1217         return 0;
1218 }
1219
1220 int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1221         if (!s)
1222                 return -EINVAL;
1223         if (!usec)
1224                 return -EINVAL;
1225         if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
1226                 return -EDOM;
1227         if (event_pid_changed(s->event))
1228                 return -ECHILD;
1229
1230         *usec = s->time.next;
1231         return 0;
1232 }
1233
1234 int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1235         if (!s)
1236                 return -EINVAL;
1237         if (usec == (uint64_t) -1)
1238                 return -EINVAL;
1239         if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
1240                 return -EDOM;
1241         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1242         if (event_pid_changed(s->event))
1243                 return -ECHILD;
1244
1245         if (s->time.next == usec)
1246                 return 0;
1247
1248         s->time.next = usec;
1249
1250         if (s->type == SOURCE_REALTIME) {
1251                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1252                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1253         } else {
1254                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1255                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1256         }
1257
1258         return 0;
1259 }
1260
1261 int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1262         if (!s)
1263                 return -EINVAL;
1264         if (s->type != SOURCE_MONOTONIC && s->type != SOURCE_REALTIME)
1265                 return -EDOM;
1266         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1267         if (event_pid_changed(s->event))
1268                 return -ECHILD;
1269
1270         if (usec == 0)
1271                 usec = DEFAULT_ACCURACY_USEC;
1272
1273         if (s->time.accuracy == usec)
1274                 return 0;
1275
1276         s->time.accuracy = usec;
1277
1278         if (s->type == SOURCE_REALTIME)
1279                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1280         else
1281                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1282
1283         return 0;
1284 }
1285
1286 int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1287         if (!s)
1288                 return -EINVAL;
1289         if (!usec)
1290                 return -EINVAL;
1291         if (s->type != SOURCE_MONOTONIC && s->type != SOURCE_REALTIME)
1292                 return -EDOM;
1293         if (event_pid_changed(s->event))
1294                 return -ECHILD;
1295
1296         *usec = s->time.accuracy;
1297         return 0;
1298 }
1299
1300 int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback) {
1301         int r;
1302
1303         assert_return(s, -EINVAL);
1304         assert_return(s->type != SOURCE_QUIT, -EDOM);
1305         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1306         assert_return(!event_pid_changed(s->event), -ECHILD);
1307
1308         if (s->prepare == callback)
1309                 return 0;
1310
1311         if (callback && s->prepare) {
1312                 s->prepare = callback;
1313                 return 0;
1314         }
1315
1316         r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1317         if (r < 0)
1318                 return r;
1319
1320         s->prepare = callback;
1321
1322         if (callback) {
1323                 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1324                 if (r < 0)
1325                         return r;
1326         } else
1327                 prioq_remove(s->event->prepare, s, &s->prepare_index);
1328
1329         return 0;
1330 }
1331
1332 void* sd_event_source_get_userdata(sd_event_source *s) {
1333         assert_return(s, NULL);
1334
1335         return s->userdata;
1336 }
1337
1338 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1339         usec_t c;
1340         assert(e);
1341         assert(a <= b);
1342
1343         if (a <= 0)
1344                 return 0;
1345
1346         if (b <= a + 1)
1347                 return a;
1348
1349         /*
1350           Find a good time to wake up again between times a and b. We
1351           have two goals here:
1352
1353           a) We want to wake up as seldom as possible, hence prefer
1354              later times over earlier times.
1355
1356           b) But if we have to wake up, then let's make sure to
1357              dispatch as much as possible on the entire system.
1358
1359           We implement this by waking up everywhere at the same time
1360           within any given second if we can, synchronised via the
1361           perturbation value determined from the boot ID. If we can't,
1362           then we try to find the same spot in every a 250ms
1363           step. Otherwise, we pick the last possible time to wake up.
1364         */
1365
1366         c = (b / USEC_PER_SEC) * USEC_PER_SEC + e->perturb;
1367         if (c >= b) {
1368                 if (_unlikely_(c < USEC_PER_SEC))
1369                         return b;
1370
1371                 c -= USEC_PER_SEC;
1372         }
1373
1374         if (c >= a)
1375                 return c;
1376
1377         c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1378         if (c >= b) {
1379                 if (_unlikely_(c < USEC_PER_MSEC*250))
1380                         return b;
1381
1382                 c -= USEC_PER_MSEC*250;
1383         }
1384
1385         if (c >= a)
1386                 return c;
1387
1388         return b;
1389 }
1390
1391 static int event_arm_timer(
1392                 sd_event *e,
1393                 int timer_fd,
1394                 Prioq *earliest,
1395                 Prioq *latest,
1396                 usec_t *next) {
1397
1398         struct itimerspec its = {};
1399         sd_event_source *a, *b;
1400         usec_t t;
1401         int r;
1402
1403         assert_se(e);
1404         assert_se(next);
1405
1406         a = prioq_peek(earliest);
1407         if (!a || a->mute == SD_EVENT_MUTED)
1408                 return 0;
1409
1410         b = prioq_peek(latest);
1411         assert_se(b && b->mute != SD_EVENT_MUTED);
1412
1413         t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1414         if (*next == t)
1415                 return 0;
1416
1417         assert_se(timer_fd >= 0);
1418
1419         if (t == 0) {
1420                 /* We don' want to disarm here, just mean some time looooong ago. */
1421                 its.it_value.tv_sec = 0;
1422                 its.it_value.tv_nsec = 1;
1423         } else
1424                 timespec_store(&its.it_value, t);
1425
1426         r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1427         if (r < 0)
1428                 return r;
1429
1430         *next = t;
1431         return 0;
1432 }
1433
1434 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1435         assert(e);
1436         assert(s);
1437         assert(s->type == SOURCE_IO);
1438
1439         s->io.revents = events;
1440
1441         /*
1442            If this is a oneshot event source, then we added it to the
1443            epoll with EPOLLONESHOT, hence we know it's not registered
1444            anymore. We can save a syscall here...
1445         */
1446
1447         if (s->mute == SD_EVENT_ONESHOT)
1448                 s->io.registered = false;
1449
1450         return source_set_pending(s, true);
1451 }
1452
1453 static int flush_timer(sd_event *e, int fd, uint32_t events) {
1454         uint64_t x;
1455         ssize_t ss;
1456
1457         assert(e);
1458         assert(fd >= 0);
1459
1460         if (events != EPOLLIN)
1461                 return -EIO;
1462
1463         ss = read(fd, &x, sizeof(x));
1464         if (ss < 0) {
1465                 if (errno == EAGAIN || errno == EINTR)
1466                         return 0;
1467
1468                 return -errno;
1469         }
1470
1471         if (ss != sizeof(x))
1472                 return -EIO;
1473
1474         return 0;
1475 }
1476
1477 static int process_timer(sd_event *e, usec_t n, Prioq *earliest, Prioq *latest) {
1478         sd_event_source *s;
1479         int r;
1480
1481         assert(e);
1482
1483         for (;;) {
1484                 s = prioq_peek(earliest);
1485                 if (!s ||
1486                     s->time.next > n ||
1487                     s->mute == SD_EVENT_MUTED ||
1488                     s->pending)
1489                         break;
1490
1491                 r = source_set_pending(s, true);
1492                 if (r < 0)
1493                         return r;
1494
1495                 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1496                 prioq_reshuffle(latest, s, &s->time.latest_index);
1497         }
1498
1499         return 0;
1500 }
1501
1502 static int process_child(sd_event *e) {
1503         sd_event_source *s;
1504         Iterator i;
1505         int r;
1506
1507         assert(e);
1508
1509         e->need_process_child = false;
1510
1511         /*
1512            So, this is ugly. We iteratively invoke waitid() with P_PID
1513            + WNOHANG for each PID we wait for, instead of using
1514            P_ALL. This is because we only want to get child
1515            information of very specific child processes, and not all
1516            of them. We might not have processed the SIGCHLD even of a
1517            previous invocation and we don't want to maintain a
1518            unbounded *per-child* event queue, hence we really don't
1519            want anything flushed out of the kernel's queue that we
1520            don't care about. Since this is O(n) this means that if you
1521            have a lot of processes you probably want to handle SIGCHLD
1522            yourself.
1523         */
1524
1525         HASHMAP_FOREACH(s, e->child_sources, i) {
1526                 assert(s->type == SOURCE_CHILD);
1527
1528                 if (s->pending)
1529                         continue;
1530
1531                 if (s->mute == SD_EVENT_MUTED)
1532                         continue;
1533
1534                 zero(s->child.siginfo);
1535                 r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options);
1536                 if (r < 0)
1537                         return -errno;
1538
1539                 if (s->child.siginfo.si_pid != 0) {
1540                         r = source_set_pending(s, true);
1541                         if (r < 0)
1542                                 return r;
1543                 }
1544         }
1545
1546         return 0;
1547 }
1548
1549 static int process_signal(sd_event *e, uint32_t events) {
1550         struct signalfd_siginfo si;
1551         bool read_one = false;
1552         ssize_t ss;
1553         int r;
1554
1555         assert(e);
1556
1557         if (events != EPOLLIN)
1558                 return -EIO;
1559
1560         for (;;) {
1561                 sd_event_source *s;
1562
1563                 ss = read(e->signal_fd, &si, sizeof(si));
1564                 if (ss < 0) {
1565                         if (errno == EAGAIN || errno == EINTR)
1566                                 return read_one;
1567
1568                         return -errno;
1569                 }
1570
1571                 if (ss != sizeof(si))
1572                         return -EIO;
1573
1574                 read_one = true;
1575
1576                 if (si.ssi_signo == SIGCHLD) {
1577                         r = process_child(e);
1578                         if (r < 0)
1579                                 return r;
1580                         if (r > 0 || !e->signal_sources[si.ssi_signo])
1581                                 continue;
1582                 } else {
1583                         s = e->signal_sources[si.ssi_signo];
1584                         if (!s)
1585                                 return -EIO;
1586                 }
1587
1588                 s->signal.siginfo = si;
1589                 r = source_set_pending(s, true);
1590                 if (r < 0)
1591                         return r;
1592         }
1593
1594
1595         return 0;
1596 }
1597
1598 static int source_dispatch(sd_event_source *s) {
1599         int r;
1600
1601         assert(s);
1602         assert(s->pending || s->type == SOURCE_QUIT);
1603
1604         if (s->type != SOURCE_DEFER && s->type != SOURCE_QUIT) {
1605                 r = source_set_pending(s, false);
1606                 if (r < 0)
1607                         return r;
1608         }
1609
1610         if (s->mute == SD_EVENT_ONESHOT) {
1611                 r = sd_event_source_set_mute(s, SD_EVENT_MUTED);
1612                 if (r < 0)
1613                         return r;
1614         }
1615
1616         switch (s->type) {
1617
1618         case SOURCE_IO:
1619                 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1620                 break;
1621
1622         case SOURCE_MONOTONIC:
1623                 r = s->time.callback(s, s->time.next, s->userdata);
1624                 break;
1625
1626         case SOURCE_REALTIME:
1627                 r = s->time.callback(s, s->time.next, s->userdata);
1628                 break;
1629
1630         case SOURCE_SIGNAL:
1631                 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1632                 break;
1633
1634         case SOURCE_CHILD:
1635                 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1636                 break;
1637
1638         case SOURCE_DEFER:
1639                 r = s->defer.callback(s, s->userdata);
1640                 break;
1641
1642         case SOURCE_QUIT:
1643                 r = s->quit.callback(s, s->userdata);
1644                 break;
1645         }
1646
1647         return r;
1648 }
1649
1650 static int event_prepare(sd_event *e) {
1651         int r;
1652
1653         assert(e);
1654
1655         for (;;) {
1656                 sd_event_source *s;
1657
1658                 s = prioq_peek(e->prepare);
1659                 if (!s || s->prepare_iteration == e->iteration || s->mute == SD_EVENT_MUTED)
1660                         break;
1661
1662                 s->prepare_iteration = e->iteration;
1663                 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1664                 if (r < 0)
1665                         return r;
1666
1667                 assert(s->prepare);
1668                 r = s->prepare(s, s->userdata);
1669                 if (r < 0)
1670                         return r;
1671
1672         }
1673
1674         return 0;
1675 }
1676
1677 static int dispatch_quit(sd_event *e) {
1678         sd_event_source *p;
1679         int r;
1680
1681         assert(e);
1682
1683         p = prioq_peek(e->quit);
1684         if (!p || p->mute == SD_EVENT_MUTED) {
1685                 e->state = SD_EVENT_FINISHED;
1686                 return 0;
1687         }
1688
1689         sd_event_ref(e);
1690         e->iteration++;
1691         e->state = SD_EVENT_QUITTING;
1692
1693         r = source_dispatch(p);
1694
1695         e->state = SD_EVENT_PASSIVE;
1696         sd_event_unref(e);
1697
1698         return r;
1699 }
1700
1701 static sd_event_source* event_next_pending(sd_event *e) {
1702         sd_event_source *p;
1703
1704         assert(e);
1705
1706         p = prioq_peek(e->pending);
1707         if (!p)
1708                 return NULL;
1709
1710         if (p->mute == SD_EVENT_MUTED)
1711                 return NULL;
1712
1713         return p;
1714 }
1715
1716 int sd_event_run(sd_event *e, uint64_t timeout) {
1717         struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1718         sd_event_source *p;
1719         int r, i, m;
1720         dual_timestamp n;
1721
1722         assert_return(e, -EINVAL);
1723         assert_return(!event_pid_changed(e), -ECHILD);
1724         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1725         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1726
1727         if (e->quit_requested)
1728                 return dispatch_quit(e);
1729
1730         sd_event_ref(e);
1731         e->iteration++;
1732         e->state = SD_EVENT_RUNNING;
1733
1734         r = event_prepare(e);
1735         if (r < 0)
1736                 goto finish;
1737
1738         if (event_next_pending(e) || e->need_process_child)
1739                 timeout = 0;
1740
1741         if (timeout > 0) {
1742                 r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1743                 if (r < 0)
1744                         goto finish;
1745
1746                 r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1747                 if (r < 0)
1748                         goto finish;
1749         }
1750
1751         m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1752                        timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1753         if (m < 0) {
1754                 r = m;
1755                 goto finish;
1756         }
1757
1758         dual_timestamp_get(&n);
1759
1760         for (i = 0; i < m; i++) {
1761
1762                 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1763                         r = flush_timer(e, e->monotonic_fd, ev_queue[i].events);
1764                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1765                         r = flush_timer(e, e->realtime_fd, ev_queue[i].events);
1766                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1767                         r = process_signal(e, ev_queue[i].events);
1768                 else
1769                         r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1770
1771                 if (r < 0)
1772                         goto finish;
1773         }
1774
1775         r = process_timer(e, n.monotonic, e->monotonic_earliest, e->monotonic_latest);
1776         if (r < 0)
1777                 goto finish;
1778
1779         r = process_timer(e, n.realtime, e->realtime_earliest, e->realtime_latest);
1780         if (r < 0)
1781                 goto finish;
1782
1783         if (e->need_process_child) {
1784                 r = process_child(e);
1785                 if (r < 0)
1786                         goto finish;
1787         }
1788
1789         p = event_next_pending(e);
1790         if (!p) {
1791                 r = 0;
1792                 goto finish;
1793         }
1794
1795         r = source_dispatch(p);
1796
1797 finish:
1798         e->state = SD_EVENT_PASSIVE;
1799         sd_event_unref(e);
1800
1801         return r;
1802 }
1803
1804 int sd_event_loop(sd_event *e) {
1805         int r;
1806
1807         assert_return(e, -EINVAL);
1808         assert_return(!event_pid_changed(e), -ECHILD);
1809         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1810
1811         sd_event_ref(e);
1812
1813         while (e->state != SD_EVENT_FINISHED) {
1814                 r = sd_event_run(e, (uint64_t) -1);
1815                 if (r < 0)
1816                         goto finish;
1817         }
1818
1819         r = 0;
1820
1821 finish:
1822         sd_event_unref(e);
1823         return r;
1824 }
1825
1826 int sd_event_get_state(sd_event *e) {
1827         assert_return(e, -EINVAL);
1828         assert_return(!event_pid_changed(e), -ECHILD);
1829
1830         return e->state;
1831 }
1832
1833 int sd_event_get_quit(sd_event *e) {
1834         assert_return(e, -EINVAL);
1835         assert_return(!event_pid_changed(e), -ECHILD);
1836
1837         return e->quit_requested;
1838 }
1839
1840 int sd_event_request_quit(sd_event *e) {
1841         assert_return(e, -EINVAL);
1842         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1843         assert_return(!event_pid_changed(e), -ECHILD);
1844
1845         e->quit_requested = true;
1846         return 0;
1847 }