chiark / gitweb /
bus: in GetManagedObjects() only return each object once.
[elogind.git] / src / libsystemd-bus / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25
26 #include "macro.h"
27 #include "prioq.h"
28 #include "hashmap.h"
29 #include "util.h"
30 #include "time-util.h"
31 #include "sd-id128.h"
32
33 #include "sd-event.h"
34
35 #define EPOLL_QUEUE_MAX 64
36 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
37
38 typedef enum EventSourceType {
39         SOURCE_IO,
40         SOURCE_MONOTONIC,
41         SOURCE_REALTIME,
42         SOURCE_SIGNAL,
43         SOURCE_CHILD,
44         SOURCE_DEFER,
45         SOURCE_QUIT
46 } EventSourceType;
47
48 struct sd_event_source {
49         unsigned n_ref;
50
51         sd_event *event;
52         void *userdata;
53         sd_prepare_handler_t prepare;
54
55         EventSourceType type:4;
56         int enabled:3;
57         bool pending:1;
58
59         int priority;
60         unsigned pending_index;
61         unsigned prepare_index;
62         unsigned pending_iteration;
63         unsigned prepare_iteration;
64
65         union {
66                 struct {
67                         sd_io_handler_t callback;
68                         int fd;
69                         uint32_t events;
70                         uint32_t revents;
71                         bool registered:1;
72                 } io;
73                 struct {
74                         sd_time_handler_t callback;
75                         usec_t next, accuracy;
76                         unsigned earliest_index;
77                         unsigned latest_index;
78                 } time;
79                 struct {
80                         sd_signal_handler_t callback;
81                         struct signalfd_siginfo siginfo;
82                         int sig;
83                 } signal;
84                 struct {
85                         sd_child_handler_t callback;
86                         siginfo_t siginfo;
87                         pid_t pid;
88                         int options;
89                 } child;
90                 struct {
91                         sd_defer_handler_t callback;
92                 } defer;
93                 struct {
94                         sd_quit_handler_t callback;
95                         unsigned prioq_index;
96                 } quit;
97         };
98 };
99
100 struct sd_event {
101         unsigned n_ref;
102
103         int epoll_fd;
104         int signal_fd;
105         int realtime_fd;
106         int monotonic_fd;
107
108         Prioq *pending;
109         Prioq *prepare;
110
111         /* For both clocks we maintain two priority queues each, one
112          * ordered for the earliest times the events may be
113          * dispatched, and one ordered by the latest times they must
114          * have been dispatched. The range between the top entries in
115          * the two prioqs is the time window we can freely schedule
116          * wakeups in */
117         Prioq *monotonic_earliest;
118         Prioq *monotonic_latest;
119         Prioq *realtime_earliest;
120         Prioq *realtime_latest;
121
122         usec_t realtime_next, monotonic_next;
123         usec_t perturb;
124
125         sigset_t sigset;
126         sd_event_source **signal_sources;
127
128         Hashmap *child_sources;
129         unsigned n_enabled_child_sources;
130
131         Prioq *quit;
132
133         pid_t original_pid;
134
135         unsigned iteration;
136         dual_timestamp timestamp;
137         int state;
138
139         bool quit_requested:1;
140         bool need_process_child:1;
141 };
142
143 static int pending_prioq_compare(const void *a, const void *b) {
144         const sd_event_source *x = a, *y = b;
145
146         assert(x->pending);
147         assert(y->pending);
148
149         /* Enabled ones first */
150         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
151                 return -1;
152         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
153                 return 1;
154
155         /* Lower priority values first */
156         if (x->priority < y->priority)
157                 return -1;
158         if (x->priority > y->priority)
159                 return 1;
160
161         /* Older entries first */
162         if (x->pending_iteration < y->pending_iteration)
163                 return -1;
164         if (x->pending_iteration > y->pending_iteration)
165                 return 1;
166
167         /* Stability for the rest */
168         if (x < y)
169                 return -1;
170         if (x > y)
171                 return 1;
172
173         return 0;
174 }
175
176 static int prepare_prioq_compare(const void *a, const void *b) {
177         const sd_event_source *x = a, *y = b;
178
179         assert(x->prepare);
180         assert(y->prepare);
181
182         /* Move most recently prepared ones last, so that we can stop
183          * preparing as soon as we hit one that has already been
184          * prepared in the current iteration */
185         if (x->prepare_iteration < y->prepare_iteration)
186                 return -1;
187         if (x->prepare_iteration > y->prepare_iteration)
188                 return 1;
189
190         /* Enabled ones first */
191         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
192                 return -1;
193         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
194                 return 1;
195
196         /* Lower priority values first */
197         if (x->priority < y->priority)
198                 return -1;
199         if (x->priority > y->priority)
200                 return 1;
201
202         /* Stability for the rest */
203         if (x < y)
204                 return -1;
205         if (x > y)
206                 return 1;
207
208         return 0;
209 }
210
211 static int earliest_time_prioq_compare(const void *a, const void *b) {
212         const sd_event_source *x = a, *y = b;
213
214         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
215         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
216
217         /* Enabled ones first */
218         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
219                 return -1;
220         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
221                 return 1;
222
223         /* Move the pending ones to the end */
224         if (!x->pending && y->pending)
225                 return -1;
226         if (x->pending && !y->pending)
227                 return 1;
228
229         /* Order by time */
230         if (x->time.next < y->time.next)
231                 return -1;
232         if (x->time.next > y->time.next)
233                 return -1;
234
235         /* Stability for the rest */
236         if (x < y)
237                 return -1;
238         if (x > y)
239                 return 1;
240
241         return 0;
242 }
243
244 static int latest_time_prioq_compare(const void *a, const void *b) {
245         const sd_event_source *x = a, *y = b;
246
247         assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
248                (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
249
250         /* Enabled ones first */
251         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
252                 return -1;
253         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
254                 return 1;
255
256         /* Move the pending ones to the end */
257         if (!x->pending && y->pending)
258                 return -1;
259         if (x->pending && !y->pending)
260                 return 1;
261
262         /* Order by time */
263         if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
264                 return -1;
265         if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
266                 return -1;
267
268         /* Stability for the rest */
269         if (x < y)
270                 return -1;
271         if (x > y)
272                 return 1;
273
274         return 0;
275 }
276
277 static int quit_prioq_compare(const void *a, const void *b) {
278         const sd_event_source *x = a, *y = b;
279
280         assert(x->type == SOURCE_QUIT);
281         assert(y->type == SOURCE_QUIT);
282
283         /* Enabled ones first */
284         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
285                 return -1;
286         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
287                 return 1;
288
289         /* Lower priority values first */
290         if (x->priority < y->priority)
291                 return -1;
292         if (x->priority > y->priority)
293                 return 1;
294
295         /* Stability for the rest */
296         if (x < y)
297                 return -1;
298         if (x > y)
299                 return 1;
300
301         return 0;
302 }
303
304 static void event_free(sd_event *e) {
305         assert(e);
306
307         if (e->epoll_fd >= 0)
308                 close_nointr_nofail(e->epoll_fd);
309
310         if (e->signal_fd >= 0)
311                 close_nointr_nofail(e->signal_fd);
312
313         if (e->realtime_fd >= 0)
314                 close_nointr_nofail(e->realtime_fd);
315
316         if (e->monotonic_fd >= 0)
317                 close_nointr_nofail(e->monotonic_fd);
318
319         prioq_free(e->pending);
320         prioq_free(e->prepare);
321         prioq_free(e->monotonic_earliest);
322         prioq_free(e->monotonic_latest);
323         prioq_free(e->realtime_earliest);
324         prioq_free(e->realtime_latest);
325         prioq_free(e->quit);
326
327         free(e->signal_sources);
328
329         hashmap_free(e->child_sources);
330         free(e);
331 }
332
333 int sd_event_new(sd_event** ret) {
334         sd_event *e;
335         int r;
336
337         assert_return(ret, -EINVAL);
338
339         e = new0(sd_event, 1);
340         if (!e)
341                 return -ENOMEM;
342
343         e->n_ref = 1;
344         e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
345         e->realtime_next = e->monotonic_next = (usec_t) -1;
346         e->original_pid = getpid();
347
348         assert_se(sigemptyset(&e->sigset) == 0);
349
350         e->pending = prioq_new(pending_prioq_compare);
351         if (!e->pending) {
352                 r = -ENOMEM;
353                 goto fail;
354         }
355
356         e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
357         if (e->epoll_fd < 0) {
358                 r = -errno;
359                 goto fail;
360         }
361
362         *ret = e;
363         return 0;
364
365 fail:
366         event_free(e);
367         return r;
368 }
369
370 sd_event* sd_event_ref(sd_event *e) {
371         assert_return(e, NULL);
372
373         assert(e->n_ref >= 1);
374         e->n_ref++;
375
376         return e;
377 }
378
379 sd_event* sd_event_unref(sd_event *e) {
380         assert_return(e, NULL);
381
382         assert(e->n_ref >= 1);
383         e->n_ref--;
384
385         if (e->n_ref <= 0)
386                 event_free(e);
387
388         return NULL;
389 }
390
391 static bool event_pid_changed(sd_event *e) {
392         assert(e);
393
394         /* We don't support people creating am event loop and keeping
395          * it around over a fork(). Let's complain. */
396
397         return e->original_pid != getpid();
398 }
399
400 static int source_io_unregister(sd_event_source *s) {
401         int r;
402
403         assert(s);
404         assert(s->type == SOURCE_IO);
405
406         if (!s->io.registered)
407                 return 0;
408
409         r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
410         if (r < 0)
411                 return -errno;
412
413         s->io.registered = false;
414         return 0;
415 }
416
417 static int source_io_register(
418                 sd_event_source *s,
419                 int enabled,
420                 uint32_t events) {
421
422         struct epoll_event ev = {};
423         int r;
424
425         assert(s);
426         assert(s->type == SOURCE_IO);
427         assert(enabled != SD_EVENT_OFF);
428
429         ev.events = events;
430         ev.data.ptr = s;
431
432         if (enabled == SD_EVENT_ONESHOT)
433                 ev.events |= EPOLLONESHOT;
434
435         if (s->io.registered)
436                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
437         else
438                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
439
440         if (r < 0)
441                 return -errno;
442
443         s->io.registered = true;
444
445         return 0;
446 }
447
448 static void source_free(sd_event_source *s) {
449         assert(s);
450
451         if (s->event) {
452                 switch (s->type) {
453
454                 case SOURCE_IO:
455                         if (s->io.fd >= 0)
456                                 source_io_unregister(s);
457
458                         break;
459
460                 case SOURCE_MONOTONIC:
461                         prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
462                         prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
463                         break;
464
465                 case SOURCE_REALTIME:
466                         prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
467                         prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
468                         break;
469
470                 case SOURCE_SIGNAL:
471                         if (s->signal.sig > 0) {
472                                 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)
473                                         assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
474
475                                 if (s->event->signal_sources)
476                                         s->event->signal_sources[s->signal.sig] = NULL;
477                         }
478
479                         break;
480
481                 case SOURCE_CHILD:
482                         if (s->child.pid > 0) {
483                                 if (s->enabled != SD_EVENT_OFF) {
484                                         assert(s->event->n_enabled_child_sources > 0);
485                                         s->event->n_enabled_child_sources--;
486                                 }
487
488                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
489                                         assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
490
491                                 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
492                         }
493
494                         break;
495
496                 case SOURCE_QUIT:
497                         prioq_remove(s->event->quit, s, &s->quit.prioq_index);
498                         break;
499                 }
500
501                 if (s->pending)
502                         prioq_remove(s->event->pending, s, &s->pending_index);
503
504                 if (s->prepare)
505                         prioq_remove(s->event->prepare, s, &s->prepare_index);
506
507                 sd_event_unref(s->event);
508         }
509
510         free(s);
511 }
512
513 static int source_set_pending(sd_event_source *s, bool b) {
514         int r;
515
516         assert(s);
517         assert(s->type != SOURCE_QUIT);
518
519         if (s->pending == b)
520                 return 0;
521
522         s->pending = b;
523
524         if (b) {
525                 s->pending_iteration = s->event->iteration;
526
527                 r = prioq_put(s->event->pending, s, &s->pending_index);
528                 if (r < 0) {
529                         s->pending = false;
530                         return r;
531                 }
532         } else
533                 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
534
535         return 0;
536 }
537
538 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
539         sd_event_source *s;
540
541         assert(e);
542
543         s = new0(sd_event_source, 1);
544         if (!s)
545                 return NULL;
546
547         s->n_ref = 1;
548         s->event = sd_event_ref(e);
549         s->type = type;
550         s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
551
552         return s;
553 }
554
555 int sd_event_add_io(
556                 sd_event *e,
557                 int fd,
558                 uint32_t events,
559                 sd_io_handler_t callback,
560                 void *userdata,
561                 sd_event_source **ret) {
562
563         sd_event_source *s;
564         int r;
565
566         assert_return(e, -EINVAL);
567         assert_return(fd >= 0, -EINVAL);
568         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
569         assert_return(callback, -EINVAL);
570         assert_return(ret, -EINVAL);
571         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
572         assert_return(!event_pid_changed(e), -ECHILD);
573
574         s = source_new(e, SOURCE_IO);
575         if (!s)
576                 return -ENOMEM;
577
578         s->io.fd = fd;
579         s->io.events = events;
580         s->io.callback = callback;
581         s->userdata = userdata;
582         s->enabled = SD_EVENT_ON;
583
584         r = source_io_register(s, s->enabled, events);
585         if (r < 0) {
586                 source_free(s);
587                 return -errno;
588         }
589
590         *ret = s;
591         return 0;
592 }
593
594 static int event_setup_timer_fd(
595                 sd_event *e,
596                 EventSourceType type,
597                 int *timer_fd,
598                 clockid_t id) {
599
600         struct epoll_event ev = {};
601         int r, fd;
602         sd_id128_t bootid;
603
604         assert(e);
605         assert(timer_fd);
606
607         if (_likely_(*timer_fd >= 0))
608                 return 0;
609
610         fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
611         if (fd < 0)
612                 return -errno;
613
614         ev.events = EPOLLIN;
615         ev.data.ptr = INT_TO_PTR(type);
616
617         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
618         if (r < 0) {
619                 close_nointr_nofail(fd);
620                 return -errno;
621         }
622
623         /* When we sleep for longer, we try to realign the wakeup to
624            the same time wihtin each second, so that events all across
625            the system can be coalesced into a single CPU
626            wakeup. However, let's take some system-specific randomness
627            for this value, so that in a network of systems with synced
628            clocks timer events are distributed a bit. Here, we
629            calculate a perturbation usec offset from the boot ID. */
630
631         if (sd_id128_get_boot(&bootid) >= 0)
632                 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_SEC;
633
634         *timer_fd = fd;
635         return 0;
636 }
637
638 static int event_add_time_internal(
639                 sd_event *e,
640                 EventSourceType type,
641                 int *timer_fd,
642                 clockid_t id,
643                 Prioq **earliest,
644                 Prioq **latest,
645                 uint64_t usec,
646                 uint64_t accuracy,
647                 sd_time_handler_t callback,
648                 void *userdata,
649                 sd_event_source **ret) {
650
651         sd_event_source *s;
652         int r;
653
654         assert_return(e, -EINVAL);
655         assert_return(callback, -EINVAL);
656         assert_return(ret, -EINVAL);
657         assert_return(usec != (uint64_t) -1, -EINVAL);
658         assert_return(accuracy != (uint64_t) -1, -EINVAL);
659         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
660         assert_return(!event_pid_changed(e), -ECHILD);
661
662         assert(timer_fd);
663         assert(earliest);
664         assert(latest);
665
666         if (!*earliest) {
667                 *earliest = prioq_new(earliest_time_prioq_compare);
668                 if (!*earliest)
669                         return -ENOMEM;
670         }
671
672         if (!*latest) {
673                 *latest = prioq_new(latest_time_prioq_compare);
674                 if (!*latest)
675                         return -ENOMEM;
676         }
677
678         if (*timer_fd < 0) {
679                 r = event_setup_timer_fd(e, type, timer_fd, id);
680                 if (r < 0)
681                         return r;
682         }
683
684         s = source_new(e, type);
685         if (!s)
686                 return -ENOMEM;
687
688         s->time.next = usec;
689         s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
690         s->time.callback = callback;
691         s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
692         s->userdata = userdata;
693         s->enabled = SD_EVENT_ONESHOT;
694
695         r = prioq_put(*earliest, s, &s->time.earliest_index);
696         if (r < 0)
697                 goto fail;
698
699         r = prioq_put(*latest, s, &s->time.latest_index);
700         if (r < 0)
701                 goto fail;
702
703         *ret = s;
704         return 0;
705
706 fail:
707         source_free(s);
708         return r;
709 }
710
711 int sd_event_add_monotonic(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
712         return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
713 }
714
715 int sd_event_add_realtime(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
716         return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
717 }
718
719 static int event_update_signal_fd(sd_event *e) {
720         struct epoll_event ev = {};
721         bool add_to_epoll;
722         int r;
723
724         assert(e);
725
726         add_to_epoll = e->signal_fd < 0;
727
728         r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
729         if (r < 0)
730                 return -errno;
731
732         e->signal_fd = r;
733
734         if (!add_to_epoll)
735                 return 0;
736
737         ev.events = EPOLLIN;
738         ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
739
740         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
741         if (r < 0) {
742                 close_nointr_nofail(e->signal_fd);
743                 e->signal_fd = -1;
744
745                 return -errno;
746         }
747
748         return 0;
749 }
750
751 int sd_event_add_signal(
752                 sd_event *e,
753                 int sig,
754                 sd_signal_handler_t callback,
755                 void *userdata,
756                 sd_event_source **ret) {
757
758         sd_event_source *s;
759         int r;
760
761         assert_return(e, -EINVAL);
762         assert_return(sig > 0, -EINVAL);
763         assert_return(sig < _NSIG, -EINVAL);
764         assert_return(callback, -EINVAL);
765         assert_return(ret, -EINVAL);
766         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
767         assert_return(!event_pid_changed(e), -ECHILD);
768
769         if (!e->signal_sources) {
770                 e->signal_sources = new0(sd_event_source*, _NSIG);
771                 if (!e->signal_sources)
772                         return -ENOMEM;
773         } else if (e->signal_sources[sig])
774                 return -EBUSY;
775
776         s = source_new(e, SOURCE_SIGNAL);
777         if (!s)
778                 return -ENOMEM;
779
780         s->signal.sig = sig;
781         s->signal.callback = callback;
782         s->userdata = userdata;
783         s->enabled = SD_EVENT_ON;
784
785         e->signal_sources[sig] = s;
786         assert_se(sigaddset(&e->sigset, sig) == 0);
787
788         if (sig != SIGCHLD || e->n_enabled_child_sources == 0) {
789                 r = event_update_signal_fd(e);
790                 if (r < 0) {
791                         source_free(s);
792                         return r;
793                 }
794         }
795
796         *ret = s;
797         return 0;
798 }
799
800 int sd_event_add_child(
801                 sd_event *e,
802                 pid_t pid,
803                 int options,
804                 sd_child_handler_t callback,
805                 void *userdata,
806                 sd_event_source **ret) {
807
808         sd_event_source *s;
809         int r;
810
811         assert_return(e, -EINVAL);
812         assert_return(pid > 1, -EINVAL);
813         assert_return(!(options & ~(WEXITED|WSTOPPED|WCONTINUED)), -EINVAL);
814         assert_return(options != 0, -EINVAL);
815         assert_return(callback, -EINVAL);
816         assert_return(ret, -EINVAL);
817         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
818         assert_return(!event_pid_changed(e), -ECHILD);
819
820         r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
821         if (r < 0)
822                 return r;
823
824         if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
825                 return -EBUSY;
826
827         s = source_new(e, SOURCE_CHILD);
828         if (!s)
829                 return -ENOMEM;
830
831         s->child.pid = pid;
832         s->child.options = options;
833         s->child.callback = callback;
834         s->userdata = userdata;
835         s->enabled = SD_EVENT_ONESHOT;
836
837         r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
838         if (r < 0) {
839                 source_free(s);
840                 return r;
841         }
842
843         e->n_enabled_child_sources ++;
844
845         assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
846
847         if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
848                 r = event_update_signal_fd(e);
849                 if (r < 0) {
850                         source_free(s);
851                         return -errno;
852                 }
853         }
854
855         e->need_process_child = true;
856
857         *ret = s;
858         return 0;
859 }
860
861 int sd_event_add_defer(
862                 sd_event *e,
863                 sd_defer_handler_t callback,
864                 void *userdata,
865                 sd_event_source **ret) {
866
867         sd_event_source *s;
868         int r;
869
870         assert_return(e, -EINVAL);
871         assert_return(callback, -EINVAL);
872         assert_return(ret, -EINVAL);
873         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
874         assert_return(!event_pid_changed(e), -ECHILD);
875
876         s = source_new(e, SOURCE_DEFER);
877         if (!s)
878                 return -ENOMEM;
879
880         s->defer.callback = callback;
881         s->userdata = userdata;
882         s->enabled = SD_EVENT_ONESHOT;
883
884         r = source_set_pending(s, true);
885         if (r < 0) {
886                 source_free(s);
887                 return r;
888         }
889
890         *ret = s;
891         return 0;
892 }
893
894 int sd_event_add_quit(
895                 sd_event *e,
896                 sd_quit_handler_t callback,
897                 void *userdata,
898                 sd_event_source **ret) {
899
900         sd_event_source *s;
901         int r;
902
903         assert_return(e, -EINVAL);
904         assert_return(callback, -EINVAL);
905         assert_return(ret, -EINVAL);
906         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
907         assert_return(!event_pid_changed(e), -ECHILD);
908
909         if (!e->quit) {
910                 e->quit = prioq_new(quit_prioq_compare);
911                 if (!e->quit)
912                         return -ENOMEM;
913         }
914
915         s = source_new(e, SOURCE_QUIT);
916         if (!s)
917                 return -ENOMEM;
918
919         s->quit.callback = callback;
920         s->userdata = userdata;
921         s->quit.prioq_index = PRIOQ_IDX_NULL;
922         s->enabled = SD_EVENT_ONESHOT;
923
924         r = prioq_put(s->event->quit, s, &s->quit.prioq_index);
925         if (r < 0) {
926                 source_free(s);
927                 return r;
928         }
929
930         *ret = s;
931         return 0;
932 }
933
934 sd_event_source* sd_event_source_ref(sd_event_source *s) {
935         assert_return(s, NULL);
936
937         assert(s->n_ref >= 1);
938         s->n_ref++;
939
940         return s;
941 }
942
943 sd_event_source* sd_event_source_unref(sd_event_source *s) {
944         assert_return(s, NULL);
945
946         assert(s->n_ref >= 1);
947         s->n_ref--;
948
949         if (s->n_ref <= 0)
950                 source_free(s);
951
952         return NULL;
953 }
954
955 sd_event *sd_event_get(sd_event_source *s) {
956         assert_return(s, NULL);
957
958         return s->event;
959 }
960
961 int sd_event_source_get_pending(sd_event_source *s) {
962         assert_return(s, -EINVAL);
963         assert_return(s->type != SOURCE_QUIT, -EDOM);
964         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
965         assert_return(!event_pid_changed(s->event), -ECHILD);
966
967         return s->pending;
968 }
969
970 int sd_event_source_get_io_fd(sd_event_source *s) {
971         assert_return(s, -EINVAL);
972         assert_return(s->type == SOURCE_IO, -EDOM);
973         assert_return(!event_pid_changed(s->event), -ECHILD);
974
975         return s->io.fd;
976 }
977
978 int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
979         assert_return(s, -EINVAL);
980         assert_return(events, -EINVAL);
981         assert_return(s->type == SOURCE_IO, -EDOM);
982         assert_return(!event_pid_changed(s->event), -ECHILD);
983
984         *events = s->io.events;
985         return 0;
986 }
987
988 int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
989         int r;
990
991         assert_return(s, -EINVAL);
992         assert_return(s->type == SOURCE_IO, -EDOM);
993         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
994         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
995         assert_return(!event_pid_changed(s->event), -ECHILD);
996
997         if (s->io.events == events)
998                 return 0;
999
1000         if (s->enabled != SD_EVENT_OFF) {
1001                 r = source_io_register(s, s->io.events, events);
1002                 if (r < 0)
1003                         return r;
1004         }
1005
1006         s->io.events = events;
1007
1008         return 0;
1009 }
1010
1011 int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1012         assert_return(s, -EINVAL);
1013         assert_return(revents, -EINVAL);
1014         assert_return(s->type == SOURCE_IO, -EDOM);
1015         assert_return(s->pending, -ENODATA);
1016         assert_return(!event_pid_changed(s->event), -ECHILD);
1017
1018         *revents = s->io.revents;
1019         return 0;
1020 }
1021
1022 int sd_event_source_get_signal(sd_event_source *s) {
1023         assert_return(s, -EINVAL);
1024         assert_return(s->type == SOURCE_SIGNAL, -EDOM);
1025         assert_return(!event_pid_changed(s->event), -ECHILD);
1026
1027         return s->signal.sig;
1028 }
1029
1030 int sd_event_source_get_priority(sd_event_source *s, int *priority) {
1031         assert_return(s, -EINVAL);
1032         assert_return(!event_pid_changed(s->event), -ECHILD);
1033
1034         return s->priority;
1035 }
1036
1037 int sd_event_source_set_priority(sd_event_source *s, int priority) {
1038         assert_return(s, -EINVAL);
1039         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1040         assert_return(!event_pid_changed(s->event), -ECHILD);
1041
1042         if (s->priority == priority)
1043                 return 0;
1044
1045         s->priority = priority;
1046
1047         if (s->pending)
1048                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1049
1050         if (s->prepare)
1051                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1052
1053         if (s->type == SOURCE_QUIT)
1054                 prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1055
1056         return 0;
1057 }
1058
1059 int sd_event_source_get_enabled(sd_event_source *s, int *m) {
1060         assert_return(s, -EINVAL);
1061         assert_return(m, -EINVAL);
1062         assert_return(!event_pid_changed(s->event), -ECHILD);
1063
1064         *m = s->enabled;
1065         return 0;
1066 }
1067
1068 int sd_event_source_set_enabled(sd_event_source *s, int m) {
1069         int r;
1070
1071         assert_return(s, -EINVAL);
1072         assert_return(m == SD_EVENT_OFF || m == SD_EVENT_ON || m == SD_EVENT_ONESHOT, -EINVAL);
1073         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1074         assert_return(!event_pid_changed(s->event), -ECHILD);
1075
1076         if (s->enabled == m)
1077                 return 0;
1078
1079         if (m == SD_EVENT_OFF) {
1080
1081                 switch (s->type) {
1082
1083                 case SOURCE_IO:
1084                         r = source_io_unregister(s);
1085                         if (r < 0)
1086                                 return r;
1087
1088                         s->enabled = m;
1089                         break;
1090
1091                 case SOURCE_MONOTONIC:
1092                         s->enabled = m;
1093                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1094                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1095                         break;
1096
1097                 case SOURCE_REALTIME:
1098                         s->enabled = m;
1099                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1100                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1101                         break;
1102
1103                 case SOURCE_SIGNAL:
1104                         s->enabled = m;
1105                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1106                                 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1107                                 event_update_signal_fd(s->event);
1108                         }
1109
1110                         break;
1111
1112                 case SOURCE_CHILD:
1113                         s->enabled = m;
1114
1115                         assert(s->event->n_enabled_child_sources > 0);
1116                         s->event->n_enabled_child_sources--;
1117
1118                         if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1119                                 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1120                                 event_update_signal_fd(s->event);
1121                         }
1122
1123                         break;
1124
1125                 case SOURCE_QUIT:
1126                         s->enabled = m;
1127                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1128                         break;
1129
1130                 case SOURCE_DEFER:
1131                         s->enabled = m;
1132                         break;
1133                 }
1134
1135         } else {
1136                 switch (s->type) {
1137
1138                 case SOURCE_IO:
1139                         r = source_io_register(s, m, s->io.events);
1140                         if (r < 0)
1141                                 return r;
1142
1143                         s->enabled = m;
1144                         break;
1145
1146                 case SOURCE_MONOTONIC:
1147                         s->enabled = m;
1148                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1149                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1150                         break;
1151
1152                 case SOURCE_REALTIME:
1153                         s->enabled = m;
1154                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1155                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1156                         break;
1157
1158                 case SOURCE_SIGNAL:
1159                         s->enabled = m;
1160
1161                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)  {
1162                                 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1163                                 event_update_signal_fd(s->event);
1164                         }
1165                         break;
1166
1167                 case SOURCE_CHILD:
1168                         s->enabled = m;
1169
1170                         if (s->enabled == SD_EVENT_OFF) {
1171                                 s->event->n_enabled_child_sources++;
1172
1173                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1174                                         assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1175                                         event_update_signal_fd(s->event);
1176                                 }
1177                         }
1178                         break;
1179
1180                 case SOURCE_QUIT:
1181                         s->enabled = m;
1182                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1183                         break;
1184
1185                 case SOURCE_DEFER:
1186                         s->enabled = m;
1187                         break;
1188                 }
1189         }
1190
1191         if (s->pending)
1192                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1193
1194         if (s->prepare)
1195                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1196
1197         return 0;
1198 }
1199
1200 int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1201         assert_return(s, -EINVAL);
1202         assert_return(usec, -EINVAL);
1203         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1204         assert_return(!event_pid_changed(s->event), -ECHILD);
1205
1206         *usec = s->time.next;
1207         return 0;
1208 }
1209
1210 int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1211         assert_return(s, -EINVAL);
1212         assert_return(usec != (uint64_t) -1, -EINVAL);
1213         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1214         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1215         assert_return(!event_pid_changed(s->event), -ECHILD);
1216
1217         if (s->time.next == usec)
1218                 return 0;
1219
1220         s->time.next = usec;
1221
1222         if (s->type == SOURCE_REALTIME) {
1223                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1224                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1225         } else {
1226                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1227                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1228         }
1229
1230         return 0;
1231 }
1232
1233 int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1234         assert_return(s, -EINVAL);
1235         assert_return(usec, -EINVAL);
1236         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1237         assert_return(!event_pid_changed(s->event), -ECHILD);
1238
1239         *usec = s->time.accuracy;
1240         return 0;
1241 }
1242
1243 int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1244         assert_return(s, -EINVAL);
1245         assert_return(usec != (uint64_t) -1, -EINVAL);
1246         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1247         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1248         assert_return(!event_pid_changed(s->event), -ECHILD);
1249
1250         if (usec == 0)
1251                 usec = DEFAULT_ACCURACY_USEC;
1252
1253         if (s->time.accuracy == usec)
1254                 return 0;
1255
1256         s->time.accuracy = usec;
1257
1258         if (s->type == SOURCE_REALTIME)
1259                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1260         else
1261                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1262
1263         return 0;
1264 }
1265
1266 int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid) {
1267         assert_return(s, -EINVAL);
1268         assert_return(pid, -EINVAL);
1269         assert_return(s->type == SOURCE_CHILD, -EDOM);
1270         assert_return(!event_pid_changed(s->event), -ECHILD);
1271
1272         *pid = s->child.pid;
1273         return 0;
1274 }
1275
1276 int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback) {
1277         int r;
1278
1279         assert_return(s, -EINVAL);
1280         assert_return(s->type != SOURCE_QUIT, -EDOM);
1281         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1282         assert_return(!event_pid_changed(s->event), -ECHILD);
1283
1284         if (s->prepare == callback)
1285                 return 0;
1286
1287         if (callback && s->prepare) {
1288                 s->prepare = callback;
1289                 return 0;
1290         }
1291
1292         r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1293         if (r < 0)
1294                 return r;
1295
1296         s->prepare = callback;
1297
1298         if (callback) {
1299                 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1300                 if (r < 0)
1301                         return r;
1302         } else
1303                 prioq_remove(s->event->prepare, s, &s->prepare_index);
1304
1305         return 0;
1306 }
1307
1308 void* sd_event_source_get_userdata(sd_event_source *s) {
1309         assert_return(s, NULL);
1310
1311         return s->userdata;
1312 }
1313
1314 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1315         usec_t c;
1316         assert(e);
1317         assert(a <= b);
1318
1319         if (a <= 0)
1320                 return 0;
1321
1322         if (b <= a + 1)
1323                 return a;
1324
1325         /*
1326           Find a good time to wake up again between times a and b. We
1327           have two goals here:
1328
1329           a) We want to wake up as seldom as possible, hence prefer
1330              later times over earlier times.
1331
1332           b) But if we have to wake up, then let's make sure to
1333              dispatch as much as possible on the entire system.
1334
1335           We implement this by waking up everywhere at the same time
1336           within any given second if we can, synchronised via the
1337           perturbation value determined from the boot ID. If we can't,
1338           then we try to find the same spot in every a 250ms
1339           step. Otherwise, we pick the last possible time to wake up.
1340         */
1341
1342         c = (b / USEC_PER_SEC) * USEC_PER_SEC + e->perturb;
1343         if (c >= b) {
1344                 if (_unlikely_(c < USEC_PER_SEC))
1345                         return b;
1346
1347                 c -= USEC_PER_SEC;
1348         }
1349
1350         if (c >= a)
1351                 return c;
1352
1353         c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1354         if (c >= b) {
1355                 if (_unlikely_(c < USEC_PER_MSEC*250))
1356                         return b;
1357
1358                 c -= USEC_PER_MSEC*250;
1359         }
1360
1361         if (c >= a)
1362                 return c;
1363
1364         return b;
1365 }
1366
1367 static int event_arm_timer(
1368                 sd_event *e,
1369                 int timer_fd,
1370                 Prioq *earliest,
1371                 Prioq *latest,
1372                 usec_t *next) {
1373
1374         struct itimerspec its = {};
1375         sd_event_source *a, *b;
1376         usec_t t;
1377         int r;
1378
1379         assert_se(e);
1380         assert_se(next);
1381
1382         a = prioq_peek(earliest);
1383         if (!a || a->enabled == SD_EVENT_OFF)
1384                 return 0;
1385
1386         b = prioq_peek(latest);
1387         assert_se(b && b->enabled != SD_EVENT_OFF);
1388
1389         t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1390         if (*next == t)
1391                 return 0;
1392
1393         assert_se(timer_fd >= 0);
1394
1395         if (t == 0) {
1396                 /* We don' want to disarm here, just mean some time looooong ago. */
1397                 its.it_value.tv_sec = 0;
1398                 its.it_value.tv_nsec = 1;
1399         } else
1400                 timespec_store(&its.it_value, t);
1401
1402         r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1403         if (r < 0)
1404                 return r;
1405
1406         *next = t;
1407         return 0;
1408 }
1409
1410 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1411         assert(e);
1412         assert(s);
1413         assert(s->type == SOURCE_IO);
1414
1415         s->io.revents = events;
1416
1417         /*
1418            If this is a oneshot event source, then we added it to the
1419            epoll with EPOLLONESHOT, hence we know it's not registered
1420            anymore. We can save a syscall here...
1421         */
1422
1423         if (s->enabled == SD_EVENT_ONESHOT)
1424                 s->io.registered = false;
1425
1426         return source_set_pending(s, true);
1427 }
1428
1429 static int flush_timer(sd_event *e, int fd, uint32_t events) {
1430         uint64_t x;
1431         ssize_t ss;
1432
1433         assert(e);
1434         assert(fd >= 0);
1435         assert_return(events == EPOLLIN, -EIO);
1436
1437         ss = read(fd, &x, sizeof(x));
1438         if (ss < 0) {
1439                 if (errno == EAGAIN || errno == EINTR)
1440                         return 0;
1441
1442                 return -errno;
1443         }
1444
1445         if (ss != sizeof(x))
1446                 return -EIO;
1447
1448         return 0;
1449 }
1450
1451 static int process_timer(
1452                 sd_event *e,
1453                 usec_t n,
1454                 Prioq *earliest,
1455                 Prioq *latest) {
1456
1457         sd_event_source *s;
1458         int r;
1459
1460         assert(e);
1461
1462         for (;;) {
1463                 s = prioq_peek(earliest);
1464                 if (!s ||
1465                     s->time.next > n ||
1466                     s->enabled == SD_EVENT_OFF ||
1467                     s->pending)
1468                         break;
1469
1470                 r = source_set_pending(s, true);
1471                 if (r < 0)
1472                         return r;
1473
1474                 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1475                 prioq_reshuffle(latest, s, &s->time.latest_index);
1476         }
1477
1478         return 0;
1479 }
1480
1481 static int process_child(sd_event *e) {
1482         sd_event_source *s;
1483         Iterator i;
1484         int r;
1485
1486         assert(e);
1487
1488         e->need_process_child = false;
1489
1490         /*
1491            So, this is ugly. We iteratively invoke waitid() with P_PID
1492            + WNOHANG for each PID we wait for, instead of using
1493            P_ALL. This is because we only want to get child
1494            information of very specific child processes, and not all
1495            of them. We might not have processed the SIGCHLD even of a
1496            previous invocation and we don't want to maintain a
1497            unbounded *per-child* event queue, hence we really don't
1498            want anything flushed out of the kernel's queue that we
1499            don't care about. Since this is O(n) this means that if you
1500            have a lot of processes you probably want to handle SIGCHLD
1501            yourself.
1502         */
1503
1504         HASHMAP_FOREACH(s, e->child_sources, i) {
1505                 assert(s->type == SOURCE_CHILD);
1506
1507                 if (s->pending)
1508                         continue;
1509
1510                 if (s->enabled == SD_EVENT_OFF)
1511                         continue;
1512
1513                 zero(s->child.siginfo);
1514                 r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options);
1515                 if (r < 0)
1516                         return -errno;
1517
1518                 if (s->child.siginfo.si_pid != 0) {
1519                         r = source_set_pending(s, true);
1520                         if (r < 0)
1521                                 return r;
1522                 }
1523         }
1524
1525         return 0;
1526 }
1527
1528 static int process_signal(sd_event *e, uint32_t events) {
1529         struct signalfd_siginfo si;
1530         bool read_one = false;
1531         ssize_t ss;
1532         int r;
1533
1534         assert(e);
1535         assert_return(events == EPOLLIN, -EIO);
1536
1537         for (;;) {
1538                 sd_event_source *s;
1539
1540                 ss = read(e->signal_fd, &si, sizeof(si));
1541                 if (ss < 0) {
1542                         if (errno == EAGAIN || errno == EINTR)
1543                                 return read_one;
1544
1545                         return -errno;
1546                 }
1547
1548                 if (ss != sizeof(si))
1549                         return -EIO;
1550
1551                 read_one = true;
1552
1553                 if (si.ssi_signo == SIGCHLD) {
1554                         r = process_child(e);
1555                         if (r < 0)
1556                                 return r;
1557                         if (r > 0 || !e->signal_sources[si.ssi_signo])
1558                                 continue;
1559                 } else {
1560                         s = e->signal_sources[si.ssi_signo];
1561                         if (!s)
1562                                 return -EIO;
1563                 }
1564
1565                 s->signal.siginfo = si;
1566                 r = source_set_pending(s, true);
1567                 if (r < 0)
1568                         return r;
1569         }
1570
1571
1572         return 0;
1573 }
1574
1575 static int source_dispatch(sd_event_source *s) {
1576         int r;
1577
1578         assert(s);
1579         assert(s->pending || s->type == SOURCE_QUIT);
1580
1581         if (s->type != SOURCE_DEFER && s->type != SOURCE_QUIT) {
1582                 r = source_set_pending(s, false);
1583                 if (r < 0)
1584                         return r;
1585         }
1586
1587         if (s->enabled == SD_EVENT_ONESHOT) {
1588                 r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
1589                 if (r < 0)
1590                         return r;
1591         }
1592
1593         switch (s->type) {
1594
1595         case SOURCE_IO:
1596                 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1597                 break;
1598
1599         case SOURCE_MONOTONIC:
1600                 r = s->time.callback(s, s->time.next, s->userdata);
1601                 break;
1602
1603         case SOURCE_REALTIME:
1604                 r = s->time.callback(s, s->time.next, s->userdata);
1605                 break;
1606
1607         case SOURCE_SIGNAL:
1608                 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1609                 break;
1610
1611         case SOURCE_CHILD:
1612                 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1613                 break;
1614
1615         case SOURCE_DEFER:
1616                 r = s->defer.callback(s, s->userdata);
1617                 break;
1618
1619         case SOURCE_QUIT:
1620                 r = s->quit.callback(s, s->userdata);
1621                 break;
1622         }
1623
1624         return r;
1625 }
1626
1627 static int event_prepare(sd_event *e) {
1628         int r;
1629
1630         assert(e);
1631
1632         for (;;) {
1633                 sd_event_source *s;
1634
1635                 s = prioq_peek(e->prepare);
1636                 if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
1637                         break;
1638
1639                 s->prepare_iteration = e->iteration;
1640                 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1641                 if (r < 0)
1642                         return r;
1643
1644                 assert(s->prepare);
1645                 r = s->prepare(s, s->userdata);
1646                 if (r < 0)
1647                         return r;
1648
1649         }
1650
1651         return 0;
1652 }
1653
1654 static int dispatch_quit(sd_event *e) {
1655         sd_event_source *p;
1656         int r;
1657
1658         assert(e);
1659
1660         p = prioq_peek(e->quit);
1661         if (!p || p->enabled == SD_EVENT_OFF) {
1662                 e->state = SD_EVENT_FINISHED;
1663                 return 0;
1664         }
1665
1666         sd_event_ref(e);
1667         e->iteration++;
1668         e->state = SD_EVENT_QUITTING;
1669
1670         r = source_dispatch(p);
1671
1672         e->state = SD_EVENT_PASSIVE;
1673         sd_event_unref(e);
1674
1675         return r;
1676 }
1677
1678 static sd_event_source* event_next_pending(sd_event *e) {
1679         sd_event_source *p;
1680
1681         assert(e);
1682
1683         p = prioq_peek(e->pending);
1684         if (!p)
1685                 return NULL;
1686
1687         if (p->enabled == SD_EVENT_OFF)
1688                 return NULL;
1689
1690         return p;
1691 }
1692
1693 int sd_event_run(sd_event *e, uint64_t timeout) {
1694         struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1695         sd_event_source *p;
1696         int r, i, m;
1697
1698         assert_return(e, -EINVAL);
1699         assert_return(!event_pid_changed(e), -ECHILD);
1700         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1701         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1702
1703         if (e->quit_requested)
1704                 return dispatch_quit(e);
1705
1706         sd_event_ref(e);
1707         e->iteration++;
1708         e->state = SD_EVENT_RUNNING;
1709
1710         r = event_prepare(e);
1711         if (r < 0)
1712                 goto finish;
1713
1714         if (event_next_pending(e) || e->need_process_child)
1715                 timeout = 0;
1716
1717         if (timeout > 0) {
1718                 r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1719                 if (r < 0)
1720                         goto finish;
1721
1722                 r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1723                 if (r < 0)
1724                         goto finish;
1725         }
1726
1727         m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1728                        timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1729         if (m < 0) {
1730                 r = m;
1731                 goto finish;
1732         }
1733
1734         dual_timestamp_get(&e->timestamp);
1735
1736         for (i = 0; i < m; i++) {
1737
1738                 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1739                         r = flush_timer(e, e->monotonic_fd, ev_queue[i].events);
1740                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1741                         r = flush_timer(e, e->realtime_fd, ev_queue[i].events);
1742                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1743                         r = process_signal(e, ev_queue[i].events);
1744                 else
1745                         r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1746
1747                 if (r < 0)
1748                         goto finish;
1749         }
1750
1751         r = process_timer(e, e->timestamp.monotonic, e->monotonic_earliest, e->monotonic_latest);
1752         if (r < 0)
1753                 goto finish;
1754
1755         r = process_timer(e, e->timestamp.realtime, e->realtime_earliest, e->realtime_latest);
1756         if (r < 0)
1757                 goto finish;
1758
1759         if (e->need_process_child) {
1760                 r = process_child(e);
1761                 if (r < 0)
1762                         goto finish;
1763         }
1764
1765         p = event_next_pending(e);
1766         if (!p) {
1767                 r = 0;
1768                 goto finish;
1769         }
1770
1771         r = source_dispatch(p);
1772
1773 finish:
1774         e->state = SD_EVENT_PASSIVE;
1775         sd_event_unref(e);
1776
1777         return r;
1778 }
1779
1780 int sd_event_loop(sd_event *e) {
1781         int r;
1782
1783         assert_return(e, -EINVAL);
1784         assert_return(!event_pid_changed(e), -ECHILD);
1785         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1786
1787         sd_event_ref(e);
1788
1789         while (e->state != SD_EVENT_FINISHED) {
1790                 r = sd_event_run(e, (uint64_t) -1);
1791                 if (r < 0)
1792                         goto finish;
1793         }
1794
1795         r = 0;
1796
1797 finish:
1798         sd_event_unref(e);
1799         return r;
1800 }
1801
1802 int sd_event_get_state(sd_event *e) {
1803         assert_return(e, -EINVAL);
1804         assert_return(!event_pid_changed(e), -ECHILD);
1805
1806         return e->state;
1807 }
1808
1809 int sd_event_get_quit(sd_event *e) {
1810         assert_return(e, -EINVAL);
1811         assert_return(!event_pid_changed(e), -ECHILD);
1812
1813         return e->quit_requested;
1814 }
1815
1816 int sd_event_request_quit(sd_event *e) {
1817         assert_return(e, -EINVAL);
1818         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1819         assert_return(!event_pid_changed(e), -ECHILD);
1820
1821         e->quit_requested = true;
1822         return 0;
1823 }
1824
1825 int sd_event_get_now_realtime(sd_event *e, uint64_t *usec) {
1826         assert_return(e, -EINVAL);
1827         assert_return(usec, -EINVAL);
1828         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1829         assert_return(!event_pid_changed(e), -ECHILD);
1830
1831         *usec = e->timestamp.realtime;
1832         return 0;
1833 }
1834
1835 int sd_event_get_now_monotonic(sd_event *e, uint64_t *usec) {
1836         assert_return(e, -EINVAL);
1837         assert_return(usec, -EINVAL);
1838         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1839         assert_return(!event_pid_changed(e), -ECHILD);
1840
1841         *usec = e->timestamp.monotonic;
1842         return 0;
1843 }