chiark / gitweb /
9fceb7b13edd794b99655e0d644244047678d71b
[elogind.git] / src / libsystemd-bus / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25
26 #include "sd-id128.h"
27 #include "sd-daemon.h"
28 #include "macro.h"
29 #include "prioq.h"
30 #include "hashmap.h"
31 #include "util.h"
32 #include "time-util.h"
33 #include "missing.h"
34
35 #include "sd-event.h"
36
37 #define EPOLL_QUEUE_MAX 64
38 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
39
40 typedef enum EventSourceType {
41         SOURCE_IO,
42         SOURCE_MONOTONIC,
43         SOURCE_REALTIME,
44         SOURCE_SIGNAL,
45         SOURCE_CHILD,
46         SOURCE_DEFER,
47         SOURCE_QUIT,
48         SOURCE_WATCHDOG
49 } EventSourceType;
50
51 struct sd_event_source {
52         unsigned n_ref;
53
54         sd_event *event;
55         void *userdata;
56         sd_event_handler_t prepare;
57
58         EventSourceType type:4;
59         int enabled:3;
60         bool pending:1;
61
62         int priority;
63         unsigned pending_index;
64         unsigned prepare_index;
65         unsigned pending_iteration;
66         unsigned prepare_iteration;
67
68         union {
69                 struct {
70                         sd_event_io_handler_t callback;
71                         int fd;
72                         uint32_t events;
73                         uint32_t revents;
74                         bool registered:1;
75                 } io;
76                 struct {
77                         sd_event_time_handler_t callback;
78                         usec_t next, accuracy;
79                         unsigned earliest_index;
80                         unsigned latest_index;
81                 } time;
82                 struct {
83                         sd_event_signal_handler_t callback;
84                         struct signalfd_siginfo siginfo;
85                         int sig;
86                 } signal;
87                 struct {
88                         sd_event_child_handler_t callback;
89                         siginfo_t siginfo;
90                         pid_t pid;
91                         int options;
92                 } child;
93                 struct {
94                         sd_event_handler_t callback;
95                 } defer;
96                 struct {
97                         sd_event_handler_t callback;
98                         unsigned prioq_index;
99                 } quit;
100         };
101 };
102
103 struct sd_event {
104         unsigned n_ref;
105
106         int epoll_fd;
107         int signal_fd;
108         int realtime_fd;
109         int monotonic_fd;
110         int watchdog_fd;
111
112         Prioq *pending;
113         Prioq *prepare;
114
115         /* For both clocks we maintain two priority queues each, one
116          * ordered for the earliest times the events may be
117          * dispatched, and one ordered by the latest times they must
118          * have been dispatched. The range between the top entries in
119          * the two prioqs is the time window we can freely schedule
120          * wakeups in */
121         Prioq *monotonic_earliest;
122         Prioq *monotonic_latest;
123         Prioq *realtime_earliest;
124         Prioq *realtime_latest;
125
126         usec_t realtime_next, monotonic_next;
127         usec_t perturb;
128
129         sigset_t sigset;
130         sd_event_source **signal_sources;
131
132         Hashmap *child_sources;
133         unsigned n_enabled_child_sources;
134
135         Prioq *quit;
136
137         pid_t original_pid;
138
139         unsigned iteration;
140         dual_timestamp timestamp;
141         int state;
142
143         bool quit_requested:1;
144         bool need_process_child:1;
145         bool watchdog:1;
146
147         pid_t tid;
148         sd_event **default_event_ptr;
149
150         usec_t watchdog_last, watchdog_period;
151 };
152
153 static int pending_prioq_compare(const void *a, const void *b) {
154         const sd_event_source *x = a, *y = b;
155
156         assert(x->pending);
157         assert(y->pending);
158
159         /* Enabled ones first */
160         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
161                 return -1;
162         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
163                 return 1;
164
165         /* Lower priority values first */
166         if (x->priority < y->priority)
167                 return -1;
168         if (x->priority > y->priority)
169                 return 1;
170
171         /* Older entries first */
172         if (x->pending_iteration < y->pending_iteration)
173                 return -1;
174         if (x->pending_iteration > y->pending_iteration)
175                 return 1;
176
177         /* Stability for the rest */
178         if (x < y)
179                 return -1;
180         if (x > y)
181                 return 1;
182
183         return 0;
184 }
185
186 static int prepare_prioq_compare(const void *a, const void *b) {
187         const sd_event_source *x = a, *y = b;
188
189         assert(x->prepare);
190         assert(y->prepare);
191
192         /* Move most recently prepared ones last, so that we can stop
193          * preparing as soon as we hit one that has already been
194          * prepared in the current iteration */
195         if (x->prepare_iteration < y->prepare_iteration)
196                 return -1;
197         if (x->prepare_iteration > y->prepare_iteration)
198                 return 1;
199
200         /* Enabled ones first */
201         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
202                 return -1;
203         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
204                 return 1;
205
206         /* Lower priority values first */
207         if (x->priority < y->priority)
208                 return -1;
209         if (x->priority > y->priority)
210                 return 1;
211
212         /* Stability for the rest */
213         if (x < y)
214                 return -1;
215         if (x > y)
216                 return 1;
217
218         return 0;
219 }
220
221 static int earliest_time_prioq_compare(const void *a, const void *b) {
222         const sd_event_source *x = a, *y = b;
223
224         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
225         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
226
227         /* Enabled ones first */
228         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
229                 return -1;
230         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
231                 return 1;
232
233         /* Move the pending ones to the end */
234         if (!x->pending && y->pending)
235                 return -1;
236         if (x->pending && !y->pending)
237                 return 1;
238
239         /* Order by time */
240         if (x->time.next < y->time.next)
241                 return -1;
242         if (x->time.next > y->time.next)
243                 return 1;
244
245         /* Stability for the rest */
246         if (x < y)
247                 return -1;
248         if (x > y)
249                 return 1;
250
251         return 0;
252 }
253
254 static int latest_time_prioq_compare(const void *a, const void *b) {
255         const sd_event_source *x = a, *y = b;
256
257         assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
258                (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
259
260         /* Enabled ones first */
261         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
262                 return -1;
263         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
264                 return 1;
265
266         /* Move the pending ones to the end */
267         if (!x->pending && y->pending)
268                 return -1;
269         if (x->pending && !y->pending)
270                 return 1;
271
272         /* Order by time */
273         if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
274                 return -1;
275         if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
276                 return 1;
277
278         /* Stability for the rest */
279         if (x < y)
280                 return -1;
281         if (x > y)
282                 return 1;
283
284         return 0;
285 }
286
287 static int quit_prioq_compare(const void *a, const void *b) {
288         const sd_event_source *x = a, *y = b;
289
290         assert(x->type == SOURCE_QUIT);
291         assert(y->type == SOURCE_QUIT);
292
293         /* Enabled ones first */
294         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
295                 return -1;
296         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
297                 return 1;
298
299         /* Lower priority values first */
300         if (x->priority < y->priority)
301                 return -1;
302         if (x->priority > y->priority)
303                 return 1;
304
305         /* Stability for the rest */
306         if (x < y)
307                 return -1;
308         if (x > y)
309                 return 1;
310
311         return 0;
312 }
313
314 static void event_free(sd_event *e) {
315         assert(e);
316
317         if (e->default_event_ptr)
318                 *(e->default_event_ptr) = NULL;
319
320         if (e->epoll_fd >= 0)
321                 close_nointr_nofail(e->epoll_fd);
322
323         if (e->signal_fd >= 0)
324                 close_nointr_nofail(e->signal_fd);
325
326         if (e->realtime_fd >= 0)
327                 close_nointr_nofail(e->realtime_fd);
328
329         if (e->monotonic_fd >= 0)
330                 close_nointr_nofail(e->monotonic_fd);
331
332         if (e->watchdog_fd >= 0)
333                 close_nointr_nofail(e->watchdog_fd);
334
335         prioq_free(e->pending);
336         prioq_free(e->prepare);
337         prioq_free(e->monotonic_earliest);
338         prioq_free(e->monotonic_latest);
339         prioq_free(e->realtime_earliest);
340         prioq_free(e->realtime_latest);
341         prioq_free(e->quit);
342
343         free(e->signal_sources);
344
345         hashmap_free(e->child_sources);
346         free(e);
347 }
348
349 _public_ int sd_event_new(sd_event** ret) {
350         sd_event *e;
351         int r;
352
353         assert_return(ret, -EINVAL);
354
355         e = new0(sd_event, 1);
356         if (!e)
357                 return -ENOMEM;
358
359         e->n_ref = 1;
360         e->signal_fd = e->realtime_fd = e->monotonic_fd = e->watchdog_fd = e->epoll_fd = -1;
361         e->realtime_next = e->monotonic_next = (usec_t) -1;
362         e->original_pid = getpid();
363
364         assert_se(sigemptyset(&e->sigset) == 0);
365
366         e->pending = prioq_new(pending_prioq_compare);
367         if (!e->pending) {
368                 r = -ENOMEM;
369                 goto fail;
370         }
371
372         e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
373         if (e->epoll_fd < 0) {
374                 r = -errno;
375                 goto fail;
376         }
377
378         *ret = e;
379         return 0;
380
381 fail:
382         event_free(e);
383         return r;
384 }
385
386 _public_ sd_event* sd_event_ref(sd_event *e) {
387         assert_return(e, NULL);
388
389         assert(e->n_ref >= 1);
390         e->n_ref++;
391
392         return e;
393 }
394
395 _public_ sd_event* sd_event_unref(sd_event *e) {
396
397         if (!e)
398                 return NULL;
399
400         assert(e->n_ref >= 1);
401         e->n_ref--;
402
403         if (e->n_ref <= 0)
404                 event_free(e);
405
406         return NULL;
407 }
408
409 static bool event_pid_changed(sd_event *e) {
410         assert(e);
411
412         /* We don't support people creating am event loop and keeping
413          * it around over a fork(). Let's complain. */
414
415         return e->original_pid != getpid();
416 }
417
418 static int source_io_unregister(sd_event_source *s) {
419         int r;
420
421         assert(s);
422         assert(s->type == SOURCE_IO);
423
424         if (!s->io.registered)
425                 return 0;
426
427         r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
428         if (r < 0)
429                 return -errno;
430
431         s->io.registered = false;
432         return 0;
433 }
434
435 static int source_io_register(
436                 sd_event_source *s,
437                 int enabled,
438                 uint32_t events) {
439
440         struct epoll_event ev = {};
441         int r;
442
443         assert(s);
444         assert(s->type == SOURCE_IO);
445         assert(enabled != SD_EVENT_OFF);
446
447         ev.events = events;
448         ev.data.ptr = s;
449
450         if (enabled == SD_EVENT_ONESHOT)
451                 ev.events |= EPOLLONESHOT;
452
453         if (s->io.registered)
454                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
455         else
456                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
457
458         if (r < 0)
459                 return -errno;
460
461         s->io.registered = true;
462
463         return 0;
464 }
465
466 static void source_free(sd_event_source *s) {
467         assert(s);
468
469         if (s->event) {
470                 switch (s->type) {
471
472                 case SOURCE_IO:
473                         if (s->io.fd >= 0)
474                                 source_io_unregister(s);
475
476                         break;
477
478                 case SOURCE_MONOTONIC:
479                         prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
480                         prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
481                         break;
482
483                 case SOURCE_REALTIME:
484                         prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
485                         prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
486                         break;
487
488                 case SOURCE_SIGNAL:
489                         if (s->signal.sig > 0) {
490                                 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)
491                                         assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
492
493                                 if (s->event->signal_sources)
494                                         s->event->signal_sources[s->signal.sig] = NULL;
495                         }
496
497                         break;
498
499                 case SOURCE_CHILD:
500                         if (s->child.pid > 0) {
501                                 if (s->enabled != SD_EVENT_OFF) {
502                                         assert(s->event->n_enabled_child_sources > 0);
503                                         s->event->n_enabled_child_sources--;
504                                 }
505
506                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
507                                         assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
508
509                                 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
510                         }
511
512                         break;
513
514                 case SOURCE_DEFER:
515                         /* nothing */
516                         break;
517
518                 case SOURCE_QUIT:
519                         prioq_remove(s->event->quit, s, &s->quit.prioq_index);
520                         break;
521                 }
522
523                 if (s->pending)
524                         prioq_remove(s->event->pending, s, &s->pending_index);
525
526                 if (s->prepare)
527                         prioq_remove(s->event->prepare, s, &s->prepare_index);
528
529                 sd_event_unref(s->event);
530         }
531
532         free(s);
533 }
534
535 static int source_set_pending(sd_event_source *s, bool b) {
536         int r;
537
538         assert(s);
539         assert(s->type != SOURCE_QUIT);
540
541         if (s->pending == b)
542                 return 0;
543
544         s->pending = b;
545
546         if (b) {
547                 s->pending_iteration = s->event->iteration;
548
549                 r = prioq_put(s->event->pending, s, &s->pending_index);
550                 if (r < 0) {
551                         s->pending = false;
552                         return r;
553                 }
554         } else
555                 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
556
557         if (s->type == SOURCE_REALTIME) {
558                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
559                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
560         } else if (s->type == SOURCE_MONOTONIC) {
561                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
562                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
563         }
564
565         return 0;
566 }
567
568 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
569         sd_event_source *s;
570
571         assert(e);
572
573         s = new0(sd_event_source, 1);
574         if (!s)
575                 return NULL;
576
577         s->n_ref = 1;
578         s->event = sd_event_ref(e);
579         s->type = type;
580         s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
581
582         return s;
583 }
584
585 _public_ int sd_event_add_io(
586                 sd_event *e,
587                 int fd,
588                 uint32_t events,
589                 sd_event_io_handler_t callback,
590                 void *userdata,
591                 sd_event_source **ret) {
592
593         sd_event_source *s;
594         int r;
595
596         assert_return(e, -EINVAL);
597         assert_return(fd >= 0, -EINVAL);
598         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP|EPOLLET)), -EINVAL);
599         assert_return(callback, -EINVAL);
600         assert_return(ret, -EINVAL);
601         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
602         assert_return(!event_pid_changed(e), -ECHILD);
603
604         s = source_new(e, SOURCE_IO);
605         if (!s)
606                 return -ENOMEM;
607
608         s->io.fd = fd;
609         s->io.events = events;
610         s->io.callback = callback;
611         s->userdata = userdata;
612         s->enabled = SD_EVENT_ON;
613
614         r = source_io_register(s, s->enabled, events);
615         if (r < 0) {
616                 source_free(s);
617                 return -errno;
618         }
619
620         *ret = s;
621         return 0;
622 }
623
624 static int event_setup_timer_fd(
625                 sd_event *e,
626                 EventSourceType type,
627                 int *timer_fd,
628                 clockid_t id) {
629
630         struct epoll_event ev = {};
631         int r, fd;
632         sd_id128_t bootid;
633
634         assert(e);
635         assert(timer_fd);
636
637         if (_likely_(*timer_fd >= 0))
638                 return 0;
639
640         fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
641         if (fd < 0)
642                 return -errno;
643
644         ev.events = EPOLLIN;
645         ev.data.ptr = INT_TO_PTR(type);
646
647         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
648         if (r < 0) {
649                 close_nointr_nofail(fd);
650                 return -errno;
651         }
652
653         /* When we sleep for longer, we try to realign the wakeup to
654            the same time wihtin each minute/second/250ms, so that
655            events all across the system can be coalesced into a single
656            CPU wakeup. However, let's take some system-specific
657            randomness for this value, so that in a network of systems
658            with synced clocks timer events are distributed a
659            bit. Here, we calculate a perturbation usec offset from the
660            boot ID. */
661
662         if (sd_id128_get_boot(&bootid) >= 0)
663                 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_MINUTE;
664
665         *timer_fd = fd;
666         return 0;
667 }
668
669 static int event_add_time_internal(
670                 sd_event *e,
671                 EventSourceType type,
672                 int *timer_fd,
673                 clockid_t id,
674                 Prioq **earliest,
675                 Prioq **latest,
676                 uint64_t usec,
677                 uint64_t accuracy,
678                 sd_event_time_handler_t callback,
679                 void *userdata,
680                 sd_event_source **ret) {
681
682         sd_event_source *s;
683         int r;
684
685         assert_return(e, -EINVAL);
686         assert_return(callback, -EINVAL);
687         assert_return(ret, -EINVAL);
688         assert_return(usec != (uint64_t) -1, -EINVAL);
689         assert_return(accuracy != (uint64_t) -1, -EINVAL);
690         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
691         assert_return(!event_pid_changed(e), -ECHILD);
692
693         assert(timer_fd);
694         assert(earliest);
695         assert(latest);
696
697         if (!*earliest) {
698                 *earliest = prioq_new(earliest_time_prioq_compare);
699                 if (!*earliest)
700                         return -ENOMEM;
701         }
702
703         if (!*latest) {
704                 *latest = prioq_new(latest_time_prioq_compare);
705                 if (!*latest)
706                         return -ENOMEM;
707         }
708
709         if (*timer_fd < 0) {
710                 r = event_setup_timer_fd(e, type, timer_fd, id);
711                 if (r < 0)
712                         return r;
713         }
714
715         s = source_new(e, type);
716         if (!s)
717                 return -ENOMEM;
718
719         s->time.next = usec;
720         s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
721         s->time.callback = callback;
722         s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
723         s->userdata = userdata;
724         s->enabled = SD_EVENT_ONESHOT;
725
726         r = prioq_put(*earliest, s, &s->time.earliest_index);
727         if (r < 0)
728                 goto fail;
729
730         r = prioq_put(*latest, s, &s->time.latest_index);
731         if (r < 0)
732                 goto fail;
733
734         *ret = s;
735         return 0;
736
737 fail:
738         source_free(s);
739         return r;
740 }
741
742 _public_ int sd_event_add_monotonic(sd_event *e,
743                                     uint64_t usec,
744                                     uint64_t accuracy,
745                                     sd_event_time_handler_t callback,
746                                     void *userdata,
747                                     sd_event_source **ret) {
748
749         return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
750 }
751
752 _public_ int sd_event_add_realtime(sd_event *e,
753                                    uint64_t usec,
754                                    uint64_t accuracy,
755                                    sd_event_time_handler_t callback,
756                                    void *userdata,
757                                    sd_event_source **ret) {
758
759         return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
760 }
761
762 static int event_update_signal_fd(sd_event *e) {
763         struct epoll_event ev = {};
764         bool add_to_epoll;
765         int r;
766
767         assert(e);
768
769         add_to_epoll = e->signal_fd < 0;
770
771         r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
772         if (r < 0)
773                 return -errno;
774
775         e->signal_fd = r;
776
777         if (!add_to_epoll)
778                 return 0;
779
780         ev.events = EPOLLIN;
781         ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
782
783         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
784         if (r < 0) {
785                 close_nointr_nofail(e->signal_fd);
786                 e->signal_fd = -1;
787
788                 return -errno;
789         }
790
791         return 0;
792 }
793
794 _public_ int sd_event_add_signal(
795                 sd_event *e,
796                 int sig,
797                 sd_event_signal_handler_t callback,
798                 void *userdata,
799                 sd_event_source **ret) {
800
801         sd_event_source *s;
802         int r;
803
804         assert_return(e, -EINVAL);
805         assert_return(sig > 0, -EINVAL);
806         assert_return(sig < _NSIG, -EINVAL);
807         assert_return(callback, -EINVAL);
808         assert_return(ret, -EINVAL);
809         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
810         assert_return(!event_pid_changed(e), -ECHILD);
811
812         if (!e->signal_sources) {
813                 e->signal_sources = new0(sd_event_source*, _NSIG);
814                 if (!e->signal_sources)
815                         return -ENOMEM;
816         } else if (e->signal_sources[sig])
817                 return -EBUSY;
818
819         s = source_new(e, SOURCE_SIGNAL);
820         if (!s)
821                 return -ENOMEM;
822
823         s->signal.sig = sig;
824         s->signal.callback = callback;
825         s->userdata = userdata;
826         s->enabled = SD_EVENT_ON;
827
828         e->signal_sources[sig] = s;
829         assert_se(sigaddset(&e->sigset, sig) == 0);
830
831         if (sig != SIGCHLD || e->n_enabled_child_sources == 0) {
832                 r = event_update_signal_fd(e);
833                 if (r < 0) {
834                         source_free(s);
835                         return r;
836                 }
837         }
838
839         *ret = s;
840         return 0;
841 }
842
843 _public_ int sd_event_add_child(
844                 sd_event *e,
845                 pid_t pid,
846                 int options,
847                 sd_event_child_handler_t callback,
848                 void *userdata,
849                 sd_event_source **ret) {
850
851         sd_event_source *s;
852         int r;
853
854         assert_return(e, -EINVAL);
855         assert_return(pid > 1, -EINVAL);
856         assert_return(!(options & ~(WEXITED|WSTOPPED|WCONTINUED)), -EINVAL);
857         assert_return(options != 0, -EINVAL);
858         assert_return(callback, -EINVAL);
859         assert_return(ret, -EINVAL);
860         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
861         assert_return(!event_pid_changed(e), -ECHILD);
862
863         r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
864         if (r < 0)
865                 return r;
866
867         if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
868                 return -EBUSY;
869
870         s = source_new(e, SOURCE_CHILD);
871         if (!s)
872                 return -ENOMEM;
873
874         s->child.pid = pid;
875         s->child.options = options;
876         s->child.callback = callback;
877         s->userdata = userdata;
878         s->enabled = SD_EVENT_ONESHOT;
879
880         r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
881         if (r < 0) {
882                 source_free(s);
883                 return r;
884         }
885
886         e->n_enabled_child_sources ++;
887
888         assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
889
890         if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
891                 r = event_update_signal_fd(e);
892                 if (r < 0) {
893                         source_free(s);
894                         return -errno;
895                 }
896         }
897
898         e->need_process_child = true;
899
900         *ret = s;
901         return 0;
902 }
903
904 _public_ int sd_event_add_defer(
905                 sd_event *e,
906                 sd_event_handler_t callback,
907                 void *userdata,
908                 sd_event_source **ret) {
909
910         sd_event_source *s;
911         int r;
912
913         assert_return(e, -EINVAL);
914         assert_return(callback, -EINVAL);
915         assert_return(ret, -EINVAL);
916         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
917         assert_return(!event_pid_changed(e), -ECHILD);
918
919         s = source_new(e, SOURCE_DEFER);
920         if (!s)
921                 return -ENOMEM;
922
923         s->defer.callback = callback;
924         s->userdata = userdata;
925         s->enabled = SD_EVENT_ONESHOT;
926
927         r = source_set_pending(s, true);
928         if (r < 0) {
929                 source_free(s);
930                 return r;
931         }
932
933         *ret = s;
934         return 0;
935 }
936
937 _public_ int sd_event_add_quit(
938                 sd_event *e,
939                 sd_event_handler_t callback,
940                 void *userdata,
941                 sd_event_source **ret) {
942
943         sd_event_source *s;
944         int r;
945
946         assert_return(e, -EINVAL);
947         assert_return(callback, -EINVAL);
948         assert_return(ret, -EINVAL);
949         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
950         assert_return(!event_pid_changed(e), -ECHILD);
951
952         if (!e->quit) {
953                 e->quit = prioq_new(quit_prioq_compare);
954                 if (!e->quit)
955                         return -ENOMEM;
956         }
957
958         s = source_new(e, SOURCE_QUIT);
959         if (!s)
960                 return -ENOMEM;
961
962         s->quit.callback = callback;
963         s->userdata = userdata;
964         s->quit.prioq_index = PRIOQ_IDX_NULL;
965         s->enabled = SD_EVENT_ONESHOT;
966
967         r = prioq_put(s->event->quit, s, &s->quit.prioq_index);
968         if (r < 0) {
969                 source_free(s);
970                 return r;
971         }
972
973         *ret = s;
974         return 0;
975 }
976
977 _public_ sd_event_source* sd_event_source_ref(sd_event_source *s) {
978         assert_return(s, NULL);
979
980         assert(s->n_ref >= 1);
981         s->n_ref++;
982
983         return s;
984 }
985
986 _public_ sd_event_source* sd_event_source_unref(sd_event_source *s) {
987
988         if (!s)
989                 return NULL;
990
991         assert(s->n_ref >= 1);
992         s->n_ref--;
993
994         if (s->n_ref <= 0)
995                 source_free(s);
996
997         return NULL;
998 }
999
1000 _public_ sd_event *sd_event_source_get_event(sd_event_source *s) {
1001         assert_return(s, NULL);
1002
1003         return s->event;
1004 }
1005
1006 _public_ int sd_event_source_get_pending(sd_event_source *s) {
1007         assert_return(s, -EINVAL);
1008         assert_return(s->type != SOURCE_QUIT, -EDOM);
1009         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1010         assert_return(!event_pid_changed(s->event), -ECHILD);
1011
1012         return s->pending;
1013 }
1014
1015 _public_ int sd_event_source_get_io_fd(sd_event_source *s) {
1016         assert_return(s, -EINVAL);
1017         assert_return(s->type == SOURCE_IO, -EDOM);
1018         assert_return(!event_pid_changed(s->event), -ECHILD);
1019
1020         return s->io.fd;
1021 }
1022
1023 _public_ int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
1024         assert_return(s, -EINVAL);
1025         assert_return(events, -EINVAL);
1026         assert_return(s->type == SOURCE_IO, -EDOM);
1027         assert_return(!event_pid_changed(s->event), -ECHILD);
1028
1029         *events = s->io.events;
1030         return 0;
1031 }
1032
1033 _public_ int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
1034         int r;
1035
1036         assert_return(s, -EINVAL);
1037         assert_return(s->type == SOURCE_IO, -EDOM);
1038         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP|EPOLLET)), -EINVAL);
1039         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1040         assert_return(!event_pid_changed(s->event), -ECHILD);
1041
1042         if (s->io.events == events)
1043                 return 0;
1044
1045         if (s->enabled != SD_EVENT_OFF) {
1046                 r = source_io_register(s, s->enabled, events);
1047                 if (r < 0)
1048                         return r;
1049         }
1050
1051         s->io.events = events;
1052         source_set_pending(s, false);
1053
1054         return 0;
1055 }
1056
1057 _public_ int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1058         assert_return(s, -EINVAL);
1059         assert_return(revents, -EINVAL);
1060         assert_return(s->type == SOURCE_IO, -EDOM);
1061         assert_return(s->pending, -ENODATA);
1062         assert_return(!event_pid_changed(s->event), -ECHILD);
1063
1064         *revents = s->io.revents;
1065         return 0;
1066 }
1067
1068 _public_ int sd_event_source_get_signal(sd_event_source *s) {
1069         assert_return(s, -EINVAL);
1070         assert_return(s->type == SOURCE_SIGNAL, -EDOM);
1071         assert_return(!event_pid_changed(s->event), -ECHILD);
1072
1073         return s->signal.sig;
1074 }
1075
1076 _public_ int sd_event_source_get_priority(sd_event_source *s, int *priority) {
1077         assert_return(s, -EINVAL);
1078         assert_return(!event_pid_changed(s->event), -ECHILD);
1079
1080         return s->priority;
1081 }
1082
1083 _public_ int sd_event_source_set_priority(sd_event_source *s, int priority) {
1084         assert_return(s, -EINVAL);
1085         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1086         assert_return(!event_pid_changed(s->event), -ECHILD);
1087
1088         if (s->priority == priority)
1089                 return 0;
1090
1091         s->priority = priority;
1092
1093         if (s->pending)
1094                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1095
1096         if (s->prepare)
1097                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1098
1099         if (s->type == SOURCE_QUIT)
1100                 prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1101
1102         return 0;
1103 }
1104
1105 _public_ int sd_event_source_get_enabled(sd_event_source *s, int *m) {
1106         assert_return(s, -EINVAL);
1107         assert_return(m, -EINVAL);
1108         assert_return(!event_pid_changed(s->event), -ECHILD);
1109
1110         *m = s->enabled;
1111         return 0;
1112 }
1113
1114 _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
1115         int r;
1116
1117         assert_return(s, -EINVAL);
1118         assert_return(m == SD_EVENT_OFF || m == SD_EVENT_ON || m == SD_EVENT_ONESHOT, -EINVAL);
1119         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1120         assert_return(!event_pid_changed(s->event), -ECHILD);
1121
1122         if (s->enabled == m)
1123                 return 0;
1124
1125         if (m == SD_EVENT_OFF) {
1126
1127                 switch (s->type) {
1128
1129                 case SOURCE_IO:
1130                         r = source_io_unregister(s);
1131                         if (r < 0)
1132                                 return r;
1133
1134                         s->enabled = m;
1135                         break;
1136
1137                 case SOURCE_MONOTONIC:
1138                         s->enabled = m;
1139                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1140                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1141                         break;
1142
1143                 case SOURCE_REALTIME:
1144                         s->enabled = m;
1145                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1146                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1147                         break;
1148
1149                 case SOURCE_SIGNAL:
1150                         s->enabled = m;
1151                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1152                                 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1153                                 event_update_signal_fd(s->event);
1154                         }
1155
1156                         break;
1157
1158                 case SOURCE_CHILD:
1159                         s->enabled = m;
1160
1161                         assert(s->event->n_enabled_child_sources > 0);
1162                         s->event->n_enabled_child_sources--;
1163
1164                         if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1165                                 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1166                                 event_update_signal_fd(s->event);
1167                         }
1168
1169                         break;
1170
1171                 case SOURCE_QUIT:
1172                         s->enabled = m;
1173                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1174                         break;
1175
1176                 case SOURCE_DEFER:
1177                         s->enabled = m;
1178                         break;
1179                 }
1180
1181         } else {
1182                 switch (s->type) {
1183
1184                 case SOURCE_IO:
1185                         r = source_io_register(s, m, s->io.events);
1186                         if (r < 0)
1187                                 return r;
1188
1189                         s->enabled = m;
1190                         break;
1191
1192                 case SOURCE_MONOTONIC:
1193                         s->enabled = m;
1194                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1195                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1196                         break;
1197
1198                 case SOURCE_REALTIME:
1199                         s->enabled = m;
1200                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1201                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1202                         break;
1203
1204                 case SOURCE_SIGNAL:
1205                         s->enabled = m;
1206
1207                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)  {
1208                                 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1209                                 event_update_signal_fd(s->event);
1210                         }
1211                         break;
1212
1213                 case SOURCE_CHILD:
1214                         s->enabled = m;
1215
1216                         if (s->enabled == SD_EVENT_OFF) {
1217                                 s->event->n_enabled_child_sources++;
1218
1219                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1220                                         assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1221                                         event_update_signal_fd(s->event);
1222                                 }
1223                         }
1224                         break;
1225
1226                 case SOURCE_QUIT:
1227                         s->enabled = m;
1228                         prioq_reshuffle(s->event->quit, s, &s->quit.prioq_index);
1229                         break;
1230
1231                 case SOURCE_DEFER:
1232                         s->enabled = m;
1233                         break;
1234                 }
1235         }
1236
1237         if (s->pending)
1238                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1239
1240         if (s->prepare)
1241                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1242
1243         return 0;
1244 }
1245
1246 _public_ int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1247         assert_return(s, -EINVAL);
1248         assert_return(usec, -EINVAL);
1249         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1250         assert_return(!event_pid_changed(s->event), -ECHILD);
1251
1252         *usec = s->time.next;
1253         return 0;
1254 }
1255
1256 _public_ int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1257         assert_return(s, -EINVAL);
1258         assert_return(usec != (uint64_t) -1, -EINVAL);
1259         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1260         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1261         assert_return(!event_pid_changed(s->event), -ECHILD);
1262
1263         s->time.next = usec;
1264
1265         source_set_pending(s, false);
1266
1267         if (s->type == SOURCE_REALTIME) {
1268                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1269                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1270         } else {
1271                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1272                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1273         }
1274
1275         return 0;
1276 }
1277
1278 _public_ int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1279         assert_return(s, -EINVAL);
1280         assert_return(usec, -EINVAL);
1281         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1282         assert_return(!event_pid_changed(s->event), -ECHILD);
1283
1284         *usec = s->time.accuracy;
1285         return 0;
1286 }
1287
1288 _public_ int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1289         assert_return(s, -EINVAL);
1290         assert_return(usec != (uint64_t) -1, -EINVAL);
1291         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1292         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1293         assert_return(!event_pid_changed(s->event), -ECHILD);
1294
1295         if (usec == 0)
1296                 usec = DEFAULT_ACCURACY_USEC;
1297
1298         s->time.accuracy = usec;
1299
1300         source_set_pending(s, false);
1301
1302         if (s->type == SOURCE_REALTIME)
1303                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1304         else
1305                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1306
1307         return 0;
1308 }
1309
1310 _public_ int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid) {
1311         assert_return(s, -EINVAL);
1312         assert_return(pid, -EINVAL);
1313         assert_return(s->type == SOURCE_CHILD, -EDOM);
1314         assert_return(!event_pid_changed(s->event), -ECHILD);
1315
1316         *pid = s->child.pid;
1317         return 0;
1318 }
1319
1320 _public_ int sd_event_source_set_prepare(sd_event_source *s, sd_event_handler_t callback) {
1321         int r;
1322
1323         assert_return(s, -EINVAL);
1324         assert_return(s->type != SOURCE_QUIT, -EDOM);
1325         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1326         assert_return(!event_pid_changed(s->event), -ECHILD);
1327
1328         if (s->prepare == callback)
1329                 return 0;
1330
1331         if (callback && s->prepare) {
1332                 s->prepare = callback;
1333                 return 0;
1334         }
1335
1336         r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1337         if (r < 0)
1338                 return r;
1339
1340         s->prepare = callback;
1341
1342         if (callback) {
1343                 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1344                 if (r < 0)
1345                         return r;
1346         } else
1347                 prioq_remove(s->event->prepare, s, &s->prepare_index);
1348
1349         return 0;
1350 }
1351
1352 _public_ void* sd_event_source_get_userdata(sd_event_source *s) {
1353         assert_return(s, NULL);
1354
1355         return s->userdata;
1356 }
1357
1358 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1359         usec_t c;
1360         assert(e);
1361         assert(a <= b);
1362
1363         if (a <= 0)
1364                 return 0;
1365
1366         if (b <= a + 1)
1367                 return a;
1368
1369         /*
1370           Find a good time to wake up again between times a and b. We
1371           have two goals here:
1372
1373           a) We want to wake up as seldom as possible, hence prefer
1374              later times over earlier times.
1375
1376           b) But if we have to wake up, then let's make sure to
1377              dispatch as much as possible on the entire system.
1378
1379           We implement this by waking up everywhere at the same time
1380           within any given minute if we can, synchronised via the
1381           perturbation value determined from the boot ID. If we can't,
1382           then we try to find the same spot in every 1s and then 250ms
1383           step. Otherwise, we pick the last possible time to wake up.
1384         */
1385
1386         c = (b / USEC_PER_MINUTE) * USEC_PER_MINUTE + e->perturb;
1387         if (c >= b) {
1388                 if (_unlikely_(c < USEC_PER_MINUTE))
1389                         return b;
1390
1391                 c -= USEC_PER_MINUTE;
1392         }
1393
1394         if (c >= a)
1395                 return c;
1396
1397         c = (b / USEC_PER_SEC) * USEC_PER_SEC + (e->perturb % USEC_PER_SEC);
1398         if (c >= b) {
1399                 if (_unlikely_(c < USEC_PER_SEC))
1400                         return b;
1401
1402                 c -= USEC_PER_SEC;
1403         }
1404
1405         if (c >= a)
1406                 return c;
1407
1408         c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1409         if (c >= b) {
1410                 if (_unlikely_(c < USEC_PER_MSEC*250))
1411                         return b;
1412
1413                 c -= USEC_PER_MSEC*250;
1414         }
1415
1416         if (c >= a)
1417                 return c;
1418
1419         return b;
1420 }
1421
1422 static int event_arm_timer(
1423                 sd_event *e,
1424                 int timer_fd,
1425                 Prioq *earliest,
1426                 Prioq *latest,
1427                 usec_t *next) {
1428
1429         struct itimerspec its = {};
1430         sd_event_source *a, *b;
1431         usec_t t;
1432         int r;
1433
1434         assert(e);
1435         assert(next);
1436
1437         a = prioq_peek(earliest);
1438         if (!a || a->enabled == SD_EVENT_OFF) {
1439
1440                 if (timer_fd < 0)
1441                         return 0;
1442
1443                 if (*next == (usec_t) -1)
1444                         return 0;
1445
1446                 /* disarm */
1447                 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1448                 if (r < 0)
1449                         return r;
1450
1451                 *next = (usec_t) -1;
1452
1453                 return 0;
1454         }
1455
1456         b = prioq_peek(latest);
1457         assert_se(b && b->enabled != SD_EVENT_OFF);
1458
1459         t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1460         if (*next == t)
1461                 return 0;
1462
1463         assert_se(timer_fd >= 0);
1464
1465         if (t == 0) {
1466                 /* We don' want to disarm here, just mean some time looooong ago. */
1467                 its.it_value.tv_sec = 0;
1468                 its.it_value.tv_nsec = 1;
1469         } else
1470                 timespec_store(&its.it_value, t);
1471
1472         r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1473         if (r < 0)
1474                 return -errno;
1475
1476         *next = t;
1477         return 0;
1478 }
1479
1480 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1481         assert(e);
1482         assert(s);
1483         assert(s->type == SOURCE_IO);
1484
1485         s->io.revents = events;
1486
1487         return source_set_pending(s, true);
1488 }
1489
1490 static int flush_timer(sd_event *e, int fd, uint32_t events, usec_t *next) {
1491         uint64_t x;
1492         ssize_t ss;
1493
1494         assert(e);
1495         assert(fd >= 0);
1496
1497         assert_return(events == EPOLLIN, -EIO);
1498
1499         ss = read(fd, &x, sizeof(x));
1500         if (ss < 0) {
1501                 if (errno == EAGAIN || errno == EINTR)
1502                         return 0;
1503
1504                 return -errno;
1505         }
1506
1507         if (ss != sizeof(x))
1508                 return -EIO;
1509
1510         if (next)
1511                 *next = (usec_t) -1;
1512
1513         return 0;
1514 }
1515
1516 static int process_timer(
1517                 sd_event *e,
1518                 usec_t n,
1519                 Prioq *earliest,
1520                 Prioq *latest) {
1521
1522         sd_event_source *s;
1523         int r;
1524
1525         assert(e);
1526
1527         for (;;) {
1528                 s = prioq_peek(earliest);
1529                 if (!s ||
1530                     s->time.next > n ||
1531                     s->enabled == SD_EVENT_OFF ||
1532                     s->pending)
1533                         break;
1534
1535                 r = source_set_pending(s, true);
1536                 if (r < 0)
1537                         return r;
1538
1539                 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1540                 prioq_reshuffle(latest, s, &s->time.latest_index);
1541         }
1542
1543         return 0;
1544 }
1545
1546 static int process_child(sd_event *e) {
1547         sd_event_source *s;
1548         Iterator i;
1549         int r;
1550
1551         assert(e);
1552
1553         e->need_process_child = false;
1554
1555         /*
1556            So, this is ugly. We iteratively invoke waitid() with P_PID
1557            + WNOHANG for each PID we wait for, instead of using
1558            P_ALL. This is because we only want to get child
1559            information of very specific child processes, and not all
1560            of them. We might not have processed the SIGCHLD even of a
1561            previous invocation and we don't want to maintain a
1562            unbounded *per-child* event queue, hence we really don't
1563            want anything flushed out of the kernel's queue that we
1564            don't care about. Since this is O(n) this means that if you
1565            have a lot of processes you probably want to handle SIGCHLD
1566            yourself.
1567
1568            We do not reap the children here (by using WNOWAIT), this
1569            is only done after the event source is dispatched so that
1570            the callback still sees the process as a zombie.
1571         */
1572
1573         HASHMAP_FOREACH(s, e->child_sources, i) {
1574                 assert(s->type == SOURCE_CHILD);
1575
1576                 if (s->pending)
1577                         continue;
1578
1579                 if (s->enabled == SD_EVENT_OFF)
1580                         continue;
1581
1582                 zero(s->child.siginfo);
1583                 r = waitid(P_PID, s->child.pid, &s->child.siginfo,
1584                            WNOHANG | (s->child.options & WEXITED ? WNOWAIT : 0) | s->child.options);
1585                 if (r < 0)
1586                         return -errno;
1587
1588                 if (s->child.siginfo.si_pid != 0) {
1589                         bool zombie =
1590                                 s->child.siginfo.si_code == CLD_EXITED ||
1591                                 s->child.siginfo.si_code == CLD_KILLED ||
1592                                 s->child.siginfo.si_code == CLD_DUMPED;
1593
1594                         if (!zombie && (s->child.options & WEXITED)) {
1595                                 /* If the child isn't dead then let's
1596                                  * immediately remove the state change
1597                                  * from the queue, since there's no
1598                                  * benefit in leaving it queued */
1599
1600                                 assert(s->child.options & (WSTOPPED|WCONTINUED));
1601                                 waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|(s->child.options & (WSTOPPED|WCONTINUED)));
1602                         }
1603
1604                         r = source_set_pending(s, true);
1605                         if (r < 0)
1606                                 return r;
1607                 }
1608         }
1609
1610         return 0;
1611 }
1612
1613 static int process_signal(sd_event *e, uint32_t events) {
1614         bool read_one = false;
1615         int r;
1616
1617         assert(e);
1618         assert(e->signal_sources);
1619
1620         assert_return(events == EPOLLIN, -EIO);
1621
1622         for (;;) {
1623                 struct signalfd_siginfo si;
1624                 ssize_t ss;
1625                 sd_event_source *s;
1626
1627                 ss = read(e->signal_fd, &si, sizeof(si));
1628                 if (ss < 0) {
1629                         if (errno == EAGAIN || errno == EINTR)
1630                                 return read_one;
1631
1632                         return -errno;
1633                 }
1634
1635                 if (ss != sizeof(si))
1636                         return -EIO;
1637
1638                 read_one = true;
1639
1640                 s = e->signal_sources[si.ssi_signo];
1641                 if (si.ssi_signo == SIGCHLD) {
1642                         r = process_child(e);
1643                         if (r < 0)
1644                                 return r;
1645                         if (r > 0 || !s)
1646                                 continue;
1647                 } else
1648                         if (!s)
1649                                 return -EIO;
1650
1651                 s->signal.siginfo = si;
1652                 r = source_set_pending(s, true);
1653                 if (r < 0)
1654                         return r;
1655         }
1656
1657         return 0;
1658 }
1659
1660 static int source_dispatch(sd_event_source *s) {
1661         int r = 0;
1662
1663         assert(s);
1664         assert(s->pending || s->type == SOURCE_QUIT);
1665
1666         if (s->type != SOURCE_DEFER && s->type != SOURCE_QUIT) {
1667                 r = source_set_pending(s, false);
1668                 if (r < 0)
1669                         return r;
1670         }
1671
1672         if (s->enabled == SD_EVENT_ONESHOT) {
1673                 r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
1674                 if (r < 0)
1675                         return r;
1676         }
1677
1678         sd_event_source_ref(s);
1679
1680         switch (s->type) {
1681
1682         case SOURCE_IO:
1683                 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1684                 break;
1685
1686         case SOURCE_MONOTONIC:
1687                 r = s->time.callback(s, s->time.next, s->userdata);
1688                 break;
1689
1690         case SOURCE_REALTIME:
1691                 r = s->time.callback(s, s->time.next, s->userdata);
1692                 break;
1693
1694         case SOURCE_SIGNAL:
1695                 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1696                 break;
1697
1698         case SOURCE_CHILD: {
1699                 bool zombie;
1700
1701                 zombie = s->child.siginfo.si_code == CLD_EXITED ||
1702                          s->child.siginfo.si_code == CLD_KILLED ||
1703                          s->child.siginfo.si_code == CLD_DUMPED;
1704
1705                 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1706
1707                 /* Now, reap the PID for good. */
1708                 if (zombie)
1709                         waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|WEXITED);
1710
1711                 break;
1712         }
1713
1714         case SOURCE_DEFER:
1715                 r = s->defer.callback(s, s->userdata);
1716                 break;
1717
1718         case SOURCE_QUIT:
1719                 r = s->quit.callback(s, s->userdata);
1720                 break;
1721         }
1722
1723         sd_event_source_unref(s);
1724
1725         return r;
1726 }
1727
1728 static int event_prepare(sd_event *e) {
1729         int r;
1730
1731         assert(e);
1732
1733         for (;;) {
1734                 sd_event_source *s;
1735
1736                 s = prioq_peek(e->prepare);
1737                 if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
1738                         break;
1739
1740                 s->prepare_iteration = e->iteration;
1741                 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1742                 if (r < 0)
1743                         return r;
1744
1745                 assert(s->prepare);
1746                 r = s->prepare(s, s->userdata);
1747                 if (r < 0)
1748                         return r;
1749
1750         }
1751
1752         return 0;
1753 }
1754
1755 static int dispatch_quit(sd_event *e) {
1756         sd_event_source *p;
1757         int r;
1758
1759         assert(e);
1760
1761         p = prioq_peek(e->quit);
1762         if (!p || p->enabled == SD_EVENT_OFF) {
1763                 e->state = SD_EVENT_FINISHED;
1764                 return 0;
1765         }
1766
1767         sd_event_ref(e);
1768         e->iteration++;
1769         e->state = SD_EVENT_QUITTING;
1770
1771         r = source_dispatch(p);
1772
1773         e->state = SD_EVENT_PASSIVE;
1774         sd_event_unref(e);
1775
1776         return r;
1777 }
1778
1779 static sd_event_source* event_next_pending(sd_event *e) {
1780         sd_event_source *p;
1781
1782         assert(e);
1783
1784         p = prioq_peek(e->pending);
1785         if (!p)
1786                 return NULL;
1787
1788         if (p->enabled == SD_EVENT_OFF)
1789                 return NULL;
1790
1791         return p;
1792 }
1793
1794 static int arm_watchdog(sd_event *e) {
1795         struct itimerspec its = {};
1796         usec_t t;
1797         int r;
1798
1799         assert(e);
1800         assert(e->watchdog_fd >= 0);
1801
1802         t = sleep_between(e,
1803                           e->watchdog_last + (e->watchdog_period / 2),
1804                           e->watchdog_last + (e->watchdog_period * 3 / 4));
1805
1806         timespec_store(&its.it_value, t);
1807
1808         r = timerfd_settime(e->watchdog_fd, TFD_TIMER_ABSTIME, &its, NULL);
1809         if (r < 0)
1810                 return -errno;
1811
1812         return 0;
1813 }
1814
1815 static int process_watchdog(sd_event *e) {
1816         assert(e);
1817
1818         if (!e->watchdog)
1819                 return 0;
1820
1821         /* Don't notify watchdog too often */
1822         if (e->watchdog_last + e->watchdog_period / 4 > e->timestamp.monotonic)
1823                 return 0;
1824
1825         sd_notify(false, "WATCHDOG=1");
1826         e->watchdog_last = e->timestamp.monotonic;
1827
1828         return arm_watchdog(e);
1829 }
1830
1831 _public_ int sd_event_run(sd_event *e, uint64_t timeout) {
1832         struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1833         sd_event_source *p;
1834         int r, i, m;
1835
1836         assert_return(e, -EINVAL);
1837         assert_return(!event_pid_changed(e), -ECHILD);
1838         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1839         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1840
1841         if (e->quit_requested)
1842                 return dispatch_quit(e);
1843
1844         sd_event_ref(e);
1845         e->iteration++;
1846         e->state = SD_EVENT_RUNNING;
1847
1848         r = event_prepare(e);
1849         if (r < 0)
1850                 goto finish;
1851
1852         r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1853         if (r < 0)
1854                 goto finish;
1855
1856         r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1857         if (r < 0)
1858                 goto finish;
1859
1860         if (event_next_pending(e) || e->need_process_child)
1861                 timeout = 0;
1862
1863         m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1864                        timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1865         if (m < 0) {
1866                 r = errno == EAGAIN || errno == EINTR ? 0 : -errno;
1867                 goto finish;
1868         }
1869
1870         dual_timestamp_get(&e->timestamp);
1871
1872         for (i = 0; i < m; i++) {
1873
1874                 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1875                         r = flush_timer(e, e->monotonic_fd, ev_queue[i].events, &e->monotonic_next);
1876                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1877                         r = flush_timer(e, e->realtime_fd, ev_queue[i].events, &e->realtime_next);
1878                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1879                         r = process_signal(e, ev_queue[i].events);
1880                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_WATCHDOG))
1881                         r = flush_timer(e, e->watchdog_fd, ev_queue[i].events, NULL);
1882                 else
1883                         r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1884
1885                 if (r < 0)
1886                         goto finish;
1887         }
1888
1889         r = process_watchdog(e);
1890         if (r < 0)
1891                 goto finish;
1892
1893         r = process_timer(e, e->timestamp.monotonic, e->monotonic_earliest, e->monotonic_latest);
1894         if (r < 0)
1895                 goto finish;
1896
1897         r = process_timer(e, e->timestamp.realtime, e->realtime_earliest, e->realtime_latest);
1898         if (r < 0)
1899                 goto finish;
1900
1901         if (e->need_process_child) {
1902                 r = process_child(e);
1903                 if (r < 0)
1904                         goto finish;
1905         }
1906
1907         p = event_next_pending(e);
1908         if (!p) {
1909                 r = 0;
1910                 goto finish;
1911         }
1912
1913         r = source_dispatch(p);
1914
1915 finish:
1916         e->state = SD_EVENT_PASSIVE;
1917         sd_event_unref(e);
1918
1919         return r;
1920 }
1921
1922 _public_ int sd_event_loop(sd_event *e) {
1923         int r;
1924
1925         assert_return(e, -EINVAL);
1926         assert_return(!event_pid_changed(e), -ECHILD);
1927         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1928
1929         sd_event_ref(e);
1930
1931         while (e->state != SD_EVENT_FINISHED) {
1932                 r = sd_event_run(e, (uint64_t) -1);
1933                 if (r < 0)
1934                         goto finish;
1935         }
1936
1937         r = 0;
1938
1939 finish:
1940         sd_event_unref(e);
1941         return r;
1942 }
1943
1944 _public_ int sd_event_get_state(sd_event *e) {
1945         assert_return(e, -EINVAL);
1946         assert_return(!event_pid_changed(e), -ECHILD);
1947
1948         return e->state;
1949 }
1950
1951 _public_ int sd_event_get_quit(sd_event *e) {
1952         assert_return(e, -EINVAL);
1953         assert_return(!event_pid_changed(e), -ECHILD);
1954
1955         return e->quit_requested;
1956 }
1957
1958 _public_ int sd_event_request_quit(sd_event *e) {
1959         assert_return(e, -EINVAL);
1960         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1961         assert_return(!event_pid_changed(e), -ECHILD);
1962
1963         e->quit_requested = true;
1964         return 0;
1965 }
1966
1967 _public_ int sd_event_get_now_realtime(sd_event *e, uint64_t *usec) {
1968         assert_return(e, -EINVAL);
1969         assert_return(usec, -EINVAL);
1970         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1971         assert_return(!event_pid_changed(e), -ECHILD);
1972
1973         *usec = e->timestamp.realtime;
1974         return 0;
1975 }
1976
1977 _public_ int sd_event_get_now_monotonic(sd_event *e, uint64_t *usec) {
1978         assert_return(e, -EINVAL);
1979         assert_return(usec, -EINVAL);
1980         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
1981         assert_return(!event_pid_changed(e), -ECHILD);
1982
1983         *usec = e->timestamp.monotonic;
1984         return 0;
1985 }
1986
1987 _public_ int sd_event_default(sd_event **ret) {
1988
1989         static __thread sd_event *default_event = NULL;
1990         sd_event *e;
1991         int r;
1992
1993         if (!ret)
1994                 return !!default_event;
1995
1996         if (default_event) {
1997                 *ret = sd_event_ref(default_event);
1998                 return 0;
1999         }
2000
2001         r = sd_event_new(&e);
2002         if (r < 0)
2003                 return r;
2004
2005         e->default_event_ptr = &default_event;
2006         e->tid = gettid();
2007         default_event = e;
2008
2009         *ret = e;
2010         return 1;
2011 }
2012
2013 _public_ int sd_event_get_tid(sd_event *e, pid_t *tid) {
2014         assert_return(e, -EINVAL);
2015         assert_return(tid, -EINVAL);
2016         assert_return(!event_pid_changed(e), -ECHILD);
2017
2018         if (e->tid != 0) {
2019                 *tid = e->tid;
2020                 return 0;
2021         }
2022
2023         return -ENXIO;
2024 }
2025
2026 _public_ int sd_event_set_watchdog(sd_event *e, int b) {
2027         int r;
2028
2029         assert_return(e, -EINVAL);
2030
2031         if (e->watchdog == !!b)
2032                 return e->watchdog;
2033
2034         if (b) {
2035                 struct epoll_event ev = {};
2036                 const char *env;
2037
2038                 env = getenv("WATCHDOG_USEC");
2039                 if (!env)
2040                         return false;
2041
2042                 r = safe_atou64(env, &e->watchdog_period);
2043                 if (r < 0)
2044                         return r;
2045                 if (e->watchdog_period <= 0)
2046                         return -EIO;
2047
2048                 /* Issue first ping immediately */
2049                 sd_notify(false, "WATCHDOG=1");
2050                 e->watchdog_last = now(CLOCK_MONOTONIC);
2051
2052                 e->watchdog_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC);
2053                 if (e->watchdog_fd < 0)
2054                         return -errno;
2055
2056                 r = arm_watchdog(e);
2057                 if (r < 0)
2058                         goto fail;
2059
2060                 ev.events = EPOLLIN;
2061                 ev.data.ptr = INT_TO_PTR(SOURCE_WATCHDOG);
2062
2063                 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->watchdog_fd, &ev);
2064                 if (r < 0) {
2065                         r = -errno;
2066                         goto fail;
2067                 }
2068
2069         } else {
2070                 if (e->watchdog_fd >= 0) {
2071                         epoll_ctl(e->epoll_fd, EPOLL_CTL_DEL, e->watchdog_fd, NULL);
2072                         close_nointr_nofail(e->watchdog_fd);
2073                         e->watchdog_fd = -1;
2074                 }
2075         }
2076
2077         e->watchdog = !!b;
2078         return e->watchdog;
2079
2080 fail:
2081         close_nointr_nofail(e->watchdog_fd);
2082         e->watchdog_fd = -1;
2083         return r;
2084 }