chiark / gitweb /
event: move all library calls over to new assert_return() macro
[elogind.git] / src / libsystemd-bus / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25
26 #include "macro.h"
27 #include "prioq.h"
28 #include "hashmap.h"
29 #include "util.h"
30 #include "time-util.h"
31 #include "sd-id128.h"
32
33 #include "sd-event.h"
34
35 #define EPOLL_QUEUE_MAX 64
36 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
37
38 typedef enum EventSourceType {
39         SOURCE_IO,
40         SOURCE_MONOTONIC,
41         SOURCE_REALTIME,
42         SOURCE_SIGNAL,
43         SOURCE_CHILD,
44         SOURCE_DEFER,
45         SOURCE_QUIT
46 } EventSourceType;
47
48 struct sd_event_source {
49         unsigned n_ref;
50
51         sd_event *event;
52         void *userdata;
53         sd_prepare_handler_t prepare;
54
55         EventSourceType type:4;
56         int enabled:3;
57         bool pending:1;
58
59         int priority;
60         unsigned pending_index;
61         unsigned prepare_index;
62         unsigned pending_iteration;
63         unsigned prepare_iteration;
64
65         union {
66                 struct {
67                         sd_io_handler_t callback;
68                         int fd;
69                         uint32_t events;
70                         uint32_t revents;
71                         bool registered:1;
72                 } io;
73                 struct {
74                         sd_time_handler_t callback;
75                         usec_t next, accuracy;
76                         unsigned earliest_index;
77                         unsigned latest_index;
78                 } time;
79                 struct {
80                         sd_signal_handler_t callback;
81                         struct signalfd_siginfo siginfo;
82                         int sig;
83                 } signal;
84                 struct {
85                         sd_child_handler_t callback;
86                         siginfo_t siginfo;
87                         pid_t pid;
88                         int options;
89                 } child;
90                 struct {
91                         sd_defer_handler_t callback;
92                 } defer;
93                 struct {
94                         sd_quit_handler_t callback;
95                         unsigned prioq_index;
96                 } quit;
97         };
98 };
99
100 struct sd_event {
101         unsigned n_ref;
102
103         int epoll_fd;
104         int signal_fd;
105         int realtime_fd;
106         int monotonic_fd;
107
108         Prioq *pending;
109         Prioq *prepare;
110
111         /* For both clocks we maintain two priority queues each, one
112          * ordered for the earliest times the events may be
113          * dispatched, and one ordered by the latest times they must
114          * have been dispatched. The range between the top entries in
115          * the two prioqs is the time window we can freely schedule
116          * wakeups in */
117         Prioq *monotonic_earliest;
118         Prioq *monotonic_latest;
119         Prioq *realtime_earliest;
120         Prioq *realtime_latest;
121
122         usec_t realtime_next, monotonic_next;
123         usec_t perturb;
124
125         sigset_t sigset;
126         sd_event_source **signal_sources;
127
128         Hashmap *child_sources;
129         unsigned n_enabled_child_sources;
130
131         Prioq *quit;
132
133         pid_t original_pid;
134
135         unsigned iteration;
136         int state;
137
138         bool quit_requested:1;
139         bool need_process_child:1;
140 };
141
142 static int pending_prioq_compare(const void *a, const void *b) {
143         const sd_event_source *x = a, *y = b;
144
145         assert(x->pending);
146         assert(y->pending);
147
148         /* Enabled ones first */
149         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
150                 return -1;
151         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
152                 return 1;
153
154         /* Lower priority values first */
155         if (x->priority < y->priority)
156                 return -1;
157         if (x->priority > y->priority)
158                 return 1;
159
160         /* Older entries first */
161         if (x->pending_iteration < y->pending_iteration)
162                 return -1;
163         if (x->pending_iteration > y->pending_iteration)
164                 return 1;
165
166         /* Stability for the rest */
167         if (x < y)
168                 return -1;
169         if (x > y)
170                 return 1;
171
172         return 0;
173 }
174
175 static int prepare_prioq_compare(const void *a, const void *b) {
176         const sd_event_source *x = a, *y = b;
177
178         assert(x->prepare);
179         assert(y->prepare);
180
181         /* Move most recently prepared ones last, so that we can stop
182          * preparing as soon as we hit one that has already been
183          * prepared in the current iteration */
184         if (x->prepare_iteration < y->prepare_iteration)
185                 return -1;
186         if (x->prepare_iteration > y->prepare_iteration)
187                 return 1;
188
189         /* Enabled ones first */
190         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
191                 return -1;
192         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
193                 return 1;
194
195         /* Lower priority values first */
196         if (x->priority < y->priority)
197                 return -1;
198         if (x->priority > y->priority)
199                 return 1;
200
201         /* Stability for the rest */
202         if (x < y)
203                 return -1;
204         if (x > y)
205                 return 1;
206
207         return 0;
208 }
209
210 static int earliest_time_prioq_compare(const void *a, const void *b) {
211         const sd_event_source *x = a, *y = b;
212
213         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
214         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
215
216         /* Enabled ones first */
217         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
218                 return -1;
219         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
220                 return 1;
221
222         /* Move the pending ones to the end */
223         if (!x->pending && y->pending)
224                 return -1;
225         if (x->pending && !y->pending)
226                 return 1;
227
228         /* Order by time */
229         if (x->time.next < y->time.next)
230                 return -1;
231         if (x->time.next > y->time.next)
232                 return -1;
233
234         /* Stability for the rest */
235         if (x < y)
236                 return -1;
237         if (x > y)
238                 return 1;
239
240         return 0;
241 }
242
243 static int latest_time_prioq_compare(const void *a, const void *b) {
244         const sd_event_source *x = a, *y = b;
245
246         assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
247                (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
248
249         /* Enabled ones first */
250         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
251                 return -1;
252         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
253                 return 1;
254
255         /* Move the pending ones to the end */
256         if (!x->pending && y->pending)
257                 return -1;
258         if (x->pending && !y->pending)
259                 return 1;
260
261         /* Order by time */
262         if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
263                 return -1;
264         if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
265                 return -1;
266
267         /* Stability for the rest */
268         if (x < y)
269                 return -1;
270         if (x > y)
271                 return 1;
272
273         return 0;
274 }
275
276 static int quit_prioq_compare(const void *a, const void *b) {
277         const sd_event_source *x = a, *y = b;
278
279         assert(x->type == SOURCE_QUIT);
280         assert(y->type == SOURCE_QUIT);
281
282         /* Enabled ones first */
283         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
284                 return -1;
285         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
286                 return 1;
287
288         /* Lower priority values first */
289         if (x->priority < y->priority)
290                 return -1;
291         if (x->priority > y->priority)
292                 return 1;
293
294         /* Stability for the rest */
295         if (x < y)
296                 return -1;
297         if (x > y)
298                 return 1;
299
300         return 0;
301 }
302
303 static void event_free(sd_event *e) {
304         assert(e);
305
306         if (e->epoll_fd >= 0)
307                 close_nointr_nofail(e->epoll_fd);
308
309         if (e->signal_fd >= 0)
310                 close_nointr_nofail(e->signal_fd);
311
312         if (e->realtime_fd >= 0)
313                 close_nointr_nofail(e->realtime_fd);
314
315         if (e->monotonic_fd >= 0)
316                 close_nointr_nofail(e->monotonic_fd);
317
318         prioq_free(e->pending);
319         prioq_free(e->prepare);
320         prioq_free(e->monotonic_earliest);
321         prioq_free(e->monotonic_latest);
322         prioq_free(e->realtime_earliest);
323         prioq_free(e->realtime_latest);
324         prioq_free(e->quit);
325
326         free(e->signal_sources);
327
328         hashmap_free(e->child_sources);
329         free(e);
330 }
331
332 int sd_event_new(sd_event** ret) {
333         sd_event *e;
334         int r;
335
336         assert_return(ret, -EINVAL);
337
338         e = new0(sd_event, 1);
339         if (!e)
340                 return -ENOMEM;
341
342         e->n_ref = 1;
343         e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
344         e->realtime_next = e->monotonic_next = (usec_t) -1;
345         e->original_pid = getpid();
346
347         assert_se(sigemptyset(&e->sigset) == 0);
348
349         e->pending = prioq_new(pending_prioq_compare);
350         if (!e->pending) {
351                 r = -ENOMEM;
352                 goto fail;
353         }
354
355         e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
356         if (e->epoll_fd < 0) {
357                 r = -errno;
358                 goto fail;
359         }
360
361         *ret = e;
362         return 0;
363
364 fail:
365         event_free(e);
366         return r;
367 }
368
369 sd_event* sd_event_ref(sd_event *e) {
370         assert_return(e, NULL);
371
372         assert(e->n_ref >= 1);
373         e->n_ref++;
374
375         return e;
376 }
377
378 sd_event* sd_event_unref(sd_event *e) {
379         assert_return(e, NULL);
380
381         assert(e->n_ref >= 1);
382         e->n_ref--;
383
384         if (e->n_ref <= 0)
385                 event_free(e);
386
387         return NULL;
388 }
389
390 static bool event_pid_changed(sd_event *e) {
391         assert(e);
392
393         /* We don't support people creating am event loop and keeping
394          * it around over a fork(). Let's complain. */
395
396         return e->original_pid != getpid();
397 }
398
399 static int source_io_unregister(sd_event_source *s) {
400         int r;
401
402         assert(s);
403         assert(s->type == SOURCE_IO);
404
405         if (!s->io.registered)
406                 return 0;
407
408         r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
409         if (r < 0)
410                 return -errno;
411
412         s->io.registered = false;
413         return 0;
414 }
415
416 static int source_io_register(
417                 sd_event_source *s,
418                 int enabled,
419                 uint32_t events) {
420
421         struct epoll_event ev = {};
422         int r;
423
424         assert(s);
425         assert(s->type == SOURCE_IO);
426         assert(enabled != SD_EVENT_OFF);
427
428         ev.events = events;
429         ev.data.ptr = s;
430
431         if (enabled == SD_EVENT_ONESHOT)
432                 ev.events |= EPOLLONESHOT;
433
434         if (s->io.registered)
435                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
436         else
437                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
438
439         if (r < 0)
440                 return -errno;
441
442         s->io.registered = true;
443
444         return 0;
445 }
446
447 static void source_free(sd_event_source *s) {
448         assert(s);
449
450         if (s->event) {
451                 switch (s->type) {
452
453                 case SOURCE_IO:
454                         if (s->io.fd >= 0)
455                                 source_io_unregister(s);
456
457                         break;
458
459                 case SOURCE_MONOTONIC:
460                         prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
461                         prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
462                         break;
463
464                 case SOURCE_REALTIME:
465                         prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
466                         prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
467                         break;
468
469                 case SOURCE_SIGNAL:
470                         if (s->signal.sig > 0) {
471                                 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)
472                                         assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
473
474                                 if (s->event->signal_sources)
475                                         s->event->signal_sources[s->signal.sig] = NULL;
476                         }
477
478                         break;
479
480                 case SOURCE_CHILD:
481                         if (s->child.pid > 0) {
482                                 if (s->enabled != SD_EVENT_OFF) {
483                                         assert(s->event->n_enabled_child_sources > 0);
484                                         s->event->n_enabled_child_sources--;
485                                 }
486
487                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
488                                         assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
489
490                                 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
491                         }
492
493                         break;
494
495                 case SOURCE_QUIT:
496                         prioq_remove(s->event->quit, s, &s->quit.prioq_index);
497                         break;
498                 }
499
500                 if (s->pending)
501                         prioq_remove(s->event->pending, s, &s->pending_index);
502
503                 if (s->prepare)
504                         prioq_remove(s->event->prepare, s, &s->prepare_index);
505
506                 sd_event_unref(s->event);
507         }
508
509         free(s);
510 }
511
512 static int source_set_pending(sd_event_source *s, bool b) {
513         int r;
514
515         assert(s);
516         assert(s->type != SOURCE_QUIT);
517
518         if (s->pending == b)
519                 return 0;
520
521         s->pending = b;
522
523         if (b) {
524                 s->pending_iteration = s->event->iteration;
525
526                 r = prioq_put(s->event->pending, s, &s->pending_index);
527                 if (r < 0) {
528                         s->pending = false;
529                         return r;
530                 }
531         } else
532                 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
533
534         return 0;
535 }
536
537 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
538         sd_event_source *s;
539
540         assert(e);
541
542         s = new0(sd_event_source, 1);
543         if (!s)
544                 return NULL;
545
546         s->n_ref = 1;
547         s->event = sd_event_ref(e);
548         s->type = type;
549         s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
550
551         return s;
552 }
553
554 int sd_event_add_io(
555                 sd_event *e,
556                 int fd,
557                 uint32_t events,
558                 sd_io_handler_t callback,
559                 void *userdata,
560                 sd_event_source **ret) {
561
562         sd_event_source *s;
563         int r;
564
565         assert_return(e, -EINVAL);
566         assert_return(fd >= 0, -EINVAL);
567         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
568         assert_return(callback, -EINVAL);
569         assert_return(ret, -EINVAL);
570         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
571         assert_return(!event_pid_changed(e), -ECHILD);
572
573         s = source_new(e, SOURCE_IO);
574         if (!s)
575                 return -ENOMEM;
576
577         s->io.fd = fd;
578         s->io.events = events;
579         s->io.callback = callback;
580         s->userdata = userdata;
581         s->enabled = SD_EVENT_ON;
582
583         r = source_io_register(s, s->enabled, events);
584         if (r < 0) {
585                 source_free(s);
586                 return -errno;
587         }
588
589         *ret = s;
590         return 0;
591 }
592
593 static int event_setup_timer_fd(
594                 sd_event *e,
595                 EventSourceType type,
596                 int *timer_fd,
597                 clockid_t id) {
598
599         struct epoll_event ev = {};
600         int r, fd;
601         sd_id128_t bootid;
602
603         assert(e);
604         assert(timer_fd);
605
606         if (_likely_(*timer_fd >= 0))
607                 return 0;
608
609         fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
610         if (fd < 0)
611                 return -errno;
612
613         ev.events = EPOLLIN;
614         ev.data.ptr = INT_TO_PTR(type);
615
616         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
617         if (r < 0) {
618                 close_nointr_nofail(fd);
619                 return -errno;
620         }
621
622         /* When we sleep for longer, we try to realign the wakeup to
623            the same time wihtin each second, so that events all across
624            the system can be coalesced into a single CPU
625            wakeup. However, let's take some system-specific randomness
626            for this value, so that in a network of systems with synced
627            clocks timer events are distributed a bit. Here, we
628            calculate a perturbation usec offset from the boot ID. */
629
630         if (sd_id128_get_boot(&bootid) >= 0)
631                 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_SEC;
632
633         *timer_fd = fd;
634         return 0;
635 }
636
637 static int event_add_time_internal(
638                 sd_event *e,
639                 EventSourceType type,
640                 int *timer_fd,
641                 clockid_t id,
642                 Prioq **earliest,
643                 Prioq **latest,
644                 uint64_t usec,
645                 uint64_t accuracy,
646                 sd_time_handler_t callback,
647                 void *userdata,
648                 sd_event_source **ret) {
649
650         sd_event_source *s;
651         int r;
652
653         assert_return(e, -EINVAL);
654         assert_return(callback, -EINVAL);
655         assert_return(ret, -EINVAL);
656         assert_return(usec != (uint64_t) -1, -EINVAL);
657         assert_return(accuracy != (uint64_t) -1, -EINVAL);
658         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
659         assert_return(!event_pid_changed(e), -ECHILD);
660
661         assert(timer_fd);
662         assert(earliest);
663         assert(latest);
664
665         if (!*earliest) {
666                 *earliest = prioq_new(earliest_time_prioq_compare);
667                 if (!*earliest)
668                         return -ENOMEM;
669         }
670
671         if (!*latest) {
672                 *latest = prioq_new(latest_time_prioq_compare);
673                 if (!*latest)
674                         return -ENOMEM;
675         }
676
677         if (*timer_fd < 0) {
678                 r = event_setup_timer_fd(e, type, timer_fd, id);
679                 if (r < 0)
680                         return r;
681         }
682
683         s = source_new(e, type);
684         if (!s)
685                 return -ENOMEM;
686
687         s->time.next = usec;
688         s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
689         s->time.callback = callback;
690         s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
691         s->userdata = userdata;
692         s->enabled = SD_EVENT_ONESHOT;
693
694         r = prioq_put(*earliest, s, &s->time.earliest_index);
695         if (r < 0)
696                 goto fail;
697
698         r = prioq_put(*latest, s, &s->time.latest_index);
699         if (r < 0)
700                 goto fail;
701
702         *ret = s;
703         return 0;
704
705 fail:
706         source_free(s);
707         return r;
708 }
709
710 int sd_event_add_monotonic(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
711         return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
712 }
713
714 int sd_event_add_realtime(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
715         return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
716 }
717
718 static int event_update_signal_fd(sd_event *e) {
719         struct epoll_event ev = {};
720         bool add_to_epoll;
721         int r;
722
723         assert(e);
724
725         add_to_epoll = e->signal_fd < 0;
726
727         r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
728         if (r < 0)
729                 return -errno;
730
731         e->signal_fd = r;
732
733         if (!add_to_epoll)
734                 return 0;
735
736         ev.events = EPOLLIN;
737         ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
738
739         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
740         if (r < 0) {
741                 close_nointr_nofail(e->signal_fd);
742                 e->signal_fd = -1;
743
744                 return -errno;
745         }
746
747         return 0;
748 }
749
750 int sd_event_add_signal(
751                 sd_event *e,
752                 int sig,
753                 sd_signal_handler_t callback,
754                 void *userdata,
755                 sd_event_source **ret) {
756
757         sd_event_source *s;
758         int r;
759
760         assert_return(e, -EINVAL);
761         assert_return(sig > 0, -EINVAL);
762         assert_return(sig < _NSIG, -EINVAL);
763         assert_return(callback, -EINVAL);
764         assert_return(ret, -EINVAL);
765         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
766         assert_return(!event_pid_changed(e), -ECHILD);
767
768         if (!e->signal_sources) {
769                 e->signal_sources = new0(sd_event_source*, _NSIG);
770                 if (!e->signal_sources)
771                         return -ENOMEM;
772         } else if (e->signal_sources[sig])
773                 return -EBUSY;
774
775         s = source_new(e, SOURCE_SIGNAL);
776         if (!s)
777                 return -ENOMEM;
778
779         s->signal.sig = sig;
780         s->signal.callback = callback;
781         s->userdata = userdata;
782         s->enabled = SD_EVENT_ON;
783
784         e->signal_sources[sig] = s;
785         assert_se(sigaddset(&e->sigset, sig) == 0);
786
787         if (sig != SIGCHLD || e->n_enabled_child_sources == 0) {
788                 r = event_update_signal_fd(e);
789                 if (r < 0) {
790                         source_free(s);
791                         return r;
792                 }
793         }
794
795         *ret = s;
796         return 0;
797 }
798
799 int sd_event_add_child(
800                 sd_event *e,
801                 pid_t pid,
802                 int options,
803                 sd_child_handler_t callback,
804                 void *userdata,
805                 sd_event_source **ret) {
806
807         sd_event_source *s;
808         int r;
809
810         assert_return(e, -EINVAL);
811         assert_return(pid > 1, -EINVAL);
812         assert_return(!(options & ~(WEXITED|WSTOPPED|WCONTINUED)), -EINVAL);
813         assert_return(options != 0, -EINVAL);
814         assert_return(callback, -EINVAL);
815         assert_return(ret, -EINVAL);
816         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
817         assert_return(!event_pid_changed(e), -ECHILD);
818
819         r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
820         if (r < 0)
821                 return r;
822
823         if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
824                 return -EBUSY;
825
826         s = source_new(e, SOURCE_CHILD);
827         if (!s)
828                 return -ENOMEM;
829
830         s->child.pid = pid;
831         s->child.options = options;
832         s->child.callback = callback;
833         s->userdata = userdata;
834         s->enabled = SD_EVENT_ONESHOT;
835
836         r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
837         if (r < 0) {
838                 source_free(s);
839                 return r;
840         }
841
842         e->n_enabled_child_sources ++;
843
844         assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
845
846         if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
847                 r = event_update_signal_fd(e);
848                 if (r < 0) {
849                         source_free(s);
850                         return -errno;
851                 }
852         }
853
854         e->need_process_child = true;
855
856         *ret = s;
857         return 0;
858 }
859
860 int sd_event_add_defer(
861                 sd_event *e,
862                 sd_defer_handler_t callback,
863                 void *userdata,
864                 sd_event_source **ret) {
865
866         sd_event_source *s;
867         int r;
868
869         assert_return(e, -EINVAL);
870         assert_return(callback, -EINVAL);
871         assert_return(ret, -EINVAL);
872         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
873         assert_return(!event_pid_changed(e), -ECHILD);
874
875         s = source_new(e, SOURCE_DEFER);
876         if (!s)
877                 return -ENOMEM;
878
879         s->defer.callback = callback;
880         s->userdata = userdata;
881         s->enabled = SD_EVENT_ONESHOT;
882
883         r = source_set_pending(s, true);
884         if (r < 0) {
885                 source_free(s);
886                 return r;
887         }
888
889         *ret = s;
890         return 0;
891 }
892
893 int sd_event_add_quit(
894                 sd_event *e,
895                 sd_quit_handler_t callback,
896                 void *userdata,
897                 sd_event_source **ret) {
898
899         sd_event_source *s;
900         int r;
901
902         assert_return(e, -EINVAL);
903         assert_return(callback, -EINVAL);
904         assert_return(ret, -EINVAL);
905         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
906         assert_return(!event_pid_changed(e), -ECHILD);
907
908         if (!e->quit) {
909                 e->quit = prioq_new(quit_prioq_compare);
910                 if (!e->quit)
911                         return -ENOMEM;
912         }
913
914         s = source_new(e, SOURCE_QUIT);
915         if (!s)
916                 return -ENOMEM;
917
918         s->quit.callback = callback;
919         s->userdata = userdata;
920         s->quit.prioq_index = PRIOQ_IDX_NULL;
921         s->enabled = SD_EVENT_ONESHOT;
922
923         r = prioq_put(s->event->quit, s, &s->quit.prioq_index);
924         if (r < 0) {
925                 source_free(s);
926                 return r;
927         }
928
929         *ret = s;
930         return 0;
931 }
932
933 sd_event_source* sd_event_source_ref(sd_event_source *s) {
934         assert_return(s, NULL);
935
936         assert(s->n_ref >= 1);
937         s->n_ref++;
938
939         return s;
940 }
941
942 sd_event_source* sd_event_source_unref(sd_event_source *s) {
943         assert_return(s, NULL);
944
945         assert(s->n_ref >= 1);
946         s->n_ref--;
947
948         if (s->n_ref <= 0)
949                 source_free(s);
950
951         return NULL;
952 }
953
954 sd_event *sd_event_get(sd_event_source *s) {
955         assert_return(s, NULL);
956
957         return s->event;
958 }
959
960 int sd_event_source_get_pending(sd_event_source *s) {
961         assert_return(s, -EINVAL);
962         assert_return(s->type != SOURCE_QUIT, -EDOM);
963         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
964         assert_return(!event_pid_changed(s->event), -ECHILD);
965
966         return s->pending;
967 }
968
969 int sd_event_source_get_io_fd(sd_event_source *s) {
970         assert_return(s, -EINVAL);
971         assert_return(s->type == SOURCE_IO, -EDOM);
972         assert_return(!event_pid_changed(s->event), -ECHILD);
973
974         return s->io.fd;
975 }
976
977 int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
978         assert_return(s, -EINVAL);
979         assert_return(events, -EINVAL);
980         assert_return(s->type == SOURCE_IO, -EDOM);
981         assert_return(!event_pid_changed(s->event), -ECHILD);
982
983         *events = s->io.events;
984         return 0;
985 }
986
987 int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
988         int r;
989
990         assert_return(s, -EINVAL);
991         assert_return(s->type == SOURCE_IO, -EDOM);
992         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
993         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
994         assert_return(!event_pid_changed(s->event), -ECHILD);
995
996         if (s->io.events == events)
997                 return 0;
998
999         if (s->enabled != SD_EVENT_OFF) {
1000                 r = source_io_register(s, s->io.events, events);
1001                 if (r < 0)
1002                         return r;
1003         }
1004
1005         s->io.events = events;
1006
1007         return 0;
1008 }
1009
1010 int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1011         assert_return(s, -EINVAL);
1012         assert_return(revents, -EINVAL);
1013         assert_return(s->type == SOURCE_IO, -EDOM);
1014         assert_return(s->pending, -ENODATA);
1015         assert_return(!event_pid_changed(s->event), -ECHILD);
1016
1017         *revents = s->io.revents;
1018         return 0;
1019 }
1020
1021 int sd_event_source_get_signal(sd_event_source *s) {
1022         assert_return(s, -EINVAL);
1023         assert_return(s->type == SOURCE_SIGNAL, -EDOM);
1024         assert_return(!event_pid_changed(s->event), -ECHILD);
1025
1026         return s->signal.sig;
1027 }
1028
1029 int sd_event_source_get_priority(sd_event_source *s, int *priority) {
1030         assert_return(s, -EINVAL);
1031         assert_return(!event_pid_changed(s->event), -ECHILD);
1032
1033         return s->priority;
1034 }
1035
1036 int sd_event_source_set_priority(sd_event_source *s, int priority) {
1037         assert_return(s, -EINVAL);
1038         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1039         assert_return(!event_pid_changed(s->event), -ECHILD);
1040
1041         if (s->priority == priority)
1042                 return 0;
1043
1044         s->priority = priority;
1045
1046         if (s->pending)
1047                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1048
1049         if (s->prepare)
1050                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1051
1052         if (s->type == SOURCE_QUIT)
1053                 prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1054
1055         return 0;
1056 }
1057
1058 int sd_event_source_get_enabled(sd_event_source *s, int *m) {
1059         assert_return(s, -EINVAL);
1060         assert_return(m, -EINVAL);
1061         assert_return(!event_pid_changed(s->event), -ECHILD);
1062
1063         *m = s->enabled;
1064         return 0;
1065 }
1066
1067 int sd_event_source_set_enabled(sd_event_source *s, int m) {
1068         int r;
1069
1070         assert_return(s, -EINVAL);
1071         assert_return(m == SD_EVENT_OFF || m == SD_EVENT_ON || m == SD_EVENT_ONESHOT, -EINVAL);
1072         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1073         assert_return(!event_pid_changed(s->event), -ECHILD);
1074
1075         if (s->enabled == m)
1076                 return 0;
1077
1078         if (m == SD_EVENT_OFF) {
1079
1080                 switch (s->type) {
1081
1082                 case SOURCE_IO:
1083                         r = source_io_unregister(s);
1084                         if (r < 0)
1085                                 return r;
1086
1087                         s->enabled = m;
1088                         break;
1089
1090                 case SOURCE_MONOTONIC:
1091                         s->enabled = m;
1092                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1093                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1094                         break;
1095
1096                 case SOURCE_REALTIME:
1097                         s->enabled = m;
1098                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1099                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1100                         break;
1101
1102                 case SOURCE_SIGNAL:
1103                         s->enabled = m;
1104                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1105                                 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1106                                 event_update_signal_fd(s->event);
1107                         }
1108
1109                         break;
1110
1111                 case SOURCE_CHILD:
1112                         s->enabled = m;
1113
1114                         assert(s->event->n_enabled_child_sources > 0);
1115                         s->event->n_enabled_child_sources--;
1116
1117                         if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1118                                 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1119                                 event_update_signal_fd(s->event);
1120                         }
1121
1122                         break;
1123
1124                 case SOURCE_QUIT:
1125                         s->enabled = m;
1126                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1127                         break;
1128
1129                 case SOURCE_DEFER:
1130                         s->enabled = m;
1131                         break;
1132                 }
1133
1134         } else {
1135                 switch (s->type) {
1136
1137                 case SOURCE_IO:
1138                         r = source_io_register(s, m, s->io.events);
1139                         if (r < 0)
1140                                 return r;
1141
1142                         s->enabled = m;
1143                         break;
1144
1145                 case SOURCE_MONOTONIC:
1146                         s->enabled = m;
1147                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1148                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1149                         break;
1150
1151                 case SOURCE_REALTIME:
1152                         s->enabled = m;
1153                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1154                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1155                         break;
1156
1157                 case SOURCE_SIGNAL:
1158                         s->enabled = m;
1159
1160                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)  {
1161                                 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1162                                 event_update_signal_fd(s->event);
1163                         }
1164                         break;
1165
1166                 case SOURCE_CHILD:
1167                         s->enabled = m;
1168
1169                         if (s->enabled == SD_EVENT_OFF) {
1170                                 s->event->n_enabled_child_sources++;
1171
1172                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1173                                         assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1174                                         event_update_signal_fd(s->event);
1175                                 }
1176                         }
1177                         break;
1178
1179                 case SOURCE_QUIT:
1180                         s->enabled = m;
1181                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1182                         break;
1183
1184                 case SOURCE_DEFER:
1185                         s->enabled = m;
1186                         break;
1187                 }
1188         }
1189
1190         if (s->pending)
1191                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1192
1193         if (s->prepare)
1194                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1195
1196         return 0;
1197 }
1198
1199 int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1200         assert_return(s, -EINVAL);
1201         assert_return(usec, -EINVAL);
1202         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1203         assert_return(!event_pid_changed(s->event), -ECHILD);
1204
1205         *usec = s->time.next;
1206         return 0;
1207 }
1208
1209 int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1210         assert_return(s, -EINVAL);
1211         assert_return(usec != (uint64_t) -1, -EINVAL);
1212         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1213         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1214         assert_return(!event_pid_changed(s->event), -ECHILD);
1215
1216         if (s->time.next == usec)
1217                 return 0;
1218
1219         s->time.next = usec;
1220
1221         if (s->type == SOURCE_REALTIME) {
1222                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1223                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1224         } else {
1225                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1226                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1227         }
1228
1229         return 0;
1230 }
1231
1232 int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1233         assert_return(s, -EINVAL);
1234         assert_return(usec, -EINVAL);
1235         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1236         assert_return(!event_pid_changed(s->event), -ECHILD);
1237
1238         *usec = s->time.accuracy;
1239         return 0;
1240 }
1241
1242 int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1243         assert_return(s, -EINVAL);
1244         assert_return(usec != (uint64_t) -1, -EINVAL);
1245         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1246         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1247         assert_return(!event_pid_changed(s->event), -ECHILD);
1248
1249         if (usec == 0)
1250                 usec = DEFAULT_ACCURACY_USEC;
1251
1252         if (s->time.accuracy == usec)
1253                 return 0;
1254
1255         s->time.accuracy = usec;
1256
1257         if (s->type == SOURCE_REALTIME)
1258                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1259         else
1260                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1261
1262         return 0;
1263 }
1264
1265 int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid) {
1266         assert_return(s, -EINVAL);
1267         assert_return(pid, -EINVAL);
1268         assert_return(s->type == SOURCE_CHILD, -EDOM);
1269         assert_return(!event_pid_changed(s->event), -ECHILD);
1270
1271         *pid = s->child.pid;
1272         return 0;
1273 }
1274
1275 int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback) {
1276         int r;
1277
1278         assert_return(s, -EINVAL);
1279         assert_return(s->type != SOURCE_QUIT, -EDOM);
1280         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1281         assert_return(!event_pid_changed(s->event), -ECHILD);
1282
1283         if (s->prepare == callback)
1284                 return 0;
1285
1286         if (callback && s->prepare) {
1287                 s->prepare = callback;
1288                 return 0;
1289         }
1290
1291         r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1292         if (r < 0)
1293                 return r;
1294
1295         s->prepare = callback;
1296
1297         if (callback) {
1298                 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1299                 if (r < 0)
1300                         return r;
1301         } else
1302                 prioq_remove(s->event->prepare, s, &s->prepare_index);
1303
1304         return 0;
1305 }
1306
1307 void* sd_event_source_get_userdata(sd_event_source *s) {
1308         assert_return(s, NULL);
1309
1310         return s->userdata;
1311 }
1312
1313 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1314         usec_t c;
1315         assert(e);
1316         assert(a <= b);
1317
1318         if (a <= 0)
1319                 return 0;
1320
1321         if (b <= a + 1)
1322                 return a;
1323
1324         /*
1325           Find a good time to wake up again between times a and b. We
1326           have two goals here:
1327
1328           a) We want to wake up as seldom as possible, hence prefer
1329              later times over earlier times.
1330
1331           b) But if we have to wake up, then let's make sure to
1332              dispatch as much as possible on the entire system.
1333
1334           We implement this by waking up everywhere at the same time
1335           within any given second if we can, synchronised via the
1336           perturbation value determined from the boot ID. If we can't,
1337           then we try to find the same spot in every a 250ms
1338           step. Otherwise, we pick the last possible time to wake up.
1339         */
1340
1341         c = (b / USEC_PER_SEC) * USEC_PER_SEC + e->perturb;
1342         if (c >= b) {
1343                 if (_unlikely_(c < USEC_PER_SEC))
1344                         return b;
1345
1346                 c -= USEC_PER_SEC;
1347         }
1348
1349         if (c >= a)
1350                 return c;
1351
1352         c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1353         if (c >= b) {
1354                 if (_unlikely_(c < USEC_PER_MSEC*250))
1355                         return b;
1356
1357                 c -= USEC_PER_MSEC*250;
1358         }
1359
1360         if (c >= a)
1361                 return c;
1362
1363         return b;
1364 }
1365
1366 static int event_arm_timer(
1367                 sd_event *e,
1368                 int timer_fd,
1369                 Prioq *earliest,
1370                 Prioq *latest,
1371                 usec_t *next) {
1372
1373         struct itimerspec its = {};
1374         sd_event_source *a, *b;
1375         usec_t t;
1376         int r;
1377
1378         assert_se(e);
1379         assert_se(next);
1380
1381         a = prioq_peek(earliest);
1382         if (!a || a->enabled == SD_EVENT_OFF)
1383                 return 0;
1384
1385         b = prioq_peek(latest);
1386         assert_se(b && b->enabled != SD_EVENT_OFF);
1387
1388         t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1389         if (*next == t)
1390                 return 0;
1391
1392         assert_se(timer_fd >= 0);
1393
1394         if (t == 0) {
1395                 /* We don' want to disarm here, just mean some time looooong ago. */
1396                 its.it_value.tv_sec = 0;
1397                 its.it_value.tv_nsec = 1;
1398         } else
1399                 timespec_store(&its.it_value, t);
1400
1401         r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1402         if (r < 0)
1403                 return r;
1404
1405         *next = t;
1406         return 0;
1407 }
1408
1409 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1410         assert(e);
1411         assert(s);
1412         assert(s->type == SOURCE_IO);
1413
1414         s->io.revents = events;
1415
1416         /*
1417            If this is a oneshot event source, then we added it to the
1418            epoll with EPOLLONESHOT, hence we know it's not registered
1419            anymore. We can save a syscall here...
1420         */
1421
1422         if (s->enabled == SD_EVENT_ONESHOT)
1423                 s->io.registered = false;
1424
1425         return source_set_pending(s, true);
1426 }
1427
1428 static int flush_timer(sd_event *e, int fd, uint32_t events) {
1429         uint64_t x;
1430         ssize_t ss;
1431
1432         assert(e);
1433         assert(fd >= 0);
1434         assert_return(events == EPOLLIN, -EIO);
1435
1436         ss = read(fd, &x, sizeof(x));
1437         if (ss < 0) {
1438                 if (errno == EAGAIN || errno == EINTR)
1439                         return 0;
1440
1441                 return -errno;
1442         }
1443
1444         if (ss != sizeof(x))
1445                 return -EIO;
1446
1447         return 0;
1448 }
1449
1450 static int process_timer(
1451                 sd_event *e,
1452                 usec_t n,
1453                 Prioq *earliest,
1454                 Prioq *latest) {
1455
1456         sd_event_source *s;
1457         int r;
1458
1459         assert(e);
1460
1461         for (;;) {
1462                 s = prioq_peek(earliest);
1463                 if (!s ||
1464                     s->time.next > n ||
1465                     s->enabled == SD_EVENT_OFF ||
1466                     s->pending)
1467                         break;
1468
1469                 r = source_set_pending(s, true);
1470                 if (r < 0)
1471                         return r;
1472
1473                 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1474                 prioq_reshuffle(latest, s, &s->time.latest_index);
1475         }
1476
1477         return 0;
1478 }
1479
1480 static int process_child(sd_event *e) {
1481         sd_event_source *s;
1482         Iterator i;
1483         int r;
1484
1485         assert(e);
1486
1487         e->need_process_child = false;
1488
1489         /*
1490            So, this is ugly. We iteratively invoke waitid() with P_PID
1491            + WNOHANG for each PID we wait for, instead of using
1492            P_ALL. This is because we only want to get child
1493            information of very specific child processes, and not all
1494            of them. We might not have processed the SIGCHLD even of a
1495            previous invocation and we don't want to maintain a
1496            unbounded *per-child* event queue, hence we really don't
1497            want anything flushed out of the kernel's queue that we
1498            don't care about. Since this is O(n) this means that if you
1499            have a lot of processes you probably want to handle SIGCHLD
1500            yourself.
1501         */
1502
1503         HASHMAP_FOREACH(s, e->child_sources, i) {
1504                 assert(s->type == SOURCE_CHILD);
1505
1506                 if (s->pending)
1507                         continue;
1508
1509                 if (s->enabled == SD_EVENT_OFF)
1510                         continue;
1511
1512                 zero(s->child.siginfo);
1513                 r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options);
1514                 if (r < 0)
1515                         return -errno;
1516
1517                 if (s->child.siginfo.si_pid != 0) {
1518                         r = source_set_pending(s, true);
1519                         if (r < 0)
1520                                 return r;
1521                 }
1522         }
1523
1524         return 0;
1525 }
1526
1527 static int process_signal(sd_event *e, uint32_t events) {
1528         struct signalfd_siginfo si;
1529         bool read_one = false;
1530         ssize_t ss;
1531         int r;
1532
1533         assert(e);
1534         assert_return(events == EPOLLIN, -EIO);
1535
1536         for (;;) {
1537                 sd_event_source *s;
1538
1539                 ss = read(e->signal_fd, &si, sizeof(si));
1540                 if (ss < 0) {
1541                         if (errno == EAGAIN || errno == EINTR)
1542                                 return read_one;
1543
1544                         return -errno;
1545                 }
1546
1547                 if (ss != sizeof(si))
1548                         return -EIO;
1549
1550                 read_one = true;
1551
1552                 if (si.ssi_signo == SIGCHLD) {
1553                         r = process_child(e);
1554                         if (r < 0)
1555                                 return r;
1556                         if (r > 0 || !e->signal_sources[si.ssi_signo])
1557                                 continue;
1558                 } else {
1559                         s = e->signal_sources[si.ssi_signo];
1560                         if (!s)
1561                                 return -EIO;
1562                 }
1563
1564                 s->signal.siginfo = si;
1565                 r = source_set_pending(s, true);
1566                 if (r < 0)
1567                         return r;
1568         }
1569
1570
1571         return 0;
1572 }
1573
1574 static int source_dispatch(sd_event_source *s) {
1575         int r;
1576
1577         assert(s);
1578         assert(s->pending || s->type == SOURCE_QUIT);
1579
1580         if (s->type != SOURCE_DEFER && s->type != SOURCE_QUIT) {
1581                 r = source_set_pending(s, false);
1582                 if (r < 0)
1583                         return r;
1584         }
1585
1586         if (s->enabled == SD_EVENT_ONESHOT) {
1587                 r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
1588                 if (r < 0)
1589                         return r;
1590         }
1591
1592         switch (s->type) {
1593
1594         case SOURCE_IO:
1595                 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1596                 break;
1597
1598         case SOURCE_MONOTONIC:
1599                 r = s->time.callback(s, s->time.next, s->userdata);
1600                 break;
1601
1602         case SOURCE_REALTIME:
1603                 r = s->time.callback(s, s->time.next, s->userdata);
1604                 break;
1605
1606         case SOURCE_SIGNAL:
1607                 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1608                 break;
1609
1610         case SOURCE_CHILD:
1611                 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1612                 break;
1613
1614         case SOURCE_DEFER:
1615                 r = s->defer.callback(s, s->userdata);
1616                 break;
1617
1618         case SOURCE_QUIT:
1619                 r = s->quit.callback(s, s->userdata);
1620                 break;
1621         }
1622
1623         return r;
1624 }
1625
1626 static int event_prepare(sd_event *e) {
1627         int r;
1628
1629         assert(e);
1630
1631         for (;;) {
1632                 sd_event_source *s;
1633
1634                 s = prioq_peek(e->prepare);
1635                 if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
1636                         break;
1637
1638                 s->prepare_iteration = e->iteration;
1639                 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1640                 if (r < 0)
1641                         return r;
1642
1643                 assert(s->prepare);
1644                 r = s->prepare(s, s->userdata);
1645                 if (r < 0)
1646                         return r;
1647
1648         }
1649
1650         return 0;
1651 }
1652
1653 static int dispatch_quit(sd_event *e) {
1654         sd_event_source *p;
1655         int r;
1656
1657         assert(e);
1658
1659         p = prioq_peek(e->quit);
1660         if (!p || p->enabled == SD_EVENT_OFF) {
1661                 e->state = SD_EVENT_FINISHED;
1662                 return 0;
1663         }
1664
1665         sd_event_ref(e);
1666         e->iteration++;
1667         e->state = SD_EVENT_QUITTING;
1668
1669         r = source_dispatch(p);
1670
1671         e->state = SD_EVENT_PASSIVE;
1672         sd_event_unref(e);
1673
1674         return r;
1675 }
1676
1677 static sd_event_source* event_next_pending(sd_event *e) {
1678         sd_event_source *p;
1679
1680         assert(e);
1681
1682         p = prioq_peek(e->pending);
1683         if (!p)
1684                 return NULL;
1685
1686         if (p->enabled == SD_EVENT_OFF)
1687                 return NULL;
1688
1689         return p;
1690 }
1691
1692 int sd_event_run(sd_event *e, uint64_t timeout) {
1693         struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1694         sd_event_source *p;
1695         dual_timestamp n;
1696         int r, i, m;
1697
1698         assert_return(e, -EINVAL);
1699         assert_return(!event_pid_changed(e), -ECHILD);
1700         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1701         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1702
1703         if (e->quit_requested)
1704                 return dispatch_quit(e);
1705
1706         sd_event_ref(e);
1707         e->iteration++;
1708         e->state = SD_EVENT_RUNNING;
1709
1710         r = event_prepare(e);
1711         if (r < 0)
1712                 goto finish;
1713
1714         if (event_next_pending(e) || e->need_process_child)
1715                 timeout = 0;
1716
1717         if (timeout > 0) {
1718                 r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1719                 if (r < 0)
1720                         goto finish;
1721
1722                 r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1723                 if (r < 0)
1724                         goto finish;
1725         }
1726
1727         m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1728                        timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1729         if (m < 0) {
1730                 r = m;
1731                 goto finish;
1732         }
1733
1734         dual_timestamp_get(&n);
1735
1736         for (i = 0; i < m; i++) {
1737
1738                 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1739                         r = flush_timer(e, e->monotonic_fd, ev_queue[i].events);
1740                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1741                         r = flush_timer(e, e->realtime_fd, ev_queue[i].events);
1742                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1743                         r = process_signal(e, ev_queue[i].events);
1744                 else
1745                         r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1746
1747                 if (r < 0)
1748                         goto finish;
1749         }
1750
1751         r = process_timer(e, n.monotonic, e->monotonic_earliest, e->monotonic_latest);
1752         if (r < 0)
1753                 goto finish;
1754
1755         r = process_timer(e, n.realtime, e->realtime_earliest, e->realtime_latest);
1756         if (r < 0)
1757                 goto finish;
1758
1759         if (e->need_process_child) {
1760                 r = process_child(e);
1761                 if (r < 0)
1762                         goto finish;
1763         }
1764
1765         p = event_next_pending(e);
1766         if (!p) {
1767                 r = 0;
1768                 goto finish;
1769         }
1770
1771         r = source_dispatch(p);
1772
1773 finish:
1774         e->state = SD_EVENT_PASSIVE;
1775         sd_event_unref(e);
1776
1777         return r;
1778 }
1779
1780 int sd_event_loop(sd_event *e) {
1781         int r;
1782
1783         assert_return(e, -EINVAL);
1784         assert_return(!event_pid_changed(e), -ECHILD);
1785         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1786
1787         sd_event_ref(e);
1788
1789         while (e->state != SD_EVENT_FINISHED) {
1790                 r = sd_event_run(e, (uint64_t) -1);
1791                 if (r < 0)
1792                         goto finish;
1793         }
1794
1795         r = 0;
1796
1797 finish:
1798         sd_event_unref(e);
1799         return r;
1800 }
1801
1802 int sd_event_get_state(sd_event *e) {
1803         assert_return(e, -EINVAL);
1804         assert_return(!event_pid_changed(e), -ECHILD);
1805
1806         return e->state;
1807 }
1808
1809 int sd_event_get_quit(sd_event *e) {
1810         assert_return(e, -EINVAL);
1811         assert_return(!event_pid_changed(e), -ECHILD);
1812
1813         return e->quit_requested;
1814 }
1815
1816 int sd_event_request_quit(sd_event *e) {
1817         assert_return(e, -EINVAL);
1818         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1819         assert_return(!event_pid_changed(e), -ECHILD);
1820
1821         e->quit_requested = true;
1822         return 0;
1823 }