chiark / gitweb /
6af52ecb3cdf45325d91e74d91c0a4801397dbc9
[elogind.git] / src / libsystemd-bus / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25
26 #include "sd-id128.h"
27 #include "sd-daemon.h"
28 #include "macro.h"
29 #include "prioq.h"
30 #include "hashmap.h"
31 #include "util.h"
32 #include "time-util.h"
33 #include "missing.h"
34
35 #include "sd-event.h"
36
37 #define EPOLL_QUEUE_MAX 64
38 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
39
40 typedef enum EventSourceType {
41         SOURCE_IO,
42         SOURCE_MONOTONIC,
43         SOURCE_REALTIME,
44         SOURCE_SIGNAL,
45         SOURCE_CHILD,
46         SOURCE_DEFER,
47         SOURCE_EXIT,
48         SOURCE_WATCHDOG
49 } EventSourceType;
50
51 struct sd_event_source {
52         unsigned n_ref;
53
54         sd_event *event;
55         void *userdata;
56         sd_event_handler_t prepare;
57
58         EventSourceType type:4;
59         int enabled:3;
60         bool pending:1;
61         bool dispatching:1;
62
63         int priority;
64         unsigned pending_index;
65         unsigned prepare_index;
66         unsigned pending_iteration;
67         unsigned prepare_iteration;
68
69         union {
70                 struct {
71                         sd_event_io_handler_t callback;
72                         int fd;
73                         uint32_t events;
74                         uint32_t revents;
75                         bool registered:1;
76                 } io;
77                 struct {
78                         sd_event_time_handler_t callback;
79                         usec_t next, accuracy;
80                         unsigned earliest_index;
81                         unsigned latest_index;
82                 } time;
83                 struct {
84                         sd_event_signal_handler_t callback;
85                         struct signalfd_siginfo siginfo;
86                         int sig;
87                 } signal;
88                 struct {
89                         sd_event_child_handler_t callback;
90                         siginfo_t siginfo;
91                         pid_t pid;
92                         int options;
93                 } child;
94                 struct {
95                         sd_event_handler_t callback;
96                 } defer;
97                 struct {
98                         sd_event_handler_t callback;
99                         unsigned prioq_index;
100                 } exit;
101         };
102 };
103
104 struct sd_event {
105         unsigned n_ref;
106
107         int epoll_fd;
108         int signal_fd;
109         int realtime_fd;
110         int monotonic_fd;
111         int watchdog_fd;
112
113         Prioq *pending;
114         Prioq *prepare;
115
116         /* For both clocks we maintain two priority queues each, one
117          * ordered for the earliest times the events may be
118          * dispatched, and one ordered by the latest times they must
119          * have been dispatched. The range between the top entries in
120          * the two prioqs is the time window we can freely schedule
121          * wakeups in */
122         Prioq *monotonic_earliest;
123         Prioq *monotonic_latest;
124         Prioq *realtime_earliest;
125         Prioq *realtime_latest;
126
127         usec_t realtime_next, monotonic_next;
128         usec_t perturb;
129
130         sigset_t sigset;
131         sd_event_source **signal_sources;
132
133         Hashmap *child_sources;
134         unsigned n_enabled_child_sources;
135
136         Prioq *exit;
137
138         pid_t original_pid;
139
140         unsigned iteration;
141         dual_timestamp timestamp;
142         int state;
143
144         bool exit_requested:1;
145         bool need_process_child:1;
146         bool watchdog:1;
147
148         int exit_code;
149
150         pid_t tid;
151         sd_event **default_event_ptr;
152
153         usec_t watchdog_last, watchdog_period;
154 };
155
156 static int pending_prioq_compare(const void *a, const void *b) {
157         const sd_event_source *x = a, *y = b;
158
159         assert(x->pending);
160         assert(y->pending);
161
162         /* Enabled ones first */
163         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
164                 return -1;
165         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
166                 return 1;
167
168         /* Lower priority values first */
169         if (x->priority < y->priority)
170                 return -1;
171         if (x->priority > y->priority)
172                 return 1;
173
174         /* Older entries first */
175         if (x->pending_iteration < y->pending_iteration)
176                 return -1;
177         if (x->pending_iteration > y->pending_iteration)
178                 return 1;
179
180         /* Stability for the rest */
181         if (x < y)
182                 return -1;
183         if (x > y)
184                 return 1;
185
186         return 0;
187 }
188
189 static int prepare_prioq_compare(const void *a, const void *b) {
190         const sd_event_source *x = a, *y = b;
191
192         assert(x->prepare);
193         assert(y->prepare);
194
195         /* Move most recently prepared ones last, so that we can stop
196          * preparing as soon as we hit one that has already been
197          * prepared in the current iteration */
198         if (x->prepare_iteration < y->prepare_iteration)
199                 return -1;
200         if (x->prepare_iteration > y->prepare_iteration)
201                 return 1;
202
203         /* Enabled ones first */
204         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
205                 return -1;
206         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
207                 return 1;
208
209         /* Lower priority values first */
210         if (x->priority < y->priority)
211                 return -1;
212         if (x->priority > y->priority)
213                 return 1;
214
215         /* Stability for the rest */
216         if (x < y)
217                 return -1;
218         if (x > y)
219                 return 1;
220
221         return 0;
222 }
223
224 static int earliest_time_prioq_compare(const void *a, const void *b) {
225         const sd_event_source *x = a, *y = b;
226
227         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
228         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
229
230         /* Enabled ones first */
231         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
232                 return -1;
233         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
234                 return 1;
235
236         /* Move the pending ones to the end */
237         if (!x->pending && y->pending)
238                 return -1;
239         if (x->pending && !y->pending)
240                 return 1;
241
242         /* Order by time */
243         if (x->time.next < y->time.next)
244                 return -1;
245         if (x->time.next > y->time.next)
246                 return 1;
247
248         /* Stability for the rest */
249         if (x < y)
250                 return -1;
251         if (x > y)
252                 return 1;
253
254         return 0;
255 }
256
257 static int latest_time_prioq_compare(const void *a, const void *b) {
258         const sd_event_source *x = a, *y = b;
259
260         assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
261                (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
262
263         /* Enabled ones first */
264         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
265                 return -1;
266         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
267                 return 1;
268
269         /* Move the pending ones to the end */
270         if (!x->pending && y->pending)
271                 return -1;
272         if (x->pending && !y->pending)
273                 return 1;
274
275         /* Order by time */
276         if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
277                 return -1;
278         if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
279                 return 1;
280
281         /* Stability for the rest */
282         if (x < y)
283                 return -1;
284         if (x > y)
285                 return 1;
286
287         return 0;
288 }
289
290 static int exit_prioq_compare(const void *a, const void *b) {
291         const sd_event_source *x = a, *y = b;
292
293         assert(x->type == SOURCE_EXIT);
294         assert(y->type == SOURCE_EXIT);
295
296         /* Enabled ones first */
297         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
298                 return -1;
299         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
300                 return 1;
301
302         /* Lower priority values first */
303         if (x->priority < y->priority)
304                 return -1;
305         if (x->priority > y->priority)
306                 return 1;
307
308         /* Stability for the rest */
309         if (x < y)
310                 return -1;
311         if (x > y)
312                 return 1;
313
314         return 0;
315 }
316
317 static void event_free(sd_event *e) {
318         assert(e);
319
320         if (e->default_event_ptr)
321                 *(e->default_event_ptr) = NULL;
322
323         if (e->epoll_fd >= 0)
324                 close_nointr_nofail(e->epoll_fd);
325
326         if (e->signal_fd >= 0)
327                 close_nointr_nofail(e->signal_fd);
328
329         if (e->realtime_fd >= 0)
330                 close_nointr_nofail(e->realtime_fd);
331
332         if (e->monotonic_fd >= 0)
333                 close_nointr_nofail(e->monotonic_fd);
334
335         if (e->watchdog_fd >= 0)
336                 close_nointr_nofail(e->watchdog_fd);
337
338         prioq_free(e->pending);
339         prioq_free(e->prepare);
340         prioq_free(e->monotonic_earliest);
341         prioq_free(e->monotonic_latest);
342         prioq_free(e->realtime_earliest);
343         prioq_free(e->realtime_latest);
344         prioq_free(e->exit);
345
346         free(e->signal_sources);
347
348         hashmap_free(e->child_sources);
349         free(e);
350 }
351
352 _public_ int sd_event_new(sd_event** ret) {
353         sd_event *e;
354         int r;
355
356         assert_return(ret, -EINVAL);
357
358         e = new0(sd_event, 1);
359         if (!e)
360                 return -ENOMEM;
361
362         e->n_ref = 1;
363         e->signal_fd = e->realtime_fd = e->monotonic_fd = e->watchdog_fd = e->epoll_fd = -1;
364         e->realtime_next = e->monotonic_next = (usec_t) -1;
365         e->original_pid = getpid();
366
367         assert_se(sigemptyset(&e->sigset) == 0);
368
369         e->pending = prioq_new(pending_prioq_compare);
370         if (!e->pending) {
371                 r = -ENOMEM;
372                 goto fail;
373         }
374
375         e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
376         if (e->epoll_fd < 0) {
377                 r = -errno;
378                 goto fail;
379         }
380
381         *ret = e;
382         return 0;
383
384 fail:
385         event_free(e);
386         return r;
387 }
388
389 _public_ sd_event* sd_event_ref(sd_event *e) {
390         assert_return(e, NULL);
391
392         assert(e->n_ref >= 1);
393         e->n_ref++;
394
395         return e;
396 }
397
398 _public_ sd_event* sd_event_unref(sd_event *e) {
399
400         if (!e)
401                 return NULL;
402
403         assert(e->n_ref >= 1);
404         e->n_ref--;
405
406         if (e->n_ref <= 0)
407                 event_free(e);
408
409         return NULL;
410 }
411
412 static bool event_pid_changed(sd_event *e) {
413         assert(e);
414
415         /* We don't support people creating am event loop and keeping
416          * it around over a fork(). Let's complain. */
417
418         return e->original_pid != getpid();
419 }
420
421 static int source_io_unregister(sd_event_source *s) {
422         int r;
423
424         assert(s);
425         assert(s->type == SOURCE_IO);
426
427         if (!s->io.registered)
428                 return 0;
429
430         r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
431         if (r < 0)
432                 return -errno;
433
434         s->io.registered = false;
435         return 0;
436 }
437
438 static int source_io_register(
439                 sd_event_source *s,
440                 int enabled,
441                 uint32_t events) {
442
443         struct epoll_event ev = {};
444         int r;
445
446         assert(s);
447         assert(s->type == SOURCE_IO);
448         assert(enabled != SD_EVENT_OFF);
449
450         ev.events = events;
451         ev.data.ptr = s;
452
453         if (enabled == SD_EVENT_ONESHOT)
454                 ev.events |= EPOLLONESHOT;
455
456         if (s->io.registered)
457                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
458         else
459                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
460
461         if (r < 0)
462                 return -errno;
463
464         s->io.registered = true;
465
466         return 0;
467 }
468
469 static void source_free(sd_event_source *s) {
470         assert(s);
471
472         if (s->event) {
473                 switch (s->type) {
474
475                 case SOURCE_IO:
476                         if (s->io.fd >= 0)
477                                 source_io_unregister(s);
478
479                         break;
480
481                 case SOURCE_MONOTONIC:
482                         prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
483                         prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
484                         break;
485
486                 case SOURCE_REALTIME:
487                         prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
488                         prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
489                         break;
490
491                 case SOURCE_SIGNAL:
492                         if (s->signal.sig > 0) {
493                                 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)
494                                         assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
495
496                                 if (s->event->signal_sources)
497                                         s->event->signal_sources[s->signal.sig] = NULL;
498                         }
499
500                         break;
501
502                 case SOURCE_CHILD:
503                         if (s->child.pid > 0) {
504                                 if (s->enabled != SD_EVENT_OFF) {
505                                         assert(s->event->n_enabled_child_sources > 0);
506                                         s->event->n_enabled_child_sources--;
507                                 }
508
509                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
510                                         assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
511
512                                 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
513                         }
514
515                         break;
516
517                 case SOURCE_DEFER:
518                         /* nothing */
519                         break;
520
521                 case SOURCE_EXIT:
522                         prioq_remove(s->event->exit, s, &s->exit.prioq_index);
523                         break;
524                 }
525
526                 if (s->pending)
527                         prioq_remove(s->event->pending, s, &s->pending_index);
528
529                 if (s->prepare)
530                         prioq_remove(s->event->prepare, s, &s->prepare_index);
531
532                 sd_event_unref(s->event);
533         }
534
535         free(s);
536 }
537
538 static int source_set_pending(sd_event_source *s, bool b) {
539         int r;
540
541         assert(s);
542         assert(s->type != SOURCE_EXIT);
543
544         if (s->pending == b)
545                 return 0;
546
547         s->pending = b;
548
549         if (b) {
550                 s->pending_iteration = s->event->iteration;
551
552                 r = prioq_put(s->event->pending, s, &s->pending_index);
553                 if (r < 0) {
554                         s->pending = false;
555                         return r;
556                 }
557         } else
558                 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
559
560         if (s->type == SOURCE_REALTIME) {
561                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
562                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
563         } else if (s->type == SOURCE_MONOTONIC) {
564                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
565                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
566         }
567
568         return 0;
569 }
570
571 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
572         sd_event_source *s;
573
574         assert(e);
575
576         s = new0(sd_event_source, 1);
577         if (!s)
578                 return NULL;
579
580         s->n_ref = 1;
581         s->event = sd_event_ref(e);
582         s->type = type;
583         s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
584
585         return s;
586 }
587
588 _public_ int sd_event_add_io(
589                 sd_event *e,
590                 int fd,
591                 uint32_t events,
592                 sd_event_io_handler_t callback,
593                 void *userdata,
594                 sd_event_source **ret) {
595
596         sd_event_source *s;
597         int r;
598
599         assert_return(e, -EINVAL);
600         assert_return(fd >= 0, -EINVAL);
601         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP|EPOLLET)), -EINVAL);
602         assert_return(callback, -EINVAL);
603         assert_return(ret, -EINVAL);
604         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
605         assert_return(!event_pid_changed(e), -ECHILD);
606
607         s = source_new(e, SOURCE_IO);
608         if (!s)
609                 return -ENOMEM;
610
611         s->io.fd = fd;
612         s->io.events = events;
613         s->io.callback = callback;
614         s->userdata = userdata;
615         s->enabled = SD_EVENT_ON;
616
617         r = source_io_register(s, s->enabled, events);
618         if (r < 0) {
619                 source_free(s);
620                 return -errno;
621         }
622
623         *ret = s;
624         return 0;
625 }
626
627 static int event_setup_timer_fd(
628                 sd_event *e,
629                 EventSourceType type,
630                 int *timer_fd,
631                 clockid_t id) {
632
633         struct epoll_event ev = {};
634         int r, fd;
635         sd_id128_t bootid;
636
637         assert(e);
638         assert(timer_fd);
639
640         if (_likely_(*timer_fd >= 0))
641                 return 0;
642
643         fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
644         if (fd < 0)
645                 return -errno;
646
647         ev.events = EPOLLIN;
648         ev.data.ptr = INT_TO_PTR(type);
649
650         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
651         if (r < 0) {
652                 close_nointr_nofail(fd);
653                 return -errno;
654         }
655
656         /* When we sleep for longer, we try to realign the wakeup to
657            the same time wihtin each minute/second/250ms, so that
658            events all across the system can be coalesced into a single
659            CPU wakeup. However, let's take some system-specific
660            randomness for this value, so that in a network of systems
661            with synced clocks timer events are distributed a
662            bit. Here, we calculate a perturbation usec offset from the
663            boot ID. */
664
665         if (sd_id128_get_boot(&bootid) >= 0)
666                 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_MINUTE;
667
668         *timer_fd = fd;
669         return 0;
670 }
671
672 static int event_add_time_internal(
673                 sd_event *e,
674                 EventSourceType type,
675                 int *timer_fd,
676                 clockid_t id,
677                 Prioq **earliest,
678                 Prioq **latest,
679                 uint64_t usec,
680                 uint64_t accuracy,
681                 sd_event_time_handler_t callback,
682                 void *userdata,
683                 sd_event_source **ret) {
684
685         sd_event_source *s;
686         int r;
687
688         assert_return(e, -EINVAL);
689         assert_return(callback, -EINVAL);
690         assert_return(ret, -EINVAL);
691         assert_return(usec != (uint64_t) -1, -EINVAL);
692         assert_return(accuracy != (uint64_t) -1, -EINVAL);
693         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
694         assert_return(!event_pid_changed(e), -ECHILD);
695
696         assert(timer_fd);
697         assert(earliest);
698         assert(latest);
699
700         if (!*earliest) {
701                 *earliest = prioq_new(earliest_time_prioq_compare);
702                 if (!*earliest)
703                         return -ENOMEM;
704         }
705
706         if (!*latest) {
707                 *latest = prioq_new(latest_time_prioq_compare);
708                 if (!*latest)
709                         return -ENOMEM;
710         }
711
712         if (*timer_fd < 0) {
713                 r = event_setup_timer_fd(e, type, timer_fd, id);
714                 if (r < 0)
715                         return r;
716         }
717
718         s = source_new(e, type);
719         if (!s)
720                 return -ENOMEM;
721
722         s->time.next = usec;
723         s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
724         s->time.callback = callback;
725         s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
726         s->userdata = userdata;
727         s->enabled = SD_EVENT_ONESHOT;
728
729         r = prioq_put(*earliest, s, &s->time.earliest_index);
730         if (r < 0)
731                 goto fail;
732
733         r = prioq_put(*latest, s, &s->time.latest_index);
734         if (r < 0)
735                 goto fail;
736
737         *ret = s;
738         return 0;
739
740 fail:
741         source_free(s);
742         return r;
743 }
744
745 _public_ int sd_event_add_monotonic(sd_event *e,
746                                     uint64_t usec,
747                                     uint64_t accuracy,
748                                     sd_event_time_handler_t callback,
749                                     void *userdata,
750                                     sd_event_source **ret) {
751
752         return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
753 }
754
755 _public_ int sd_event_add_realtime(sd_event *e,
756                                    uint64_t usec,
757                                    uint64_t accuracy,
758                                    sd_event_time_handler_t callback,
759                                    void *userdata,
760                                    sd_event_source **ret) {
761
762         return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
763 }
764
765 static int event_update_signal_fd(sd_event *e) {
766         struct epoll_event ev = {};
767         bool add_to_epoll;
768         int r;
769
770         assert(e);
771
772         add_to_epoll = e->signal_fd < 0;
773
774         r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
775         if (r < 0)
776                 return -errno;
777
778         e->signal_fd = r;
779
780         if (!add_to_epoll)
781                 return 0;
782
783         ev.events = EPOLLIN;
784         ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
785
786         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
787         if (r < 0) {
788                 close_nointr_nofail(e->signal_fd);
789                 e->signal_fd = -1;
790
791                 return -errno;
792         }
793
794         return 0;
795 }
796
797 _public_ int sd_event_add_signal(
798                 sd_event *e,
799                 int sig,
800                 sd_event_signal_handler_t callback,
801                 void *userdata,
802                 sd_event_source **ret) {
803
804         sd_event_source *s;
805         int r;
806
807         assert_return(e, -EINVAL);
808         assert_return(sig > 0, -EINVAL);
809         assert_return(sig < _NSIG, -EINVAL);
810         assert_return(callback, -EINVAL);
811         assert_return(ret, -EINVAL);
812         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
813         assert_return(!event_pid_changed(e), -ECHILD);
814
815         if (!e->signal_sources) {
816                 e->signal_sources = new0(sd_event_source*, _NSIG);
817                 if (!e->signal_sources)
818                         return -ENOMEM;
819         } else if (e->signal_sources[sig])
820                 return -EBUSY;
821
822         s = source_new(e, SOURCE_SIGNAL);
823         if (!s)
824                 return -ENOMEM;
825
826         s->signal.sig = sig;
827         s->signal.callback = callback;
828         s->userdata = userdata;
829         s->enabled = SD_EVENT_ON;
830
831         e->signal_sources[sig] = s;
832         assert_se(sigaddset(&e->sigset, sig) == 0);
833
834         if (sig != SIGCHLD || e->n_enabled_child_sources == 0) {
835                 r = event_update_signal_fd(e);
836                 if (r < 0) {
837                         source_free(s);
838                         return r;
839                 }
840         }
841
842         *ret = s;
843         return 0;
844 }
845
846 _public_ int sd_event_add_child(
847                 sd_event *e,
848                 pid_t pid,
849                 int options,
850                 sd_event_child_handler_t callback,
851                 void *userdata,
852                 sd_event_source **ret) {
853
854         sd_event_source *s;
855         int r;
856
857         assert_return(e, -EINVAL);
858         assert_return(pid > 1, -EINVAL);
859         assert_return(!(options & ~(WEXITED|WSTOPPED|WCONTINUED)), -EINVAL);
860         assert_return(options != 0, -EINVAL);
861         assert_return(callback, -EINVAL);
862         assert_return(ret, -EINVAL);
863         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
864         assert_return(!event_pid_changed(e), -ECHILD);
865
866         r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
867         if (r < 0)
868                 return r;
869
870         if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
871                 return -EBUSY;
872
873         s = source_new(e, SOURCE_CHILD);
874         if (!s)
875                 return -ENOMEM;
876
877         s->child.pid = pid;
878         s->child.options = options;
879         s->child.callback = callback;
880         s->userdata = userdata;
881         s->enabled = SD_EVENT_ONESHOT;
882
883         r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
884         if (r < 0) {
885                 source_free(s);
886                 return r;
887         }
888
889         e->n_enabled_child_sources ++;
890
891         assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
892
893         if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
894                 r = event_update_signal_fd(e);
895                 if (r < 0) {
896                         source_free(s);
897                         return -errno;
898                 }
899         }
900
901         e->need_process_child = true;
902
903         *ret = s;
904         return 0;
905 }
906
907 _public_ int sd_event_add_defer(
908                 sd_event *e,
909                 sd_event_handler_t callback,
910                 void *userdata,
911                 sd_event_source **ret) {
912
913         sd_event_source *s;
914         int r;
915
916         assert_return(e, -EINVAL);
917         assert_return(callback, -EINVAL);
918         assert_return(ret, -EINVAL);
919         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
920         assert_return(!event_pid_changed(e), -ECHILD);
921
922         s = source_new(e, SOURCE_DEFER);
923         if (!s)
924                 return -ENOMEM;
925
926         s->defer.callback = callback;
927         s->userdata = userdata;
928         s->enabled = SD_EVENT_ONESHOT;
929
930         r = source_set_pending(s, true);
931         if (r < 0) {
932                 source_free(s);
933                 return r;
934         }
935
936         *ret = s;
937         return 0;
938 }
939
940 _public_ int sd_event_add_exit(
941                 sd_event *e,
942                 sd_event_handler_t callback,
943                 void *userdata,
944                 sd_event_source **ret) {
945
946         sd_event_source *s;
947         int r;
948
949         assert_return(e, -EINVAL);
950         assert_return(callback, -EINVAL);
951         assert_return(ret, -EINVAL);
952         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
953         assert_return(!event_pid_changed(e), -ECHILD);
954
955         if (!e->exit) {
956                 e->exit = prioq_new(exit_prioq_compare);
957                 if (!e->exit)
958                         return -ENOMEM;
959         }
960
961         s = source_new(e, SOURCE_EXIT);
962         if (!s)
963                 return -ENOMEM;
964
965         s->exit.callback = callback;
966         s->userdata = userdata;
967         s->exit.prioq_index = PRIOQ_IDX_NULL;
968         s->enabled = SD_EVENT_ONESHOT;
969
970         r = prioq_put(s->event->exit, s, &s->exit.prioq_index);
971         if (r < 0) {
972                 source_free(s);
973                 return r;
974         }
975
976         *ret = s;
977         return 0;
978 }
979
980 _public_ sd_event_source* sd_event_source_ref(sd_event_source *s) {
981         assert_return(s, NULL);
982
983         assert(s->n_ref >= 1);
984         s->n_ref++;
985
986         return s;
987 }
988
989 _public_ sd_event_source* sd_event_source_unref(sd_event_source *s) {
990
991         if (!s)
992                 return NULL;
993
994         assert(s->n_ref >= 1);
995         s->n_ref--;
996
997         if (s->n_ref <= 0) {
998                 /* Here's a special hack: when we are called from a
999                  * dispatch handler we won't free the event source
1000                  * immediately, but we will detach the fd from the
1001                  * epoll. This way it is safe for the caller to unref
1002                  * the event source and immediately close the fd, but
1003                  * we still retain a valid event source object after
1004                  * the callback. */
1005
1006                 if (s->dispatching) {
1007                         if (s->type == SOURCE_IO)
1008                                 source_io_unregister(s);
1009                 } else
1010                         source_free(s);
1011         }
1012
1013         return NULL;
1014 }
1015
1016 _public_ sd_event *sd_event_source_get_event(sd_event_source *s) {
1017         assert_return(s, NULL);
1018
1019         return s->event;
1020 }
1021
1022 _public_ int sd_event_source_get_pending(sd_event_source *s) {
1023         assert_return(s, -EINVAL);
1024         assert_return(s->type != SOURCE_EXIT, -EDOM);
1025         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1026         assert_return(!event_pid_changed(s->event), -ECHILD);
1027
1028         return s->pending;
1029 }
1030
1031 _public_ int sd_event_source_get_io_fd(sd_event_source *s) {
1032         assert_return(s, -EINVAL);
1033         assert_return(s->type == SOURCE_IO, -EDOM);
1034         assert_return(!event_pid_changed(s->event), -ECHILD);
1035
1036         return s->io.fd;
1037 }
1038
1039 _public_ int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
1040         assert_return(s, -EINVAL);
1041         assert_return(events, -EINVAL);
1042         assert_return(s->type == SOURCE_IO, -EDOM);
1043         assert_return(!event_pid_changed(s->event), -ECHILD);
1044
1045         *events = s->io.events;
1046         return 0;
1047 }
1048
1049 _public_ int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
1050         int r;
1051
1052         assert_return(s, -EINVAL);
1053         assert_return(s->type == SOURCE_IO, -EDOM);
1054         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP|EPOLLET)), -EINVAL);
1055         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1056         assert_return(!event_pid_changed(s->event), -ECHILD);
1057
1058         if (s->io.events == events)
1059                 return 0;
1060
1061         if (s->enabled != SD_EVENT_OFF) {
1062                 r = source_io_register(s, s->enabled, events);
1063                 if (r < 0)
1064                         return r;
1065         }
1066
1067         s->io.events = events;
1068         source_set_pending(s, false);
1069
1070         return 0;
1071 }
1072
1073 _public_ int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1074         assert_return(s, -EINVAL);
1075         assert_return(revents, -EINVAL);
1076         assert_return(s->type == SOURCE_IO, -EDOM);
1077         assert_return(s->pending, -ENODATA);
1078         assert_return(!event_pid_changed(s->event), -ECHILD);
1079
1080         *revents = s->io.revents;
1081         return 0;
1082 }
1083
1084 _public_ int sd_event_source_get_signal(sd_event_source *s) {
1085         assert_return(s, -EINVAL);
1086         assert_return(s->type == SOURCE_SIGNAL, -EDOM);
1087         assert_return(!event_pid_changed(s->event), -ECHILD);
1088
1089         return s->signal.sig;
1090 }
1091
1092 _public_ int sd_event_source_get_priority(sd_event_source *s, int *priority) {
1093         assert_return(s, -EINVAL);
1094         assert_return(!event_pid_changed(s->event), -ECHILD);
1095
1096         return s->priority;
1097 }
1098
1099 _public_ int sd_event_source_set_priority(sd_event_source *s, int priority) {
1100         assert_return(s, -EINVAL);
1101         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1102         assert_return(!event_pid_changed(s->event), -ECHILD);
1103
1104         if (s->priority == priority)
1105                 return 0;
1106
1107         s->priority = priority;
1108
1109         if (s->pending)
1110                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1111
1112         if (s->prepare)
1113                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1114
1115         if (s->type == SOURCE_EXIT)
1116                 prioq_reshuffle(s->event->exit, s, &s->exit.prioq_index);
1117
1118         return 0;
1119 }
1120
1121 _public_ int sd_event_source_get_enabled(sd_event_source *s, int *m) {
1122         assert_return(s, -EINVAL);
1123         assert_return(m, -EINVAL);
1124         assert_return(!event_pid_changed(s->event), -ECHILD);
1125
1126         *m = s->enabled;
1127         return 0;
1128 }
1129
1130 _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
1131         int r;
1132
1133         assert_return(s, -EINVAL);
1134         assert_return(m == SD_EVENT_OFF || m == SD_EVENT_ON || m == SD_EVENT_ONESHOT, -EINVAL);
1135         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1136         assert_return(!event_pid_changed(s->event), -ECHILD);
1137
1138         if (s->enabled == m)
1139                 return 0;
1140
1141         if (m == SD_EVENT_OFF) {
1142
1143                 switch (s->type) {
1144
1145                 case SOURCE_IO:
1146                         r = source_io_unregister(s);
1147                         if (r < 0)
1148                                 return r;
1149
1150                         s->enabled = m;
1151                         break;
1152
1153                 case SOURCE_MONOTONIC:
1154                         s->enabled = m;
1155                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1156                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1157                         break;
1158
1159                 case SOURCE_REALTIME:
1160                         s->enabled = m;
1161                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1162                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1163                         break;
1164
1165                 case SOURCE_SIGNAL:
1166                         s->enabled = m;
1167                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1168                                 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1169                                 event_update_signal_fd(s->event);
1170                         }
1171
1172                         break;
1173
1174                 case SOURCE_CHILD:
1175                         s->enabled = m;
1176
1177                         assert(s->event->n_enabled_child_sources > 0);
1178                         s->event->n_enabled_child_sources--;
1179
1180                         if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1181                                 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1182                                 event_update_signal_fd(s->event);
1183                         }
1184
1185                         break;
1186
1187                 case SOURCE_EXIT:
1188                         s->enabled = m;
1189                         prioq_reshuffle(s->event->exit, s, &s->exit.prioq_index);
1190                         break;
1191
1192                 case SOURCE_DEFER:
1193                         s->enabled = m;
1194                         break;
1195                 }
1196
1197         } else {
1198                 switch (s->type) {
1199
1200                 case SOURCE_IO:
1201                         r = source_io_register(s, m, s->io.events);
1202                         if (r < 0)
1203                                 return r;
1204
1205                         s->enabled = m;
1206                         break;
1207
1208                 case SOURCE_MONOTONIC:
1209                         s->enabled = m;
1210                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1211                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1212                         break;
1213
1214                 case SOURCE_REALTIME:
1215                         s->enabled = m;
1216                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1217                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1218                         break;
1219
1220                 case SOURCE_SIGNAL:
1221                         s->enabled = m;
1222
1223                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)  {
1224                                 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1225                                 event_update_signal_fd(s->event);
1226                         }
1227                         break;
1228
1229                 case SOURCE_CHILD:
1230                         s->enabled = m;
1231
1232                         if (s->enabled == SD_EVENT_OFF) {
1233                                 s->event->n_enabled_child_sources++;
1234
1235                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1236                                         assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1237                                         event_update_signal_fd(s->event);
1238                                 }
1239                         }
1240                         break;
1241
1242                 case SOURCE_EXIT:
1243                         s->enabled = m;
1244                         prioq_reshuffle(s->event->exit, s, &s->exit.prioq_index);
1245                         break;
1246
1247                 case SOURCE_DEFER:
1248                         s->enabled = m;
1249                         break;
1250                 }
1251         }
1252
1253         if (s->pending)
1254                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1255
1256         if (s->prepare)
1257                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1258
1259         return 0;
1260 }
1261
1262 _public_ int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1263         assert_return(s, -EINVAL);
1264         assert_return(usec, -EINVAL);
1265         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1266         assert_return(!event_pid_changed(s->event), -ECHILD);
1267
1268         *usec = s->time.next;
1269         return 0;
1270 }
1271
1272 _public_ int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1273         assert_return(s, -EINVAL);
1274         assert_return(usec != (uint64_t) -1, -EINVAL);
1275         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1276         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1277         assert_return(!event_pid_changed(s->event), -ECHILD);
1278
1279         s->time.next = usec;
1280
1281         source_set_pending(s, false);
1282
1283         if (s->type == SOURCE_REALTIME) {
1284                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1285                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1286         } else {
1287                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1288                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1289         }
1290
1291         return 0;
1292 }
1293
1294 _public_ int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1295         assert_return(s, -EINVAL);
1296         assert_return(usec, -EINVAL);
1297         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1298         assert_return(!event_pid_changed(s->event), -ECHILD);
1299
1300         *usec = s->time.accuracy;
1301         return 0;
1302 }
1303
1304 _public_ int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1305         assert_return(s, -EINVAL);
1306         assert_return(usec != (uint64_t) -1, -EINVAL);
1307         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1308         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1309         assert_return(!event_pid_changed(s->event), -ECHILD);
1310
1311         if (usec == 0)
1312                 usec = DEFAULT_ACCURACY_USEC;
1313
1314         s->time.accuracy = usec;
1315
1316         source_set_pending(s, false);
1317
1318         if (s->type == SOURCE_REALTIME)
1319                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1320         else
1321                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1322
1323         return 0;
1324 }
1325
1326 _public_ int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid) {
1327         assert_return(s, -EINVAL);
1328         assert_return(pid, -EINVAL);
1329         assert_return(s->type == SOURCE_CHILD, -EDOM);
1330         assert_return(!event_pid_changed(s->event), -ECHILD);
1331
1332         *pid = s->child.pid;
1333         return 0;
1334 }
1335
1336 _public_ int sd_event_source_set_prepare(sd_event_source *s, sd_event_handler_t callback) {
1337         int r;
1338
1339         assert_return(s, -EINVAL);
1340         assert_return(s->type != SOURCE_EXIT, -EDOM);
1341         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1342         assert_return(!event_pid_changed(s->event), -ECHILD);
1343
1344         if (s->prepare == callback)
1345                 return 0;
1346
1347         if (callback && s->prepare) {
1348                 s->prepare = callback;
1349                 return 0;
1350         }
1351
1352         r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1353         if (r < 0)
1354                 return r;
1355
1356         s->prepare = callback;
1357
1358         if (callback) {
1359                 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1360                 if (r < 0)
1361                         return r;
1362         } else
1363                 prioq_remove(s->event->prepare, s, &s->prepare_index);
1364
1365         return 0;
1366 }
1367
1368 _public_ void* sd_event_source_get_userdata(sd_event_source *s) {
1369         assert_return(s, NULL);
1370
1371         return s->userdata;
1372 }
1373
1374 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1375         usec_t c;
1376         assert(e);
1377         assert(a <= b);
1378
1379         if (a <= 0)
1380                 return 0;
1381
1382         if (b <= a + 1)
1383                 return a;
1384
1385         /*
1386           Find a good time to wake up again between times a and b. We
1387           have two goals here:
1388
1389           a) We want to wake up as seldom as possible, hence prefer
1390              later times over earlier times.
1391
1392           b) But if we have to wake up, then let's make sure to
1393              dispatch as much as possible on the entire system.
1394
1395           We implement this by waking up everywhere at the same time
1396           within any given minute if we can, synchronised via the
1397           perturbation value determined from the boot ID. If we can't,
1398           then we try to find the same spot in every 10s, then 1s and
1399           then 250ms step. Otherwise, we pick the last possible time
1400           to wake up.
1401         */
1402
1403         c = (b / USEC_PER_MINUTE) * USEC_PER_MINUTE + e->perturb;
1404         if (c >= b) {
1405                 if (_unlikely_(c < USEC_PER_MINUTE))
1406                         return b;
1407
1408                 c -= USEC_PER_MINUTE;
1409         }
1410
1411         if (c >= a)
1412                 return c;
1413
1414         c = (b / (USEC_PER_SEC*10)) * (USEC_PER_SEC*10) + (e->perturb % (USEC_PER_SEC*10));
1415         if (c >= b) {
1416                 if (_unlikely_(c < USEC_PER_SEC*10))
1417                         return b;
1418
1419                 c -= USEC_PER_SEC*10;
1420         }
1421
1422         if (c >= a)
1423                 return c;
1424
1425         c = (b / USEC_PER_SEC) * USEC_PER_SEC + (e->perturb % USEC_PER_SEC);
1426         if (c >= b) {
1427                 if (_unlikely_(c < USEC_PER_SEC))
1428                         return b;
1429
1430                 c -= USEC_PER_SEC;
1431         }
1432
1433         if (c >= a)
1434                 return c;
1435
1436         c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1437         if (c >= b) {
1438                 if (_unlikely_(c < USEC_PER_MSEC*250))
1439                         return b;
1440
1441                 c -= USEC_PER_MSEC*250;
1442         }
1443
1444         if (c >= a)
1445                 return c;
1446
1447         return b;
1448 }
1449
1450 static int event_arm_timer(
1451                 sd_event *e,
1452                 int timer_fd,
1453                 Prioq *earliest,
1454                 Prioq *latest,
1455                 usec_t *next) {
1456
1457         struct itimerspec its = {};
1458         sd_event_source *a, *b;
1459         usec_t t;
1460         int r;
1461
1462         assert(e);
1463         assert(next);
1464
1465         a = prioq_peek(earliest);
1466         if (!a || a->enabled == SD_EVENT_OFF) {
1467
1468                 if (timer_fd < 0)
1469                         return 0;
1470
1471                 if (*next == (usec_t) -1)
1472                         return 0;
1473
1474                 /* disarm */
1475                 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1476                 if (r < 0)
1477                         return r;
1478
1479                 *next = (usec_t) -1;
1480
1481                 return 0;
1482         }
1483
1484         b = prioq_peek(latest);
1485         assert_se(b && b->enabled != SD_EVENT_OFF);
1486
1487         t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1488         if (*next == t)
1489                 return 0;
1490
1491         assert_se(timer_fd >= 0);
1492
1493         if (t == 0) {
1494                 /* We don' want to disarm here, just mean some time looooong ago. */
1495                 its.it_value.tv_sec = 0;
1496                 its.it_value.tv_nsec = 1;
1497         } else
1498                 timespec_store(&its.it_value, t);
1499
1500         r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1501         if (r < 0)
1502                 return -errno;
1503
1504         *next = t;
1505         return 0;
1506 }
1507
1508 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1509         assert(e);
1510         assert(s);
1511         assert(s->type == SOURCE_IO);
1512
1513         s->io.revents = events;
1514
1515         return source_set_pending(s, true);
1516 }
1517
1518 static int flush_timer(sd_event *e, int fd, uint32_t events, usec_t *next) {
1519         uint64_t x;
1520         ssize_t ss;
1521
1522         assert(e);
1523         assert(fd >= 0);
1524
1525         assert_return(events == EPOLLIN, -EIO);
1526
1527         ss = read(fd, &x, sizeof(x));
1528         if (ss < 0) {
1529                 if (errno == EAGAIN || errno == EINTR)
1530                         return 0;
1531
1532                 return -errno;
1533         }
1534
1535         if (ss != sizeof(x))
1536                 return -EIO;
1537
1538         if (next)
1539                 *next = (usec_t) -1;
1540
1541         return 0;
1542 }
1543
1544 static int process_timer(
1545                 sd_event *e,
1546                 usec_t n,
1547                 Prioq *earliest,
1548                 Prioq *latest) {
1549
1550         sd_event_source *s;
1551         int r;
1552
1553         assert(e);
1554
1555         for (;;) {
1556                 s = prioq_peek(earliest);
1557                 if (!s ||
1558                     s->time.next > n ||
1559                     s->enabled == SD_EVENT_OFF ||
1560                     s->pending)
1561                         break;
1562
1563                 r = source_set_pending(s, true);
1564                 if (r < 0)
1565                         return r;
1566
1567                 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1568                 prioq_reshuffle(latest, s, &s->time.latest_index);
1569         }
1570
1571         return 0;
1572 }
1573
1574 static int process_child(sd_event *e) {
1575         sd_event_source *s;
1576         Iterator i;
1577         int r;
1578
1579         assert(e);
1580
1581         e->need_process_child = false;
1582
1583         /*
1584            So, this is ugly. We iteratively invoke waitid() with P_PID
1585            + WNOHANG for each PID we wait for, instead of using
1586            P_ALL. This is because we only want to get child
1587            information of very specific child processes, and not all
1588            of them. We might not have processed the SIGCHLD even of a
1589            previous invocation and we don't want to maintain a
1590            unbounded *per-child* event queue, hence we really don't
1591            want anything flushed out of the kernel's queue that we
1592            don't care about. Since this is O(n) this means that if you
1593            have a lot of processes you probably want to handle SIGCHLD
1594            yourself.
1595
1596            We do not reap the children here (by using WNOWAIT), this
1597            is only done after the event source is dispatched so that
1598            the callback still sees the process as a zombie.
1599         */
1600
1601         HASHMAP_FOREACH(s, e->child_sources, i) {
1602                 assert(s->type == SOURCE_CHILD);
1603
1604                 if (s->pending)
1605                         continue;
1606
1607                 if (s->enabled == SD_EVENT_OFF)
1608                         continue;
1609
1610                 zero(s->child.siginfo);
1611                 r = waitid(P_PID, s->child.pid, &s->child.siginfo,
1612                            WNOHANG | (s->child.options & WEXITED ? WNOWAIT : 0) | s->child.options);
1613                 if (r < 0)
1614                         return -errno;
1615
1616                 if (s->child.siginfo.si_pid != 0) {
1617                         bool zombie =
1618                                 s->child.siginfo.si_code == CLD_EXITED ||
1619                                 s->child.siginfo.si_code == CLD_KILLED ||
1620                                 s->child.siginfo.si_code == CLD_DUMPED;
1621
1622                         if (!zombie && (s->child.options & WEXITED)) {
1623                                 /* If the child isn't dead then let's
1624                                  * immediately remove the state change
1625                                  * from the queue, since there's no
1626                                  * benefit in leaving it queued */
1627
1628                                 assert(s->child.options & (WSTOPPED|WCONTINUED));
1629                                 waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|(s->child.options & (WSTOPPED|WCONTINUED)));
1630                         }
1631
1632                         r = source_set_pending(s, true);
1633                         if (r < 0)
1634                                 return r;
1635                 }
1636         }
1637
1638         return 0;
1639 }
1640
1641 static int process_signal(sd_event *e, uint32_t events) {
1642         bool read_one = false;
1643         int r;
1644
1645         assert(e);
1646         assert(e->signal_sources);
1647
1648         assert_return(events == EPOLLIN, -EIO);
1649
1650         for (;;) {
1651                 struct signalfd_siginfo si;
1652                 ssize_t ss;
1653                 sd_event_source *s;
1654
1655                 ss = read(e->signal_fd, &si, sizeof(si));
1656                 if (ss < 0) {
1657                         if (errno == EAGAIN || errno == EINTR)
1658                                 return read_one;
1659
1660                         return -errno;
1661                 }
1662
1663                 if (ss != sizeof(si))
1664                         return -EIO;
1665
1666                 read_one = true;
1667
1668                 s = e->signal_sources[si.ssi_signo];
1669                 if (si.ssi_signo == SIGCHLD) {
1670                         r = process_child(e);
1671                         if (r < 0)
1672                                 return r;
1673                         if (r > 0 || !s)
1674                                 continue;
1675                 } else
1676                         if (!s)
1677                                 return -EIO;
1678
1679                 s->signal.siginfo = si;
1680                 r = source_set_pending(s, true);
1681                 if (r < 0)
1682                         return r;
1683         }
1684
1685         return 0;
1686 }
1687
1688 static int source_dispatch(sd_event_source *s) {
1689         int r = 0;
1690
1691         assert(s);
1692         assert(s->pending || s->type == SOURCE_EXIT);
1693
1694         if (s->type != SOURCE_DEFER && s->type != SOURCE_EXIT) {
1695                 r = source_set_pending(s, false);
1696                 if (r < 0)
1697                         return r;
1698         }
1699
1700         if (s->enabled == SD_EVENT_ONESHOT) {
1701                 r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
1702                 if (r < 0)
1703                         return r;
1704         }
1705
1706         s->dispatching = true;
1707
1708         switch (s->type) {
1709
1710         case SOURCE_IO:
1711                 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1712                 break;
1713
1714         case SOURCE_MONOTONIC:
1715                 r = s->time.callback(s, s->time.next, s->userdata);
1716                 break;
1717
1718         case SOURCE_REALTIME:
1719                 r = s->time.callback(s, s->time.next, s->userdata);
1720                 break;
1721
1722         case SOURCE_SIGNAL:
1723                 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1724                 break;
1725
1726         case SOURCE_CHILD: {
1727                 bool zombie;
1728
1729                 zombie = s->child.siginfo.si_code == CLD_EXITED ||
1730                          s->child.siginfo.si_code == CLD_KILLED ||
1731                          s->child.siginfo.si_code == CLD_DUMPED;
1732
1733                 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1734
1735                 /* Now, reap the PID for good. */
1736                 if (zombie)
1737                         waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|WEXITED);
1738
1739                 break;
1740         }
1741
1742         case SOURCE_DEFER:
1743                 r = s->defer.callback(s, s->userdata);
1744                 break;
1745
1746         case SOURCE_EXIT:
1747                 r = s->exit.callback(s, s->userdata);
1748                 break;
1749         }
1750
1751         s->dispatching = false;
1752
1753         if (r < 0)
1754                 log_debug("Event source %p returned error, disabling: %s", s, strerror(-r));
1755
1756         if (s->n_ref == 0)
1757                 source_free(s);
1758         else if (r < 0)
1759                 sd_event_source_set_enabled(s, SD_EVENT_OFF);
1760
1761         return 1;
1762 }
1763
1764 static int event_prepare(sd_event *e) {
1765         int r;
1766
1767         assert(e);
1768
1769         for (;;) {
1770                 sd_event_source *s;
1771
1772                 s = prioq_peek(e->prepare);
1773                 if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
1774                         break;
1775
1776                 s->prepare_iteration = e->iteration;
1777                 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1778                 if (r < 0)
1779                         return r;
1780
1781                 assert(s->prepare);
1782
1783                 s->dispatching = true;
1784                 r = s->prepare(s, s->userdata);
1785                 s->dispatching = false;
1786
1787                 if (r < 0)
1788                         log_debug("Prepare callback of event source %p returned error, disabling: %s", s, strerror(-r));
1789
1790                 if (s->n_ref == 0)
1791                         source_free(s);
1792                 else if (r < 0)
1793                         sd_event_source_set_enabled(s, SD_EVENT_OFF);
1794         }
1795
1796         return 0;
1797 }
1798
1799 static int dispatch_exit(sd_event *e) {
1800         sd_event_source *p;
1801         int r;
1802
1803         assert(e);
1804
1805         p = prioq_peek(e->exit);
1806         if (!p || p->enabled == SD_EVENT_OFF) {
1807                 e->state = SD_EVENT_FINISHED;
1808                 return 0;
1809         }
1810
1811         sd_event_ref(e);
1812         e->iteration++;
1813         e->state = SD_EVENT_EXITING;
1814
1815         r = source_dispatch(p);
1816
1817         e->state = SD_EVENT_PASSIVE;
1818         sd_event_unref(e);
1819
1820         return r;
1821 }
1822
1823 static sd_event_source* event_next_pending(sd_event *e) {
1824         sd_event_source *p;
1825
1826         assert(e);
1827
1828         p = prioq_peek(e->pending);
1829         if (!p)
1830                 return NULL;
1831
1832         if (p->enabled == SD_EVENT_OFF)
1833                 return NULL;
1834
1835         return p;
1836 }
1837
1838 static int arm_watchdog(sd_event *e) {
1839         struct itimerspec its = {};
1840         usec_t t;
1841         int r;
1842
1843         assert(e);
1844         assert(e->watchdog_fd >= 0);
1845
1846         t = sleep_between(e,
1847                           e->watchdog_last + (e->watchdog_period / 2),
1848                           e->watchdog_last + (e->watchdog_period * 3 / 4));
1849
1850         timespec_store(&its.it_value, t);
1851
1852         r = timerfd_settime(e->watchdog_fd, TFD_TIMER_ABSTIME, &its, NULL);
1853         if (r < 0)
1854                 return -errno;
1855
1856         return 0;
1857 }
1858
1859 static int process_watchdog(sd_event *e) {
1860         assert(e);
1861
1862         if (!e->watchdog)
1863                 return 0;
1864
1865         /* Don't notify watchdog too often */
1866         if (e->watchdog_last + e->watchdog_period / 4 > e->timestamp.monotonic)
1867                 return 0;
1868
1869         sd_notify(false, "WATCHDOG=1");
1870         e->watchdog_last = e->timestamp.monotonic;
1871
1872         return arm_watchdog(e);
1873 }
1874
1875 _public_ int sd_event_run(sd_event *e, uint64_t timeout) {
1876         struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1877         sd_event_source *p;
1878         int r, i, m;
1879
1880         assert_return(e, -EINVAL);
1881         assert_return(!event_pid_changed(e), -ECHILD);
1882         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1883         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1884
1885         if (e->exit_requested)
1886                 return dispatch_exit(e);
1887
1888         sd_event_ref(e);
1889         e->iteration++;
1890         e->state = SD_EVENT_RUNNING;
1891
1892         r = event_prepare(e);
1893         if (r < 0)
1894                 goto finish;
1895
1896         r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1897         if (r < 0)
1898                 goto finish;
1899
1900         r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1901         if (r < 0)
1902                 goto finish;
1903
1904         if (event_next_pending(e) || e->need_process_child)
1905                 timeout = 0;
1906
1907         m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1908                        timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1909         if (m < 0) {
1910                 r = errno == EAGAIN || errno == EINTR ? 0 : -errno;
1911                 goto finish;
1912         }
1913
1914         dual_timestamp_get(&e->timestamp);
1915
1916         for (i = 0; i < m; i++) {
1917
1918                 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1919                         r = flush_timer(e, e->monotonic_fd, ev_queue[i].events, &e->monotonic_next);
1920                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1921                         r = flush_timer(e, e->realtime_fd, ev_queue[i].events, &e->realtime_next);
1922                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1923                         r = process_signal(e, ev_queue[i].events);
1924                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_WATCHDOG))
1925                         r = flush_timer(e, e->watchdog_fd, ev_queue[i].events, NULL);
1926                 else
1927                         r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1928
1929                 if (r < 0)
1930                         goto finish;
1931         }
1932
1933         r = process_watchdog(e);
1934         if (r < 0)
1935                 goto finish;
1936
1937         r = process_timer(e, e->timestamp.monotonic, e->monotonic_earliest, e->monotonic_latest);
1938         if (r < 0)
1939                 goto finish;
1940
1941         r = process_timer(e, e->timestamp.realtime, e->realtime_earliest, e->realtime_latest);
1942         if (r < 0)
1943                 goto finish;
1944
1945         if (e->need_process_child) {
1946                 r = process_child(e);
1947                 if (r < 0)
1948                         goto finish;
1949         }
1950
1951         p = event_next_pending(e);
1952         if (!p) {
1953                 r = 0;
1954                 goto finish;
1955         }
1956
1957         r = source_dispatch(p);
1958
1959 finish:
1960         e->state = SD_EVENT_PASSIVE;
1961         sd_event_unref(e);
1962
1963         return r;
1964 }
1965
1966 _public_ int sd_event_loop(sd_event *e) {
1967         int r;
1968
1969         assert_return(e, -EINVAL);
1970         assert_return(!event_pid_changed(e), -ECHILD);
1971         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1972
1973         sd_event_ref(e);
1974
1975         while (e->state != SD_EVENT_FINISHED) {
1976                 r = sd_event_run(e, (uint64_t) -1);
1977                 if (r < 0)
1978                         goto finish;
1979         }
1980
1981         r = e->exit_code;
1982
1983 finish:
1984         sd_event_unref(e);
1985         return r;
1986 }
1987
1988 _public_ int sd_event_get_state(sd_event *e) {
1989         assert_return(e, -EINVAL);
1990         assert_return(!event_pid_changed(e), -ECHILD);
1991
1992         return e->state;
1993 }
1994
1995 _public_ int sd_event_get_exit_code(sd_event *e, int *code) {
1996         assert_return(e, -EINVAL);
1997         assert_return(code, -EINVAL);
1998         assert_return(!event_pid_changed(e), -ECHILD);
1999
2000         if (!e->exit_requested)
2001                 return -ENODATA;
2002
2003         *code = e->exit_code;
2004         return 0;
2005 }
2006
2007 _public_ int sd_event_exit(sd_event *e, int code) {
2008         assert_return(e, -EINVAL);
2009         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
2010         assert_return(!event_pid_changed(e), -ECHILD);
2011
2012         e->exit_requested = true;
2013         e->exit_code = code;
2014
2015         return 0;
2016 }
2017
2018 _public_ int sd_event_get_now_realtime(sd_event *e, uint64_t *usec) {
2019         assert_return(e, -EINVAL);
2020         assert_return(usec, -EINVAL);
2021         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
2022         assert_return(!event_pid_changed(e), -ECHILD);
2023
2024         *usec = e->timestamp.realtime;
2025         return 0;
2026 }
2027
2028 _public_ int sd_event_get_now_monotonic(sd_event *e, uint64_t *usec) {
2029         assert_return(e, -EINVAL);
2030         assert_return(usec, -EINVAL);
2031         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
2032         assert_return(!event_pid_changed(e), -ECHILD);
2033
2034         *usec = e->timestamp.monotonic;
2035         return 0;
2036 }
2037
2038 _public_ int sd_event_default(sd_event **ret) {
2039
2040         static __thread sd_event *default_event = NULL;
2041         sd_event *e;
2042         int r;
2043
2044         if (!ret)
2045                 return !!default_event;
2046
2047         if (default_event) {
2048                 *ret = sd_event_ref(default_event);
2049                 return 0;
2050         }
2051
2052         r = sd_event_new(&e);
2053         if (r < 0)
2054                 return r;
2055
2056         e->default_event_ptr = &default_event;
2057         e->tid = gettid();
2058         default_event = e;
2059
2060         *ret = e;
2061         return 1;
2062 }
2063
2064 _public_ int sd_event_get_tid(sd_event *e, pid_t *tid) {
2065         assert_return(e, -EINVAL);
2066         assert_return(tid, -EINVAL);
2067         assert_return(!event_pid_changed(e), -ECHILD);
2068
2069         if (e->tid != 0) {
2070                 *tid = e->tid;
2071                 return 0;
2072         }
2073
2074         return -ENXIO;
2075 }
2076
2077 _public_ int sd_event_set_watchdog(sd_event *e, int b) {
2078         int r;
2079
2080         assert_return(e, -EINVAL);
2081
2082         if (e->watchdog == !!b)
2083                 return e->watchdog;
2084
2085         if (b) {
2086                 struct epoll_event ev = {};
2087                 const char *env;
2088
2089                 env = getenv("WATCHDOG_USEC");
2090                 if (!env)
2091                         return false;
2092
2093                 r = safe_atou64(env, &e->watchdog_period);
2094                 if (r < 0)
2095                         return r;
2096                 if (e->watchdog_period <= 0)
2097                         return -EIO;
2098
2099                 /* Issue first ping immediately */
2100                 sd_notify(false, "WATCHDOG=1");
2101                 e->watchdog_last = now(CLOCK_MONOTONIC);
2102
2103                 e->watchdog_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC);
2104                 if (e->watchdog_fd < 0)
2105                         return -errno;
2106
2107                 r = arm_watchdog(e);
2108                 if (r < 0)
2109                         goto fail;
2110
2111                 ev.events = EPOLLIN;
2112                 ev.data.ptr = INT_TO_PTR(SOURCE_WATCHDOG);
2113
2114                 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->watchdog_fd, &ev);
2115                 if (r < 0) {
2116                         r = -errno;
2117                         goto fail;
2118                 }
2119
2120         } else {
2121                 if (e->watchdog_fd >= 0) {
2122                         epoll_ctl(e->epoll_fd, EPOLL_CTL_DEL, e->watchdog_fd, NULL);
2123                         close_nointr_nofail(e->watchdog_fd);
2124                         e->watchdog_fd = -1;
2125                 }
2126         }
2127
2128         e->watchdog = !!b;
2129         return e->watchdog;
2130
2131 fail:
2132         close_nointr_nofail(e->watchdog_fd);
2133         e->watchdog_fd = -1;
2134         return r;
2135 }