chiark / gitweb /
bus: always pass valid timeout to kdbus
[elogind.git] / src / libsystemd-bus / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25
26 #include "sd-id128.h"
27 #include "sd-daemon.h"
28 #include "macro.h"
29 #include "prioq.h"
30 #include "hashmap.h"
31 #include "util.h"
32 #include "time-util.h"
33 #include "missing.h"
34
35 #include "sd-event.h"
36
37 #define EPOLL_QUEUE_MAX 64
38 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
39
40 typedef enum EventSourceType {
41         SOURCE_IO,
42         SOURCE_MONOTONIC,
43         SOURCE_REALTIME,
44         SOURCE_SIGNAL,
45         SOURCE_CHILD,
46         SOURCE_DEFER,
47         SOURCE_QUIT,
48         SOURCE_WATCHDOG
49 } EventSourceType;
50
51 struct sd_event_source {
52         unsigned n_ref;
53
54         sd_event *event;
55         void *userdata;
56         sd_event_handler_t prepare;
57
58         EventSourceType type:4;
59         int enabled:3;
60         bool pending:1;
61
62         int priority;
63         unsigned pending_index;
64         unsigned prepare_index;
65         unsigned pending_iteration;
66         unsigned prepare_iteration;
67
68         union {
69                 struct {
70                         sd_event_io_handler_t callback;
71                         int fd;
72                         uint32_t events;
73                         uint32_t revents;
74                         bool registered:1;
75                 } io;
76                 struct {
77                         sd_event_time_handler_t callback;
78                         usec_t next, accuracy;
79                         unsigned earliest_index;
80                         unsigned latest_index;
81                 } time;
82                 struct {
83                         sd_event_signal_handler_t callback;
84                         struct signalfd_siginfo siginfo;
85                         int sig;
86                 } signal;
87                 struct {
88                         sd_event_child_handler_t callback;
89                         siginfo_t siginfo;
90                         pid_t pid;
91                         int options;
92                 } child;
93                 struct {
94                         sd_event_handler_t callback;
95                 } defer;
96                 struct {
97                         sd_event_handler_t callback;
98                         unsigned prioq_index;
99                 } quit;
100         };
101 };
102
103 struct sd_event {
104         unsigned n_ref;
105
106         int epoll_fd;
107         int signal_fd;
108         int realtime_fd;
109         int monotonic_fd;
110         int watchdog_fd;
111
112         Prioq *pending;
113         Prioq *prepare;
114
115         /* For both clocks we maintain two priority queues each, one
116          * ordered for the earliest times the events may be
117          * dispatched, and one ordered by the latest times they must
118          * have been dispatched. The range between the top entries in
119          * the two prioqs is the time window we can freely schedule
120          * wakeups in */
121         Prioq *monotonic_earliest;
122         Prioq *monotonic_latest;
123         Prioq *realtime_earliest;
124         Prioq *realtime_latest;
125
126         usec_t realtime_next, monotonic_next;
127         usec_t perturb;
128
129         sigset_t sigset;
130         sd_event_source **signal_sources;
131
132         Hashmap *child_sources;
133         unsigned n_enabled_child_sources;
134
135         Prioq *quit;
136
137         pid_t original_pid;
138
139         unsigned iteration;
140         dual_timestamp timestamp;
141         int state;
142
143         bool quit_requested:1;
144         bool need_process_child:1;
145         bool watchdog:1;
146
147         pid_t tid;
148         sd_event **default_event_ptr;
149
150         usec_t watchdog_last, watchdog_period;
151 };
152
153 static int pending_prioq_compare(const void *a, const void *b) {
154         const sd_event_source *x = a, *y = b;
155
156         assert(x->pending);
157         assert(y->pending);
158
159         /* Enabled ones first */
160         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
161                 return -1;
162         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
163                 return 1;
164
165         /* Lower priority values first */
166         if (x->priority < y->priority)
167                 return -1;
168         if (x->priority > y->priority)
169                 return 1;
170
171         /* Older entries first */
172         if (x->pending_iteration < y->pending_iteration)
173                 return -1;
174         if (x->pending_iteration > y->pending_iteration)
175                 return 1;
176
177         /* Stability for the rest */
178         if (x < y)
179                 return -1;
180         if (x > y)
181                 return 1;
182
183         return 0;
184 }
185
186 static int prepare_prioq_compare(const void *a, const void *b) {
187         const sd_event_source *x = a, *y = b;
188
189         assert(x->prepare);
190         assert(y->prepare);
191
192         /* Move most recently prepared ones last, so that we can stop
193          * preparing as soon as we hit one that has already been
194          * prepared in the current iteration */
195         if (x->prepare_iteration < y->prepare_iteration)
196                 return -1;
197         if (x->prepare_iteration > y->prepare_iteration)
198                 return 1;
199
200         /* Enabled ones first */
201         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
202                 return -1;
203         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
204                 return 1;
205
206         /* Lower priority values first */
207         if (x->priority < y->priority)
208                 return -1;
209         if (x->priority > y->priority)
210                 return 1;
211
212         /* Stability for the rest */
213         if (x < y)
214                 return -1;
215         if (x > y)
216                 return 1;
217
218         return 0;
219 }
220
221 static int earliest_time_prioq_compare(const void *a, const void *b) {
222         const sd_event_source *x = a, *y = b;
223
224         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
225         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
226
227         /* Enabled ones first */
228         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
229                 return -1;
230         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
231                 return 1;
232
233         /* Move the pending ones to the end */
234         if (!x->pending && y->pending)
235                 return -1;
236         if (x->pending && !y->pending)
237                 return 1;
238
239         /* Order by time */
240         if (x->time.next < y->time.next)
241                 return -1;
242         if (x->time.next > y->time.next)
243                 return 1;
244
245         /* Stability for the rest */
246         if (x < y)
247                 return -1;
248         if (x > y)
249                 return 1;
250
251         return 0;
252 }
253
254 static int latest_time_prioq_compare(const void *a, const void *b) {
255         const sd_event_source *x = a, *y = b;
256
257         assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
258                (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
259
260         /* Enabled ones first */
261         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
262                 return -1;
263         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
264                 return 1;
265
266         /* Move the pending ones to the end */
267         if (!x->pending && y->pending)
268                 return -1;
269         if (x->pending && !y->pending)
270                 return 1;
271
272         /* Order by time */
273         if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
274                 return -1;
275         if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
276                 return 1;
277
278         /* Stability for the rest */
279         if (x < y)
280                 return -1;
281         if (x > y)
282                 return 1;
283
284         return 0;
285 }
286
287 static int quit_prioq_compare(const void *a, const void *b) {
288         const sd_event_source *x = a, *y = b;
289
290         assert(x->type == SOURCE_QUIT);
291         assert(y->type == SOURCE_QUIT);
292
293         /* Enabled ones first */
294         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
295                 return -1;
296         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
297                 return 1;
298
299         /* Lower priority values first */
300         if (x->priority < y->priority)
301                 return -1;
302         if (x->priority > y->priority)
303                 return 1;
304
305         /* Stability for the rest */
306         if (x < y)
307                 return -1;
308         if (x > y)
309                 return 1;
310
311         return 0;
312 }
313
314 static void event_free(sd_event *e) {
315         assert(e);
316
317         if (e->default_event_ptr)
318                 *(e->default_event_ptr) = NULL;
319
320         if (e->epoll_fd >= 0)
321                 close_nointr_nofail(e->epoll_fd);
322
323         if (e->signal_fd >= 0)
324                 close_nointr_nofail(e->signal_fd);
325
326         if (e->realtime_fd >= 0)
327                 close_nointr_nofail(e->realtime_fd);
328
329         if (e->monotonic_fd >= 0)
330                 close_nointr_nofail(e->monotonic_fd);
331
332         if (e->watchdog_fd >= 0)
333                 close_nointr_nofail(e->watchdog_fd);
334
335         prioq_free(e->pending);
336         prioq_free(e->prepare);
337         prioq_free(e->monotonic_earliest);
338         prioq_free(e->monotonic_latest);
339         prioq_free(e->realtime_earliest);
340         prioq_free(e->realtime_latest);
341         prioq_free(e->quit);
342
343         free(e->signal_sources);
344
345         hashmap_free(e->child_sources);
346         free(e);
347 }
348
349 _public_ int sd_event_new(sd_event** ret) {
350         sd_event *e;
351         int r;
352
353         assert_return(ret, -EINVAL);
354
355         e = new0(sd_event, 1);
356         if (!e)
357                 return -ENOMEM;
358
359         e->n_ref = 1;
360         e->signal_fd = e->realtime_fd = e->monotonic_fd = e->watchdog_fd = e->epoll_fd = -1;
361         e->realtime_next = e->monotonic_next = (usec_t) -1;
362         e->original_pid = getpid();
363
364         assert_se(sigemptyset(&e->sigset) == 0);
365
366         e->pending = prioq_new(pending_prioq_compare);
367         if (!e->pending) {
368                 r = -ENOMEM;
369                 goto fail;
370         }
371
372         e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
373         if (e->epoll_fd < 0) {
374                 r = -errno;
375                 goto fail;
376         }
377
378         *ret = e;
379         return 0;
380
381 fail:
382         event_free(e);
383         return r;
384 }
385
386 _public_ sd_event* sd_event_ref(sd_event *e) {
387         assert_return(e, NULL);
388
389         assert(e->n_ref >= 1);
390         e->n_ref++;
391
392         return e;
393 }
394
395 _public_ sd_event* sd_event_unref(sd_event *e) {
396
397         if (!e)
398                 return NULL;
399
400         assert(e->n_ref >= 1);
401         e->n_ref--;
402
403         if (e->n_ref <= 0)
404                 event_free(e);
405
406         return NULL;
407 }
408
409 static bool event_pid_changed(sd_event *e) {
410         assert(e);
411
412         /* We don't support people creating am event loop and keeping
413          * it around over a fork(). Let's complain. */
414
415         return e->original_pid != getpid();
416 }
417
418 static int source_io_unregister(sd_event_source *s) {
419         int r;
420
421         assert(s);
422         assert(s->type == SOURCE_IO);
423
424         if (!s->io.registered)
425                 return 0;
426
427         r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
428         if (r < 0)
429                 return -errno;
430
431         s->io.registered = false;
432         return 0;
433 }
434
435 static int source_io_register(
436                 sd_event_source *s,
437                 int enabled,
438                 uint32_t events) {
439
440         struct epoll_event ev = {};
441         int r;
442
443         assert(s);
444         assert(s->type == SOURCE_IO);
445         assert(enabled != SD_EVENT_OFF);
446
447         ev.events = events;
448         ev.data.ptr = s;
449
450         if (enabled == SD_EVENT_ONESHOT)
451                 ev.events |= EPOLLONESHOT;
452
453         if (s->io.registered)
454                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
455         else
456                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
457
458         if (r < 0)
459                 return -errno;
460
461         s->io.registered = true;
462
463         return 0;
464 }
465
466 static void source_free(sd_event_source *s) {
467         assert(s);
468
469         if (s->event) {
470                 switch (s->type) {
471
472                 case SOURCE_IO:
473                         if (s->io.fd >= 0)
474                                 source_io_unregister(s);
475
476                         break;
477
478                 case SOURCE_MONOTONIC:
479                         prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
480                         prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
481                         break;
482
483                 case SOURCE_REALTIME:
484                         prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
485                         prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
486                         break;
487
488                 case SOURCE_SIGNAL:
489                         if (s->signal.sig > 0) {
490                                 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)
491                                         assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
492
493                                 if (s->event->signal_sources)
494                                         s->event->signal_sources[s->signal.sig] = NULL;
495                         }
496
497                         break;
498
499                 case SOURCE_CHILD:
500                         if (s->child.pid > 0) {
501                                 if (s->enabled != SD_EVENT_OFF) {
502                                         assert(s->event->n_enabled_child_sources > 0);
503                                         s->event->n_enabled_child_sources--;
504                                 }
505
506                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
507                                         assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
508
509                                 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
510                         }
511
512                         break;
513
514                 case SOURCE_DEFER:
515                         /* nothing */
516                         break;
517
518                 case SOURCE_QUIT:
519                         prioq_remove(s->event->quit, s, &s->quit.prioq_index);
520                         break;
521                 }
522
523                 if (s->pending)
524                         prioq_remove(s->event->pending, s, &s->pending_index);
525
526                 if (s->prepare)
527                         prioq_remove(s->event->prepare, s, &s->prepare_index);
528
529                 sd_event_unref(s->event);
530         }
531
532         free(s);
533 }
534
535 static int source_set_pending(sd_event_source *s, bool b) {
536         int r;
537
538         assert(s);
539         assert(s->type != SOURCE_QUIT);
540
541         if (s->pending == b)
542                 return 0;
543
544         s->pending = b;
545
546         if (b) {
547                 s->pending_iteration = s->event->iteration;
548
549                 r = prioq_put(s->event->pending, s, &s->pending_index);
550                 if (r < 0) {
551                         s->pending = false;
552                         return r;
553                 }
554         } else
555                 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
556
557         if (s->type == SOURCE_REALTIME) {
558                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
559                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
560         } else if (s->type == SOURCE_MONOTONIC) {
561                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
562                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
563         }
564
565         return 0;
566 }
567
568 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
569         sd_event_source *s;
570
571         assert(e);
572
573         s = new0(sd_event_source, 1);
574         if (!s)
575                 return NULL;
576
577         s->n_ref = 1;
578         s->event = sd_event_ref(e);
579         s->type = type;
580         s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
581
582         return s;
583 }
584
585 _public_ int sd_event_add_io(
586                 sd_event *e,
587                 int fd,
588                 uint32_t events,
589                 sd_event_io_handler_t callback,
590                 void *userdata,
591                 sd_event_source **ret) {
592
593         sd_event_source *s;
594         int r;
595
596         assert_return(e, -EINVAL);
597         assert_return(fd >= 0, -EINVAL);
598         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP|EPOLLET)), -EINVAL);
599         assert_return(callback, -EINVAL);
600         assert_return(ret, -EINVAL);
601         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
602         assert_return(!event_pid_changed(e), -ECHILD);
603
604         s = source_new(e, SOURCE_IO);
605         if (!s)
606                 return -ENOMEM;
607
608         s->io.fd = fd;
609         s->io.events = events;
610         s->io.callback = callback;
611         s->userdata = userdata;
612         s->enabled = SD_EVENT_ON;
613
614         r = source_io_register(s, s->enabled, events);
615         if (r < 0) {
616                 source_free(s);
617                 return -errno;
618         }
619
620         *ret = s;
621         return 0;
622 }
623
624 static int event_setup_timer_fd(
625                 sd_event *e,
626                 EventSourceType type,
627                 int *timer_fd,
628                 clockid_t id) {
629
630         struct epoll_event ev = {};
631         int r, fd;
632         sd_id128_t bootid;
633
634         assert(e);
635         assert(timer_fd);
636
637         if (_likely_(*timer_fd >= 0))
638                 return 0;
639
640         fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
641         if (fd < 0)
642                 return -errno;
643
644         ev.events = EPOLLIN;
645         ev.data.ptr = INT_TO_PTR(type);
646
647         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
648         if (r < 0) {
649                 close_nointr_nofail(fd);
650                 return -errno;
651         }
652
653         /* When we sleep for longer, we try to realign the wakeup to
654            the same time wihtin each minute/second/250ms, so that
655            events all across the system can be coalesced into a single
656            CPU wakeup. However, let's take some system-specific
657            randomness for this value, so that in a network of systems
658            with synced clocks timer events are distributed a
659            bit. Here, we calculate a perturbation usec offset from the
660            boot ID. */
661
662         if (sd_id128_get_boot(&bootid) >= 0)
663                 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_MINUTE;
664
665         *timer_fd = fd;
666         return 0;
667 }
668
669 static int event_add_time_internal(
670                 sd_event *e,
671                 EventSourceType type,
672                 int *timer_fd,
673                 clockid_t id,
674                 Prioq **earliest,
675                 Prioq **latest,
676                 uint64_t usec,
677                 uint64_t accuracy,
678                 sd_event_time_handler_t callback,
679                 void *userdata,
680                 sd_event_source **ret) {
681
682         sd_event_source *s;
683         int r;
684
685         assert_return(e, -EINVAL);
686         assert_return(callback, -EINVAL);
687         assert_return(ret, -EINVAL);
688         assert_return(usec != (uint64_t) -1, -EINVAL);
689         assert_return(accuracy != (uint64_t) -1, -EINVAL);
690         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
691         assert_return(!event_pid_changed(e), -ECHILD);
692
693         assert(timer_fd);
694         assert(earliest);
695         assert(latest);
696
697         if (!*earliest) {
698                 *earliest = prioq_new(earliest_time_prioq_compare);
699                 if (!*earliest)
700                         return -ENOMEM;
701         }
702
703         if (!*latest) {
704                 *latest = prioq_new(latest_time_prioq_compare);
705                 if (!*latest)
706                         return -ENOMEM;
707         }
708
709         if (*timer_fd < 0) {
710                 r = event_setup_timer_fd(e, type, timer_fd, id);
711                 if (r < 0)
712                         return r;
713         }
714
715         s = source_new(e, type);
716         if (!s)
717                 return -ENOMEM;
718
719         s->time.next = usec;
720         s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
721         s->time.callback = callback;
722         s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
723         s->userdata = userdata;
724         s->enabled = SD_EVENT_ONESHOT;
725
726         r = prioq_put(*earliest, s, &s->time.earliest_index);
727         if (r < 0)
728                 goto fail;
729
730         r = prioq_put(*latest, s, &s->time.latest_index);
731         if (r < 0)
732                 goto fail;
733
734         *ret = s;
735         return 0;
736
737 fail:
738         source_free(s);
739         return r;
740 }
741
742 _public_ int sd_event_add_monotonic(sd_event *e,
743                                     uint64_t usec,
744                                     uint64_t accuracy,
745                                     sd_event_time_handler_t callback,
746                                     void *userdata,
747                                     sd_event_source **ret) {
748
749         return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
750 }
751
752 _public_ int sd_event_add_realtime(sd_event *e,
753                                    uint64_t usec,
754                                    uint64_t accuracy,
755                                    sd_event_time_handler_t callback,
756                                    void *userdata,
757                                    sd_event_source **ret) {
758
759         return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
760 }
761
762 static int event_update_signal_fd(sd_event *e) {
763         struct epoll_event ev = {};
764         bool add_to_epoll;
765         int r;
766
767         assert(e);
768
769         add_to_epoll = e->signal_fd < 0;
770
771         r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
772         if (r < 0)
773                 return -errno;
774
775         e->signal_fd = r;
776
777         if (!add_to_epoll)
778                 return 0;
779
780         ev.events = EPOLLIN;
781         ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
782
783         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
784         if (r < 0) {
785                 close_nointr_nofail(e->signal_fd);
786                 e->signal_fd = -1;
787
788                 return -errno;
789         }
790
791         return 0;
792 }
793
794 _public_ int sd_event_add_signal(
795                 sd_event *e,
796                 int sig,
797                 sd_event_signal_handler_t callback,
798                 void *userdata,
799                 sd_event_source **ret) {
800
801         sd_event_source *s;
802         int r;
803
804         assert_return(e, -EINVAL);
805         assert_return(sig > 0, -EINVAL);
806         assert_return(sig < _NSIG, -EINVAL);
807         assert_return(callback, -EINVAL);
808         assert_return(ret, -EINVAL);
809         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
810         assert_return(!event_pid_changed(e), -ECHILD);
811
812         if (!e->signal_sources) {
813                 e->signal_sources = new0(sd_event_source*, _NSIG);
814                 if (!e->signal_sources)
815                         return -ENOMEM;
816         } else if (e->signal_sources[sig])
817                 return -EBUSY;
818
819         s = source_new(e, SOURCE_SIGNAL);
820         if (!s)
821                 return -ENOMEM;
822
823         s->signal.sig = sig;
824         s->signal.callback = callback;
825         s->userdata = userdata;
826         s->enabled = SD_EVENT_ON;
827
828         e->signal_sources[sig] = s;
829         assert_se(sigaddset(&e->sigset, sig) == 0);
830
831         if (sig != SIGCHLD || e->n_enabled_child_sources == 0) {
832                 r = event_update_signal_fd(e);
833                 if (r < 0) {
834                         source_free(s);
835                         return r;
836                 }
837         }
838
839         *ret = s;
840         return 0;
841 }
842
843 _public_ int sd_event_add_child(
844                 sd_event *e,
845                 pid_t pid,
846                 int options,
847                 sd_event_child_handler_t callback,
848                 void *userdata,
849                 sd_event_source **ret) {
850
851         sd_event_source *s;
852         int r;
853
854         assert_return(e, -EINVAL);
855         assert_return(pid > 1, -EINVAL);
856         assert_return(!(options & ~(WEXITED|WSTOPPED|WCONTINUED)), -EINVAL);
857         assert_return(options != 0, -EINVAL);
858         assert_return(callback, -EINVAL);
859         assert_return(ret, -EINVAL);
860         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
861         assert_return(!event_pid_changed(e), -ECHILD);
862
863         r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
864         if (r < 0)
865                 return r;
866
867         if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
868                 return -EBUSY;
869
870         s = source_new(e, SOURCE_CHILD);
871         if (!s)
872                 return -ENOMEM;
873
874         s->child.pid = pid;
875         s->child.options = options;
876         s->child.callback = callback;
877         s->userdata = userdata;
878         s->enabled = SD_EVENT_ONESHOT;
879
880         r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
881         if (r < 0) {
882                 source_free(s);
883                 return r;
884         }
885
886         e->n_enabled_child_sources ++;
887
888         assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
889
890         if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
891                 r = event_update_signal_fd(e);
892                 if (r < 0) {
893                         source_free(s);
894                         return -errno;
895                 }
896         }
897
898         e->need_process_child = true;
899
900         *ret = s;
901         return 0;
902 }
903
904 _public_ int sd_event_add_defer(
905                 sd_event *e,
906                 sd_event_handler_t callback,
907                 void *userdata,
908                 sd_event_source **ret) {
909
910         sd_event_source *s;
911         int r;
912
913         assert_return(e, -EINVAL);
914         assert_return(callback, -EINVAL);
915         assert_return(ret, -EINVAL);
916         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
917         assert_return(!event_pid_changed(e), -ECHILD);
918
919         s = source_new(e, SOURCE_DEFER);
920         if (!s)
921                 return -ENOMEM;
922
923         s->defer.callback = callback;
924         s->userdata = userdata;
925         s->enabled = SD_EVENT_ONESHOT;
926
927         r = source_set_pending(s, true);
928         if (r < 0) {
929                 source_free(s);
930                 return r;
931         }
932
933         *ret = s;
934         return 0;
935 }
936
937 _public_ int sd_event_add_quit(
938                 sd_event *e,
939                 sd_event_handler_t callback,
940                 void *userdata,
941                 sd_event_source **ret) {
942
943         sd_event_source *s;
944         int r;
945
946         assert_return(e, -EINVAL);
947         assert_return(callback, -EINVAL);
948         assert_return(ret, -EINVAL);
949         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
950         assert_return(!event_pid_changed(e), -ECHILD);
951
952         if (!e->quit) {
953                 e->quit = prioq_new(quit_prioq_compare);
954                 if (!e->quit)
955                         return -ENOMEM;
956         }
957
958         s = source_new(e, SOURCE_QUIT);
959         if (!s)
960                 return -ENOMEM;
961
962         s->quit.callback = callback;
963         s->userdata = userdata;
964         s->quit.prioq_index = PRIOQ_IDX_NULL;
965         s->enabled = SD_EVENT_ONESHOT;
966
967         r = prioq_put(s->event->quit, s, &s->quit.prioq_index);
968         if (r < 0) {
969                 source_free(s);
970                 return r;
971         }
972
973         *ret = s;
974         return 0;
975 }
976
977 _public_ sd_event_source* sd_event_source_ref(sd_event_source *s) {
978         assert_return(s, NULL);
979
980         assert(s->n_ref >= 1);
981         s->n_ref++;
982
983         return s;
984 }
985
986 _public_ sd_event_source* sd_event_source_unref(sd_event_source *s) {
987
988         if (!s)
989                 return NULL;
990
991         assert(s->n_ref >= 1);
992         s->n_ref--;
993
994         if (s->n_ref <= 0)
995                 source_free(s);
996
997         return NULL;
998 }
999
1000 _public_ sd_event *sd_event_source_get_event(sd_event_source *s) {
1001         assert_return(s, NULL);
1002
1003         return s->event;
1004 }
1005
1006 _public_ int sd_event_source_get_pending(sd_event_source *s) {
1007         assert_return(s, -EINVAL);
1008         assert_return(s->type != SOURCE_QUIT, -EDOM);
1009         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1010         assert_return(!event_pid_changed(s->event), -ECHILD);
1011
1012         return s->pending;
1013 }
1014
1015 _public_ int sd_event_source_get_io_fd(sd_event_source *s) {
1016         assert_return(s, -EINVAL);
1017         assert_return(s->type == SOURCE_IO, -EDOM);
1018         assert_return(!event_pid_changed(s->event), -ECHILD);
1019
1020         return s->io.fd;
1021 }
1022
1023 _public_ int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
1024         assert_return(s, -EINVAL);
1025         assert_return(events, -EINVAL);
1026         assert_return(s->type == SOURCE_IO, -EDOM);
1027         assert_return(!event_pid_changed(s->event), -ECHILD);
1028
1029         *events = s->io.events;
1030         return 0;
1031 }
1032
1033 _public_ int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
1034         int r;
1035
1036         assert_return(s, -EINVAL);
1037         assert_return(s->type == SOURCE_IO, -EDOM);
1038         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP|EPOLLET)), -EINVAL);
1039         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1040         assert_return(!event_pid_changed(s->event), -ECHILD);
1041
1042         if (s->io.events == events)
1043                 return 0;
1044
1045         if (s->enabled != SD_EVENT_OFF) {
1046                 r = source_io_register(s, s->enabled, events);
1047                 if (r < 0)
1048                         return r;
1049         }
1050
1051         s->io.events = events;
1052         source_set_pending(s, false);
1053
1054         return 0;
1055 }
1056
1057 _public_ int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1058         assert_return(s, -EINVAL);
1059         assert_return(revents, -EINVAL);
1060         assert_return(s->type == SOURCE_IO, -EDOM);
1061         assert_return(s->pending, -ENODATA);
1062         assert_return(!event_pid_changed(s->event), -ECHILD);
1063
1064         *revents = s->io.revents;
1065         return 0;
1066 }
1067
1068 _public_ int sd_event_source_get_signal(sd_event_source *s) {
1069         assert_return(s, -EINVAL);
1070         assert_return(s->type == SOURCE_SIGNAL, -EDOM);
1071         assert_return(!event_pid_changed(s->event), -ECHILD);
1072
1073         return s->signal.sig;
1074 }
1075
1076 _public_ int sd_event_source_get_priority(sd_event_source *s, int *priority) {
1077         assert_return(s, -EINVAL);
1078         assert_return(!event_pid_changed(s->event), -ECHILD);
1079
1080         return s->priority;
1081 }
1082
1083 _public_ int sd_event_source_set_priority(sd_event_source *s, int priority) {
1084         assert_return(s, -EINVAL);
1085         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1086         assert_return(!event_pid_changed(s->event), -ECHILD);
1087
1088         if (s->priority == priority)
1089                 return 0;
1090
1091         s->priority = priority;
1092
1093         if (s->pending)
1094                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1095
1096         if (s->prepare)
1097                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1098
1099         if (s->type == SOURCE_QUIT)
1100                 prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1101
1102         return 0;
1103 }
1104
1105 _public_ int sd_event_source_get_enabled(sd_event_source *s, int *m) {
1106         assert_return(s, -EINVAL);
1107         assert_return(m, -EINVAL);
1108         assert_return(!event_pid_changed(s->event), -ECHILD);
1109
1110         *m = s->enabled;
1111         return 0;
1112 }
1113
1114 _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
1115         int r;
1116
1117         assert_return(s, -EINVAL);
1118         assert_return(m == SD_EVENT_OFF || m == SD_EVENT_ON || m == SD_EVENT_ONESHOT, -EINVAL);
1119         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1120         assert_return(!event_pid_changed(s->event), -ECHILD);
1121
1122         if (s->enabled == m)
1123                 return 0;
1124
1125         if (m == SD_EVENT_OFF) {
1126
1127                 switch (s->type) {
1128
1129                 case SOURCE_IO:
1130                         r = source_io_unregister(s);
1131                         if (r < 0)
1132                                 return r;
1133
1134                         s->enabled = m;
1135                         break;
1136
1137                 case SOURCE_MONOTONIC:
1138                         s->enabled = m;
1139                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1140                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1141                         break;
1142
1143                 case SOURCE_REALTIME:
1144                         s->enabled = m;
1145                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1146                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1147                         break;
1148
1149                 case SOURCE_SIGNAL:
1150                         s->enabled = m;
1151                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1152                                 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1153                                 event_update_signal_fd(s->event);
1154                         }
1155
1156                         break;
1157
1158                 case SOURCE_CHILD:
1159                         s->enabled = m;
1160
1161                         assert(s->event->n_enabled_child_sources > 0);
1162                         s->event->n_enabled_child_sources--;
1163
1164                         if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1165                                 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1166                                 event_update_signal_fd(s->event);
1167                         }
1168
1169                         break;
1170
1171                 case SOURCE_QUIT:
1172                         s->enabled = m;
1173                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1174                         break;
1175
1176                 case SOURCE_DEFER:
1177                         s->enabled = m;
1178                         break;
1179                 }
1180
1181         } else {
1182                 switch (s->type) {
1183
1184                 case SOURCE_IO:
1185                         r = source_io_register(s, m, s->io.events);
1186                         if (r < 0)
1187                                 return r;
1188
1189                         s->enabled = m;
1190                         break;
1191
1192                 case SOURCE_MONOTONIC:
1193                         s->enabled = m;
1194                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1195                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1196                         break;
1197
1198                 case SOURCE_REALTIME:
1199                         s->enabled = m;
1200                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1201                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1202                         break;
1203
1204                 case SOURCE_SIGNAL:
1205                         s->enabled = m;
1206
1207                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)  {
1208                                 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1209                                 event_update_signal_fd(s->event);
1210                         }
1211                         break;
1212
1213                 case SOURCE_CHILD:
1214                         s->enabled = m;
1215
1216                         if (s->enabled == SD_EVENT_OFF) {
1217                                 s->event->n_enabled_child_sources++;
1218
1219                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1220                                         assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1221                                         event_update_signal_fd(s->event);
1222                                 }
1223                         }
1224                         break;
1225
1226                 case SOURCE_QUIT:
1227                         s->enabled = m;
1228                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1229                         break;
1230
1231                 case SOURCE_DEFER:
1232                         s->enabled = m;
1233                         break;
1234                 }
1235         }
1236
1237         if (s->pending)
1238                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1239
1240         if (s->prepare)
1241                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1242
1243         return 0;
1244 }
1245
1246 _public_ int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1247         assert_return(s, -EINVAL);
1248         assert_return(usec, -EINVAL);
1249         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1250         assert_return(!event_pid_changed(s->event), -ECHILD);
1251
1252         *usec = s->time.next;
1253         return 0;
1254 }
1255
1256 _public_ int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1257         assert_return(s, -EINVAL);
1258         assert_return(usec != (uint64_t) -1, -EINVAL);
1259         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1260         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1261         assert_return(!event_pid_changed(s->event), -ECHILD);
1262
1263         s->time.next = usec;
1264
1265         source_set_pending(s, false);
1266
1267         if (s->type == SOURCE_REALTIME) {
1268                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1269                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1270         } else {
1271                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1272                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1273         }
1274
1275         return 0;
1276 }
1277
1278 _public_ int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1279         assert_return(s, -EINVAL);
1280         assert_return(usec, -EINVAL);
1281         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1282         assert_return(!event_pid_changed(s->event), -ECHILD);
1283
1284         *usec = s->time.accuracy;
1285         return 0;
1286 }
1287
1288 _public_ int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1289         assert_return(s, -EINVAL);
1290         assert_return(usec != (uint64_t) -1, -EINVAL);
1291         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1292         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1293         assert_return(!event_pid_changed(s->event), -ECHILD);
1294
1295         if (usec == 0)
1296                 usec = DEFAULT_ACCURACY_USEC;
1297
1298         s->time.accuracy = usec;
1299
1300         source_set_pending(s, false);
1301
1302         if (s->type == SOURCE_REALTIME)
1303                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1304         else
1305                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1306
1307         return 0;
1308 }
1309
1310 _public_ int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid) {
1311         assert_return(s, -EINVAL);
1312         assert_return(pid, -EINVAL);
1313         assert_return(s->type == SOURCE_CHILD, -EDOM);
1314         assert_return(!event_pid_changed(s->event), -ECHILD);
1315
1316         *pid = s->child.pid;
1317         return 0;
1318 }
1319
1320 _public_ int sd_event_source_set_prepare(sd_event_source *s, sd_event_handler_t callback) {
1321         int r;
1322
1323         assert_return(s, -EINVAL);
1324         assert_return(s->type != SOURCE_QUIT, -EDOM);
1325         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1326         assert_return(!event_pid_changed(s->event), -ECHILD);
1327
1328         if (s->prepare == callback)
1329                 return 0;
1330
1331         if (callback && s->prepare) {
1332                 s->prepare = callback;
1333                 return 0;
1334         }
1335
1336         r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1337         if (r < 0)
1338                 return r;
1339
1340         s->prepare = callback;
1341
1342         if (callback) {
1343                 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1344                 if (r < 0)
1345                         return r;
1346         } else
1347                 prioq_remove(s->event->prepare, s, &s->prepare_index);
1348
1349         return 0;
1350 }
1351
1352 _public_ void* sd_event_source_get_userdata(sd_event_source *s) {
1353         assert_return(s, NULL);
1354
1355         return s->userdata;
1356 }
1357
1358 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1359         usec_t c;
1360         assert(e);
1361         assert(a <= b);
1362
1363         if (a <= 0)
1364                 return 0;
1365
1366         if (b <= a + 1)
1367                 return a;
1368
1369         /*
1370           Find a good time to wake up again between times a and b. We
1371           have two goals here:
1372
1373           a) We want to wake up as seldom as possible, hence prefer
1374              later times over earlier times.
1375
1376           b) But if we have to wake up, then let's make sure to
1377              dispatch as much as possible on the entire system.
1378
1379           We implement this by waking up everywhere at the same time
1380           within any given minute if we can, synchronised via the
1381           perturbation value determined from the boot ID. If we can't,
1382           then we try to find the same spot in every 10s, then 1s and
1383           then 250ms step. Otherwise, we pick the last possible time
1384           to wake up.
1385         */
1386
1387         c = (b / USEC_PER_MINUTE) * USEC_PER_MINUTE + e->perturb;
1388         if (c >= b) {
1389                 if (_unlikely_(c < USEC_PER_MINUTE))
1390                         return b;
1391
1392                 c -= USEC_PER_MINUTE;
1393         }
1394
1395         if (c >= a)
1396                 return c;
1397
1398         c = (b / (USEC_PER_SEC*10)) * (USEC_PER_SEC*10) + (e->perturb % (USEC_PER_SEC*10));
1399         if (c >= b) {
1400                 if (_unlikely_(c < USEC_PER_SEC*10))
1401                         return b;
1402
1403                 c -= USEC_PER_SEC*10;
1404         }
1405
1406         if (c >= a)
1407                 return c;
1408
1409         c = (b / USEC_PER_SEC) * USEC_PER_SEC + (e->perturb % USEC_PER_SEC);
1410         if (c >= b) {
1411                 if (_unlikely_(c < USEC_PER_SEC))
1412                         return b;
1413
1414                 c -= USEC_PER_SEC;
1415         }
1416
1417         if (c >= a)
1418                 return c;
1419
1420         c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1421         if (c >= b) {
1422                 if (_unlikely_(c < USEC_PER_MSEC*250))
1423                         return b;
1424
1425                 c -= USEC_PER_MSEC*250;
1426         }
1427
1428         if (c >= a)
1429                 return c;
1430
1431         return b;
1432 }
1433
1434 static int event_arm_timer(
1435                 sd_event *e,
1436                 int timer_fd,
1437                 Prioq *earliest,
1438                 Prioq *latest,
1439                 usec_t *next) {
1440
1441         struct itimerspec its = {};
1442         sd_event_source *a, *b;
1443         usec_t t;
1444         int r;
1445
1446         assert(e);
1447         assert(next);
1448
1449         a = prioq_peek(earliest);
1450         if (!a || a->enabled == SD_EVENT_OFF) {
1451
1452                 if (timer_fd < 0)
1453                         return 0;
1454
1455                 if (*next == (usec_t) -1)
1456                         return 0;
1457
1458                 /* disarm */
1459                 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1460                 if (r < 0)
1461                         return r;
1462
1463                 *next = (usec_t) -1;
1464
1465                 return 0;
1466         }
1467
1468         b = prioq_peek(latest);
1469         assert_se(b && b->enabled != SD_EVENT_OFF);
1470
1471         t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1472         if (*next == t)
1473                 return 0;
1474
1475         assert_se(timer_fd >= 0);
1476
1477         if (t == 0) {
1478                 /* We don' want to disarm here, just mean some time looooong ago. */
1479                 its.it_value.tv_sec = 0;
1480                 its.it_value.tv_nsec = 1;
1481         } else
1482                 timespec_store(&its.it_value, t);
1483
1484         r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1485         if (r < 0)
1486                 return -errno;
1487
1488         *next = t;
1489         return 0;
1490 }
1491
1492 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1493         assert(e);
1494         assert(s);
1495         assert(s->type == SOURCE_IO);
1496
1497         s->io.revents = events;
1498
1499         return source_set_pending(s, true);
1500 }
1501
1502 static int flush_timer(sd_event *e, int fd, uint32_t events, usec_t *next) {
1503         uint64_t x;
1504         ssize_t ss;
1505
1506         assert(e);
1507         assert(fd >= 0);
1508
1509         assert_return(events == EPOLLIN, -EIO);
1510
1511         ss = read(fd, &x, sizeof(x));
1512         if (ss < 0) {
1513                 if (errno == EAGAIN || errno == EINTR)
1514                         return 0;
1515
1516                 return -errno;
1517         }
1518
1519         if (ss != sizeof(x))
1520                 return -EIO;
1521
1522         if (next)
1523                 *next = (usec_t) -1;
1524
1525         return 0;
1526 }
1527
1528 static int process_timer(
1529                 sd_event *e,
1530                 usec_t n,
1531                 Prioq *earliest,
1532                 Prioq *latest) {
1533
1534         sd_event_source *s;
1535         int r;
1536
1537         assert(e);
1538
1539         for (;;) {
1540                 s = prioq_peek(earliest);
1541                 if (!s ||
1542                     s->time.next > n ||
1543                     s->enabled == SD_EVENT_OFF ||
1544                     s->pending)
1545                         break;
1546
1547                 r = source_set_pending(s, true);
1548                 if (r < 0)
1549                         return r;
1550
1551                 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1552                 prioq_reshuffle(latest, s, &s->time.latest_index);
1553         }
1554
1555         return 0;
1556 }
1557
1558 static int process_child(sd_event *e) {
1559         sd_event_source *s;
1560         Iterator i;
1561         int r;
1562
1563         assert(e);
1564
1565         e->need_process_child = false;
1566
1567         /*
1568            So, this is ugly. We iteratively invoke waitid() with P_PID
1569            + WNOHANG for each PID we wait for, instead of using
1570            P_ALL. This is because we only want to get child
1571            information of very specific child processes, and not all
1572            of them. We might not have processed the SIGCHLD even of a
1573            previous invocation and we don't want to maintain a
1574            unbounded *per-child* event queue, hence we really don't
1575            want anything flushed out of the kernel's queue that we
1576            don't care about. Since this is O(n) this means that if you
1577            have a lot of processes you probably want to handle SIGCHLD
1578            yourself.
1579
1580            We do not reap the children here (by using WNOWAIT), this
1581            is only done after the event source is dispatched so that
1582            the callback still sees the process as a zombie.
1583         */
1584
1585         HASHMAP_FOREACH(s, e->child_sources, i) {
1586                 assert(s->type == SOURCE_CHILD);
1587
1588                 if (s->pending)
1589                         continue;
1590
1591                 if (s->enabled == SD_EVENT_OFF)
1592                         continue;
1593
1594                 zero(s->child.siginfo);
1595                 r = waitid(P_PID, s->child.pid, &s->child.siginfo,
1596                            WNOHANG | (s->child.options & WEXITED ? WNOWAIT : 0) | s->child.options);
1597                 if (r < 0)
1598                         return -errno;
1599
1600                 if (s->child.siginfo.si_pid != 0) {
1601                         bool zombie =
1602                                 s->child.siginfo.si_code == CLD_EXITED ||
1603                                 s->child.siginfo.si_code == CLD_KILLED ||
1604                                 s->child.siginfo.si_code == CLD_DUMPED;
1605
1606                         if (!zombie && (s->child.options & WEXITED)) {
1607                                 /* If the child isn't dead then let's
1608                                  * immediately remove the state change
1609                                  * from the queue, since there's no
1610                                  * benefit in leaving it queued */
1611
1612                                 assert(s->child.options & (WSTOPPED|WCONTINUED));
1613                                 waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|(s->child.options & (WSTOPPED|WCONTINUED)));
1614                         }
1615
1616                         r = source_set_pending(s, true);
1617                         if (r < 0)
1618                                 return r;
1619                 }
1620         }
1621
1622         return 0;
1623 }
1624
1625 static int process_signal(sd_event *e, uint32_t events) {
1626         bool read_one = false;
1627         int r;
1628
1629         assert(e);
1630         assert(e->signal_sources);
1631
1632         assert_return(events == EPOLLIN, -EIO);
1633
1634         for (;;) {
1635                 struct signalfd_siginfo si;
1636                 ssize_t ss;
1637                 sd_event_source *s;
1638
1639                 ss = read(e->signal_fd, &si, sizeof(si));
1640                 if (ss < 0) {
1641                         if (errno == EAGAIN || errno == EINTR)
1642                                 return read_one;
1643
1644                         return -errno;
1645                 }
1646
1647                 if (ss != sizeof(si))
1648                         return -EIO;
1649
1650                 read_one = true;
1651
1652                 s = e->signal_sources[si.ssi_signo];
1653                 if (si.ssi_signo == SIGCHLD) {
1654                         r = process_child(e);
1655                         if (r < 0)
1656                                 return r;
1657                         if (r > 0 || !s)
1658                                 continue;
1659                 } else
1660                         if (!s)
1661                                 return -EIO;
1662
1663                 s->signal.siginfo = si;
1664                 r = source_set_pending(s, true);
1665                 if (r < 0)
1666                         return r;
1667         }
1668
1669         return 0;
1670 }
1671
1672 static int source_dispatch(sd_event_source *s) {
1673         int r = 0;
1674
1675         assert(s);
1676         assert(s->pending || s->type == SOURCE_QUIT);
1677
1678         if (s->type != SOURCE_DEFER && s->type != SOURCE_QUIT) {
1679                 r = source_set_pending(s, false);
1680                 if (r < 0)
1681                         return r;
1682         }
1683
1684         if (s->enabled == SD_EVENT_ONESHOT) {
1685                 r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
1686                 if (r < 0)
1687                         return r;
1688         }
1689
1690         sd_event_source_ref(s);
1691
1692         switch (s->type) {
1693
1694         case SOURCE_IO:
1695                 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1696                 break;
1697
1698         case SOURCE_MONOTONIC:
1699                 r = s->time.callback(s, s->time.next, s->userdata);
1700                 break;
1701
1702         case SOURCE_REALTIME:
1703                 r = s->time.callback(s, s->time.next, s->userdata);
1704                 break;
1705
1706         case SOURCE_SIGNAL:
1707                 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1708                 break;
1709
1710         case SOURCE_CHILD: {
1711                 bool zombie;
1712
1713                 zombie = s->child.siginfo.si_code == CLD_EXITED ||
1714                          s->child.siginfo.si_code == CLD_KILLED ||
1715                          s->child.siginfo.si_code == CLD_DUMPED;
1716
1717                 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1718
1719                 /* Now, reap the PID for good. */
1720                 if (zombie)
1721                         waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|WEXITED);
1722
1723                 break;
1724         }
1725
1726         case SOURCE_DEFER:
1727                 r = s->defer.callback(s, s->userdata);
1728                 break;
1729
1730         case SOURCE_QUIT:
1731                 r = s->quit.callback(s, s->userdata);
1732                 break;
1733         }
1734
1735         sd_event_source_unref(s);
1736
1737         return r;
1738 }
1739
1740 static int event_prepare(sd_event *e) {
1741         int r;
1742
1743         assert(e);
1744
1745         for (;;) {
1746                 sd_event_source *s;
1747
1748                 s = prioq_peek(e->prepare);
1749                 if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
1750                         break;
1751
1752                 s->prepare_iteration = e->iteration;
1753                 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1754                 if (r < 0)
1755                         return r;
1756
1757                 assert(s->prepare);
1758                 r = s->prepare(s, s->userdata);
1759                 if (r < 0)
1760                         return r;
1761
1762         }
1763
1764         return 0;
1765 }
1766
1767 static int dispatch_quit(sd_event *e) {
1768         sd_event_source *p;
1769         int r;
1770
1771         assert(e);
1772
1773         p = prioq_peek(e->quit);
1774         if (!p || p->enabled == SD_EVENT_OFF) {
1775                 e->state = SD_EVENT_FINISHED;
1776                 return 0;
1777         }
1778
1779         sd_event_ref(e);
1780         e->iteration++;
1781         e->state = SD_EVENT_QUITTING;
1782
1783         r = source_dispatch(p);
1784
1785         e->state = SD_EVENT_PASSIVE;
1786         sd_event_unref(e);
1787
1788         return r;
1789 }
1790
1791 static sd_event_source* event_next_pending(sd_event *e) {
1792         sd_event_source *p;
1793
1794         assert(e);
1795
1796         p = prioq_peek(e->pending);
1797         if (!p)
1798                 return NULL;
1799
1800         if (p->enabled == SD_EVENT_OFF)
1801                 return NULL;
1802
1803         return p;
1804 }
1805
1806 static int arm_watchdog(sd_event *e) {
1807         struct itimerspec its = {};
1808         usec_t t;
1809         int r;
1810
1811         assert(e);
1812         assert(e->watchdog_fd >= 0);
1813
1814         t = sleep_between(e,
1815                           e->watchdog_last + (e->watchdog_period / 2),
1816                           e->watchdog_last + (e->watchdog_period * 3 / 4));
1817
1818         timespec_store(&its.it_value, t);
1819
1820         r = timerfd_settime(e->watchdog_fd, TFD_TIMER_ABSTIME, &its, NULL);
1821         if (r < 0)
1822                 return -errno;
1823
1824         return 0;
1825 }
1826
1827 static int process_watchdog(sd_event *e) {
1828         assert(e);
1829
1830         if (!e->watchdog)
1831                 return 0;
1832
1833         /* Don't notify watchdog too often */
1834         if (e->watchdog_last + e->watchdog_period / 4 > e->timestamp.monotonic)
1835                 return 0;
1836
1837         sd_notify(false, "WATCHDOG=1");
1838         e->watchdog_last = e->timestamp.monotonic;
1839
1840         return arm_watchdog(e);
1841 }
1842
1843 _public_ int sd_event_run(sd_event *e, uint64_t timeout) {
1844         struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1845         sd_event_source *p;
1846         int r, i, m;
1847
1848         assert_return(e, -EINVAL);
1849         assert_return(!event_pid_changed(e), -ECHILD);
1850         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1851         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1852
1853         if (e->quit_requested)
1854                 return dispatch_quit(e);
1855
1856         sd_event_ref(e);
1857         e->iteration++;
1858         e->state = SD_EVENT_RUNNING;
1859
1860         r = event_prepare(e);
1861         if (r < 0)
1862                 goto finish;
1863
1864         r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1865         if (r < 0)
1866                 goto finish;
1867
1868         r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1869         if (r < 0)
1870                 goto finish;
1871
1872         if (event_next_pending(e) || e->need_process_child)
1873                 timeout = 0;
1874
1875         m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1876                        timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1877         if (m < 0) {
1878                 r = errno == EAGAIN || errno == EINTR ? 0 : -errno;
1879                 goto finish;
1880         }
1881
1882         dual_timestamp_get(&e->timestamp);
1883
1884         for (i = 0; i < m; i++) {
1885
1886                 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1887                         r = flush_timer(e, e->monotonic_fd, ev_queue[i].events, &e->monotonic_next);
1888                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1889                         r = flush_timer(e, e->realtime_fd, ev_queue[i].events, &e->realtime_next);
1890                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1891                         r = process_signal(e, ev_queue[i].events);
1892                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_WATCHDOG))
1893                         r = flush_timer(e, e->watchdog_fd, ev_queue[i].events, NULL);
1894                 else
1895                         r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1896
1897                 if (r < 0)
1898                         goto finish;
1899         }
1900
1901         r = process_watchdog(e);
1902         if (r < 0)
1903                 goto finish;
1904
1905         r = process_timer(e, e->timestamp.monotonic, e->monotonic_earliest, e->monotonic_latest);
1906         if (r < 0)
1907                 goto finish;
1908
1909         r = process_timer(e, e->timestamp.realtime, e->realtime_earliest, e->realtime_latest);
1910         if (r < 0)
1911                 goto finish;
1912
1913         if (e->need_process_child) {
1914                 r = process_child(e);
1915                 if (r < 0)
1916                         goto finish;
1917         }
1918
1919         p = event_next_pending(e);
1920         if (!p) {
1921                 r = 0;
1922                 goto finish;
1923         }
1924
1925         r = source_dispatch(p);
1926
1927 finish:
1928         e->state = SD_EVENT_PASSIVE;
1929         sd_event_unref(e);
1930
1931         return r;
1932 }
1933
1934 _public_ int sd_event_loop(sd_event *e) {
1935         int r;
1936
1937         assert_return(e, -EINVAL);
1938         assert_return(!event_pid_changed(e), -ECHILD);
1939         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1940
1941         sd_event_ref(e);
1942
1943         while (e->state != SD_EVENT_FINISHED) {
1944                 r = sd_event_run(e, (uint64_t) -1);
1945                 if (r < 0)
1946                         goto finish;
1947         }
1948
1949         r = 0;
1950
1951 finish:
1952         sd_event_unref(e);
1953         return r;
1954 }
1955
1956 _public_ int sd_event_get_state(sd_event *e) {
1957         assert_return(e, -EINVAL);
1958         assert_return(!event_pid_changed(e), -ECHILD);
1959
1960         return e->state;
1961 }
1962
1963 _public_ int sd_event_get_quit(sd_event *e) {
1964         assert_return(e, -EINVAL);
1965         assert_return(!event_pid_changed(e), -ECHILD);
1966
1967         return e->quit_requested;
1968 }
1969
1970 _public_ int sd_event_request_quit(sd_event *e) {
1971         assert_return(e, -EINVAL);
1972         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1973         assert_return(!event_pid_changed(e), -ECHILD);
1974
1975         e->quit_requested = true;
1976         return 0;
1977 }
1978
1979 _public_ int sd_event_get_now_realtime(sd_event *e, uint64_t *usec) {
1980         assert_return(e, -EINVAL);
1981         assert_return(usec, -EINVAL);
1982         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1983         assert_return(!event_pid_changed(e), -ECHILD);
1984
1985         *usec = e->timestamp.realtime;
1986         return 0;
1987 }
1988
1989 _public_ int sd_event_get_now_monotonic(sd_event *e, uint64_t *usec) {
1990         assert_return(e, -EINVAL);
1991         assert_return(usec, -EINVAL);
1992         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1993         assert_return(!event_pid_changed(e), -ECHILD);
1994
1995         *usec = e->timestamp.monotonic;
1996         return 0;
1997 }
1998
1999 _public_ int sd_event_default(sd_event **ret) {
2000
2001         static __thread sd_event *default_event = NULL;
2002         sd_event *e;
2003         int r;
2004
2005         if (!ret)
2006                 return !!default_event;
2007
2008         if (default_event) {
2009                 *ret = sd_event_ref(default_event);
2010                 return 0;
2011         }
2012
2013         r = sd_event_new(&e);
2014         if (r < 0)
2015                 return r;
2016
2017         e->default_event_ptr = &default_event;
2018         e->tid = gettid();
2019         default_event = e;
2020
2021         *ret = e;
2022         return 1;
2023 }
2024
2025 _public_ int sd_event_get_tid(sd_event *e, pid_t *tid) {
2026         assert_return(e, -EINVAL);
2027         assert_return(tid, -EINVAL);
2028         assert_return(!event_pid_changed(e), -ECHILD);
2029
2030         if (e->tid != 0) {
2031                 *tid = e->tid;
2032                 return 0;
2033         }
2034
2035         return -ENXIO;
2036 }
2037
2038 _public_ int sd_event_set_watchdog(sd_event *e, int b) {
2039         int r;
2040
2041         assert_return(e, -EINVAL);
2042
2043         if (e->watchdog == !!b)
2044                 return e->watchdog;
2045
2046         if (b) {
2047                 struct epoll_event ev = {};
2048                 const char *env;
2049
2050                 env = getenv("WATCHDOG_USEC");
2051                 if (!env)
2052                         return false;
2053
2054                 r = safe_atou64(env, &e->watchdog_period);
2055                 if (r < 0)
2056                         return r;
2057                 if (e->watchdog_period <= 0)
2058                         return -EIO;
2059
2060                 /* Issue first ping immediately */
2061                 sd_notify(false, "WATCHDOG=1");
2062                 e->watchdog_last = now(CLOCK_MONOTONIC);
2063
2064                 e->watchdog_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC);
2065                 if (e->watchdog_fd < 0)
2066                         return -errno;
2067
2068                 r = arm_watchdog(e);
2069                 if (r < 0)
2070                         goto fail;
2071
2072                 ev.events = EPOLLIN;
2073                 ev.data.ptr = INT_TO_PTR(SOURCE_WATCHDOG);
2074
2075                 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->watchdog_fd, &ev);
2076                 if (r < 0) {
2077                         r = -errno;
2078                         goto fail;
2079                 }
2080
2081         } else {
2082                 if (e->watchdog_fd >= 0) {
2083                         epoll_ctl(e->epoll_fd, EPOLL_CTL_DEL, e->watchdog_fd, NULL);
2084                         close_nointr_nofail(e->watchdog_fd);
2085                         e->watchdog_fd = -1;
2086                 }
2087         }
2088
2089         e->watchdog = !!b;
2090         return e->watchdog;
2091
2092 fail:
2093         close_nointr_nofail(e->watchdog_fd);
2094         e->watchdog_fd = -1;
2095         return r;
2096 }