chiark / gitweb /
event: clear pending-state when re-arming timers
[elogind.git] / src / libsystemd-bus / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25
26 #include "sd-id128.h"
27 #include "macro.h"
28 #include "prioq.h"
29 #include "hashmap.h"
30 #include "util.h"
31 #include "time-util.h"
32 #include "missing.h"
33
34 #include "sd-event.h"
35
36 #define EPOLL_QUEUE_MAX 64
37 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
38
39 typedef enum EventSourceType {
40         SOURCE_IO,
41         SOURCE_MONOTONIC,
42         SOURCE_REALTIME,
43         SOURCE_SIGNAL,
44         SOURCE_CHILD,
45         SOURCE_DEFER,
46         SOURCE_QUIT
47 } EventSourceType;
48
49 struct sd_event_source {
50         unsigned n_ref;
51
52         sd_event *event;
53         void *userdata;
54         sd_prepare_handler_t prepare;
55
56         EventSourceType type:4;
57         int enabled:3;
58         bool pending:1;
59
60         int priority;
61         unsigned pending_index;
62         unsigned prepare_index;
63         unsigned pending_iteration;
64         unsigned prepare_iteration;
65
66         union {
67                 struct {
68                         sd_io_handler_t callback;
69                         int fd;
70                         uint32_t events;
71                         uint32_t revents;
72                         bool registered:1;
73                 } io;
74                 struct {
75                         sd_time_handler_t callback;
76                         usec_t next, accuracy;
77                         unsigned earliest_index;
78                         unsigned latest_index;
79                 } time;
80                 struct {
81                         sd_signal_handler_t callback;
82                         struct signalfd_siginfo siginfo;
83                         int sig;
84                 } signal;
85                 struct {
86                         sd_child_handler_t callback;
87                         siginfo_t siginfo;
88                         pid_t pid;
89                         int options;
90                 } child;
91                 struct {
92                         sd_defer_handler_t callback;
93                 } defer;
94                 struct {
95                         sd_quit_handler_t callback;
96                         unsigned prioq_index;
97                 } quit;
98         };
99 };
100
101 struct sd_event {
102         unsigned n_ref;
103
104         int epoll_fd;
105         int signal_fd;
106         int realtime_fd;
107         int monotonic_fd;
108
109         Prioq *pending;
110         Prioq *prepare;
111
112         /* For both clocks we maintain two priority queues each, one
113          * ordered for the earliest times the events may be
114          * dispatched, and one ordered by the latest times they must
115          * have been dispatched. The range between the top entries in
116          * the two prioqs is the time window we can freely schedule
117          * wakeups in */
118         Prioq *monotonic_earliest;
119         Prioq *monotonic_latest;
120         Prioq *realtime_earliest;
121         Prioq *realtime_latest;
122
123         usec_t realtime_next, monotonic_next;
124         usec_t perturb;
125
126         sigset_t sigset;
127         sd_event_source **signal_sources;
128
129         Hashmap *child_sources;
130         unsigned n_enabled_child_sources;
131
132         Prioq *quit;
133
134         pid_t original_pid;
135
136         unsigned iteration;
137         dual_timestamp timestamp;
138         int state;
139
140         bool quit_requested:1;
141         bool need_process_child:1;
142
143         pid_t tid;
144         sd_event **default_event_ptr;
145 };
146
147 static int pending_prioq_compare(const void *a, const void *b) {
148         const sd_event_source *x = a, *y = b;
149
150         assert(x->pending);
151         assert(y->pending);
152
153         /* Enabled ones first */
154         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
155                 return -1;
156         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
157                 return 1;
158
159         /* Lower priority values first */
160         if (x->priority < y->priority)
161                 return -1;
162         if (x->priority > y->priority)
163                 return 1;
164
165         /* Older entries first */
166         if (x->pending_iteration < y->pending_iteration)
167                 return -1;
168         if (x->pending_iteration > y->pending_iteration)
169                 return 1;
170
171         /* Stability for the rest */
172         if (x < y)
173                 return -1;
174         if (x > y)
175                 return 1;
176
177         return 0;
178 }
179
180 static int prepare_prioq_compare(const void *a, const void *b) {
181         const sd_event_source *x = a, *y = b;
182
183         assert(x->prepare);
184         assert(y->prepare);
185
186         /* Move most recently prepared ones last, so that we can stop
187          * preparing as soon as we hit one that has already been
188          * prepared in the current iteration */
189         if (x->prepare_iteration < y->prepare_iteration)
190                 return -1;
191         if (x->prepare_iteration > y->prepare_iteration)
192                 return 1;
193
194         /* Enabled ones first */
195         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
196                 return -1;
197         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
198                 return 1;
199
200         /* Lower priority values first */
201         if (x->priority < y->priority)
202                 return -1;
203         if (x->priority > y->priority)
204                 return 1;
205
206         /* Stability for the rest */
207         if (x < y)
208                 return -1;
209         if (x > y)
210                 return 1;
211
212         return 0;
213 }
214
215 static int earliest_time_prioq_compare(const void *a, const void *b) {
216         const sd_event_source *x = a, *y = b;
217
218         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
219         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
220
221         /* Enabled ones first */
222         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
223                 return -1;
224         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
225                 return 1;
226
227         /* Move the pending ones to the end */
228         if (!x->pending && y->pending)
229                 return -1;
230         if (x->pending && !y->pending)
231                 return 1;
232
233         /* Order by time */
234         if (x->time.next < y->time.next)
235                 return -1;
236         if (x->time.next > y->time.next)
237                 return -1;
238
239         /* Stability for the rest */
240         if (x < y)
241                 return -1;
242         if (x > y)
243                 return 1;
244
245         return 0;
246 }
247
248 static int latest_time_prioq_compare(const void *a, const void *b) {
249         const sd_event_source *x = a, *y = b;
250
251         assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
252                (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
253
254         /* Enabled ones first */
255         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
256                 return -1;
257         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
258                 return 1;
259
260         /* Move the pending ones to the end */
261         if (!x->pending && y->pending)
262                 return -1;
263         if (x->pending && !y->pending)
264                 return 1;
265
266         /* Order by time */
267         if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
268                 return -1;
269         if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
270                 return -1;
271
272         /* Stability for the rest */
273         if (x < y)
274                 return -1;
275         if (x > y)
276                 return 1;
277
278         return 0;
279 }
280
281 static int quit_prioq_compare(const void *a, const void *b) {
282         const sd_event_source *x = a, *y = b;
283
284         assert(x->type == SOURCE_QUIT);
285         assert(y->type == SOURCE_QUIT);
286
287         /* Enabled ones first */
288         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
289                 return -1;
290         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
291                 return 1;
292
293         /* Lower priority values first */
294         if (x->priority < y->priority)
295                 return -1;
296         if (x->priority > y->priority)
297                 return 1;
298
299         /* Stability for the rest */
300         if (x < y)
301                 return -1;
302         if (x > y)
303                 return 1;
304
305         return 0;
306 }
307
308 static void event_free(sd_event *e) {
309         assert(e);
310
311         if (e->default_event_ptr)
312                 *(e->default_event_ptr) = NULL;
313
314         if (e->epoll_fd >= 0)
315                 close_nointr_nofail(e->epoll_fd);
316
317         if (e->signal_fd >= 0)
318                 close_nointr_nofail(e->signal_fd);
319
320         if (e->realtime_fd >= 0)
321                 close_nointr_nofail(e->realtime_fd);
322
323         if (e->monotonic_fd >= 0)
324                 close_nointr_nofail(e->monotonic_fd);
325
326         prioq_free(e->pending);
327         prioq_free(e->prepare);
328         prioq_free(e->monotonic_earliest);
329         prioq_free(e->monotonic_latest);
330         prioq_free(e->realtime_earliest);
331         prioq_free(e->realtime_latest);
332         prioq_free(e->quit);
333
334         free(e->signal_sources);
335
336         hashmap_free(e->child_sources);
337         free(e);
338 }
339
340 _public_ int sd_event_new(sd_event** ret) {
341         sd_event *e;
342         int r;
343
344         assert_return(ret, -EINVAL);
345
346         e = new0(sd_event, 1);
347         if (!e)
348                 return -ENOMEM;
349
350         e->n_ref = 1;
351         e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
352         e->realtime_next = e->monotonic_next = (usec_t) -1;
353         e->original_pid = getpid();
354
355         assert_se(sigemptyset(&e->sigset) == 0);
356
357         e->pending = prioq_new(pending_prioq_compare);
358         if (!e->pending) {
359                 r = -ENOMEM;
360                 goto fail;
361         }
362
363         e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
364         if (e->epoll_fd < 0) {
365                 r = -errno;
366                 goto fail;
367         }
368
369         *ret = e;
370         return 0;
371
372 fail:
373         event_free(e);
374         return r;
375 }
376
377 _public_ sd_event* sd_event_ref(sd_event *e) {
378         assert_return(e, NULL);
379
380         assert(e->n_ref >= 1);
381         e->n_ref++;
382
383         return e;
384 }
385
386 _public_ sd_event* sd_event_unref(sd_event *e) {
387         assert_return(e, NULL);
388
389         assert(e->n_ref >= 1);
390         e->n_ref--;
391
392         if (e->n_ref <= 0)
393                 event_free(e);
394
395         return NULL;
396 }
397
398 static bool event_pid_changed(sd_event *e) {
399         assert(e);
400
401         /* We don't support people creating am event loop and keeping
402          * it around over a fork(). Let's complain. */
403
404         return e->original_pid != getpid();
405 }
406
407 static int source_io_unregister(sd_event_source *s) {
408         int r;
409
410         assert(s);
411         assert(s->type == SOURCE_IO);
412
413         if (!s->io.registered)
414                 return 0;
415
416         r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
417         if (r < 0)
418                 return -errno;
419
420         s->io.registered = false;
421         return 0;
422 }
423
424 static int source_io_register(
425                 sd_event_source *s,
426                 int enabled,
427                 uint32_t events) {
428
429         struct epoll_event ev = {};
430         int r;
431
432         assert(s);
433         assert(s->type == SOURCE_IO);
434         assert(enabled != SD_EVENT_OFF);
435
436         ev.events = events;
437         ev.data.ptr = s;
438
439         if (enabled == SD_EVENT_ONESHOT)
440                 ev.events |= EPOLLONESHOT;
441
442         if (s->io.registered)
443                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
444         else
445                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
446
447         if (r < 0)
448                 return -errno;
449
450         s->io.registered = true;
451
452         return 0;
453 }
454
455 static void source_free(sd_event_source *s) {
456         assert(s);
457
458         if (s->event) {
459                 switch (s->type) {
460
461                 case SOURCE_IO:
462                         if (s->io.fd >= 0)
463                                 source_io_unregister(s);
464
465                         break;
466
467                 case SOURCE_MONOTONIC:
468                         prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
469                         prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
470                         break;
471
472                 case SOURCE_REALTIME:
473                         prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
474                         prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
475                         break;
476
477                 case SOURCE_SIGNAL:
478                         if (s->signal.sig > 0) {
479                                 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)
480                                         assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
481
482                                 if (s->event->signal_sources)
483                                         s->event->signal_sources[s->signal.sig] = NULL;
484                         }
485
486                         break;
487
488                 case SOURCE_CHILD:
489                         if (s->child.pid > 0) {
490                                 if (s->enabled != SD_EVENT_OFF) {
491                                         assert(s->event->n_enabled_child_sources > 0);
492                                         s->event->n_enabled_child_sources--;
493                                 }
494
495                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
496                                         assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
497
498                                 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
499                         }
500
501                         break;
502
503                 case SOURCE_DEFER:
504                         /* nothing */
505                         break;
506
507                 case SOURCE_QUIT:
508                         prioq_remove(s->event->quit, s, &s->quit.prioq_index);
509                         break;
510                 }
511
512                 if (s->pending)
513                         prioq_remove(s->event->pending, s, &s->pending_index);
514
515                 if (s->prepare)
516                         prioq_remove(s->event->prepare, s, &s->prepare_index);
517
518                 sd_event_unref(s->event);
519         }
520
521         free(s);
522 }
523
524 static int source_set_pending(sd_event_source *s, bool b) {
525         int r;
526
527         assert(s);
528         assert(s->type != SOURCE_QUIT);
529
530         if (s->pending == b)
531                 return 0;
532
533         s->pending = b;
534
535         if (b) {
536                 s->pending_iteration = s->event->iteration;
537
538                 r = prioq_put(s->event->pending, s, &s->pending_index);
539                 if (r < 0) {
540                         s->pending = false;
541                         return r;
542                 }
543         } else
544                 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
545
546         return 0;
547 }
548
549 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
550         sd_event_source *s;
551
552         assert(e);
553
554         s = new0(sd_event_source, 1);
555         if (!s)
556                 return NULL;
557
558         s->n_ref = 1;
559         s->event = sd_event_ref(e);
560         s->type = type;
561         s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
562
563         return s;
564 }
565
566 _public_ int sd_event_add_io(
567                 sd_event *e,
568                 int fd,
569                 uint32_t events,
570                 sd_io_handler_t callback,
571                 void *userdata,
572                 sd_event_source **ret) {
573
574         sd_event_source *s;
575         int r;
576
577         assert_return(e, -EINVAL);
578         assert_return(fd >= 0, -EINVAL);
579         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
580         assert_return(callback, -EINVAL);
581         assert_return(ret, -EINVAL);
582         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
583         assert_return(!event_pid_changed(e), -ECHILD);
584
585         s = source_new(e, SOURCE_IO);
586         if (!s)
587                 return -ENOMEM;
588
589         s->io.fd = fd;
590         s->io.events = events;
591         s->io.callback = callback;
592         s->userdata = userdata;
593         s->enabled = SD_EVENT_ON;
594
595         r = source_io_register(s, s->enabled, events);
596         if (r < 0) {
597                 source_free(s);
598                 return -errno;
599         }
600
601         *ret = s;
602         return 0;
603 }
604
605 static int event_setup_timer_fd(
606                 sd_event *e,
607                 EventSourceType type,
608                 int *timer_fd,
609                 clockid_t id) {
610
611         struct epoll_event ev = {};
612         int r, fd;
613         sd_id128_t bootid;
614
615         assert(e);
616         assert(timer_fd);
617
618         if (_likely_(*timer_fd >= 0))
619                 return 0;
620
621         fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
622         if (fd < 0)
623                 return -errno;
624
625         ev.events = EPOLLIN;
626         ev.data.ptr = INT_TO_PTR(type);
627
628         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
629         if (r < 0) {
630                 close_nointr_nofail(fd);
631                 return -errno;
632         }
633
634         /* When we sleep for longer, we try to realign the wakeup to
635            the same time wihtin each second, so that events all across
636            the system can be coalesced into a single CPU
637            wakeup. However, let's take some system-specific randomness
638            for this value, so that in a network of systems with synced
639            clocks timer events are distributed a bit. Here, we
640            calculate a perturbation usec offset from the boot ID. */
641
642         if (sd_id128_get_boot(&bootid) >= 0)
643                 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_SEC;
644
645         *timer_fd = fd;
646         return 0;
647 }
648
649 static int event_add_time_internal(
650                 sd_event *e,
651                 EventSourceType type,
652                 int *timer_fd,
653                 clockid_t id,
654                 Prioq **earliest,
655                 Prioq **latest,
656                 uint64_t usec,
657                 uint64_t accuracy,
658                 sd_time_handler_t callback,
659                 void *userdata,
660                 sd_event_source **ret) {
661
662         sd_event_source *s;
663         int r;
664
665         assert_return(e, -EINVAL);
666         assert_return(callback, -EINVAL);
667         assert_return(ret, -EINVAL);
668         assert_return(usec != (uint64_t) -1, -EINVAL);
669         assert_return(accuracy != (uint64_t) -1, -EINVAL);
670         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
671         assert_return(!event_pid_changed(e), -ECHILD);
672
673         assert(timer_fd);
674         assert(earliest);
675         assert(latest);
676
677         if (!*earliest) {
678                 *earliest = prioq_new(earliest_time_prioq_compare);
679                 if (!*earliest)
680                         return -ENOMEM;
681         }
682
683         if (!*latest) {
684                 *latest = prioq_new(latest_time_prioq_compare);
685                 if (!*latest)
686                         return -ENOMEM;
687         }
688
689         if (*timer_fd < 0) {
690                 r = event_setup_timer_fd(e, type, timer_fd, id);
691                 if (r < 0)
692                         return r;
693         }
694
695         s = source_new(e, type);
696         if (!s)
697                 return -ENOMEM;
698
699         s->time.next = usec;
700         s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
701         s->time.callback = callback;
702         s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
703         s->userdata = userdata;
704         s->enabled = SD_EVENT_ONESHOT;
705
706         r = prioq_put(*earliest, s, &s->time.earliest_index);
707         if (r < 0)
708                 goto fail;
709
710         r = prioq_put(*latest, s, &s->time.latest_index);
711         if (r < 0)
712                 goto fail;
713
714         *ret = s;
715         return 0;
716
717 fail:
718         source_free(s);
719         return r;
720 }
721
722 _public_ int sd_event_add_monotonic(sd_event *e,
723                                     uint64_t usec,
724                                     uint64_t accuracy,
725                                     sd_time_handler_t callback,
726                                     void *userdata,
727                                     sd_event_source **ret) {
728
729         return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
730 }
731
732 _public_ int sd_event_add_realtime(sd_event *e,
733                                    uint64_t usec,
734                                    uint64_t accuracy,
735                                    sd_time_handler_t callback,
736                                    void *userdata,
737                                    sd_event_source **ret) {
738
739         return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
740 }
741
742 static int event_update_signal_fd(sd_event *e) {
743         struct epoll_event ev = {};
744         bool add_to_epoll;
745         int r;
746
747         assert(e);
748
749         add_to_epoll = e->signal_fd < 0;
750
751         r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
752         if (r < 0)
753                 return -errno;
754
755         e->signal_fd = r;
756
757         if (!add_to_epoll)
758                 return 0;
759
760         ev.events = EPOLLIN;
761         ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
762
763         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
764         if (r < 0) {
765                 close_nointr_nofail(e->signal_fd);
766                 e->signal_fd = -1;
767
768                 return -errno;
769         }
770
771         return 0;
772 }
773
774 _public_ int sd_event_add_signal(
775                 sd_event *e,
776                 int sig,
777                 sd_signal_handler_t callback,
778                 void *userdata,
779                 sd_event_source **ret) {
780
781         sd_event_source *s;
782         int r;
783
784         assert_return(e, -EINVAL);
785         assert_return(sig > 0, -EINVAL);
786         assert_return(sig < _NSIG, -EINVAL);
787         assert_return(callback, -EINVAL);
788         assert_return(ret, -EINVAL);
789         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
790         assert_return(!event_pid_changed(e), -ECHILD);
791
792         if (!e->signal_sources) {
793                 e->signal_sources = new0(sd_event_source*, _NSIG);
794                 if (!e->signal_sources)
795                         return -ENOMEM;
796         } else if (e->signal_sources[sig])
797                 return -EBUSY;
798
799         s = source_new(e, SOURCE_SIGNAL);
800         if (!s)
801                 return -ENOMEM;
802
803         s->signal.sig = sig;
804         s->signal.callback = callback;
805         s->userdata = userdata;
806         s->enabled = SD_EVENT_ON;
807
808         e->signal_sources[sig] = s;
809         assert_se(sigaddset(&e->sigset, sig) == 0);
810
811         if (sig != SIGCHLD || e->n_enabled_child_sources == 0) {
812                 r = event_update_signal_fd(e);
813                 if (r < 0) {
814                         source_free(s);
815                         return r;
816                 }
817         }
818
819         *ret = s;
820         return 0;
821 }
822
823 _public_ int sd_event_add_child(
824                 sd_event *e,
825                 pid_t pid,
826                 int options,
827                 sd_child_handler_t callback,
828                 void *userdata,
829                 sd_event_source **ret) {
830
831         sd_event_source *s;
832         int r;
833
834         assert_return(e, -EINVAL);
835         assert_return(pid > 1, -EINVAL);
836         assert_return(!(options & ~(WEXITED|WSTOPPED|WCONTINUED)), -EINVAL);
837         assert_return(options != 0, -EINVAL);
838         assert_return(callback, -EINVAL);
839         assert_return(ret, -EINVAL);
840         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
841         assert_return(!event_pid_changed(e), -ECHILD);
842
843         r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
844         if (r < 0)
845                 return r;
846
847         if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
848                 return -EBUSY;
849
850         s = source_new(e, SOURCE_CHILD);
851         if (!s)
852                 return -ENOMEM;
853
854         s->child.pid = pid;
855         s->child.options = options;
856         s->child.callback = callback;
857         s->userdata = userdata;
858         s->enabled = SD_EVENT_ONESHOT;
859
860         r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
861         if (r < 0) {
862                 source_free(s);
863                 return r;
864         }
865
866         e->n_enabled_child_sources ++;
867
868         assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
869
870         if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
871                 r = event_update_signal_fd(e);
872                 if (r < 0) {
873                         source_free(s);
874                         return -errno;
875                 }
876         }
877
878         e->need_process_child = true;
879
880         *ret = s;
881         return 0;
882 }
883
884 _public_ int sd_event_add_defer(
885                 sd_event *e,
886                 sd_defer_handler_t callback,
887                 void *userdata,
888                 sd_event_source **ret) {
889
890         sd_event_source *s;
891         int r;
892
893         assert_return(e, -EINVAL);
894         assert_return(callback, -EINVAL);
895         assert_return(ret, -EINVAL);
896         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
897         assert_return(!event_pid_changed(e), -ECHILD);
898
899         s = source_new(e, SOURCE_DEFER);
900         if (!s)
901                 return -ENOMEM;
902
903         s->defer.callback = callback;
904         s->userdata = userdata;
905         s->enabled = SD_EVENT_ONESHOT;
906
907         r = source_set_pending(s, true);
908         if (r < 0) {
909                 source_free(s);
910                 return r;
911         }
912
913         *ret = s;
914         return 0;
915 }
916
917 _public_ int sd_event_add_quit(
918                 sd_event *e,
919                 sd_quit_handler_t callback,
920                 void *userdata,
921                 sd_event_source **ret) {
922
923         sd_event_source *s;
924         int r;
925
926         assert_return(e, -EINVAL);
927         assert_return(callback, -EINVAL);
928         assert_return(ret, -EINVAL);
929         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
930         assert_return(!event_pid_changed(e), -ECHILD);
931
932         if (!e->quit) {
933                 e->quit = prioq_new(quit_prioq_compare);
934                 if (!e->quit)
935                         return -ENOMEM;
936         }
937
938         s = source_new(e, SOURCE_QUIT);
939         if (!s)
940                 return -ENOMEM;
941
942         s->quit.callback = callback;
943         s->userdata = userdata;
944         s->quit.prioq_index = PRIOQ_IDX_NULL;
945         s->enabled = SD_EVENT_ONESHOT;
946
947         r = prioq_put(s->event->quit, s, &s->quit.prioq_index);
948         if (r < 0) {
949                 source_free(s);
950                 return r;
951         }
952
953         *ret = s;
954         return 0;
955 }
956
957 _public_ sd_event_source* sd_event_source_ref(sd_event_source *s) {
958         assert_return(s, NULL);
959
960         assert(s->n_ref >= 1);
961         s->n_ref++;
962
963         return s;
964 }
965
966 _public_ sd_event_source* sd_event_source_unref(sd_event_source *s) {
967         assert_return(s, NULL);
968
969         assert(s->n_ref >= 1);
970         s->n_ref--;
971
972         if (s->n_ref <= 0)
973                 source_free(s);
974
975         return NULL;
976 }
977
978 _public_ sd_event *sd_event_get(sd_event_source *s) {
979         assert_return(s, NULL);
980
981         return s->event;
982 }
983
984 _public_ int sd_event_source_get_pending(sd_event_source *s) {
985         assert_return(s, -EINVAL);
986         assert_return(s->type != SOURCE_QUIT, -EDOM);
987         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
988         assert_return(!event_pid_changed(s->event), -ECHILD);
989
990         return s->pending;
991 }
992
993 _public_ int sd_event_source_get_io_fd(sd_event_source *s) {
994         assert_return(s, -EINVAL);
995         assert_return(s->type == SOURCE_IO, -EDOM);
996         assert_return(!event_pid_changed(s->event), -ECHILD);
997
998         return s->io.fd;
999 }
1000
1001 _public_ int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
1002         assert_return(s, -EINVAL);
1003         assert_return(events, -EINVAL);
1004         assert_return(s->type == SOURCE_IO, -EDOM);
1005         assert_return(!event_pid_changed(s->event), -ECHILD);
1006
1007         *events = s->io.events;
1008         return 0;
1009 }
1010
1011 _public_ int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
1012         int r;
1013
1014         assert_return(s, -EINVAL);
1015         assert_return(s->type == SOURCE_IO, -EDOM);
1016         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)), -EINVAL);
1017         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1018         assert_return(!event_pid_changed(s->event), -ECHILD);
1019
1020         if (s->io.events == events)
1021                 return 0;
1022
1023         if (s->enabled != SD_EVENT_OFF) {
1024                 r = source_io_register(s, s->enabled, events);
1025                 if (r < 0)
1026                         return r;
1027         }
1028
1029         s->io.events = events;
1030
1031         return 0;
1032 }
1033
1034 _public_ int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1035         assert_return(s, -EINVAL);
1036         assert_return(revents, -EINVAL);
1037         assert_return(s->type == SOURCE_IO, -EDOM);
1038         assert_return(s->pending, -ENODATA);
1039         assert_return(!event_pid_changed(s->event), -ECHILD);
1040
1041         *revents = s->io.revents;
1042         return 0;
1043 }
1044
1045 _public_ int sd_event_source_get_signal(sd_event_source *s) {
1046         assert_return(s, -EINVAL);
1047         assert_return(s->type == SOURCE_SIGNAL, -EDOM);
1048         assert_return(!event_pid_changed(s->event), -ECHILD);
1049
1050         return s->signal.sig;
1051 }
1052
1053 _public_ int sd_event_source_get_priority(sd_event_source *s, int *priority) {
1054         assert_return(s, -EINVAL);
1055         assert_return(!event_pid_changed(s->event), -ECHILD);
1056
1057         return s->priority;
1058 }
1059
1060 _public_ int sd_event_source_set_priority(sd_event_source *s, int priority) {
1061         assert_return(s, -EINVAL);
1062         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1063         assert_return(!event_pid_changed(s->event), -ECHILD);
1064
1065         if (s->priority == priority)
1066                 return 0;
1067
1068         s->priority = priority;
1069
1070         if (s->pending)
1071                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1072
1073         if (s->prepare)
1074                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1075
1076         if (s->type == SOURCE_QUIT)
1077                 prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1078
1079         return 0;
1080 }
1081
1082 _public_ int sd_event_source_get_enabled(sd_event_source *s, int *m) {
1083         assert_return(s, -EINVAL);
1084         assert_return(m, -EINVAL);
1085         assert_return(!event_pid_changed(s->event), -ECHILD);
1086
1087         *m = s->enabled;
1088         return 0;
1089 }
1090
1091 _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
1092         int r;
1093
1094         assert_return(s, -EINVAL);
1095         assert_return(m == SD_EVENT_OFF || m == SD_EVENT_ON || m == SD_EVENT_ONESHOT, -EINVAL);
1096         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1097         assert_return(!event_pid_changed(s->event), -ECHILD);
1098
1099         if (s->enabled == m)
1100                 return 0;
1101
1102         if (m == SD_EVENT_OFF) {
1103
1104                 switch (s->type) {
1105
1106                 case SOURCE_IO:
1107                         r = source_io_unregister(s);
1108                         if (r < 0)
1109                                 return r;
1110
1111                         s->enabled = m;
1112                         break;
1113
1114                 case SOURCE_MONOTONIC:
1115                         s->enabled = m;
1116                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1117                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1118                         break;
1119
1120                 case SOURCE_REALTIME:
1121                         s->enabled = m;
1122                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1123                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1124                         break;
1125
1126                 case SOURCE_SIGNAL:
1127                         s->enabled = m;
1128                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1129                                 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1130                                 event_update_signal_fd(s->event);
1131                         }
1132
1133                         break;
1134
1135                 case SOURCE_CHILD:
1136                         s->enabled = m;
1137
1138                         assert(s->event->n_enabled_child_sources > 0);
1139                         s->event->n_enabled_child_sources--;
1140
1141                         if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1142                                 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1143                                 event_update_signal_fd(s->event);
1144                         }
1145
1146                         break;
1147
1148                 case SOURCE_QUIT:
1149                         s->enabled = m;
1150                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1151                         break;
1152
1153                 case SOURCE_DEFER:
1154                         s->enabled = m;
1155                         break;
1156                 }
1157
1158         } else {
1159                 switch (s->type) {
1160
1161                 case SOURCE_IO:
1162                         r = source_io_register(s, m, s->io.events);
1163                         if (r < 0)
1164                                 return r;
1165
1166                         s->enabled = m;
1167                         break;
1168
1169                 case SOURCE_MONOTONIC:
1170                         s->enabled = m;
1171                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1172                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1173                         break;
1174
1175                 case SOURCE_REALTIME:
1176                         s->enabled = m;
1177                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1178                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1179                         break;
1180
1181                 case SOURCE_SIGNAL:
1182                         s->enabled = m;
1183
1184                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)  {
1185                                 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1186                                 event_update_signal_fd(s->event);
1187                         }
1188                         break;
1189
1190                 case SOURCE_CHILD:
1191                         s->enabled = m;
1192
1193                         if (s->enabled == SD_EVENT_OFF) {
1194                                 s->event->n_enabled_child_sources++;
1195
1196                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1197                                         assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1198                                         event_update_signal_fd(s->event);
1199                                 }
1200                         }
1201                         break;
1202
1203                 case SOURCE_QUIT:
1204                         s->enabled = m;
1205                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1206                         break;
1207
1208                 case SOURCE_DEFER:
1209                         s->enabled = m;
1210                         break;
1211                 }
1212         }
1213
1214         if (s->pending)
1215                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1216
1217         if (s->prepare)
1218                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1219
1220         return 0;
1221 }
1222
1223 _public_ int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1224         assert_return(s, -EINVAL);
1225         assert_return(usec, -EINVAL);
1226         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1227         assert_return(!event_pid_changed(s->event), -ECHILD);
1228
1229         *usec = s->time.next;
1230         return 0;
1231 }
1232
1233 _public_ int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1234         assert_return(s, -EINVAL);
1235         assert_return(usec != (uint64_t) -1, -EINVAL);
1236         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1237         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1238         assert_return(!event_pid_changed(s->event), -ECHILD);
1239
1240         if (s->time.next == usec)
1241                 return 0;
1242
1243         s->time.next = usec;
1244         source_set_pending(s, false);
1245
1246         if (s->type == SOURCE_REALTIME) {
1247                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1248                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1249         } else {
1250                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1251                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1252         }
1253
1254         return 0;
1255 }
1256
1257 _public_ int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1258         assert_return(s, -EINVAL);
1259         assert_return(usec, -EINVAL);
1260         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1261         assert_return(!event_pid_changed(s->event), -ECHILD);
1262
1263         *usec = s->time.accuracy;
1264         return 0;
1265 }
1266
1267 _public_ int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1268         assert_return(s, -EINVAL);
1269         assert_return(usec != (uint64_t) -1, -EINVAL);
1270         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1271         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1272         assert_return(!event_pid_changed(s->event), -ECHILD);
1273
1274         if (usec == 0)
1275                 usec = DEFAULT_ACCURACY_USEC;
1276
1277         if (s->time.accuracy == usec)
1278                 return 0;
1279
1280         s->time.accuracy = usec;
1281
1282         if (s->type == SOURCE_REALTIME)
1283                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1284         else
1285                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1286
1287         return 0;
1288 }
1289
1290 _public_ int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid) {
1291         assert_return(s, -EINVAL);
1292         assert_return(pid, -EINVAL);
1293         assert_return(s->type == SOURCE_CHILD, -EDOM);
1294         assert_return(!event_pid_changed(s->event), -ECHILD);
1295
1296         *pid = s->child.pid;
1297         return 0;
1298 }
1299
1300 _public_ int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback) {
1301         int r;
1302
1303         assert_return(s, -EINVAL);
1304         assert_return(s->type != SOURCE_QUIT, -EDOM);
1305         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1306         assert_return(!event_pid_changed(s->event), -ECHILD);
1307
1308         if (s->prepare == callback)
1309                 return 0;
1310
1311         if (callback && s->prepare) {
1312                 s->prepare = callback;
1313                 return 0;
1314         }
1315
1316         r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1317         if (r < 0)
1318                 return r;
1319
1320         s->prepare = callback;
1321
1322         if (callback) {
1323                 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1324                 if (r < 0)
1325                         return r;
1326         } else
1327                 prioq_remove(s->event->prepare, s, &s->prepare_index);
1328
1329         return 0;
1330 }
1331
1332 _public_ void* sd_event_source_get_userdata(sd_event_source *s) {
1333         assert_return(s, NULL);
1334
1335         return s->userdata;
1336 }
1337
1338 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1339         usec_t c;
1340         assert(e);
1341         assert(a <= b);
1342
1343         if (a <= 0)
1344                 return 0;
1345
1346         if (b <= a + 1)
1347                 return a;
1348
1349         /*
1350           Find a good time to wake up again between times a and b. We
1351           have two goals here:
1352
1353           a) We want to wake up as seldom as possible, hence prefer
1354              later times over earlier times.
1355
1356           b) But if we have to wake up, then let's make sure to
1357              dispatch as much as possible on the entire system.
1358
1359           We implement this by waking up everywhere at the same time
1360           within any given second if we can, synchronised via the
1361           perturbation value determined from the boot ID. If we can't,
1362           then we try to find the same spot in every a 250ms
1363           step. Otherwise, we pick the last possible time to wake up.
1364         */
1365
1366         c = (b / USEC_PER_SEC) * USEC_PER_SEC + e->perturb;
1367         if (c >= b) {
1368                 if (_unlikely_(c < USEC_PER_SEC))
1369                         return b;
1370
1371                 c -= USEC_PER_SEC;
1372         }
1373
1374         if (c >= a)
1375                 return c;
1376
1377         c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1378         if (c >= b) {
1379                 if (_unlikely_(c < USEC_PER_MSEC*250))
1380                         return b;
1381
1382                 c -= USEC_PER_MSEC*250;
1383         }
1384
1385         if (c >= a)
1386                 return c;
1387
1388         return b;
1389 }
1390
1391 static int event_arm_timer(
1392                 sd_event *e,
1393                 int timer_fd,
1394                 Prioq *earliest,
1395                 Prioq *latest,
1396                 usec_t *next) {
1397
1398         struct itimerspec its = {};
1399         sd_event_source *a, *b;
1400         usec_t t;
1401         int r;
1402
1403         assert_se(e);
1404         assert_se(next);
1405
1406         a = prioq_peek(earliest);
1407         if (!a || a->enabled == SD_EVENT_OFF) {
1408
1409                 if (*next == (usec_t) -1)
1410                         return 0;
1411
1412                 /* disarm */
1413                 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1414                 if (r < 0)
1415                         return r;
1416
1417                 *next = (usec_t) -1;
1418
1419                 return 0;
1420         }
1421
1422         b = prioq_peek(latest);
1423         assert_se(b && b->enabled != SD_EVENT_OFF);
1424
1425         t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1426         if (*next == t)
1427                 return 0;
1428
1429         assert_se(timer_fd >= 0);
1430
1431         if (t == 0) {
1432                 /* We don' want to disarm here, just mean some time looooong ago. */
1433                 its.it_value.tv_sec = 0;
1434                 its.it_value.tv_nsec = 1;
1435         } else
1436                 timespec_store(&its.it_value, t);
1437
1438         r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1439         if (r < 0)
1440                 return r;
1441
1442         *next = t;
1443         return 0;
1444 }
1445
1446 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1447         assert(e);
1448         assert(s);
1449         assert(s->type == SOURCE_IO);
1450
1451         s->io.revents = events;
1452
1453         return source_set_pending(s, true);
1454 }
1455
1456 static int flush_timer(sd_event *e, int fd, uint32_t events, usec_t *next) {
1457         uint64_t x;
1458         ssize_t ss;
1459
1460         assert(e);
1461         assert(fd >= 0);
1462         assert(next);
1463
1464         assert_return(events == EPOLLIN, -EIO);
1465
1466         ss = read(fd, &x, sizeof(x));
1467         if (ss < 0) {
1468                 if (errno == EAGAIN || errno == EINTR)
1469                         return 0;
1470
1471                 return -errno;
1472         }
1473
1474         if (ss != sizeof(x))
1475                 return -EIO;
1476
1477         *next = (usec_t) -1;
1478
1479         return 0;
1480 }
1481
1482 static int process_timer(
1483                 sd_event *e,
1484                 usec_t n,
1485                 Prioq *earliest,
1486                 Prioq *latest) {
1487
1488         sd_event_source *s;
1489         int r;
1490
1491         assert(e);
1492
1493         for (;;) {
1494                 s = prioq_peek(earliest);
1495                 if (!s ||
1496                     s->time.next > n ||
1497                     s->enabled == SD_EVENT_OFF ||
1498                     s->pending)
1499                         break;
1500
1501                 r = source_set_pending(s, true);
1502                 if (r < 0)
1503                         return r;
1504
1505                 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1506                 prioq_reshuffle(latest, s, &s->time.latest_index);
1507         }
1508
1509         return 0;
1510 }
1511
1512 static int process_child(sd_event *e) {
1513         sd_event_source *s;
1514         Iterator i;
1515         int r;
1516
1517         assert(e);
1518
1519         e->need_process_child = false;
1520
1521         /*
1522            So, this is ugly. We iteratively invoke waitid() with P_PID
1523            + WNOHANG for each PID we wait for, instead of using
1524            P_ALL. This is because we only want to get child
1525            information of very specific child processes, and not all
1526            of them. We might not have processed the SIGCHLD even of a
1527            previous invocation and we don't want to maintain a
1528            unbounded *per-child* event queue, hence we really don't
1529            want anything flushed out of the kernel's queue that we
1530            don't care about. Since this is O(n) this means that if you
1531            have a lot of processes you probably want to handle SIGCHLD
1532            yourself.
1533         */
1534
1535         HASHMAP_FOREACH(s, e->child_sources, i) {
1536                 assert(s->type == SOURCE_CHILD);
1537
1538                 if (s->pending)
1539                         continue;
1540
1541                 if (s->enabled == SD_EVENT_OFF)
1542                         continue;
1543
1544                 zero(s->child.siginfo);
1545                 r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options);
1546                 if (r < 0)
1547                         return -errno;
1548
1549                 if (s->child.siginfo.si_pid != 0) {
1550                         r = source_set_pending(s, true);
1551                         if (r < 0)
1552                                 return r;
1553                 }
1554         }
1555
1556         return 0;
1557 }
1558
1559 static int process_signal(sd_event *e, uint32_t events) {
1560         bool read_one = false;
1561         int r;
1562
1563         assert(e);
1564         assert(e->signal_sources);
1565
1566         assert_return(events == EPOLLIN, -EIO);
1567
1568         for (;;) {
1569                 struct signalfd_siginfo si;
1570                 ssize_t ss;
1571                 sd_event_source *s;
1572
1573                 ss = read(e->signal_fd, &si, sizeof(si));
1574                 if (ss < 0) {
1575                         if (errno == EAGAIN || errno == EINTR)
1576                                 return read_one;
1577
1578                         return -errno;
1579                 }
1580
1581                 if (ss != sizeof(si))
1582                         return -EIO;
1583
1584                 read_one = true;
1585
1586                 s = e->signal_sources[si.ssi_signo];
1587                 if (si.ssi_signo == SIGCHLD) {
1588                         r = process_child(e);
1589                         if (r < 0)
1590                                 return r;
1591                         if (r > 0 || !s)
1592                                 continue;
1593                 } else
1594                         if (!s)
1595                                 return -EIO;
1596
1597                 s->signal.siginfo = si;
1598                 r = source_set_pending(s, true);
1599                 if (r < 0)
1600                         return r;
1601         }
1602
1603
1604         return 0;
1605 }
1606
1607 static int source_dispatch(sd_event_source *s) {
1608         int r = 0;
1609
1610         assert(s);
1611         assert(s->pending || s->type == SOURCE_QUIT);
1612
1613         if (s->type != SOURCE_DEFER && s->type != SOURCE_QUIT) {
1614                 r = source_set_pending(s, false);
1615                 if (r < 0)
1616                         return r;
1617         }
1618
1619         if (s->enabled == SD_EVENT_ONESHOT) {
1620                 r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
1621                 if (r < 0)
1622                         return r;
1623         }
1624
1625         sd_event_source_ref(s);
1626
1627         switch (s->type) {
1628
1629         case SOURCE_IO:
1630                 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1631                 break;
1632
1633         case SOURCE_MONOTONIC:
1634                 r = s->time.callback(s, s->time.next, s->userdata);
1635                 break;
1636
1637         case SOURCE_REALTIME:
1638                 r = s->time.callback(s, s->time.next, s->userdata);
1639                 break;
1640
1641         case SOURCE_SIGNAL:
1642                 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1643                 break;
1644
1645         case SOURCE_CHILD:
1646                 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1647                 break;
1648
1649         case SOURCE_DEFER:
1650                 r = s->defer.callback(s, s->userdata);
1651                 break;
1652
1653         case SOURCE_QUIT:
1654                 r = s->quit.callback(s, s->userdata);
1655                 break;
1656         }
1657
1658         sd_event_source_unref(s);
1659
1660         return r;
1661 }
1662
1663 static int event_prepare(sd_event *e) {
1664         int r;
1665
1666         assert(e);
1667
1668         for (;;) {
1669                 sd_event_source *s;
1670
1671                 s = prioq_peek(e->prepare);
1672                 if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
1673                         break;
1674
1675                 s->prepare_iteration = e->iteration;
1676                 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1677                 if (r < 0)
1678                         return r;
1679
1680                 assert(s->prepare);
1681                 r = s->prepare(s, s->userdata);
1682                 if (r < 0)
1683                         return r;
1684
1685         }
1686
1687         return 0;
1688 }
1689
1690 static int dispatch_quit(sd_event *e) {
1691         sd_event_source *p;
1692         int r;
1693
1694         assert(e);
1695
1696         p = prioq_peek(e->quit);
1697         if (!p || p->enabled == SD_EVENT_OFF) {
1698                 e->state = SD_EVENT_FINISHED;
1699                 return 0;
1700         }
1701
1702         sd_event_ref(e);
1703         e->iteration++;
1704         e->state = SD_EVENT_QUITTING;
1705
1706         r = source_dispatch(p);
1707
1708         e->state = SD_EVENT_PASSIVE;
1709         sd_event_unref(e);
1710
1711         return r;
1712 }
1713
1714 static sd_event_source* event_next_pending(sd_event *e) {
1715         sd_event_source *p;
1716
1717         assert(e);
1718
1719         p = prioq_peek(e->pending);
1720         if (!p)
1721                 return NULL;
1722
1723         if (p->enabled == SD_EVENT_OFF)
1724                 return NULL;
1725
1726         return p;
1727 }
1728
1729 _public_ int sd_event_run(sd_event *e, uint64_t timeout) {
1730         struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1731         sd_event_source *p;
1732         int r, i, m;
1733
1734         assert_return(e, -EINVAL);
1735         assert_return(!event_pid_changed(e), -ECHILD);
1736         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1737         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1738
1739         if (e->quit_requested)
1740                 return dispatch_quit(e);
1741
1742         sd_event_ref(e);
1743         e->iteration++;
1744         e->state = SD_EVENT_RUNNING;
1745
1746         r = event_prepare(e);
1747         if (r < 0)
1748                 goto finish;
1749
1750         if (event_next_pending(e) || e->need_process_child)
1751                 timeout = 0;
1752
1753         if (timeout > 0) {
1754                 r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1755                 if (r < 0)
1756                         goto finish;
1757
1758                 r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1759                 if (r < 0)
1760                         goto finish;
1761         }
1762
1763         m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1764                        timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1765         if (m < 0) {
1766                 r = errno == EAGAIN || errno == EINTR ? 0 : -errno;
1767                 goto finish;
1768         }
1769
1770         dual_timestamp_get(&e->timestamp);
1771
1772         for (i = 0; i < m; i++) {
1773
1774                 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1775                         r = flush_timer(e, e->monotonic_fd, ev_queue[i].events, &e->monotonic_next);
1776                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1777                         r = flush_timer(e, e->realtime_fd, ev_queue[i].events, &e->realtime_next);
1778                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1779                         r = process_signal(e, ev_queue[i].events);
1780                 else
1781                         r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1782
1783                 if (r < 0)
1784                         goto finish;
1785         }
1786
1787         r = process_timer(e, e->timestamp.monotonic, e->monotonic_earliest, e->monotonic_latest);
1788         if (r < 0)
1789                 goto finish;
1790
1791         r = process_timer(e, e->timestamp.realtime, e->realtime_earliest, e->realtime_latest);
1792         if (r < 0)
1793                 goto finish;
1794
1795         if (e->need_process_child) {
1796                 r = process_child(e);
1797                 if (r < 0)
1798                         goto finish;
1799         }
1800
1801         p = event_next_pending(e);
1802         if (!p) {
1803                 r = 0;
1804                 goto finish;
1805         }
1806
1807         r = source_dispatch(p);
1808
1809 finish:
1810         e->state = SD_EVENT_PASSIVE;
1811         sd_event_unref(e);
1812
1813         return r;
1814 }
1815
1816 _public_ int sd_event_loop(sd_event *e) {
1817         int r;
1818
1819         assert_return(e, -EINVAL);
1820         assert_return(!event_pid_changed(e), -ECHILD);
1821         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1822
1823         sd_event_ref(e);
1824
1825         while (e->state != SD_EVENT_FINISHED) {
1826                 r = sd_event_run(e, (uint64_t) -1);
1827                 if (r < 0)
1828                         goto finish;
1829         }
1830
1831         r = 0;
1832
1833 finish:
1834         sd_event_unref(e);
1835         return r;
1836 }
1837
1838 _public_ int sd_event_get_state(sd_event *e) {
1839         assert_return(e, -EINVAL);
1840         assert_return(!event_pid_changed(e), -ECHILD);
1841
1842         return e->state;
1843 }
1844
1845 _public_ int sd_event_get_quit(sd_event *e) {
1846         assert_return(e, -EINVAL);
1847         assert_return(!event_pid_changed(e), -ECHILD);
1848
1849         return e->quit_requested;
1850 }
1851
1852 _public_ int sd_event_request_quit(sd_event *e) {
1853         assert_return(e, -EINVAL);
1854         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1855         assert_return(!event_pid_changed(e), -ECHILD);
1856
1857         e->quit_requested = true;
1858         return 0;
1859 }
1860
1861 _public_ int sd_event_get_now_realtime(sd_event *e, uint64_t *usec) {
1862         assert_return(e, -EINVAL);
1863         assert_return(usec, -EINVAL);
1864         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1865         assert_return(!event_pid_changed(e), -ECHILD);
1866
1867         *usec = e->timestamp.realtime;
1868         return 0;
1869 }
1870
1871 _public_ int sd_event_get_now_monotonic(sd_event *e, uint64_t *usec) {
1872         assert_return(e, -EINVAL);
1873         assert_return(usec, -EINVAL);
1874         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1875         assert_return(!event_pid_changed(e), -ECHILD);
1876
1877         *usec = e->timestamp.monotonic;
1878         return 0;
1879 }
1880
1881 _public_ int sd_event_default(sd_event **ret) {
1882
1883         static __thread sd_event *default_event = NULL;
1884         sd_event *e;
1885         int r;
1886
1887         if (!ret)
1888                 return !!default_event;
1889
1890         if (default_event) {
1891                 *ret = sd_event_ref(default_event);
1892                 return 0;
1893         }
1894
1895         r = sd_event_new(&e);
1896         if (r < 0)
1897                 return r;
1898
1899         e->default_event_ptr = &default_event;
1900         e->tid = gettid();
1901         default_event = e;
1902
1903         *ret = e;
1904         return 1;
1905 }
1906
1907 _public_ int sd_event_get_tid(sd_event *e, pid_t *tid) {
1908         assert_return(e, -EINVAL);
1909         assert_return(tid, -EINVAL);
1910         assert_return(!event_pid_changed(e), -ECHILD);
1911
1912         if (e->tid != 0) {
1913                 *tid = e->tid;
1914                 return 0;
1915         }
1916
1917         return -ENXIO;
1918 }