chiark / gitweb /
event: rework sd-event exit logic
[elogind.git] / src / libsystemd-bus / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25
26 #include "sd-id128.h"
27 #include "sd-daemon.h"
28 #include "macro.h"
29 #include "prioq.h"
30 #include "hashmap.h"
31 #include "util.h"
32 #include "time-util.h"
33 #include "missing.h"
34
35 #include "sd-event.h"
36
37 #define EPOLL_QUEUE_MAX 64
38 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
39
40 typedef enum EventSourceType {
41         SOURCE_IO,
42         SOURCE_MONOTONIC,
43         SOURCE_REALTIME,
44         SOURCE_SIGNAL,
45         SOURCE_CHILD,
46         SOURCE_DEFER,
47         SOURCE_EXIT,
48         SOURCE_WATCHDOG
49 } EventSourceType;
50
51 struct sd_event_source {
52         unsigned n_ref;
53
54         sd_event *event;
55         void *userdata;
56         sd_event_handler_t prepare;
57
58         EventSourceType type:4;
59         int enabled:3;
60         bool pending:1;
61
62         int priority;
63         unsigned pending_index;
64         unsigned prepare_index;
65         unsigned pending_iteration;
66         unsigned prepare_iteration;
67
68         union {
69                 struct {
70                         sd_event_io_handler_t callback;
71                         int fd;
72                         uint32_t events;
73                         uint32_t revents;
74                         bool registered:1;
75                 } io;
76                 struct {
77                         sd_event_time_handler_t callback;
78                         usec_t next, accuracy;
79                         unsigned earliest_index;
80                         unsigned latest_index;
81                 } time;
82                 struct {
83                         sd_event_signal_handler_t callback;
84                         struct signalfd_siginfo siginfo;
85                         int sig;
86                 } signal;
87                 struct {
88                         sd_event_child_handler_t callback;
89                         siginfo_t siginfo;
90                         pid_t pid;
91                         int options;
92                 } child;
93                 struct {
94                         sd_event_handler_t callback;
95                 } defer;
96                 struct {
97                         sd_event_handler_t callback;
98                         unsigned prioq_index;
99                 } exit;
100         };
101 };
102
103 struct sd_event {
104         unsigned n_ref;
105
106         int epoll_fd;
107         int signal_fd;
108         int realtime_fd;
109         int monotonic_fd;
110         int watchdog_fd;
111
112         Prioq *pending;
113         Prioq *prepare;
114
115         /* For both clocks we maintain two priority queues each, one
116          * ordered for the earliest times the events may be
117          * dispatched, and one ordered by the latest times they must
118          * have been dispatched. The range between the top entries in
119          * the two prioqs is the time window we can freely schedule
120          * wakeups in */
121         Prioq *monotonic_earliest;
122         Prioq *monotonic_latest;
123         Prioq *realtime_earliest;
124         Prioq *realtime_latest;
125
126         usec_t realtime_next, monotonic_next;
127         usec_t perturb;
128
129         sigset_t sigset;
130         sd_event_source **signal_sources;
131
132         Hashmap *child_sources;
133         unsigned n_enabled_child_sources;
134
135         Prioq *exit;
136
137         pid_t original_pid;
138
139         unsigned iteration;
140         dual_timestamp timestamp;
141         int state;
142
143         bool exit_requested:1;
144         bool need_process_child:1;
145         bool watchdog:1;
146
147         int exit_code;
148
149         pid_t tid;
150         sd_event **default_event_ptr;
151
152         usec_t watchdog_last, watchdog_period;
153 };
154
155 static int pending_prioq_compare(const void *a, const void *b) {
156         const sd_event_source *x = a, *y = b;
157
158         assert(x->pending);
159         assert(y->pending);
160
161         /* Enabled ones first */
162         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
163                 return -1;
164         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
165                 return 1;
166
167         /* Lower priority values first */
168         if (x->priority < y->priority)
169                 return -1;
170         if (x->priority > y->priority)
171                 return 1;
172
173         /* Older entries first */
174         if (x->pending_iteration < y->pending_iteration)
175                 return -1;
176         if (x->pending_iteration > y->pending_iteration)
177                 return 1;
178
179         /* Stability for the rest */
180         if (x < y)
181                 return -1;
182         if (x > y)
183                 return 1;
184
185         return 0;
186 }
187
188 static int prepare_prioq_compare(const void *a, const void *b) {
189         const sd_event_source *x = a, *y = b;
190
191         assert(x->prepare);
192         assert(y->prepare);
193
194         /* Move most recently prepared ones last, so that we can stop
195          * preparing as soon as we hit one that has already been
196          * prepared in the current iteration */
197         if (x->prepare_iteration < y->prepare_iteration)
198                 return -1;
199         if (x->prepare_iteration > y->prepare_iteration)
200                 return 1;
201
202         /* Enabled ones first */
203         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
204                 return -1;
205         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
206                 return 1;
207
208         /* Lower priority values first */
209         if (x->priority < y->priority)
210                 return -1;
211         if (x->priority > y->priority)
212                 return 1;
213
214         /* Stability for the rest */
215         if (x < y)
216                 return -1;
217         if (x > y)
218                 return 1;
219
220         return 0;
221 }
222
223 static int earliest_time_prioq_compare(const void *a, const void *b) {
224         const sd_event_source *x = a, *y = b;
225
226         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
227         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
228
229         /* Enabled ones first */
230         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
231                 return -1;
232         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
233                 return 1;
234
235         /* Move the pending ones to the end */
236         if (!x->pending && y->pending)
237                 return -1;
238         if (x->pending && !y->pending)
239                 return 1;
240
241         /* Order by time */
242         if (x->time.next < y->time.next)
243                 return -1;
244         if (x->time.next > y->time.next)
245                 return 1;
246
247         /* Stability for the rest */
248         if (x < y)
249                 return -1;
250         if (x > y)
251                 return 1;
252
253         return 0;
254 }
255
256 static int latest_time_prioq_compare(const void *a, const void *b) {
257         const sd_event_source *x = a, *y = b;
258
259         assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
260                (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
261
262         /* Enabled ones first */
263         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
264                 return -1;
265         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
266                 return 1;
267
268         /* Move the pending ones to the end */
269         if (!x->pending && y->pending)
270                 return -1;
271         if (x->pending && !y->pending)
272                 return 1;
273
274         /* Order by time */
275         if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
276                 return -1;
277         if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
278                 return 1;
279
280         /* Stability for the rest */
281         if (x < y)
282                 return -1;
283         if (x > y)
284                 return 1;
285
286         return 0;
287 }
288
289 static int exit_prioq_compare(const void *a, const void *b) {
290         const sd_event_source *x = a, *y = b;
291
292         assert(x->type == SOURCE_EXIT);
293         assert(y->type == SOURCE_EXIT);
294
295         /* Enabled ones first */
296         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
297                 return -1;
298         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
299                 return 1;
300
301         /* Lower priority values first */
302         if (x->priority < y->priority)
303                 return -1;
304         if (x->priority > y->priority)
305                 return 1;
306
307         /* Stability for the rest */
308         if (x < y)
309                 return -1;
310         if (x > y)
311                 return 1;
312
313         return 0;
314 }
315
316 static void event_free(sd_event *e) {
317         assert(e);
318
319         if (e->default_event_ptr)
320                 *(e->default_event_ptr) = NULL;
321
322         if (e->epoll_fd >= 0)
323                 close_nointr_nofail(e->epoll_fd);
324
325         if (e->signal_fd >= 0)
326                 close_nointr_nofail(e->signal_fd);
327
328         if (e->realtime_fd >= 0)
329                 close_nointr_nofail(e->realtime_fd);
330
331         if (e->monotonic_fd >= 0)
332                 close_nointr_nofail(e->monotonic_fd);
333
334         if (e->watchdog_fd >= 0)
335                 close_nointr_nofail(e->watchdog_fd);
336
337         prioq_free(e->pending);
338         prioq_free(e->prepare);
339         prioq_free(e->monotonic_earliest);
340         prioq_free(e->monotonic_latest);
341         prioq_free(e->realtime_earliest);
342         prioq_free(e->realtime_latest);
343         prioq_free(e->exit);
344
345         free(e->signal_sources);
346
347         hashmap_free(e->child_sources);
348         free(e);
349 }
350
351 _public_ int sd_event_new(sd_event** ret) {
352         sd_event *e;
353         int r;
354
355         assert_return(ret, -EINVAL);
356
357         e = new0(sd_event, 1);
358         if (!e)
359                 return -ENOMEM;
360
361         e->n_ref = 1;
362         e->signal_fd = e->realtime_fd = e->monotonic_fd = e->watchdog_fd = e->epoll_fd = -1;
363         e->realtime_next = e->monotonic_next = (usec_t) -1;
364         e->original_pid = getpid();
365
366         assert_se(sigemptyset(&e->sigset) == 0);
367
368         e->pending = prioq_new(pending_prioq_compare);
369         if (!e->pending) {
370                 r = -ENOMEM;
371                 goto fail;
372         }
373
374         e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
375         if (e->epoll_fd < 0) {
376                 r = -errno;
377                 goto fail;
378         }
379
380         *ret = e;
381         return 0;
382
383 fail:
384         event_free(e);
385         return r;
386 }
387
388 _public_ sd_event* sd_event_ref(sd_event *e) {
389         assert_return(e, NULL);
390
391         assert(e->n_ref >= 1);
392         e->n_ref++;
393
394         return e;
395 }
396
397 _public_ sd_event* sd_event_unref(sd_event *e) {
398
399         if (!e)
400                 return NULL;
401
402         assert(e->n_ref >= 1);
403         e->n_ref--;
404
405         if (e->n_ref <= 0)
406                 event_free(e);
407
408         return NULL;
409 }
410
411 static bool event_pid_changed(sd_event *e) {
412         assert(e);
413
414         /* We don't support people creating am event loop and keeping
415          * it around over a fork(). Let's complain. */
416
417         return e->original_pid != getpid();
418 }
419
420 static int source_io_unregister(sd_event_source *s) {
421         int r;
422
423         assert(s);
424         assert(s->type == SOURCE_IO);
425
426         if (!s->io.registered)
427                 return 0;
428
429         r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
430         if (r < 0)
431                 return -errno;
432
433         s->io.registered = false;
434         return 0;
435 }
436
437 static int source_io_register(
438                 sd_event_source *s,
439                 int enabled,
440                 uint32_t events) {
441
442         struct epoll_event ev = {};
443         int r;
444
445         assert(s);
446         assert(s->type == SOURCE_IO);
447         assert(enabled != SD_EVENT_OFF);
448
449         ev.events = events;
450         ev.data.ptr = s;
451
452         if (enabled == SD_EVENT_ONESHOT)
453                 ev.events |= EPOLLONESHOT;
454
455         if (s->io.registered)
456                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
457         else
458                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
459
460         if (r < 0)
461                 return -errno;
462
463         s->io.registered = true;
464
465         return 0;
466 }
467
468 static void source_free(sd_event_source *s) {
469         assert(s);
470
471         if (s->event) {
472                 switch (s->type) {
473
474                 case SOURCE_IO:
475                         if (s->io.fd >= 0)
476                                 source_io_unregister(s);
477
478                         break;
479
480                 case SOURCE_MONOTONIC:
481                         prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
482                         prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
483                         break;
484
485                 case SOURCE_REALTIME:
486                         prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
487                         prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
488                         break;
489
490                 case SOURCE_SIGNAL:
491                         if (s->signal.sig > 0) {
492                                 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)
493                                         assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
494
495                                 if (s->event->signal_sources)
496                                         s->event->signal_sources[s->signal.sig] = NULL;
497                         }
498
499                         break;
500
501                 case SOURCE_CHILD:
502                         if (s->child.pid > 0) {
503                                 if (s->enabled != SD_EVENT_OFF) {
504                                         assert(s->event->n_enabled_child_sources > 0);
505                                         s->event->n_enabled_child_sources--;
506                                 }
507
508                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
509                                         assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
510
511                                 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
512                         }
513
514                         break;
515
516                 case SOURCE_DEFER:
517                         /* nothing */
518                         break;
519
520                 case SOURCE_EXIT:
521                         prioq_remove(s->event->exit, s, &s->exit.prioq_index);
522                         break;
523                 }
524
525                 if (s->pending)
526                         prioq_remove(s->event->pending, s, &s->pending_index);
527
528                 if (s->prepare)
529                         prioq_remove(s->event->prepare, s, &s->prepare_index);
530
531                 sd_event_unref(s->event);
532         }
533
534         free(s);
535 }
536
537 static int source_set_pending(sd_event_source *s, bool b) {
538         int r;
539
540         assert(s);
541         assert(s->type != SOURCE_EXIT);
542
543         if (s->pending == b)
544                 return 0;
545
546         s->pending = b;
547
548         if (b) {
549                 s->pending_iteration = s->event->iteration;
550
551                 r = prioq_put(s->event->pending, s, &s->pending_index);
552                 if (r < 0) {
553                         s->pending = false;
554                         return r;
555                 }
556         } else
557                 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
558
559         if (s->type == SOURCE_REALTIME) {
560                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
561                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
562         } else if (s->type == SOURCE_MONOTONIC) {
563                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
564                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
565         }
566
567         return 0;
568 }
569
570 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
571         sd_event_source *s;
572
573         assert(e);
574
575         s = new0(sd_event_source, 1);
576         if (!s)
577                 return NULL;
578
579         s->n_ref = 1;
580         s->event = sd_event_ref(e);
581         s->type = type;
582         s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
583
584         return s;
585 }
586
587 _public_ int sd_event_add_io(
588                 sd_event *e,
589                 int fd,
590                 uint32_t events,
591                 sd_event_io_handler_t callback,
592                 void *userdata,
593                 sd_event_source **ret) {
594
595         sd_event_source *s;
596         int r;
597
598         assert_return(e, -EINVAL);
599         assert_return(fd >= 0, -EINVAL);
600         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP|EPOLLET)), -EINVAL);
601         assert_return(callback, -EINVAL);
602         assert_return(ret, -EINVAL);
603         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
604         assert_return(!event_pid_changed(e), -ECHILD);
605
606         s = source_new(e, SOURCE_IO);
607         if (!s)
608                 return -ENOMEM;
609
610         s->io.fd = fd;
611         s->io.events = events;
612         s->io.callback = callback;
613         s->userdata = userdata;
614         s->enabled = SD_EVENT_ON;
615
616         r = source_io_register(s, s->enabled, events);
617         if (r < 0) {
618                 source_free(s);
619                 return -errno;
620         }
621
622         *ret = s;
623         return 0;
624 }
625
626 static int event_setup_timer_fd(
627                 sd_event *e,
628                 EventSourceType type,
629                 int *timer_fd,
630                 clockid_t id) {
631
632         struct epoll_event ev = {};
633         int r, fd;
634         sd_id128_t bootid;
635
636         assert(e);
637         assert(timer_fd);
638
639         if (_likely_(*timer_fd >= 0))
640                 return 0;
641
642         fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
643         if (fd < 0)
644                 return -errno;
645
646         ev.events = EPOLLIN;
647         ev.data.ptr = INT_TO_PTR(type);
648
649         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
650         if (r < 0) {
651                 close_nointr_nofail(fd);
652                 return -errno;
653         }
654
655         /* When we sleep for longer, we try to realign the wakeup to
656            the same time wihtin each minute/second/250ms, so that
657            events all across the system can be coalesced into a single
658            CPU wakeup. However, let's take some system-specific
659            randomness for this value, so that in a network of systems
660            with synced clocks timer events are distributed a
661            bit. Here, we calculate a perturbation usec offset from the
662            boot ID. */
663
664         if (sd_id128_get_boot(&bootid) >= 0)
665                 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_MINUTE;
666
667         *timer_fd = fd;
668         return 0;
669 }
670
671 static int event_add_time_internal(
672                 sd_event *e,
673                 EventSourceType type,
674                 int *timer_fd,
675                 clockid_t id,
676                 Prioq **earliest,
677                 Prioq **latest,
678                 uint64_t usec,
679                 uint64_t accuracy,
680                 sd_event_time_handler_t callback,
681                 void *userdata,
682                 sd_event_source **ret) {
683
684         sd_event_source *s;
685         int r;
686
687         assert_return(e, -EINVAL);
688         assert_return(callback, -EINVAL);
689         assert_return(ret, -EINVAL);
690         assert_return(usec != (uint64_t) -1, -EINVAL);
691         assert_return(accuracy != (uint64_t) -1, -EINVAL);
692         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
693         assert_return(!event_pid_changed(e), -ECHILD);
694
695         assert(timer_fd);
696         assert(earliest);
697         assert(latest);
698
699         if (!*earliest) {
700                 *earliest = prioq_new(earliest_time_prioq_compare);
701                 if (!*earliest)
702                         return -ENOMEM;
703         }
704
705         if (!*latest) {
706                 *latest = prioq_new(latest_time_prioq_compare);
707                 if (!*latest)
708                         return -ENOMEM;
709         }
710
711         if (*timer_fd < 0) {
712                 r = event_setup_timer_fd(e, type, timer_fd, id);
713                 if (r < 0)
714                         return r;
715         }
716
717         s = source_new(e, type);
718         if (!s)
719                 return -ENOMEM;
720
721         s->time.next = usec;
722         s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
723         s->time.callback = callback;
724         s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
725         s->userdata = userdata;
726         s->enabled = SD_EVENT_ONESHOT;
727
728         r = prioq_put(*earliest, s, &s->time.earliest_index);
729         if (r < 0)
730                 goto fail;
731
732         r = prioq_put(*latest, s, &s->time.latest_index);
733         if (r < 0)
734                 goto fail;
735
736         *ret = s;
737         return 0;
738
739 fail:
740         source_free(s);
741         return r;
742 }
743
744 _public_ int sd_event_add_monotonic(sd_event *e,
745                                     uint64_t usec,
746                                     uint64_t accuracy,
747                                     sd_event_time_handler_t callback,
748                                     void *userdata,
749                                     sd_event_source **ret) {
750
751         return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
752 }
753
754 _public_ int sd_event_add_realtime(sd_event *e,
755                                    uint64_t usec,
756                                    uint64_t accuracy,
757                                    sd_event_time_handler_t callback,
758                                    void *userdata,
759                                    sd_event_source **ret) {
760
761         return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
762 }
763
764 static int event_update_signal_fd(sd_event *e) {
765         struct epoll_event ev = {};
766         bool add_to_epoll;
767         int r;
768
769         assert(e);
770
771         add_to_epoll = e->signal_fd < 0;
772
773         r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
774         if (r < 0)
775                 return -errno;
776
777         e->signal_fd = r;
778
779         if (!add_to_epoll)
780                 return 0;
781
782         ev.events = EPOLLIN;
783         ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
784
785         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
786         if (r < 0) {
787                 close_nointr_nofail(e->signal_fd);
788                 e->signal_fd = -1;
789
790                 return -errno;
791         }
792
793         return 0;
794 }
795
796 _public_ int sd_event_add_signal(
797                 sd_event *e,
798                 int sig,
799                 sd_event_signal_handler_t callback,
800                 void *userdata,
801                 sd_event_source **ret) {
802
803         sd_event_source *s;
804         int r;
805
806         assert_return(e, -EINVAL);
807         assert_return(sig > 0, -EINVAL);
808         assert_return(sig < _NSIG, -EINVAL);
809         assert_return(callback, -EINVAL);
810         assert_return(ret, -EINVAL);
811         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
812         assert_return(!event_pid_changed(e), -ECHILD);
813
814         if (!e->signal_sources) {
815                 e->signal_sources = new0(sd_event_source*, _NSIG);
816                 if (!e->signal_sources)
817                         return -ENOMEM;
818         } else if (e->signal_sources[sig])
819                 return -EBUSY;
820
821         s = source_new(e, SOURCE_SIGNAL);
822         if (!s)
823                 return -ENOMEM;
824
825         s->signal.sig = sig;
826         s->signal.callback = callback;
827         s->userdata = userdata;
828         s->enabled = SD_EVENT_ON;
829
830         e->signal_sources[sig] = s;
831         assert_se(sigaddset(&e->sigset, sig) == 0);
832
833         if (sig != SIGCHLD || e->n_enabled_child_sources == 0) {
834                 r = event_update_signal_fd(e);
835                 if (r < 0) {
836                         source_free(s);
837                         return r;
838                 }
839         }
840
841         *ret = s;
842         return 0;
843 }
844
845 _public_ int sd_event_add_child(
846                 sd_event *e,
847                 pid_t pid,
848                 int options,
849                 sd_event_child_handler_t callback,
850                 void *userdata,
851                 sd_event_source **ret) {
852
853         sd_event_source *s;
854         int r;
855
856         assert_return(e, -EINVAL);
857         assert_return(pid > 1, -EINVAL);
858         assert_return(!(options & ~(WEXITED|WSTOPPED|WCONTINUED)), -EINVAL);
859         assert_return(options != 0, -EINVAL);
860         assert_return(callback, -EINVAL);
861         assert_return(ret, -EINVAL);
862         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
863         assert_return(!event_pid_changed(e), -ECHILD);
864
865         r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
866         if (r < 0)
867                 return r;
868
869         if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
870                 return -EBUSY;
871
872         s = source_new(e, SOURCE_CHILD);
873         if (!s)
874                 return -ENOMEM;
875
876         s->child.pid = pid;
877         s->child.options = options;
878         s->child.callback = callback;
879         s->userdata = userdata;
880         s->enabled = SD_EVENT_ONESHOT;
881
882         r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
883         if (r < 0) {
884                 source_free(s);
885                 return r;
886         }
887
888         e->n_enabled_child_sources ++;
889
890         assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
891
892         if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
893                 r = event_update_signal_fd(e);
894                 if (r < 0) {
895                         source_free(s);
896                         return -errno;
897                 }
898         }
899
900         e->need_process_child = true;
901
902         *ret = s;
903         return 0;
904 }
905
906 _public_ int sd_event_add_defer(
907                 sd_event *e,
908                 sd_event_handler_t callback,
909                 void *userdata,
910                 sd_event_source **ret) {
911
912         sd_event_source *s;
913         int r;
914
915         assert_return(e, -EINVAL);
916         assert_return(callback, -EINVAL);
917         assert_return(ret, -EINVAL);
918         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
919         assert_return(!event_pid_changed(e), -ECHILD);
920
921         s = source_new(e, SOURCE_DEFER);
922         if (!s)
923                 return -ENOMEM;
924
925         s->defer.callback = callback;
926         s->userdata = userdata;
927         s->enabled = SD_EVENT_ONESHOT;
928
929         r = source_set_pending(s, true);
930         if (r < 0) {
931                 source_free(s);
932                 return r;
933         }
934
935         *ret = s;
936         return 0;
937 }
938
939 _public_ int sd_event_add_exit(
940                 sd_event *e,
941                 sd_event_handler_t callback,
942                 void *userdata,
943                 sd_event_source **ret) {
944
945         sd_event_source *s;
946         int r;
947
948         assert_return(e, -EINVAL);
949         assert_return(callback, -EINVAL);
950         assert_return(ret, -EINVAL);
951         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
952         assert_return(!event_pid_changed(e), -ECHILD);
953
954         if (!e->exit) {
955                 e->exit = prioq_new(exit_prioq_compare);
956                 if (!e->exit)
957                         return -ENOMEM;
958         }
959
960         s = source_new(e, SOURCE_EXIT);
961         if (!s)
962                 return -ENOMEM;
963
964         s->exit.callback = callback;
965         s->userdata = userdata;
966         s->exit.prioq_index = PRIOQ_IDX_NULL;
967         s->enabled = SD_EVENT_ONESHOT;
968
969         r = prioq_put(s->event->exit, s, &s->exit.prioq_index);
970         if (r < 0) {
971                 source_free(s);
972                 return r;
973         }
974
975         *ret = s;
976         return 0;
977 }
978
979 _public_ sd_event_source* sd_event_source_ref(sd_event_source *s) {
980         assert_return(s, NULL);
981
982         assert(s->n_ref >= 1);
983         s->n_ref++;
984
985         return s;
986 }
987
988 _public_ sd_event_source* sd_event_source_unref(sd_event_source *s) {
989
990         if (!s)
991                 return NULL;
992
993         assert(s->n_ref >= 1);
994         s->n_ref--;
995
996         if (s->n_ref <= 0)
997                 source_free(s);
998
999         return NULL;
1000 }
1001
1002 _public_ sd_event *sd_event_source_get_event(sd_event_source *s) {
1003         assert_return(s, NULL);
1004
1005         return s->event;
1006 }
1007
1008 _public_ int sd_event_source_get_pending(sd_event_source *s) {
1009         assert_return(s, -EINVAL);
1010         assert_return(s->type != SOURCE_EXIT, -EDOM);
1011         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1012         assert_return(!event_pid_changed(s->event), -ECHILD);
1013
1014         return s->pending;
1015 }
1016
1017 _public_ int sd_event_source_get_io_fd(sd_event_source *s) {
1018         assert_return(s, -EINVAL);
1019         assert_return(s->type == SOURCE_IO, -EDOM);
1020         assert_return(!event_pid_changed(s->event), -ECHILD);
1021
1022         return s->io.fd;
1023 }
1024
1025 _public_ int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
1026         assert_return(s, -EINVAL);
1027         assert_return(events, -EINVAL);
1028         assert_return(s->type == SOURCE_IO, -EDOM);
1029         assert_return(!event_pid_changed(s->event), -ECHILD);
1030
1031         *events = s->io.events;
1032         return 0;
1033 }
1034
1035 _public_ int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
1036         int r;
1037
1038         assert_return(s, -EINVAL);
1039         assert_return(s->type == SOURCE_IO, -EDOM);
1040         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP|EPOLLET)), -EINVAL);
1041         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1042         assert_return(!event_pid_changed(s->event), -ECHILD);
1043
1044         if (s->io.events == events)
1045                 return 0;
1046
1047         if (s->enabled != SD_EVENT_OFF) {
1048                 r = source_io_register(s, s->enabled, events);
1049                 if (r < 0)
1050                         return r;
1051         }
1052
1053         s->io.events = events;
1054         source_set_pending(s, false);
1055
1056         return 0;
1057 }
1058
1059 _public_ int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1060         assert_return(s, -EINVAL);
1061         assert_return(revents, -EINVAL);
1062         assert_return(s->type == SOURCE_IO, -EDOM);
1063         assert_return(s->pending, -ENODATA);
1064         assert_return(!event_pid_changed(s->event), -ECHILD);
1065
1066         *revents = s->io.revents;
1067         return 0;
1068 }
1069
1070 _public_ int sd_event_source_get_signal(sd_event_source *s) {
1071         assert_return(s, -EINVAL);
1072         assert_return(s->type == SOURCE_SIGNAL, -EDOM);
1073         assert_return(!event_pid_changed(s->event), -ECHILD);
1074
1075         return s->signal.sig;
1076 }
1077
1078 _public_ int sd_event_source_get_priority(sd_event_source *s, int *priority) {
1079         assert_return(s, -EINVAL);
1080         assert_return(!event_pid_changed(s->event), -ECHILD);
1081
1082         return s->priority;
1083 }
1084
1085 _public_ int sd_event_source_set_priority(sd_event_source *s, int priority) {
1086         assert_return(s, -EINVAL);
1087         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1088         assert_return(!event_pid_changed(s->event), -ECHILD);
1089
1090         if (s->priority == priority)
1091                 return 0;
1092
1093         s->priority = priority;
1094
1095         if (s->pending)
1096                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1097
1098         if (s->prepare)
1099                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1100
1101         if (s->type == SOURCE_EXIT)
1102                 prioq_reshuffle(s->event->exit, s, &s->exit.prioq_index);
1103
1104         return 0;
1105 }
1106
1107 _public_ int sd_event_source_get_enabled(sd_event_source *s, int *m) {
1108         assert_return(s, -EINVAL);
1109         assert_return(m, -EINVAL);
1110         assert_return(!event_pid_changed(s->event), -ECHILD);
1111
1112         *m = s->enabled;
1113         return 0;
1114 }
1115
1116 _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
1117         int r;
1118
1119         assert_return(s, -EINVAL);
1120         assert_return(m == SD_EVENT_OFF || m == SD_EVENT_ON || m == SD_EVENT_ONESHOT, -EINVAL);
1121         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1122         assert_return(!event_pid_changed(s->event), -ECHILD);
1123
1124         if (s->enabled == m)
1125                 return 0;
1126
1127         if (m == SD_EVENT_OFF) {
1128
1129                 switch (s->type) {
1130
1131                 case SOURCE_IO:
1132                         r = source_io_unregister(s);
1133                         if (r < 0)
1134                                 return r;
1135
1136                         s->enabled = m;
1137                         break;
1138
1139                 case SOURCE_MONOTONIC:
1140                         s->enabled = m;
1141                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1142                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1143                         break;
1144
1145                 case SOURCE_REALTIME:
1146                         s->enabled = m;
1147                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1148                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1149                         break;
1150
1151                 case SOURCE_SIGNAL:
1152                         s->enabled = m;
1153                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1154                                 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1155                                 event_update_signal_fd(s->event);
1156                         }
1157
1158                         break;
1159
1160                 case SOURCE_CHILD:
1161                         s->enabled = m;
1162
1163                         assert(s->event->n_enabled_child_sources > 0);
1164                         s->event->n_enabled_child_sources--;
1165
1166                         if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1167                                 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1168                                 event_update_signal_fd(s->event);
1169                         }
1170
1171                         break;
1172
1173                 case SOURCE_EXIT:
1174                         s->enabled = m;
1175                         prioq_reshuffle(s->event->exit, s, &s->exit.prioq_index);
1176                         break;
1177
1178                 case SOURCE_DEFER:
1179                         s->enabled = m;
1180                         break;
1181                 }
1182
1183         } else {
1184                 switch (s->type) {
1185
1186                 case SOURCE_IO:
1187                         r = source_io_register(s, m, s->io.events);
1188                         if (r < 0)
1189                                 return r;
1190
1191                         s->enabled = m;
1192                         break;
1193
1194                 case SOURCE_MONOTONIC:
1195                         s->enabled = m;
1196                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1197                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1198                         break;
1199
1200                 case SOURCE_REALTIME:
1201                         s->enabled = m;
1202                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1203                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1204                         break;
1205
1206                 case SOURCE_SIGNAL:
1207                         s->enabled = m;
1208
1209                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)  {
1210                                 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1211                                 event_update_signal_fd(s->event);
1212                         }
1213                         break;
1214
1215                 case SOURCE_CHILD:
1216                         s->enabled = m;
1217
1218                         if (s->enabled == SD_EVENT_OFF) {
1219                                 s->event->n_enabled_child_sources++;
1220
1221                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1222                                         assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1223                                         event_update_signal_fd(s->event);
1224                                 }
1225                         }
1226                         break;
1227
1228                 case SOURCE_EXIT:
1229                         s->enabled = m;
1230                         prioq_reshuffle(s->event->exit, s, &s->exit.prioq_index);
1231                         break;
1232
1233                 case SOURCE_DEFER:
1234                         s->enabled = m;
1235                         break;
1236                 }
1237         }
1238
1239         if (s->pending)
1240                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1241
1242         if (s->prepare)
1243                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1244
1245         return 0;
1246 }
1247
1248 _public_ int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1249         assert_return(s, -EINVAL);
1250         assert_return(usec, -EINVAL);
1251         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1252         assert_return(!event_pid_changed(s->event), -ECHILD);
1253
1254         *usec = s->time.next;
1255         return 0;
1256 }
1257
1258 _public_ int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1259         assert_return(s, -EINVAL);
1260         assert_return(usec != (uint64_t) -1, -EINVAL);
1261         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1262         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1263         assert_return(!event_pid_changed(s->event), -ECHILD);
1264
1265         s->time.next = usec;
1266
1267         source_set_pending(s, false);
1268
1269         if (s->type == SOURCE_REALTIME) {
1270                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1271                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1272         } else {
1273                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1274                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1275         }
1276
1277         return 0;
1278 }
1279
1280 _public_ int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1281         assert_return(s, -EINVAL);
1282         assert_return(usec, -EINVAL);
1283         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1284         assert_return(!event_pid_changed(s->event), -ECHILD);
1285
1286         *usec = s->time.accuracy;
1287         return 0;
1288 }
1289
1290 _public_ int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1291         assert_return(s, -EINVAL);
1292         assert_return(usec != (uint64_t) -1, -EINVAL);
1293         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1294         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1295         assert_return(!event_pid_changed(s->event), -ECHILD);
1296
1297         if (usec == 0)
1298                 usec = DEFAULT_ACCURACY_USEC;
1299
1300         s->time.accuracy = usec;
1301
1302         source_set_pending(s, false);
1303
1304         if (s->type == SOURCE_REALTIME)
1305                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1306         else
1307                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1308
1309         return 0;
1310 }
1311
1312 _public_ int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid) {
1313         assert_return(s, -EINVAL);
1314         assert_return(pid, -EINVAL);
1315         assert_return(s->type == SOURCE_CHILD, -EDOM);
1316         assert_return(!event_pid_changed(s->event), -ECHILD);
1317
1318         *pid = s->child.pid;
1319         return 0;
1320 }
1321
1322 _public_ int sd_event_source_set_prepare(sd_event_source *s, sd_event_handler_t callback) {
1323         int r;
1324
1325         assert_return(s, -EINVAL);
1326         assert_return(s->type != SOURCE_EXIT, -EDOM);
1327         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1328         assert_return(!event_pid_changed(s->event), -ECHILD);
1329
1330         if (s->prepare == callback)
1331                 return 0;
1332
1333         if (callback && s->prepare) {
1334                 s->prepare = callback;
1335                 return 0;
1336         }
1337
1338         r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1339         if (r < 0)
1340                 return r;
1341
1342         s->prepare = callback;
1343
1344         if (callback) {
1345                 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1346                 if (r < 0)
1347                         return r;
1348         } else
1349                 prioq_remove(s->event->prepare, s, &s->prepare_index);
1350
1351         return 0;
1352 }
1353
1354 _public_ void* sd_event_source_get_userdata(sd_event_source *s) {
1355         assert_return(s, NULL);
1356
1357         return s->userdata;
1358 }
1359
1360 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1361         usec_t c;
1362         assert(e);
1363         assert(a <= b);
1364
1365         if (a <= 0)
1366                 return 0;
1367
1368         if (b <= a + 1)
1369                 return a;
1370
1371         /*
1372           Find a good time to wake up again between times a and b. We
1373           have two goals here:
1374
1375           a) We want to wake up as seldom as possible, hence prefer
1376              later times over earlier times.
1377
1378           b) But if we have to wake up, then let's make sure to
1379              dispatch as much as possible on the entire system.
1380
1381           We implement this by waking up everywhere at the same time
1382           within any given minute if we can, synchronised via the
1383           perturbation value determined from the boot ID. If we can't,
1384           then we try to find the same spot in every 10s, then 1s and
1385           then 250ms step. Otherwise, we pick the last possible time
1386           to wake up.
1387         */
1388
1389         c = (b / USEC_PER_MINUTE) * USEC_PER_MINUTE + e->perturb;
1390         if (c >= b) {
1391                 if (_unlikely_(c < USEC_PER_MINUTE))
1392                         return b;
1393
1394                 c -= USEC_PER_MINUTE;
1395         }
1396
1397         if (c >= a)
1398                 return c;
1399
1400         c = (b / (USEC_PER_SEC*10)) * (USEC_PER_SEC*10) + (e->perturb % (USEC_PER_SEC*10));
1401         if (c >= b) {
1402                 if (_unlikely_(c < USEC_PER_SEC*10))
1403                         return b;
1404
1405                 c -= USEC_PER_SEC*10;
1406         }
1407
1408         if (c >= a)
1409                 return c;
1410
1411         c = (b / USEC_PER_SEC) * USEC_PER_SEC + (e->perturb % USEC_PER_SEC);
1412         if (c >= b) {
1413                 if (_unlikely_(c < USEC_PER_SEC))
1414                         return b;
1415
1416                 c -= USEC_PER_SEC;
1417         }
1418
1419         if (c >= a)
1420                 return c;
1421
1422         c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1423         if (c >= b) {
1424                 if (_unlikely_(c < USEC_PER_MSEC*250))
1425                         return b;
1426
1427                 c -= USEC_PER_MSEC*250;
1428         }
1429
1430         if (c >= a)
1431                 return c;
1432
1433         return b;
1434 }
1435
1436 static int event_arm_timer(
1437                 sd_event *e,
1438                 int timer_fd,
1439                 Prioq *earliest,
1440                 Prioq *latest,
1441                 usec_t *next) {
1442
1443         struct itimerspec its = {};
1444         sd_event_source *a, *b;
1445         usec_t t;
1446         int r;
1447
1448         assert(e);
1449         assert(next);
1450
1451         a = prioq_peek(earliest);
1452         if (!a || a->enabled == SD_EVENT_OFF) {
1453
1454                 if (timer_fd < 0)
1455                         return 0;
1456
1457                 if (*next == (usec_t) -1)
1458                         return 0;
1459
1460                 /* disarm */
1461                 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1462                 if (r < 0)
1463                         return r;
1464
1465                 *next = (usec_t) -1;
1466
1467                 return 0;
1468         }
1469
1470         b = prioq_peek(latest);
1471         assert_se(b && b->enabled != SD_EVENT_OFF);
1472
1473         t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1474         if (*next == t)
1475                 return 0;
1476
1477         assert_se(timer_fd >= 0);
1478
1479         if (t == 0) {
1480                 /* We don' want to disarm here, just mean some time looooong ago. */
1481                 its.it_value.tv_sec = 0;
1482                 its.it_value.tv_nsec = 1;
1483         } else
1484                 timespec_store(&its.it_value, t);
1485
1486         r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1487         if (r < 0)
1488                 return -errno;
1489
1490         *next = t;
1491         return 0;
1492 }
1493
1494 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1495         assert(e);
1496         assert(s);
1497         assert(s->type == SOURCE_IO);
1498
1499         s->io.revents = events;
1500
1501         return source_set_pending(s, true);
1502 }
1503
1504 static int flush_timer(sd_event *e, int fd, uint32_t events, usec_t *next) {
1505         uint64_t x;
1506         ssize_t ss;
1507
1508         assert(e);
1509         assert(fd >= 0);
1510
1511         assert_return(events == EPOLLIN, -EIO);
1512
1513         ss = read(fd, &x, sizeof(x));
1514         if (ss < 0) {
1515                 if (errno == EAGAIN || errno == EINTR)
1516                         return 0;
1517
1518                 return -errno;
1519         }
1520
1521         if (ss != sizeof(x))
1522                 return -EIO;
1523
1524         if (next)
1525                 *next = (usec_t) -1;
1526
1527         return 0;
1528 }
1529
1530 static int process_timer(
1531                 sd_event *e,
1532                 usec_t n,
1533                 Prioq *earliest,
1534                 Prioq *latest) {
1535
1536         sd_event_source *s;
1537         int r;
1538
1539         assert(e);
1540
1541         for (;;) {
1542                 s = prioq_peek(earliest);
1543                 if (!s ||
1544                     s->time.next > n ||
1545                     s->enabled == SD_EVENT_OFF ||
1546                     s->pending)
1547                         break;
1548
1549                 r = source_set_pending(s, true);
1550                 if (r < 0)
1551                         return r;
1552
1553                 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1554                 prioq_reshuffle(latest, s, &s->time.latest_index);
1555         }
1556
1557         return 0;
1558 }
1559
1560 static int process_child(sd_event *e) {
1561         sd_event_source *s;
1562         Iterator i;
1563         int r;
1564
1565         assert(e);
1566
1567         e->need_process_child = false;
1568
1569         /*
1570            So, this is ugly. We iteratively invoke waitid() with P_PID
1571            + WNOHANG for each PID we wait for, instead of using
1572            P_ALL. This is because we only want to get child
1573            information of very specific child processes, and not all
1574            of them. We might not have processed the SIGCHLD even of a
1575            previous invocation and we don't want to maintain a
1576            unbounded *per-child* event queue, hence we really don't
1577            want anything flushed out of the kernel's queue that we
1578            don't care about. Since this is O(n) this means that if you
1579            have a lot of processes you probably want to handle SIGCHLD
1580            yourself.
1581
1582            We do not reap the children here (by using WNOWAIT), this
1583            is only done after the event source is dispatched so that
1584            the callback still sees the process as a zombie.
1585         */
1586
1587         HASHMAP_FOREACH(s, e->child_sources, i) {
1588                 assert(s->type == SOURCE_CHILD);
1589
1590                 if (s->pending)
1591                         continue;
1592
1593                 if (s->enabled == SD_EVENT_OFF)
1594                         continue;
1595
1596                 zero(s->child.siginfo);
1597                 r = waitid(P_PID, s->child.pid, &s->child.siginfo,
1598                            WNOHANG | (s->child.options & WEXITED ? WNOWAIT : 0) | s->child.options);
1599                 if (r < 0)
1600                         return -errno;
1601
1602                 if (s->child.siginfo.si_pid != 0) {
1603                         bool zombie =
1604                                 s->child.siginfo.si_code == CLD_EXITED ||
1605                                 s->child.siginfo.si_code == CLD_KILLED ||
1606                                 s->child.siginfo.si_code == CLD_DUMPED;
1607
1608                         if (!zombie && (s->child.options & WEXITED)) {
1609                                 /* If the child isn't dead then let's
1610                                  * immediately remove the state change
1611                                  * from the queue, since there's no
1612                                  * benefit in leaving it queued */
1613
1614                                 assert(s->child.options & (WSTOPPED|WCONTINUED));
1615                                 waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|(s->child.options & (WSTOPPED|WCONTINUED)));
1616                         }
1617
1618                         r = source_set_pending(s, true);
1619                         if (r < 0)
1620                                 return r;
1621                 }
1622         }
1623
1624         return 0;
1625 }
1626
1627 static int process_signal(sd_event *e, uint32_t events) {
1628         bool read_one = false;
1629         int r;
1630
1631         assert(e);
1632         assert(e->signal_sources);
1633
1634         assert_return(events == EPOLLIN, -EIO);
1635
1636         for (;;) {
1637                 struct signalfd_siginfo si;
1638                 ssize_t ss;
1639                 sd_event_source *s;
1640
1641                 ss = read(e->signal_fd, &si, sizeof(si));
1642                 if (ss < 0) {
1643                         if (errno == EAGAIN || errno == EINTR)
1644                                 return read_one;
1645
1646                         return -errno;
1647                 }
1648
1649                 if (ss != sizeof(si))
1650                         return -EIO;
1651
1652                 read_one = true;
1653
1654                 s = e->signal_sources[si.ssi_signo];
1655                 if (si.ssi_signo == SIGCHLD) {
1656                         r = process_child(e);
1657                         if (r < 0)
1658                                 return r;
1659                         if (r > 0 || !s)
1660                                 continue;
1661                 } else
1662                         if (!s)
1663                                 return -EIO;
1664
1665                 s->signal.siginfo = si;
1666                 r = source_set_pending(s, true);
1667                 if (r < 0)
1668                         return r;
1669         }
1670
1671         return 0;
1672 }
1673
1674 static int source_dispatch(sd_event_source *s) {
1675         int r = 0;
1676
1677         assert(s);
1678         assert(s->pending || s->type == SOURCE_EXIT);
1679
1680         if (s->type != SOURCE_DEFER && s->type != SOURCE_EXIT) {
1681                 r = source_set_pending(s, false);
1682                 if (r < 0)
1683                         return r;
1684         }
1685
1686         if (s->enabled == SD_EVENT_ONESHOT) {
1687                 r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
1688                 if (r < 0)
1689                         return r;
1690         }
1691
1692         sd_event_source_ref(s);
1693
1694         switch (s->type) {
1695
1696         case SOURCE_IO:
1697                 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1698                 break;
1699
1700         case SOURCE_MONOTONIC:
1701                 r = s->time.callback(s, s->time.next, s->userdata);
1702                 break;
1703
1704         case SOURCE_REALTIME:
1705                 r = s->time.callback(s, s->time.next, s->userdata);
1706                 break;
1707
1708         case SOURCE_SIGNAL:
1709                 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1710                 break;
1711
1712         case SOURCE_CHILD: {
1713                 bool zombie;
1714
1715                 zombie = s->child.siginfo.si_code == CLD_EXITED ||
1716                          s->child.siginfo.si_code == CLD_KILLED ||
1717                          s->child.siginfo.si_code == CLD_DUMPED;
1718
1719                 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1720
1721                 /* Now, reap the PID for good. */
1722                 if (zombie)
1723                         waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|WEXITED);
1724
1725                 break;
1726         }
1727
1728         case SOURCE_DEFER:
1729                 r = s->defer.callback(s, s->userdata);
1730                 break;
1731
1732         case SOURCE_EXIT:
1733                 r = s->exit.callback(s, s->userdata);
1734                 break;
1735         }
1736
1737         if (r < 0) {
1738                 log_debug("Event source %p returned error, disabling: %s", s, strerror(-r));
1739                 sd_event_source_set_enabled(s, SD_EVENT_OFF);
1740         }
1741
1742         sd_event_source_unref(s);
1743         return 1;
1744 }
1745
1746 static int event_prepare(sd_event *e) {
1747         int r;
1748
1749         assert(e);
1750
1751         for (;;) {
1752                 sd_event_source *s;
1753
1754                 s = prioq_peek(e->prepare);
1755                 if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
1756                         break;
1757
1758                 s->prepare_iteration = e->iteration;
1759                 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1760                 if (r < 0)
1761                         return r;
1762
1763                 assert(s->prepare);
1764                 r = s->prepare(s, s->userdata);
1765                 if (r < 0)
1766                         return r;
1767
1768         }
1769
1770         return 0;
1771 }
1772
1773 static int dispatch_exit(sd_event *e) {
1774         sd_event_source *p;
1775         int r;
1776
1777         assert(e);
1778
1779         p = prioq_peek(e->exit);
1780         if (!p || p->enabled == SD_EVENT_OFF) {
1781                 e->state = SD_EVENT_FINISHED;
1782                 return 0;
1783         }
1784
1785         sd_event_ref(e);
1786         e->iteration++;
1787         e->state = SD_EVENT_EXITING;
1788
1789         r = source_dispatch(p);
1790
1791         e->state = SD_EVENT_PASSIVE;
1792         sd_event_unref(e);
1793
1794         return r;
1795 }
1796
1797 static sd_event_source* event_next_pending(sd_event *e) {
1798         sd_event_source *p;
1799
1800         assert(e);
1801
1802         p = prioq_peek(e->pending);
1803         if (!p)
1804                 return NULL;
1805
1806         if (p->enabled == SD_EVENT_OFF)
1807                 return NULL;
1808
1809         return p;
1810 }
1811
1812 static int arm_watchdog(sd_event *e) {
1813         struct itimerspec its = {};
1814         usec_t t;
1815         int r;
1816
1817         assert(e);
1818         assert(e->watchdog_fd >= 0);
1819
1820         t = sleep_between(e,
1821                           e->watchdog_last + (e->watchdog_period / 2),
1822                           e->watchdog_last + (e->watchdog_period * 3 / 4));
1823
1824         timespec_store(&its.it_value, t);
1825
1826         r = timerfd_settime(e->watchdog_fd, TFD_TIMER_ABSTIME, &its, NULL);
1827         if (r < 0)
1828                 return -errno;
1829
1830         return 0;
1831 }
1832
1833 static int process_watchdog(sd_event *e) {
1834         assert(e);
1835
1836         if (!e->watchdog)
1837                 return 0;
1838
1839         /* Don't notify watchdog too often */
1840         if (e->watchdog_last + e->watchdog_period / 4 > e->timestamp.monotonic)
1841                 return 0;
1842
1843         sd_notify(false, "WATCHDOG=1");
1844         e->watchdog_last = e->timestamp.monotonic;
1845
1846         return arm_watchdog(e);
1847 }
1848
1849 _public_ int sd_event_run(sd_event *e, uint64_t timeout) {
1850         struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1851         sd_event_source *p;
1852         int r, i, m;
1853
1854         assert_return(e, -EINVAL);
1855         assert_return(!event_pid_changed(e), -ECHILD);
1856         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1857         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1858
1859         if (e->exit_requested)
1860                 return dispatch_exit(e);
1861
1862         sd_event_ref(e);
1863         e->iteration++;
1864         e->state = SD_EVENT_RUNNING;
1865
1866         r = event_prepare(e);
1867         if (r < 0)
1868                 goto finish;
1869
1870         r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1871         if (r < 0)
1872                 goto finish;
1873
1874         r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1875         if (r < 0)
1876                 goto finish;
1877
1878         if (event_next_pending(e) || e->need_process_child)
1879                 timeout = 0;
1880
1881         m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1882                        timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1883         if (m < 0) {
1884                 r = errno == EAGAIN || errno == EINTR ? 0 : -errno;
1885                 goto finish;
1886         }
1887
1888         dual_timestamp_get(&e->timestamp);
1889
1890         for (i = 0; i < m; i++) {
1891
1892                 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1893                         r = flush_timer(e, e->monotonic_fd, ev_queue[i].events, &e->monotonic_next);
1894                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1895                         r = flush_timer(e, e->realtime_fd, ev_queue[i].events, &e->realtime_next);
1896                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1897                         r = process_signal(e, ev_queue[i].events);
1898                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_WATCHDOG))
1899                         r = flush_timer(e, e->watchdog_fd, ev_queue[i].events, NULL);
1900                 else
1901                         r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1902
1903                 if (r < 0)
1904                         goto finish;
1905         }
1906
1907         r = process_watchdog(e);
1908         if (r < 0)
1909                 goto finish;
1910
1911         r = process_timer(e, e->timestamp.monotonic, e->monotonic_earliest, e->monotonic_latest);
1912         if (r < 0)
1913                 goto finish;
1914
1915         r = process_timer(e, e->timestamp.realtime, e->realtime_earliest, e->realtime_latest);
1916         if (r < 0)
1917                 goto finish;
1918
1919         if (e->need_process_child) {
1920                 r = process_child(e);
1921                 if (r < 0)
1922                         goto finish;
1923         }
1924
1925         p = event_next_pending(e);
1926         if (!p) {
1927                 r = 0;
1928                 goto finish;
1929         }
1930
1931         r = source_dispatch(p);
1932
1933 finish:
1934         e->state = SD_EVENT_PASSIVE;
1935         sd_event_unref(e);
1936
1937         return r;
1938 }
1939
1940 _public_ int sd_event_loop(sd_event *e) {
1941         int r;
1942
1943         assert_return(e, -EINVAL);
1944         assert_return(!event_pid_changed(e), -ECHILD);
1945         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1946
1947         sd_event_ref(e);
1948
1949         while (e->state != SD_EVENT_FINISHED) {
1950                 r = sd_event_run(e, (uint64_t) -1);
1951                 if (r < 0)
1952                         goto finish;
1953         }
1954
1955         r = e->exit_code;
1956
1957 finish:
1958         sd_event_unref(e);
1959         return r;
1960 }
1961
1962 _public_ int sd_event_get_state(sd_event *e) {
1963         assert_return(e, -EINVAL);
1964         assert_return(!event_pid_changed(e), -ECHILD);
1965
1966         return e->state;
1967 }
1968
1969 _public_ int sd_event_get_exit_code(sd_event *e, int *code) {
1970         assert_return(e, -EINVAL);
1971         assert_return(code, -EINVAL);
1972         assert_return(!event_pid_changed(e), -ECHILD);
1973
1974         if (!e->exit_requested)
1975                 return -ENODATA;
1976
1977         *code = e->exit_code;
1978         return 0;
1979 }
1980
1981 _public_ int sd_event_exit(sd_event *e, int code) {
1982         assert_return(e, -EINVAL);
1983         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1984         assert_return(!event_pid_changed(e), -ECHILD);
1985
1986         e->exit_requested = true;
1987         e->exit_code = code;
1988
1989         return 0;
1990 }
1991
1992 _public_ int sd_event_get_now_realtime(sd_event *e, uint64_t *usec) {
1993         assert_return(e, -EINVAL);
1994         assert_return(usec, -EINVAL);
1995         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1996         assert_return(!event_pid_changed(e), -ECHILD);
1997
1998         *usec = e->timestamp.realtime;
1999         return 0;
2000 }
2001
2002 _public_ int sd_event_get_now_monotonic(sd_event *e, uint64_t *usec) {
2003         assert_return(e, -EINVAL);
2004         assert_return(usec, -EINVAL);
2005         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
2006         assert_return(!event_pid_changed(e), -ECHILD);
2007
2008         *usec = e->timestamp.monotonic;
2009         return 0;
2010 }
2011
2012 _public_ int sd_event_default(sd_event **ret) {
2013
2014         static __thread sd_event *default_event = NULL;
2015         sd_event *e;
2016         int r;
2017
2018         if (!ret)
2019                 return !!default_event;
2020
2021         if (default_event) {
2022                 *ret = sd_event_ref(default_event);
2023                 return 0;
2024         }
2025
2026         r = sd_event_new(&e);
2027         if (r < 0)
2028                 return r;
2029
2030         e->default_event_ptr = &default_event;
2031         e->tid = gettid();
2032         default_event = e;
2033
2034         *ret = e;
2035         return 1;
2036 }
2037
2038 _public_ int sd_event_get_tid(sd_event *e, pid_t *tid) {
2039         assert_return(e, -EINVAL);
2040         assert_return(tid, -EINVAL);
2041         assert_return(!event_pid_changed(e), -ECHILD);
2042
2043         if (e->tid != 0) {
2044                 *tid = e->tid;
2045                 return 0;
2046         }
2047
2048         return -ENXIO;
2049 }
2050
2051 _public_ int sd_event_set_watchdog(sd_event *e, int b) {
2052         int r;
2053
2054         assert_return(e, -EINVAL);
2055
2056         if (e->watchdog == !!b)
2057                 return e->watchdog;
2058
2059         if (b) {
2060                 struct epoll_event ev = {};
2061                 const char *env;
2062
2063                 env = getenv("WATCHDOG_USEC");
2064                 if (!env)
2065                         return false;
2066
2067                 r = safe_atou64(env, &e->watchdog_period);
2068                 if (r < 0)
2069                         return r;
2070                 if (e->watchdog_period <= 0)
2071                         return -EIO;
2072
2073                 /* Issue first ping immediately */
2074                 sd_notify(false, "WATCHDOG=1");
2075                 e->watchdog_last = now(CLOCK_MONOTONIC);
2076
2077                 e->watchdog_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC);
2078                 if (e->watchdog_fd < 0)
2079                         return -errno;
2080
2081                 r = arm_watchdog(e);
2082                 if (r < 0)
2083                         goto fail;
2084
2085                 ev.events = EPOLLIN;
2086                 ev.data.ptr = INT_TO_PTR(SOURCE_WATCHDOG);
2087
2088                 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->watchdog_fd, &ev);
2089                 if (r < 0) {
2090                         r = -errno;
2091                         goto fail;
2092                 }
2093
2094         } else {
2095                 if (e->watchdog_fd >= 0) {
2096                         epoll_ctl(e->epoll_fd, EPOLL_CTL_DEL, e->watchdog_fd, NULL);
2097                         close_nointr_nofail(e->watchdog_fd);
2098                         e->watchdog_fd = -1;
2099                 }
2100         }
2101
2102         e->watchdog = !!b;
2103         return e->watchdog;
2104
2105 fail:
2106         close_nointr_nofail(e->watchdog_fd);
2107         e->watchdog_fd = -1;
2108         return r;
2109 }