chiark / gitweb /
sd-event: EPOLLONESHOT only disables event reporting after an event. The fd is still...
[elogind.git] / src / libsystemd-bus / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25
26 #include "macro.h"
27 #include "prioq.h"
28 #include "hashmap.h"
29 #include "util.h"
30 #include "time-util.h"
31 #include "sd-id128.h"
32
33 #include "sd-event.h"
34
35 #define EPOLL_QUEUE_MAX 64
36 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
37
38 typedef enum EventSourceType {
39         SOURCE_IO,
40         SOURCE_MONOTONIC,
41         SOURCE_REALTIME,
42         SOURCE_SIGNAL,
43         SOURCE_CHILD,
44         SOURCE_DEFER,
45         SOURCE_QUIT
46 } EventSourceType;
47
48 struct sd_event_source {
49         unsigned n_ref;
50
51         sd_event *event;
52         void *userdata;
53         sd_prepare_handler_t prepare;
54
55         EventSourceType type:4;
56         int enabled:3;
57         bool pending:1;
58
59         int priority;
60         unsigned pending_index;
61         unsigned prepare_index;
62         unsigned pending_iteration;
63         unsigned prepare_iteration;
64
65         union {
66                 struct {
67                         sd_io_handler_t callback;
68                         int fd;
69                         uint32_t events;
70                         uint32_t revents;
71                         bool registered:1;
72                 } io;
73                 struct {
74                         sd_time_handler_t callback;
75                         usec_t next, accuracy;
76                         unsigned earliest_index;
77                         unsigned latest_index;
78                 } time;
79                 struct {
80                         sd_signal_handler_t callback;
81                         struct signalfd_siginfo siginfo;
82                         int sig;
83                 } signal;
84                 struct {
85                         sd_child_handler_t callback;
86                         siginfo_t siginfo;
87                         pid_t pid;
88                         int options;
89                 } child;
90                 struct {
91                         sd_defer_handler_t callback;
92                 } defer;
93                 struct {
94                         sd_quit_handler_t callback;
95                         unsigned prioq_index;
96                 } quit;
97         };
98 };
99
100 struct sd_event {
101         unsigned n_ref;
102
103         int epoll_fd;
104         int signal_fd;
105         int realtime_fd;
106         int monotonic_fd;
107
108         Prioq *pending;
109         Prioq *prepare;
110
111         /* For both clocks we maintain two priority queues each, one
112          * ordered for the earliest times the events may be
113          * dispatched, and one ordered by the latest times they must
114          * have been dispatched. The range between the top entries in
115          * the two prioqs is the time window we can freely schedule
116          * wakeups in */
117         Prioq *monotonic_earliest;
118         Prioq *monotonic_latest;
119         Prioq *realtime_earliest;
120         Prioq *realtime_latest;
121
122         usec_t realtime_next, monotonic_next;
123         usec_t perturb;
124
125         sigset_t sigset;
126         sd_event_source **signal_sources;
127
128         Hashmap *child_sources;
129         unsigned n_enabled_child_sources;
130
131         Prioq *quit;
132
133         pid_t original_pid;
134
135         unsigned iteration;
136         dual_timestamp timestamp;
137         int state;
138
139         bool quit_requested:1;
140         bool need_process_child:1;
141 };
142
143 static int pending_prioq_compare(const void *a, const void *b) {
144         const sd_event_source *x = a, *y = b;
145
146         assert(x->pending);
147         assert(y->pending);
148
149         /* Enabled ones first */
150         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
151                 return -1;
152         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
153                 return 1;
154
155         /* Lower priority values first */
156         if (x->priority < y->priority)
157                 return -1;
158         if (x->priority > y->priority)
159                 return 1;
160
161         /* Older entries first */
162         if (x->pending_iteration < y->pending_iteration)
163                 return -1;
164         if (x->pending_iteration > y->pending_iteration)
165                 return 1;
166
167         /* Stability for the rest */
168         if (x < y)
169                 return -1;
170         if (x > y)
171                 return 1;
172
173         return 0;
174 }
175
176 static int prepare_prioq_compare(const void *a, const void *b) {
177         const sd_event_source *x = a, *y = b;
178
179         assert(x->prepare);
180         assert(y->prepare);
181
182         /* Move most recently prepared ones last, so that we can stop
183          * preparing as soon as we hit one that has already been
184          * prepared in the current iteration */
185         if (x->prepare_iteration < y->prepare_iteration)
186                 return -1;
187         if (x->prepare_iteration > y->prepare_iteration)
188                 return 1;
189
190         /* Enabled ones first */
191         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
192                 return -1;
193         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
194                 return 1;
195
196         /* Lower priority values first */
197         if (x->priority < y->priority)
198                 return -1;
199         if (x->priority > y->priority)
200                 return 1;
201
202         /* Stability for the rest */
203         if (x < y)
204                 return -1;
205         if (x > y)
206                 return 1;
207
208         return 0;
209 }
210
211 static int earliest_time_prioq_compare(const void *a, const void *b) {
212         const sd_event_source *x = a, *y = b;
213
214         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
215         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
216
217         /* Enabled ones first */
218         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
219                 return -1;
220         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
221                 return 1;
222
223         /* Move the pending ones to the end */
224         if (!x->pending && y->pending)
225                 return -1;
226         if (x->pending && !y->pending)
227                 return 1;
228
229         /* Order by time */
230         if (x->time.next < y->time.next)
231                 return -1;
232         if (x->time.next > y->time.next)
233                 return -1;
234
235         /* Stability for the rest */
236         if (x < y)
237                 return -1;
238         if (x > y)
239                 return 1;
240
241         return 0;
242 }
243
244 static int latest_time_prioq_compare(const void *a, const void *b) {
245         const sd_event_source *x = a, *y = b;
246
247         assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
248                (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
249
250         /* Enabled ones first */
251         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
252                 return -1;
253         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
254                 return 1;
255
256         /* Move the pending ones to the end */
257         if (!x->pending && y->pending)
258                 return -1;
259         if (x->pending && !y->pending)
260                 return 1;
261
262         /* Order by time */
263         if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
264                 return -1;
265         if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
266                 return -1;
267
268         /* Stability for the rest */
269         if (x < y)
270                 return -1;
271         if (x > y)
272                 return 1;
273
274         return 0;
275 }
276
277 static int quit_prioq_compare(const void *a, const void *b) {
278         const sd_event_source *x = a, *y = b;
279
280         assert(x->type == SOURCE_QUIT);
281         assert(y->type == SOURCE_QUIT);
282
283         /* Enabled ones first */
284         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
285                 return -1;
286         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
287                 return 1;
288
289         /* Lower priority values first */
290         if (x->priority < y->priority)
291                 return -1;
292         if (x->priority > y->priority)
293                 return 1;
294
295         /* Stability for the rest */
296         if (x < y)
297                 return -1;
298         if (x > y)
299                 return 1;
300
301         return 0;
302 }
303
304 static void event_free(sd_event *e) {
305         assert(e);
306
307         if (e->epoll_fd >= 0)
308                 close_nointr_nofail(e->epoll_fd);
309
310         if (e->signal_fd >= 0)
311                 close_nointr_nofail(e->signal_fd);
312
313         if (e->realtime_fd >= 0)
314                 close_nointr_nofail(e->realtime_fd);
315
316         if (e->monotonic_fd >= 0)
317                 close_nointr_nofail(e->monotonic_fd);
318
319         prioq_free(e->pending);
320         prioq_free(e->prepare);
321         prioq_free(e->monotonic_earliest);
322         prioq_free(e->monotonic_latest);
323         prioq_free(e->realtime_earliest);
324         prioq_free(e->realtime_latest);
325         prioq_free(e->quit);
326
327         free(e->signal_sources);
328
329         hashmap_free(e->child_sources);
330         free(e);
331 }
332
333 int sd_event_new(sd_event** ret) {
334         sd_event *e;
335         int r;
336
337         assert_return(ret, -EINVAL);
338
339         e = new0(sd_event, 1);
340         if (!e)
341                 return -ENOMEM;
342
343         e->n_ref = 1;
344         e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
345         e->realtime_next = e->monotonic_next = (usec_t) -1;
346         e->original_pid = getpid();
347
348         assert_se(sigemptyset(&e->sigset) == 0);
349
350         e->pending = prioq_new(pending_prioq_compare);
351         if (!e->pending) {
352                 r = -ENOMEM;
353                 goto fail;
354         }
355
356         e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
357         if (e->epoll_fd < 0) {
358                 r = -errno;
359                 goto fail;
360         }
361
362         *ret = e;
363         return 0;
364
365 fail:
366         event_free(e);
367         return r;
368 }
369
370 sd_event* sd_event_ref(sd_event *e) {
371         assert_return(e, NULL);
372
373         assert(e->n_ref >= 1);
374         e->n_ref++;
375
376         return e;
377 }
378
379 sd_event* sd_event_unref(sd_event *e) {
380         assert_return(e, NULL);
381
382         assert(e->n_ref >= 1);
383         e->n_ref--;
384
385         if (e->n_ref <= 0)
386                 event_free(e);
387
388         return NULL;
389 }
390
391 static bool event_pid_changed(sd_event *e) {
392         assert(e);
393
394         /* We don't support people creating am event loop and keeping
395          * it around over a fork(). Let's complain. */
396
397         return e->original_pid != getpid();
398 }
399
400 static int source_io_unregister(sd_event_source *s) {
401         int r;
402
403         assert(s);
404         assert(s->type == SOURCE_IO);
405
406         if (!s->io.registered)
407                 return 0;
408
409         r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
410         if (r < 0)
411                 return -errno;
412
413         s->io.registered = false;
414         return 0;
415 }
416
417 static int source_io_register(
418                 sd_event_source *s,
419                 int enabled,
420                 uint32_t events) {
421
422         struct epoll_event ev = {};
423         int r;
424
425         assert(s);
426         assert(s->type == SOURCE_IO);
427         assert(enabled != SD_EVENT_OFF);
428
429         ev.events = events;
430         ev.data.ptr = s;
431
432         if (enabled == SD_EVENT_ONESHOT)
433                 ev.events |= EPOLLONESHOT;
434
435         if (s->io.registered)
436                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
437         else
438                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
439
440         if (r < 0)
441                 return -errno;
442
443         s->io.registered = true;
444
445         return 0;
446 }
447
448 static void source_free(sd_event_source *s) {
449         assert(s);
450
451         if (s->event) {
452                 switch (s->type) {
453
454                 case SOURCE_IO:
455                         if (s->io.fd >= 0)
456                                 source_io_unregister(s);
457
458                         break;
459
460                 case SOURCE_MONOTONIC:
461                         prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
462                         prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
463                         break;
464
465                 case SOURCE_REALTIME:
466                         prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
467                         prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
468                         break;
469
470                 case SOURCE_SIGNAL:
471                         if (s->signal.sig > 0) {
472                                 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)
473                                         assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
474
475                                 if (s->event->signal_sources)
476                                         s->event->signal_sources[s->signal.sig] = NULL;
477                         }
478
479                         break;
480
481                 case SOURCE_CHILD:
482                         if (s->child.pid > 0) {
483                                 if (s->enabled != SD_EVENT_OFF) {
484                                         assert(s->event->n_enabled_child_sources > 0);
485                                         s->event->n_enabled_child_sources--;
486                                 }
487
488                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
489                                         assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
490
491                                 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
492                         }
493
494                         break;
495
496                 case SOURCE_DEFER:
497                         /* nothing */
498                         break;
499
500                 case SOURCE_QUIT:
501                         prioq_remove(s->event->quit, s, &s->quit.prioq_index);
502                         break;
503                 }
504
505                 if (s->pending)
506                         prioq_remove(s->event->pending, s, &s->pending_index);
507
508                 if (s->prepare)
509                         prioq_remove(s->event->prepare, s, &s->prepare_index);
510
511                 sd_event_unref(s->event);
512         }
513
514         free(s);
515 }
516
517 static int source_set_pending(sd_event_source *s, bool b) {
518         int r;
519
520         assert(s);
521         assert(s->type != SOURCE_QUIT);
522
523         if (s->pending == b)
524                 return 0;
525
526         s->pending = b;
527
528         if (b) {
529                 s->pending_iteration = s->event->iteration;
530
531                 r = prioq_put(s->event->pending, s, &s->pending_index);
532                 if (r < 0) {
533                         s->pending = false;
534                         return r;
535                 }
536         } else
537                 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
538
539         return 0;
540 }
541
542 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
543         sd_event_source *s;
544
545         assert(e);
546
547         s = new0(sd_event_source, 1);
548         if (!s)
549                 return NULL;
550
551         s->n_ref = 1;
552         s->event = sd_event_ref(e);
553         s->type = type;
554         s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
555
556         return s;
557 }
558
559 int sd_event_add_io(
560                 sd_event *e,
561                 int fd,
562                 uint32_t events,
563                 sd_io_handler_t callback,
564                 void *userdata,
565                 sd_event_source **ret) {
566
567         sd_event_source *s;
568         int r;
569
570         assert_return(e, -EINVAL);
571         assert_return(fd >= 0, -EINVAL);
572         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
573         assert_return(callback, -EINVAL);
574         assert_return(ret, -EINVAL);
575         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
576         assert_return(!event_pid_changed(e), -ECHILD);
577
578         s = source_new(e, SOURCE_IO);
579         if (!s)
580                 return -ENOMEM;
581
582         s->io.fd = fd;
583         s->io.events = events;
584         s->io.callback = callback;
585         s->userdata = userdata;
586         s->enabled = SD_EVENT_ON;
587
588         r = source_io_register(s, s->enabled, events);
589         if (r < 0) {
590                 source_free(s);
591                 return -errno;
592         }
593
594         *ret = s;
595         return 0;
596 }
597
598 static int event_setup_timer_fd(
599                 sd_event *e,
600                 EventSourceType type,
601                 int *timer_fd,
602                 clockid_t id) {
603
604         struct epoll_event ev = {};
605         int r, fd;
606         sd_id128_t bootid;
607
608         assert(e);
609         assert(timer_fd);
610
611         if (_likely_(*timer_fd >= 0))
612                 return 0;
613
614         fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
615         if (fd < 0)
616                 return -errno;
617
618         ev.events = EPOLLIN;
619         ev.data.ptr = INT_TO_PTR(type);
620
621         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
622         if (r < 0) {
623                 close_nointr_nofail(fd);
624                 return -errno;
625         }
626
627         /* When we sleep for longer, we try to realign the wakeup to
628            the same time wihtin each second, so that events all across
629            the system can be coalesced into a single CPU
630            wakeup. However, let's take some system-specific randomness
631            for this value, so that in a network of systems with synced
632            clocks timer events are distributed a bit. Here, we
633            calculate a perturbation usec offset from the boot ID. */
634
635         if (sd_id128_get_boot(&bootid) >= 0)
636                 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_SEC;
637
638         *timer_fd = fd;
639         return 0;
640 }
641
642 static int event_add_time_internal(
643                 sd_event *e,
644                 EventSourceType type,
645                 int *timer_fd,
646                 clockid_t id,
647                 Prioq **earliest,
648                 Prioq **latest,
649                 uint64_t usec,
650                 uint64_t accuracy,
651                 sd_time_handler_t callback,
652                 void *userdata,
653                 sd_event_source **ret) {
654
655         sd_event_source *s;
656         int r;
657
658         assert_return(e, -EINVAL);
659         assert_return(callback, -EINVAL);
660         assert_return(ret, -EINVAL);
661         assert_return(usec != (uint64_t) -1, -EINVAL);
662         assert_return(accuracy != (uint64_t) -1, -EINVAL);
663         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
664         assert_return(!event_pid_changed(e), -ECHILD);
665
666         assert(timer_fd);
667         assert(earliest);
668         assert(latest);
669
670         if (!*earliest) {
671                 *earliest = prioq_new(earliest_time_prioq_compare);
672                 if (!*earliest)
673                         return -ENOMEM;
674         }
675
676         if (!*latest) {
677                 *latest = prioq_new(latest_time_prioq_compare);
678                 if (!*latest)
679                         return -ENOMEM;
680         }
681
682         if (*timer_fd < 0) {
683                 r = event_setup_timer_fd(e, type, timer_fd, id);
684                 if (r < 0)
685                         return r;
686         }
687
688         s = source_new(e, type);
689         if (!s)
690                 return -ENOMEM;
691
692         s->time.next = usec;
693         s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
694         s->time.callback = callback;
695         s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
696         s->userdata = userdata;
697         s->enabled = SD_EVENT_ONESHOT;
698
699         r = prioq_put(*earliest, s, &s->time.earliest_index);
700         if (r < 0)
701                 goto fail;
702
703         r = prioq_put(*latest, s, &s->time.latest_index);
704         if (r < 0)
705                 goto fail;
706
707         *ret = s;
708         return 0;
709
710 fail:
711         source_free(s);
712         return r;
713 }
714
715 int sd_event_add_monotonic(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
716         return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
717 }
718
719 int sd_event_add_realtime(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
720         return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
721 }
722
723 static int event_update_signal_fd(sd_event *e) {
724         struct epoll_event ev = {};
725         bool add_to_epoll;
726         int r;
727
728         assert(e);
729
730         add_to_epoll = e->signal_fd < 0;
731
732         r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
733         if (r < 0)
734                 return -errno;
735
736         e->signal_fd = r;
737
738         if (!add_to_epoll)
739                 return 0;
740
741         ev.events = EPOLLIN;
742         ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
743
744         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
745         if (r < 0) {
746                 close_nointr_nofail(e->signal_fd);
747                 e->signal_fd = -1;
748
749                 return -errno;
750         }
751
752         return 0;
753 }
754
755 int sd_event_add_signal(
756                 sd_event *e,
757                 int sig,
758                 sd_signal_handler_t callback,
759                 void *userdata,
760                 sd_event_source **ret) {
761
762         sd_event_source *s;
763         int r;
764
765         assert_return(e, -EINVAL);
766         assert_return(sig > 0, -EINVAL);
767         assert_return(sig < _NSIG, -EINVAL);
768         assert_return(callback, -EINVAL);
769         assert_return(ret, -EINVAL);
770         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
771         assert_return(!event_pid_changed(e), -ECHILD);
772
773         if (!e->signal_sources) {
774                 e->signal_sources = new0(sd_event_source*, _NSIG);
775                 if (!e->signal_sources)
776                         return -ENOMEM;
777         } else if (e->signal_sources[sig])
778                 return -EBUSY;
779
780         s = source_new(e, SOURCE_SIGNAL);
781         if (!s)
782                 return -ENOMEM;
783
784         s->signal.sig = sig;
785         s->signal.callback = callback;
786         s->userdata = userdata;
787         s->enabled = SD_EVENT_ON;
788
789         e->signal_sources[sig] = s;
790         assert_se(sigaddset(&e->sigset, sig) == 0);
791
792         if (sig != SIGCHLD || e->n_enabled_child_sources == 0) {
793                 r = event_update_signal_fd(e);
794                 if (r < 0) {
795                         source_free(s);
796                         return r;
797                 }
798         }
799
800         *ret = s;
801         return 0;
802 }
803
804 int sd_event_add_child(
805                 sd_event *e,
806                 pid_t pid,
807                 int options,
808                 sd_child_handler_t callback,
809                 void *userdata,
810                 sd_event_source **ret) {
811
812         sd_event_source *s;
813         int r;
814
815         assert_return(e, -EINVAL);
816         assert_return(pid > 1, -EINVAL);
817         assert_return(!(options & ~(WEXITED|WSTOPPED|WCONTINUED)), -EINVAL);
818         assert_return(options != 0, -EINVAL);
819         assert_return(callback, -EINVAL);
820         assert_return(ret, -EINVAL);
821         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
822         assert_return(!event_pid_changed(e), -ECHILD);
823
824         r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
825         if (r < 0)
826                 return r;
827
828         if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
829                 return -EBUSY;
830
831         s = source_new(e, SOURCE_CHILD);
832         if (!s)
833                 return -ENOMEM;
834
835         s->child.pid = pid;
836         s->child.options = options;
837         s->child.callback = callback;
838         s->userdata = userdata;
839         s->enabled = SD_EVENT_ONESHOT;
840
841         r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
842         if (r < 0) {
843                 source_free(s);
844                 return r;
845         }
846
847         e->n_enabled_child_sources ++;
848
849         assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
850
851         if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
852                 r = event_update_signal_fd(e);
853                 if (r < 0) {
854                         source_free(s);
855                         return -errno;
856                 }
857         }
858
859         e->need_process_child = true;
860
861         *ret = s;
862         return 0;
863 }
864
865 int sd_event_add_defer(
866                 sd_event *e,
867                 sd_defer_handler_t callback,
868                 void *userdata,
869                 sd_event_source **ret) {
870
871         sd_event_source *s;
872         int r;
873
874         assert_return(e, -EINVAL);
875         assert_return(callback, -EINVAL);
876         assert_return(ret, -EINVAL);
877         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
878         assert_return(!event_pid_changed(e), -ECHILD);
879
880         s = source_new(e, SOURCE_DEFER);
881         if (!s)
882                 return -ENOMEM;
883
884         s->defer.callback = callback;
885         s->userdata = userdata;
886         s->enabled = SD_EVENT_ONESHOT;
887
888         r = source_set_pending(s, true);
889         if (r < 0) {
890                 source_free(s);
891                 return r;
892         }
893
894         *ret = s;
895         return 0;
896 }
897
898 int sd_event_add_quit(
899                 sd_event *e,
900                 sd_quit_handler_t callback,
901                 void *userdata,
902                 sd_event_source **ret) {
903
904         sd_event_source *s;
905         int r;
906
907         assert_return(e, -EINVAL);
908         assert_return(callback, -EINVAL);
909         assert_return(ret, -EINVAL);
910         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
911         assert_return(!event_pid_changed(e), -ECHILD);
912
913         if (!e->quit) {
914                 e->quit = prioq_new(quit_prioq_compare);
915                 if (!e->quit)
916                         return -ENOMEM;
917         }
918
919         s = source_new(e, SOURCE_QUIT);
920         if (!s)
921                 return -ENOMEM;
922
923         s->quit.callback = callback;
924         s->userdata = userdata;
925         s->quit.prioq_index = PRIOQ_IDX_NULL;
926         s->enabled = SD_EVENT_ONESHOT;
927
928         r = prioq_put(s->event->quit, s, &s->quit.prioq_index);
929         if (r < 0) {
930                 source_free(s);
931                 return r;
932         }
933
934         *ret = s;
935         return 0;
936 }
937
938 sd_event_source* sd_event_source_ref(sd_event_source *s) {
939         assert_return(s, NULL);
940
941         assert(s->n_ref >= 1);
942         s->n_ref++;
943
944         return s;
945 }
946
947 sd_event_source* sd_event_source_unref(sd_event_source *s) {
948         assert_return(s, NULL);
949
950         assert(s->n_ref >= 1);
951         s->n_ref--;
952
953         if (s->n_ref <= 0)
954                 source_free(s);
955
956         return NULL;
957 }
958
959 sd_event *sd_event_get(sd_event_source *s) {
960         assert_return(s, NULL);
961
962         return s->event;
963 }
964
965 int sd_event_source_get_pending(sd_event_source *s) {
966         assert_return(s, -EINVAL);
967         assert_return(s->type != SOURCE_QUIT, -EDOM);
968         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
969         assert_return(!event_pid_changed(s->event), -ECHILD);
970
971         return s->pending;
972 }
973
974 int sd_event_source_get_io_fd(sd_event_source *s) {
975         assert_return(s, -EINVAL);
976         assert_return(s->type == SOURCE_IO, -EDOM);
977         assert_return(!event_pid_changed(s->event), -ECHILD);
978
979         return s->io.fd;
980 }
981
982 int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
983         assert_return(s, -EINVAL);
984         assert_return(events, -EINVAL);
985         assert_return(s->type == SOURCE_IO, -EDOM);
986         assert_return(!event_pid_changed(s->event), -ECHILD);
987
988         *events = s->io.events;
989         return 0;
990 }
991
992 int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
993         int r;
994
995         assert_return(s, -EINVAL);
996         assert_return(s->type == SOURCE_IO, -EDOM);
997         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
998         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
999         assert_return(!event_pid_changed(s->event), -ECHILD);
1000
1001         if (s->io.events == events)
1002                 return 0;
1003
1004         if (s->enabled != SD_EVENT_OFF) {
1005                 r = source_io_register(s, s->enabled, events);
1006                 if (r < 0)
1007                         return r;
1008         }
1009
1010         s->io.events = events;
1011
1012         return 0;
1013 }
1014
1015 int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1016         assert_return(s, -EINVAL);
1017         assert_return(revents, -EINVAL);
1018         assert_return(s->type == SOURCE_IO, -EDOM);
1019         assert_return(s->pending, -ENODATA);
1020         assert_return(!event_pid_changed(s->event), -ECHILD);
1021
1022         *revents = s->io.revents;
1023         return 0;
1024 }
1025
1026 int sd_event_source_get_signal(sd_event_source *s) {
1027         assert_return(s, -EINVAL);
1028         assert_return(s->type == SOURCE_SIGNAL, -EDOM);
1029         assert_return(!event_pid_changed(s->event), -ECHILD);
1030
1031         return s->signal.sig;
1032 }
1033
1034 int sd_event_source_get_priority(sd_event_source *s, int *priority) {
1035         assert_return(s, -EINVAL);
1036         assert_return(!event_pid_changed(s->event), -ECHILD);
1037
1038         return s->priority;
1039 }
1040
1041 int sd_event_source_set_priority(sd_event_source *s, int priority) {
1042         assert_return(s, -EINVAL);
1043         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1044         assert_return(!event_pid_changed(s->event), -ECHILD);
1045
1046         if (s->priority == priority)
1047                 return 0;
1048
1049         s->priority = priority;
1050
1051         if (s->pending)
1052                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1053
1054         if (s->prepare)
1055                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1056
1057         if (s->type == SOURCE_QUIT)
1058                 prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1059
1060         return 0;
1061 }
1062
1063 int sd_event_source_get_enabled(sd_event_source *s, int *m) {
1064         assert_return(s, -EINVAL);
1065         assert_return(m, -EINVAL);
1066         assert_return(!event_pid_changed(s->event), -ECHILD);
1067
1068         *m = s->enabled;
1069         return 0;
1070 }
1071
1072 int sd_event_source_set_enabled(sd_event_source *s, int m) {
1073         int r;
1074
1075         assert_return(s, -EINVAL);
1076         assert_return(m == SD_EVENT_OFF || m == SD_EVENT_ON || m == SD_EVENT_ONESHOT, -EINVAL);
1077         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1078         assert_return(!event_pid_changed(s->event), -ECHILD);
1079
1080         if (s->enabled == m)
1081                 return 0;
1082
1083         if (m == SD_EVENT_OFF) {
1084
1085                 switch (s->type) {
1086
1087                 case SOURCE_IO:
1088                         r = source_io_unregister(s);
1089                         if (r < 0)
1090                                 return r;
1091
1092                         s->enabled = m;
1093                         break;
1094
1095                 case SOURCE_MONOTONIC:
1096                         s->enabled = m;
1097                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1098                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1099                         break;
1100
1101                 case SOURCE_REALTIME:
1102                         s->enabled = m;
1103                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1104                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1105                         break;
1106
1107                 case SOURCE_SIGNAL:
1108                         s->enabled = m;
1109                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1110                                 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1111                                 event_update_signal_fd(s->event);
1112                         }
1113
1114                         break;
1115
1116                 case SOURCE_CHILD:
1117                         s->enabled = m;
1118
1119                         assert(s->event->n_enabled_child_sources > 0);
1120                         s->event->n_enabled_child_sources--;
1121
1122                         if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1123                                 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1124                                 event_update_signal_fd(s->event);
1125                         }
1126
1127                         break;
1128
1129                 case SOURCE_QUIT:
1130                         s->enabled = m;
1131                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1132                         break;
1133
1134                 case SOURCE_DEFER:
1135                         s->enabled = m;
1136                         break;
1137                 }
1138
1139         } else {
1140                 switch (s->type) {
1141
1142                 case SOURCE_IO:
1143                         r = source_io_register(s, m, s->io.events);
1144                         if (r < 0)
1145                                 return r;
1146
1147                         s->enabled = m;
1148                         break;
1149
1150                 case SOURCE_MONOTONIC:
1151                         s->enabled = m;
1152                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1153                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1154                         break;
1155
1156                 case SOURCE_REALTIME:
1157                         s->enabled = m;
1158                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1159                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1160                         break;
1161
1162                 case SOURCE_SIGNAL:
1163                         s->enabled = m;
1164
1165                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)  {
1166                                 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1167                                 event_update_signal_fd(s->event);
1168                         }
1169                         break;
1170
1171                 case SOURCE_CHILD:
1172                         s->enabled = m;
1173
1174                         if (s->enabled == SD_EVENT_OFF) {
1175                                 s->event->n_enabled_child_sources++;
1176
1177                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1178                                         assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1179                                         event_update_signal_fd(s->event);
1180                                 }
1181                         }
1182                         break;
1183
1184                 case SOURCE_QUIT:
1185                         s->enabled = m;
1186                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1187                         break;
1188
1189                 case SOURCE_DEFER:
1190                         s->enabled = m;
1191                         break;
1192                 }
1193         }
1194
1195         if (s->pending)
1196                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1197
1198         if (s->prepare)
1199                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1200
1201         return 0;
1202 }
1203
1204 int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1205         assert_return(s, -EINVAL);
1206         assert_return(usec, -EINVAL);
1207         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1208         assert_return(!event_pid_changed(s->event), -ECHILD);
1209
1210         *usec = s->time.next;
1211         return 0;
1212 }
1213
1214 int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1215         assert_return(s, -EINVAL);
1216         assert_return(usec != (uint64_t) -1, -EINVAL);
1217         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1218         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1219         assert_return(!event_pid_changed(s->event), -ECHILD);
1220
1221         if (s->time.next == usec)
1222                 return 0;
1223
1224         s->time.next = usec;
1225
1226         if (s->type == SOURCE_REALTIME) {
1227                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1228                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1229         } else {
1230                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1231                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1232         }
1233
1234         return 0;
1235 }
1236
1237 int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1238         assert_return(s, -EINVAL);
1239         assert_return(usec, -EINVAL);
1240         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1241         assert_return(!event_pid_changed(s->event), -ECHILD);
1242
1243         *usec = s->time.accuracy;
1244         return 0;
1245 }
1246
1247 int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1248         assert_return(s, -EINVAL);
1249         assert_return(usec != (uint64_t) -1, -EINVAL);
1250         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1251         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1252         assert_return(!event_pid_changed(s->event), -ECHILD);
1253
1254         if (usec == 0)
1255                 usec = DEFAULT_ACCURACY_USEC;
1256
1257         if (s->time.accuracy == usec)
1258                 return 0;
1259
1260         s->time.accuracy = usec;
1261
1262         if (s->type == SOURCE_REALTIME)
1263                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1264         else
1265                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1266
1267         return 0;
1268 }
1269
1270 int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid) {
1271         assert_return(s, -EINVAL);
1272         assert_return(pid, -EINVAL);
1273         assert_return(s->type == SOURCE_CHILD, -EDOM);
1274         assert_return(!event_pid_changed(s->event), -ECHILD);
1275
1276         *pid = s->child.pid;
1277         return 0;
1278 }
1279
1280 int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback) {
1281         int r;
1282
1283         assert_return(s, -EINVAL);
1284         assert_return(s->type != SOURCE_QUIT, -EDOM);
1285         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1286         assert_return(!event_pid_changed(s->event), -ECHILD);
1287
1288         if (s->prepare == callback)
1289                 return 0;
1290
1291         if (callback && s->prepare) {
1292                 s->prepare = callback;
1293                 return 0;
1294         }
1295
1296         r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1297         if (r < 0)
1298                 return r;
1299
1300         s->prepare = callback;
1301
1302         if (callback) {
1303                 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1304                 if (r < 0)
1305                         return r;
1306         } else
1307                 prioq_remove(s->event->prepare, s, &s->prepare_index);
1308
1309         return 0;
1310 }
1311
1312 void* sd_event_source_get_userdata(sd_event_source *s) {
1313         assert_return(s, NULL);
1314
1315         return s->userdata;
1316 }
1317
1318 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1319         usec_t c;
1320         assert(e);
1321         assert(a <= b);
1322
1323         if (a <= 0)
1324                 return 0;
1325
1326         if (b <= a + 1)
1327                 return a;
1328
1329         /*
1330           Find a good time to wake up again between times a and b. We
1331           have two goals here:
1332
1333           a) We want to wake up as seldom as possible, hence prefer
1334              later times over earlier times.
1335
1336           b) But if we have to wake up, then let's make sure to
1337              dispatch as much as possible on the entire system.
1338
1339           We implement this by waking up everywhere at the same time
1340           within any given second if we can, synchronised via the
1341           perturbation value determined from the boot ID. If we can't,
1342           then we try to find the same spot in every a 250ms
1343           step. Otherwise, we pick the last possible time to wake up.
1344         */
1345
1346         c = (b / USEC_PER_SEC) * USEC_PER_SEC + e->perturb;
1347         if (c >= b) {
1348                 if (_unlikely_(c < USEC_PER_SEC))
1349                         return b;
1350
1351                 c -= USEC_PER_SEC;
1352         }
1353
1354         if (c >= a)
1355                 return c;
1356
1357         c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1358         if (c >= b) {
1359                 if (_unlikely_(c < USEC_PER_MSEC*250))
1360                         return b;
1361
1362                 c -= USEC_PER_MSEC*250;
1363         }
1364
1365         if (c >= a)
1366                 return c;
1367
1368         return b;
1369 }
1370
1371 static int event_arm_timer(
1372                 sd_event *e,
1373                 int timer_fd,
1374                 Prioq *earliest,
1375                 Prioq *latest,
1376                 usec_t *next) {
1377
1378         struct itimerspec its = {};
1379         sd_event_source *a, *b;
1380         usec_t t;
1381         int r;
1382
1383         assert_se(e);
1384         assert_se(next);
1385
1386         a = prioq_peek(earliest);
1387         if (!a || a->enabled == SD_EVENT_OFF) {
1388
1389                 if (*next == (usec_t) -1)
1390                         return 0;
1391
1392                 /* disarm */
1393                 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1394                 if (r < 0)
1395                         return r;
1396
1397                 *next = (usec_t) -1;
1398
1399                 return 0;
1400         }
1401
1402         b = prioq_peek(latest);
1403         assert_se(b && b->enabled != SD_EVENT_OFF);
1404
1405         t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1406         if (*next == t)
1407                 return 0;
1408
1409         assert_se(timer_fd >= 0);
1410
1411         if (t == 0) {
1412                 /* We don' want to disarm here, just mean some time looooong ago. */
1413                 its.it_value.tv_sec = 0;
1414                 its.it_value.tv_nsec = 1;
1415         } else
1416                 timespec_store(&its.it_value, t);
1417
1418         r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1419         if (r < 0)
1420                 return r;
1421
1422         *next = t;
1423         return 0;
1424 }
1425
1426 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1427         assert(e);
1428         assert(s);
1429         assert(s->type == SOURCE_IO);
1430
1431         s->io.revents = events;
1432
1433         return source_set_pending(s, true);
1434 }
1435
1436 static int flush_timer(sd_event *e, int fd, uint32_t events, usec_t *next) {
1437         uint64_t x;
1438         ssize_t ss;
1439
1440         assert(e);
1441         assert(fd >= 0);
1442         assert(next);
1443
1444         assert_return(events == EPOLLIN, -EIO);
1445
1446         ss = read(fd, &x, sizeof(x));
1447         if (ss < 0) {
1448                 if (errno == EAGAIN || errno == EINTR)
1449                         return 0;
1450
1451                 return -errno;
1452         }
1453
1454         if (ss != sizeof(x))
1455                 return -EIO;
1456
1457         *next = (usec_t) -1;
1458
1459         return 0;
1460 }
1461
1462 static int process_timer(
1463                 sd_event *e,
1464                 usec_t n,
1465                 Prioq *earliest,
1466                 Prioq *latest) {
1467
1468         sd_event_source *s;
1469         int r;
1470
1471         assert(e);
1472
1473         for (;;) {
1474                 s = prioq_peek(earliest);
1475                 if (!s ||
1476                     s->time.next > n ||
1477                     s->enabled == SD_EVENT_OFF ||
1478                     s->pending)
1479                         break;
1480
1481                 r = source_set_pending(s, true);
1482                 if (r < 0)
1483                         return r;
1484
1485                 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1486                 prioq_reshuffle(latest, s, &s->time.latest_index);
1487         }
1488
1489         return 0;
1490 }
1491
1492 static int process_child(sd_event *e) {
1493         sd_event_source *s;
1494         Iterator i;
1495         int r;
1496
1497         assert(e);
1498
1499         e->need_process_child = false;
1500
1501         /*
1502            So, this is ugly. We iteratively invoke waitid() with P_PID
1503            + WNOHANG for each PID we wait for, instead of using
1504            P_ALL. This is because we only want to get child
1505            information of very specific child processes, and not all
1506            of them. We might not have processed the SIGCHLD even of a
1507            previous invocation and we don't want to maintain a
1508            unbounded *per-child* event queue, hence we really don't
1509            want anything flushed out of the kernel's queue that we
1510            don't care about. Since this is O(n) this means that if you
1511            have a lot of processes you probably want to handle SIGCHLD
1512            yourself.
1513         */
1514
1515         HASHMAP_FOREACH(s, e->child_sources, i) {
1516                 assert(s->type == SOURCE_CHILD);
1517
1518                 if (s->pending)
1519                         continue;
1520
1521                 if (s->enabled == SD_EVENT_OFF)
1522                         continue;
1523
1524                 zero(s->child.siginfo);
1525                 r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options);
1526                 if (r < 0)
1527                         return -errno;
1528
1529                 if (s->child.siginfo.si_pid != 0) {
1530                         r = source_set_pending(s, true);
1531                         if (r < 0)
1532                                 return r;
1533                 }
1534         }
1535
1536         return 0;
1537 }
1538
1539 static int process_signal(sd_event *e, uint32_t events) {
1540         bool read_one = false;
1541         int r;
1542
1543         assert(e);
1544         assert(e->signal_sources);
1545
1546         assert_return(events == EPOLLIN, -EIO);
1547
1548         for (;;) {
1549                 struct signalfd_siginfo si;
1550                 ssize_t ss;
1551                 sd_event_source *s;
1552
1553                 ss = read(e->signal_fd, &si, sizeof(si));
1554                 if (ss < 0) {
1555                         if (errno == EAGAIN || errno == EINTR)
1556                                 return read_one;
1557
1558                         return -errno;
1559                 }
1560
1561                 if (ss != sizeof(si))
1562                         return -EIO;
1563
1564                 read_one = true;
1565
1566                 s = e->signal_sources[si.ssi_signo];
1567                 if (si.ssi_signo == SIGCHLD) {
1568                         r = process_child(e);
1569                         if (r < 0)
1570                                 return r;
1571                         if (r > 0 || !s)
1572                                 continue;
1573                 } else
1574                         if (!s)
1575                                 return -EIO;
1576
1577                 s->signal.siginfo = si;
1578                 r = source_set_pending(s, true);
1579                 if (r < 0)
1580                         return r;
1581         }
1582
1583
1584         return 0;
1585 }
1586
1587 static int source_dispatch(sd_event_source *s) {
1588         int r = 0;
1589
1590         assert(s);
1591         assert(s->pending || s->type == SOURCE_QUIT);
1592
1593         if (s->type != SOURCE_DEFER && s->type != SOURCE_QUIT) {
1594                 r = source_set_pending(s, false);
1595                 if (r < 0)
1596                         return r;
1597         }
1598
1599         if (s->enabled == SD_EVENT_ONESHOT) {
1600                 r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
1601                 if (r < 0)
1602                         return r;
1603         }
1604
1605         switch (s->type) {
1606
1607         case SOURCE_IO:
1608                 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1609                 break;
1610
1611         case SOURCE_MONOTONIC:
1612                 r = s->time.callback(s, s->time.next, s->userdata);
1613                 break;
1614
1615         case SOURCE_REALTIME:
1616                 r = s->time.callback(s, s->time.next, s->userdata);
1617                 break;
1618
1619         case SOURCE_SIGNAL:
1620                 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1621                 break;
1622
1623         case SOURCE_CHILD:
1624                 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1625                 break;
1626
1627         case SOURCE_DEFER:
1628                 r = s->defer.callback(s, s->userdata);
1629                 break;
1630
1631         case SOURCE_QUIT:
1632                 r = s->quit.callback(s, s->userdata);
1633                 break;
1634         }
1635
1636         return r;
1637 }
1638
1639 static int event_prepare(sd_event *e) {
1640         int r;
1641
1642         assert(e);
1643
1644         for (;;) {
1645                 sd_event_source *s;
1646
1647                 s = prioq_peek(e->prepare);
1648                 if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
1649                         break;
1650
1651                 s->prepare_iteration = e->iteration;
1652                 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1653                 if (r < 0)
1654                         return r;
1655
1656                 assert(s->prepare);
1657                 r = s->prepare(s, s->userdata);
1658                 if (r < 0)
1659                         return r;
1660
1661         }
1662
1663         return 0;
1664 }
1665
1666 static int dispatch_quit(sd_event *e) {
1667         sd_event_source *p;
1668         int r;
1669
1670         assert(e);
1671
1672         p = prioq_peek(e->quit);
1673         if (!p || p->enabled == SD_EVENT_OFF) {
1674                 e->state = SD_EVENT_FINISHED;
1675                 return 0;
1676         }
1677
1678         sd_event_ref(e);
1679         e->iteration++;
1680         e->state = SD_EVENT_QUITTING;
1681
1682         r = source_dispatch(p);
1683
1684         e->state = SD_EVENT_PASSIVE;
1685         sd_event_unref(e);
1686
1687         return r;
1688 }
1689
1690 static sd_event_source* event_next_pending(sd_event *e) {
1691         sd_event_source *p;
1692
1693         assert(e);
1694
1695         p = prioq_peek(e->pending);
1696         if (!p)
1697                 return NULL;
1698
1699         if (p->enabled == SD_EVENT_OFF)
1700                 return NULL;
1701
1702         return p;
1703 }
1704
1705 int sd_event_run(sd_event *e, uint64_t timeout) {
1706         struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1707         sd_event_source *p;
1708         int r, i, m;
1709
1710         assert_return(e, -EINVAL);
1711         assert_return(!event_pid_changed(e), -ECHILD);
1712         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1713         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1714
1715         if (e->quit_requested)
1716                 return dispatch_quit(e);
1717
1718         sd_event_ref(e);
1719         e->iteration++;
1720         e->state = SD_EVENT_RUNNING;
1721
1722         r = event_prepare(e);
1723         if (r < 0)
1724                 goto finish;
1725
1726         if (event_next_pending(e) || e->need_process_child)
1727                 timeout = 0;
1728
1729         if (timeout > 0) {
1730                 r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1731                 if (r < 0)
1732                         goto finish;
1733
1734                 r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1735                 if (r < 0)
1736                         goto finish;
1737         }
1738
1739         m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1740                        timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1741         if (m < 0) {
1742                 r = errno == EAGAIN || errno == EINTR ? 0 : -errno;
1743                 goto finish;
1744         }
1745
1746         dual_timestamp_get(&e->timestamp);
1747
1748         for (i = 0; i < m; i++) {
1749
1750                 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1751                         r = flush_timer(e, e->monotonic_fd, ev_queue[i].events, &e->monotonic_next);
1752                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1753                         r = flush_timer(e, e->realtime_fd, ev_queue[i].events, &e->realtime_next);
1754                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1755                         r = process_signal(e, ev_queue[i].events);
1756                 else
1757                         r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1758
1759                 if (r < 0)
1760                         goto finish;
1761         }
1762
1763         r = process_timer(e, e->timestamp.monotonic, e->monotonic_earliest, e->monotonic_latest);
1764         if (r < 0)
1765                 goto finish;
1766
1767         r = process_timer(e, e->timestamp.realtime, e->realtime_earliest, e->realtime_latest);
1768         if (r < 0)
1769                 goto finish;
1770
1771         if (e->need_process_child) {
1772                 r = process_child(e);
1773                 if (r < 0)
1774                         goto finish;
1775         }
1776
1777         p = event_next_pending(e);
1778         if (!p) {
1779                 r = 0;
1780                 goto finish;
1781         }
1782
1783         r = source_dispatch(p);
1784
1785 finish:
1786         e->state = SD_EVENT_PASSIVE;
1787         sd_event_unref(e);
1788
1789         return r;
1790 }
1791
1792 int sd_event_loop(sd_event *e) {
1793         int r;
1794
1795         assert_return(e, -EINVAL);
1796         assert_return(!event_pid_changed(e), -ECHILD);
1797         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1798
1799         sd_event_ref(e);
1800
1801         while (e->state != SD_EVENT_FINISHED) {
1802                 r = sd_event_run(e, (uint64_t) -1);
1803                 if (r < 0)
1804                         goto finish;
1805         }
1806
1807         r = 0;
1808
1809 finish:
1810         sd_event_unref(e);
1811         return r;
1812 }
1813
1814 int sd_event_get_state(sd_event *e) {
1815         assert_return(e, -EINVAL);
1816         assert_return(!event_pid_changed(e), -ECHILD);
1817
1818         return e->state;
1819 }
1820
1821 int sd_event_get_quit(sd_event *e) {
1822         assert_return(e, -EINVAL);
1823         assert_return(!event_pid_changed(e), -ECHILD);
1824
1825         return e->quit_requested;
1826 }
1827
1828 int sd_event_request_quit(sd_event *e) {
1829         assert_return(e, -EINVAL);
1830         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1831         assert_return(!event_pid_changed(e), -ECHILD);
1832
1833         e->quit_requested = true;
1834         return 0;
1835 }
1836
1837 int sd_event_get_now_realtime(sd_event *e, uint64_t *usec) {
1838         assert_return(e, -EINVAL);
1839         assert_return(usec, -EINVAL);
1840         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1841         assert_return(!event_pid_changed(e), -ECHILD);
1842
1843         *usec = e->timestamp.realtime;
1844         return 0;
1845 }
1846
1847 int sd_event_get_now_monotonic(sd_event *e, uint64_t *usec) {
1848         assert_return(e, -EINVAL);
1849         assert_return(usec, -EINVAL);
1850         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1851         assert_return(!event_pid_changed(e), -ECHILD);
1852
1853         *usec = e->timestamp.monotonic;
1854         return 0;
1855 }