chiark / gitweb /
3bda7f31adf8c97ec7b1d77705a2ad2710244f0e
[elogind.git] / src / libsystemd / sd-event / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25 #include <pthread.h>
26
27 #include "sd-id128.h"
28 #include "sd-daemon.h"
29 #include "macro.h"
30 #include "prioq.h"
31 #include "hashmap.h"
32 #include "util.h"
33 #include "time-util.h"
34 #include "missing.h"
35 #include "set.h"
36
37 #include "sd-event.h"
38
39 #define EPOLL_QUEUE_MAX 512U
40 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
41
42 typedef enum EventSourceType {
43         SOURCE_IO,
44         SOURCE_MONOTONIC,
45         SOURCE_REALTIME,
46         SOURCE_SIGNAL,
47         SOURCE_CHILD,
48         SOURCE_DEFER,
49         SOURCE_POST,
50         SOURCE_EXIT,
51         SOURCE_WATCHDOG
52 } EventSourceType;
53
54 struct sd_event_source {
55         unsigned n_ref;
56
57         sd_event *event;
58         void *userdata;
59         sd_event_handler_t prepare;
60
61         EventSourceType type:4;
62         int enabled:3;
63         bool pending:1;
64         bool dispatching:1;
65
66         int64_t priority;
67         unsigned pending_index;
68         unsigned prepare_index;
69         unsigned pending_iteration;
70         unsigned prepare_iteration;
71
72         union {
73                 struct {
74                         sd_event_io_handler_t callback;
75                         int fd;
76                         uint32_t events;
77                         uint32_t revents;
78                         bool registered:1;
79                 } io;
80                 struct {
81                         sd_event_time_handler_t callback;
82                         usec_t next, accuracy;
83                         unsigned earliest_index;
84                         unsigned latest_index;
85                 } time;
86                 struct {
87                         sd_event_signal_handler_t callback;
88                         struct signalfd_siginfo siginfo;
89                         int sig;
90                 } signal;
91                 struct {
92                         sd_event_child_handler_t callback;
93                         siginfo_t siginfo;
94                         pid_t pid;
95                         int options;
96                 } child;
97                 struct {
98                         sd_event_handler_t callback;
99                 } defer;
100                 struct {
101                         sd_event_handler_t callback;
102                 } post;
103                 struct {
104                         sd_event_handler_t callback;
105                         unsigned prioq_index;
106                 } exit;
107         };
108 };
109
110 struct sd_event {
111         unsigned n_ref;
112
113         int epoll_fd;
114         int signal_fd;
115         int realtime_fd;
116         int monotonic_fd;
117         int watchdog_fd;
118
119         Prioq *pending;
120         Prioq *prepare;
121
122         /* For both clocks we maintain two priority queues each, one
123          * ordered for the earliest times the events may be
124          * dispatched, and one ordered by the latest times they must
125          * have been dispatched. The range between the top entries in
126          * the two prioqs is the time window we can freely schedule
127          * wakeups in */
128         Prioq *monotonic_earliest;
129         Prioq *monotonic_latest;
130         Prioq *realtime_earliest;
131         Prioq *realtime_latest;
132
133         usec_t realtime_next, monotonic_next;
134         usec_t perturb;
135
136         sigset_t sigset;
137         sd_event_source **signal_sources;
138
139         Hashmap *child_sources;
140         unsigned n_enabled_child_sources;
141
142         Set *post_sources;
143
144         Prioq *exit;
145
146         pid_t original_pid;
147
148         unsigned iteration;
149         dual_timestamp timestamp;
150         int state;
151
152         bool exit_requested:1;
153         bool need_process_child:1;
154         bool watchdog:1;
155
156         int exit_code;
157
158         pid_t tid;
159         sd_event **default_event_ptr;
160
161         usec_t watchdog_last, watchdog_period;
162
163         unsigned n_sources;
164 };
165
166 static int pending_prioq_compare(const void *a, const void *b) {
167         const sd_event_source *x = a, *y = b;
168
169         assert(x->pending);
170         assert(y->pending);
171
172         /* Enabled ones first */
173         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
174                 return -1;
175         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
176                 return 1;
177
178         /* Lower priority values first */
179         if (x->priority < y->priority)
180                 return -1;
181         if (x->priority > y->priority)
182                 return 1;
183
184         /* Older entries first */
185         if (x->pending_iteration < y->pending_iteration)
186                 return -1;
187         if (x->pending_iteration > y->pending_iteration)
188                 return 1;
189
190         /* Stability for the rest */
191         if (x < y)
192                 return -1;
193         if (x > y)
194                 return 1;
195
196         return 0;
197 }
198
199 static int prepare_prioq_compare(const void *a, const void *b) {
200         const sd_event_source *x = a, *y = b;
201
202         assert(x->prepare);
203         assert(y->prepare);
204
205         /* Move most recently prepared ones last, so that we can stop
206          * preparing as soon as we hit one that has already been
207          * prepared in the current iteration */
208         if (x->prepare_iteration < y->prepare_iteration)
209                 return -1;
210         if (x->prepare_iteration > y->prepare_iteration)
211                 return 1;
212
213         /* Enabled ones first */
214         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
215                 return -1;
216         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
217                 return 1;
218
219         /* Lower priority values first */
220         if (x->priority < y->priority)
221                 return -1;
222         if (x->priority > y->priority)
223                 return 1;
224
225         /* Stability for the rest */
226         if (x < y)
227                 return -1;
228         if (x > y)
229                 return 1;
230
231         return 0;
232 }
233
234 static int earliest_time_prioq_compare(const void *a, const void *b) {
235         const sd_event_source *x = a, *y = b;
236
237         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
238         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
239
240         /* Enabled ones first */
241         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
242                 return -1;
243         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
244                 return 1;
245
246         /* Move the pending ones to the end */
247         if (!x->pending && y->pending)
248                 return -1;
249         if (x->pending && !y->pending)
250                 return 1;
251
252         /* Order by time */
253         if (x->time.next < y->time.next)
254                 return -1;
255         if (x->time.next > y->time.next)
256                 return 1;
257
258         /* Stability for the rest */
259         if (x < y)
260                 return -1;
261         if (x > y)
262                 return 1;
263
264         return 0;
265 }
266
267 static int latest_time_prioq_compare(const void *a, const void *b) {
268         const sd_event_source *x = a, *y = b;
269
270         assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
271                (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
272
273         /* Enabled ones first */
274         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
275                 return -1;
276         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
277                 return 1;
278
279         /* Move the pending ones to the end */
280         if (!x->pending && y->pending)
281                 return -1;
282         if (x->pending && !y->pending)
283                 return 1;
284
285         /* Order by time */
286         if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
287                 return -1;
288         if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
289                 return 1;
290
291         /* Stability for the rest */
292         if (x < y)
293                 return -1;
294         if (x > y)
295                 return 1;
296
297         return 0;
298 }
299
300 static int exit_prioq_compare(const void *a, const void *b) {
301         const sd_event_source *x = a, *y = b;
302
303         assert(x->type == SOURCE_EXIT);
304         assert(y->type == SOURCE_EXIT);
305
306         /* Enabled ones first */
307         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
308                 return -1;
309         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
310                 return 1;
311
312         /* Lower priority values first */
313         if (x->priority < y->priority)
314                 return -1;
315         if (x->priority > y->priority)
316                 return 1;
317
318         /* Stability for the rest */
319         if (x < y)
320                 return -1;
321         if (x > y)
322                 return 1;
323
324         return 0;
325 }
326
327 static void event_free(sd_event *e) {
328         assert(e);
329         assert(e->n_sources == 0);
330
331         if (e->default_event_ptr)
332                 *(e->default_event_ptr) = NULL;
333
334         safe_close(e->epoll_fd);
335         safe_close(e->signal_fd);
336         safe_close(e->realtime_fd);
337         safe_close(e->monotonic_fd);
338         safe_close(e->watchdog_fd);
339
340         prioq_free(e->pending);
341         prioq_free(e->prepare);
342         prioq_free(e->monotonic_earliest);
343         prioq_free(e->monotonic_latest);
344         prioq_free(e->realtime_earliest);
345         prioq_free(e->realtime_latest);
346         prioq_free(e->exit);
347
348         free(e->signal_sources);
349
350         hashmap_free(e->child_sources);
351         set_free(e->post_sources);
352         free(e);
353 }
354
355 _public_ int sd_event_new(sd_event** ret) {
356         sd_event *e;
357         int r;
358
359         assert_return(ret, -EINVAL);
360
361         e = new0(sd_event, 1);
362         if (!e)
363                 return -ENOMEM;
364
365         e->n_ref = 1;
366         e->signal_fd = e->realtime_fd = e->monotonic_fd = e->watchdog_fd = e->epoll_fd = -1;
367         e->realtime_next = e->monotonic_next = (usec_t) -1;
368         e->original_pid = getpid();
369
370         assert_se(sigemptyset(&e->sigset) == 0);
371
372         e->pending = prioq_new(pending_prioq_compare);
373         if (!e->pending) {
374                 r = -ENOMEM;
375                 goto fail;
376         }
377
378         e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
379         if (e->epoll_fd < 0) {
380                 r = -errno;
381                 goto fail;
382         }
383
384         *ret = e;
385         return 0;
386
387 fail:
388         event_free(e);
389         return r;
390 }
391
392 _public_ sd_event* sd_event_ref(sd_event *e) {
393         assert_return(e, NULL);
394
395         assert(e->n_ref >= 1);
396         e->n_ref++;
397
398         return e;
399 }
400
401 _public_ sd_event* sd_event_unref(sd_event *e) {
402
403         if (!e)
404                 return NULL;
405
406         assert(e->n_ref >= 1);
407         e->n_ref--;
408
409         if (e->n_ref <= 0)
410                 event_free(e);
411
412         return NULL;
413 }
414
415 static bool event_pid_changed(sd_event *e) {
416         assert(e);
417
418         /* We don't support people creating am event loop and keeping
419          * it around over a fork(). Let's complain. */
420
421         return e->original_pid != getpid();
422 }
423
424 static int source_io_unregister(sd_event_source *s) {
425         int r;
426
427         assert(s);
428         assert(s->type == SOURCE_IO);
429
430         if (!s->io.registered)
431                 return 0;
432
433         r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
434         if (r < 0)
435                 return -errno;
436
437         s->io.registered = false;
438         return 0;
439 }
440
441 static int source_io_register(
442                 sd_event_source *s,
443                 int enabled,
444                 uint32_t events) {
445
446         struct epoll_event ev = {};
447         int r;
448
449         assert(s);
450         assert(s->type == SOURCE_IO);
451         assert(enabled != SD_EVENT_OFF);
452
453         ev.events = events;
454         ev.data.ptr = s;
455
456         if (enabled == SD_EVENT_ONESHOT)
457                 ev.events |= EPOLLONESHOT;
458
459         if (s->io.registered)
460                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
461         else
462                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
463
464         if (r < 0)
465                 return -errno;
466
467         s->io.registered = true;
468
469         return 0;
470 }
471
472 static void source_free(sd_event_source *s) {
473         assert(s);
474
475         if (s->event) {
476                 assert(s->event->n_sources > 0);
477
478                 switch (s->type) {
479
480                 case SOURCE_IO:
481                         if (s->io.fd >= 0)
482                                 source_io_unregister(s);
483
484                         break;
485
486                 case SOURCE_MONOTONIC:
487                         prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
488                         prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
489                         break;
490
491                 case SOURCE_REALTIME:
492                         prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
493                         prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
494                         break;
495
496                 case SOURCE_SIGNAL:
497                         if (s->signal.sig > 0) {
498                                 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)
499                                         assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
500
501                                 if (s->event->signal_sources)
502                                         s->event->signal_sources[s->signal.sig] = NULL;
503                         }
504
505                         break;
506
507                 case SOURCE_CHILD:
508                         if (s->child.pid > 0) {
509                                 if (s->enabled != SD_EVENT_OFF) {
510                                         assert(s->event->n_enabled_child_sources > 0);
511                                         s->event->n_enabled_child_sources--;
512                                 }
513
514                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
515                                         assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
516
517                                 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
518                         }
519
520                         break;
521
522                 case SOURCE_DEFER:
523                         /* nothing */
524                         break;
525
526                 case SOURCE_POST:
527                         set_remove(s->event->post_sources, s);
528                         break;
529
530                 case SOURCE_EXIT:
531                         prioq_remove(s->event->exit, s, &s->exit.prioq_index);
532                         break;
533
534                 case SOURCE_WATCHDOG:
535                         assert_not_reached("Wut? I shouldn't exist.");
536                 }
537
538                 if (s->pending)
539                         prioq_remove(s->event->pending, s, &s->pending_index);
540
541                 if (s->prepare)
542                         prioq_remove(s->event->prepare, s, &s->prepare_index);
543
544                 s->event->n_sources--;
545                 sd_event_unref(s->event);
546         }
547
548         free(s);
549 }
550
551 static int source_set_pending(sd_event_source *s, bool b) {
552         int r;
553
554         assert(s);
555         assert(s->type != SOURCE_EXIT);
556
557         if (s->pending == b)
558                 return 0;
559
560         s->pending = b;
561
562         if (b) {
563                 s->pending_iteration = s->event->iteration;
564
565                 r = prioq_put(s->event->pending, s, &s->pending_index);
566                 if (r < 0) {
567                         s->pending = false;
568                         return r;
569                 }
570         } else
571                 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
572
573         if (s->type == SOURCE_REALTIME) {
574                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
575                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
576         } else if (s->type == SOURCE_MONOTONIC) {
577                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
578                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
579         }
580
581         return 0;
582 }
583
584 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
585         sd_event_source *s;
586
587         assert(e);
588
589         s = new0(sd_event_source, 1);
590         if (!s)
591                 return NULL;
592
593         s->n_ref = 1;
594         s->event = sd_event_ref(e);
595         s->type = type;
596         s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
597
598         e->n_sources ++;
599
600         return s;
601 }
602
603 _public_ int sd_event_add_io(
604                 sd_event *e,
605                 sd_event_source **ret,
606                 int fd,
607                 uint32_t events,
608                 sd_event_io_handler_t callback,
609                 void *userdata) {
610
611         sd_event_source *s;
612         int r;
613
614         assert_return(e, -EINVAL);
615         assert_return(fd >= 0, -EINVAL);
616         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP|EPOLLET)), -EINVAL);
617         assert_return(callback, -EINVAL);
618         assert_return(ret, -EINVAL);
619         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
620         assert_return(!event_pid_changed(e), -ECHILD);
621
622         s = source_new(e, SOURCE_IO);
623         if (!s)
624                 return -ENOMEM;
625
626         s->io.fd = fd;
627         s->io.events = events;
628         s->io.callback = callback;
629         s->userdata = userdata;
630         s->enabled = SD_EVENT_ON;
631
632         r = source_io_register(s, s->enabled, events);
633         if (r < 0) {
634                 source_free(s);
635                 return -errno;
636         }
637
638         *ret = s;
639         return 0;
640 }
641
642 static int event_setup_timer_fd(
643                 sd_event *e,
644                 EventSourceType type,
645                 int *timer_fd,
646                 clockid_t id) {
647
648         sd_id128_t bootid = {};
649         struct epoll_event ev = {};
650         int r, fd;
651
652         assert(e);
653         assert(timer_fd);
654
655         if (_likely_(*timer_fd >= 0))
656                 return 0;
657
658         fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
659         if (fd < 0)
660                 return -errno;
661
662         ev.events = EPOLLIN;
663         ev.data.ptr = INT_TO_PTR(type);
664
665         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
666         if (r < 0) {
667                 safe_close(fd);
668                 return -errno;
669         }
670
671         /* When we sleep for longer, we try to realign the wakeup to
672            the same time wihtin each minute/second/250ms, so that
673            events all across the system can be coalesced into a single
674            CPU wakeup. However, let's take some system-specific
675            randomness for this value, so that in a network of systems
676            with synced clocks timer events are distributed a
677            bit. Here, we calculate a perturbation usec offset from the
678            boot ID. */
679
680         if (sd_id128_get_boot(&bootid) >= 0)
681                 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_MINUTE;
682
683         *timer_fd = fd;
684         return 0;
685 }
686
687 static int event_add_time_internal(
688                 sd_event *e,
689                 sd_event_source **ret,
690                 EventSourceType type,
691                 int *timer_fd,
692                 clockid_t id,
693                 Prioq **earliest,
694                 Prioq **latest,
695                 uint64_t usec,
696                 uint64_t accuracy,
697                 sd_event_time_handler_t callback,
698                 void *userdata) {
699
700         sd_event_source *s;
701         int r;
702
703         assert_return(e, -EINVAL);
704         assert_return(callback, -EINVAL);
705         assert_return(ret, -EINVAL);
706         assert_return(usec != (uint64_t) -1, -EINVAL);
707         assert_return(accuracy != (uint64_t) -1, -EINVAL);
708         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
709         assert_return(!event_pid_changed(e), -ECHILD);
710
711         assert(timer_fd);
712         assert(earliest);
713         assert(latest);
714
715         if (!*earliest) {
716                 *earliest = prioq_new(earliest_time_prioq_compare);
717                 if (!*earliest)
718                         return -ENOMEM;
719         }
720
721         if (!*latest) {
722                 *latest = prioq_new(latest_time_prioq_compare);
723                 if (!*latest)
724                         return -ENOMEM;
725         }
726
727         if (*timer_fd < 0) {
728                 r = event_setup_timer_fd(e, type, timer_fd, id);
729                 if (r < 0)
730                         return r;
731         }
732
733         s = source_new(e, type);
734         if (!s)
735                 return -ENOMEM;
736
737         s->time.next = usec;
738         s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
739         s->time.callback = callback;
740         s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
741         s->userdata = userdata;
742         s->enabled = SD_EVENT_ONESHOT;
743
744         r = prioq_put(*earliest, s, &s->time.earliest_index);
745         if (r < 0)
746                 goto fail;
747
748         r = prioq_put(*latest, s, &s->time.latest_index);
749         if (r < 0)
750                 goto fail;
751
752         *ret = s;
753         return 0;
754
755 fail:
756         source_free(s);
757         return r;
758 }
759
760 _public_ int sd_event_add_monotonic(sd_event *e,
761                                     sd_event_source **ret,
762                                     uint64_t usec,
763                                     uint64_t accuracy,
764                                     sd_event_time_handler_t callback,
765                                     void *userdata) {
766
767         return event_add_time_internal(e, ret, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata);
768 }
769
770 _public_ int sd_event_add_realtime(sd_event *e,
771                                    sd_event_source **ret,
772                                    uint64_t usec,
773                                    uint64_t accuracy,
774                                    sd_event_time_handler_t callback,
775                                    void *userdata) {
776
777         return event_add_time_internal(e, ret, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->realtime_latest, usec, accuracy, callback, userdata);
778 }
779
780 static int event_update_signal_fd(sd_event *e) {
781         struct epoll_event ev = {};
782         bool add_to_epoll;
783         int r;
784
785         assert(e);
786
787         add_to_epoll = e->signal_fd < 0;
788
789         r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
790         if (r < 0)
791                 return -errno;
792
793         e->signal_fd = r;
794
795         if (!add_to_epoll)
796                 return 0;
797
798         ev.events = EPOLLIN;
799         ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
800
801         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
802         if (r < 0) {
803                 e->signal_fd = safe_close(e->signal_fd);
804                 return -errno;
805         }
806
807         return 0;
808 }
809
810 _public_ int sd_event_add_signal(
811                 sd_event *e,
812                 sd_event_source **ret,
813                 int sig,
814                 sd_event_signal_handler_t callback,
815                 void *userdata) {
816
817         sd_event_source *s;
818         sigset_t ss;
819         int r;
820
821         assert_return(e, -EINVAL);
822         assert_return(sig > 0, -EINVAL);
823         assert_return(sig < _NSIG, -EINVAL);
824         assert_return(callback, -EINVAL);
825         assert_return(ret, -EINVAL);
826         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
827         assert_return(!event_pid_changed(e), -ECHILD);
828
829         r = pthread_sigmask(SIG_SETMASK, NULL, &ss);
830         if (r < 0)
831                 return -errno;
832
833         if (!sigismember(&ss, sig))
834                 return -EBUSY;
835
836         if (!e->signal_sources) {
837                 e->signal_sources = new0(sd_event_source*, _NSIG);
838                 if (!e->signal_sources)
839                         return -ENOMEM;
840         } else if (e->signal_sources[sig])
841                 return -EBUSY;
842
843         s = source_new(e, SOURCE_SIGNAL);
844         if (!s)
845                 return -ENOMEM;
846
847         s->signal.sig = sig;
848         s->signal.callback = callback;
849         s->userdata = userdata;
850         s->enabled = SD_EVENT_ON;
851
852         e->signal_sources[sig] = s;
853         assert_se(sigaddset(&e->sigset, sig) == 0);
854
855         if (sig != SIGCHLD || e->n_enabled_child_sources == 0) {
856                 r = event_update_signal_fd(e);
857                 if (r < 0) {
858                         source_free(s);
859                         return r;
860                 }
861         }
862
863         *ret = s;
864         return 0;
865 }
866
867 _public_ int sd_event_add_child(
868                 sd_event *e,
869                 sd_event_source **ret,
870                 pid_t pid,
871                 int options,
872                 sd_event_child_handler_t callback,
873                 void *userdata) {
874
875         sd_event_source *s;
876         int r;
877
878         assert_return(e, -EINVAL);
879         assert_return(pid > 1, -EINVAL);
880         assert_return(!(options & ~(WEXITED|WSTOPPED|WCONTINUED)), -EINVAL);
881         assert_return(options != 0, -EINVAL);
882         assert_return(callback, -EINVAL);
883         assert_return(ret, -EINVAL);
884         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
885         assert_return(!event_pid_changed(e), -ECHILD);
886
887         r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
888         if (r < 0)
889                 return r;
890
891         if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
892                 return -EBUSY;
893
894         s = source_new(e, SOURCE_CHILD);
895         if (!s)
896                 return -ENOMEM;
897
898         s->child.pid = pid;
899         s->child.options = options;
900         s->child.callback = callback;
901         s->userdata = userdata;
902         s->enabled = SD_EVENT_ONESHOT;
903
904         r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
905         if (r < 0) {
906                 source_free(s);
907                 return r;
908         }
909
910         e->n_enabled_child_sources ++;
911
912         assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
913
914         if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
915                 r = event_update_signal_fd(e);
916                 if (r < 0) {
917                         source_free(s);
918                         return -errno;
919                 }
920         }
921
922         e->need_process_child = true;
923
924         *ret = s;
925         return 0;
926 }
927
928 _public_ int sd_event_add_defer(
929                 sd_event *e,
930                 sd_event_source **ret,
931                 sd_event_handler_t callback,
932                 void *userdata) {
933
934         sd_event_source *s;
935         int r;
936
937         assert_return(e, -EINVAL);
938         assert_return(callback, -EINVAL);
939         assert_return(ret, -EINVAL);
940         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
941         assert_return(!event_pid_changed(e), -ECHILD);
942
943         s = source_new(e, SOURCE_DEFER);
944         if (!s)
945                 return -ENOMEM;
946
947         s->defer.callback = callback;
948         s->userdata = userdata;
949         s->enabled = SD_EVENT_ONESHOT;
950
951         r = source_set_pending(s, true);
952         if (r < 0) {
953                 source_free(s);
954                 return r;
955         }
956
957         *ret = s;
958         return 0;
959 }
960
961 _public_ int sd_event_add_post(
962                 sd_event *e,
963                 sd_event_source **ret,
964                 sd_event_handler_t callback,
965                 void *userdata) {
966
967         sd_event_source *s;
968         int r;
969
970         assert_return(e, -EINVAL);
971         assert_return(callback, -EINVAL);
972         assert_return(ret, -EINVAL);
973         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
974         assert_return(!event_pid_changed(e), -ECHILD);
975
976         r = set_ensure_allocated(&e->post_sources, trivial_hash_func, trivial_compare_func);
977         if (r < 0)
978                 return r;
979
980         s = source_new(e, SOURCE_POST);
981         if (!s)
982                 return -ENOMEM;
983
984         s->post.callback = callback;
985         s->userdata = userdata;
986         s->enabled = SD_EVENT_ON;
987
988         r = set_put(e->post_sources, s);
989         if (r < 0) {
990                 source_free(s);
991                 return r;
992         }
993
994         *ret = s;
995         return 0;
996 }
997
998 _public_ int sd_event_add_exit(
999                 sd_event *e,
1000                 sd_event_source **ret,
1001                 sd_event_handler_t callback,
1002                 void *userdata) {
1003
1004         sd_event_source *s;
1005         int r;
1006
1007         assert_return(e, -EINVAL);
1008         assert_return(callback, -EINVAL);
1009         assert_return(ret, -EINVAL);
1010         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1011         assert_return(!event_pid_changed(e), -ECHILD);
1012
1013         if (!e->exit) {
1014                 e->exit = prioq_new(exit_prioq_compare);
1015                 if (!e->exit)
1016                         return -ENOMEM;
1017         }
1018
1019         s = source_new(e, SOURCE_EXIT);
1020         if (!s)
1021                 return -ENOMEM;
1022
1023         s->exit.callback = callback;
1024         s->userdata = userdata;
1025         s->exit.prioq_index = PRIOQ_IDX_NULL;
1026         s->enabled = SD_EVENT_ONESHOT;
1027
1028         r = prioq_put(s->event->exit, s, &s->exit.prioq_index);
1029         if (r < 0) {
1030                 source_free(s);
1031                 return r;
1032         }
1033
1034         *ret = s;
1035         return 0;
1036 }
1037
1038 _public_ sd_event_source* sd_event_source_ref(sd_event_source *s) {
1039         assert_return(s, NULL);
1040
1041         assert(s->n_ref >= 1);
1042         s->n_ref++;
1043
1044         return s;
1045 }
1046
1047 _public_ sd_event_source* sd_event_source_unref(sd_event_source *s) {
1048
1049         if (!s)
1050                 return NULL;
1051
1052         assert(s->n_ref >= 1);
1053         s->n_ref--;
1054
1055         if (s->n_ref <= 0) {
1056                 /* Here's a special hack: when we are called from a
1057                  * dispatch handler we won't free the event source
1058                  * immediately, but we will detach the fd from the
1059                  * epoll. This way it is safe for the caller to unref
1060                  * the event source and immediately close the fd, but
1061                  * we still retain a valid event source object after
1062                  * the callback. */
1063
1064                 if (s->dispatching) {
1065                         if (s->type == SOURCE_IO)
1066                                 source_io_unregister(s);
1067                 } else
1068                         source_free(s);
1069         }
1070
1071         return NULL;
1072 }
1073
1074 _public_ sd_event *sd_event_source_get_event(sd_event_source *s) {
1075         assert_return(s, NULL);
1076
1077         return s->event;
1078 }
1079
1080 _public_ int sd_event_source_get_pending(sd_event_source *s) {
1081         assert_return(s, -EINVAL);
1082         assert_return(s->type != SOURCE_EXIT, -EDOM);
1083         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1084         assert_return(!event_pid_changed(s->event), -ECHILD);
1085
1086         return s->pending;
1087 }
1088
1089 _public_ int sd_event_source_get_io_fd(sd_event_source *s) {
1090         assert_return(s, -EINVAL);
1091         assert_return(s->type == SOURCE_IO, -EDOM);
1092         assert_return(!event_pid_changed(s->event), -ECHILD);
1093
1094         return s->io.fd;
1095 }
1096
1097 _public_ int sd_event_source_set_io_fd(sd_event_source *s, int fd) {
1098         int r;
1099
1100         assert_return(s, -EINVAL);
1101         assert_return(fd >= 0, -EINVAL);
1102         assert_return(s->type == SOURCE_IO, -EDOM);
1103         assert_return(!event_pid_changed(s->event), -ECHILD);
1104
1105         if (s->io.fd == fd)
1106                 return 0;
1107
1108         if (s->enabled == SD_EVENT_OFF) {
1109                 s->io.fd = fd;
1110                 s->io.registered = false;
1111         } else {
1112                 int saved_fd;
1113
1114                 saved_fd = s->io.fd;
1115                 assert(s->io.registered);
1116
1117                 s->io.fd = fd;
1118                 s->io.registered = false;
1119
1120                 r = source_io_register(s, s->enabled, s->io.events);
1121                 if (r < 0) {
1122                         s->io.fd = saved_fd;
1123                         s->io.registered = true;
1124                         return r;
1125                 }
1126
1127                 epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, saved_fd, NULL);
1128         }
1129
1130         return 0;
1131 }
1132
1133 _public_ int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
1134         assert_return(s, -EINVAL);
1135         assert_return(events, -EINVAL);
1136         assert_return(s->type == SOURCE_IO, -EDOM);
1137         assert_return(!event_pid_changed(s->event), -ECHILD);
1138
1139         *events = s->io.events;
1140         return 0;
1141 }
1142
1143 _public_ int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
1144         int r;
1145
1146         assert_return(s, -EINVAL);
1147         assert_return(s->type == SOURCE_IO, -EDOM);
1148         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP|EPOLLET)), -EINVAL);
1149         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1150         assert_return(!event_pid_changed(s->event), -ECHILD);
1151
1152         if (s->io.events == events)
1153                 return 0;
1154
1155         if (s->enabled != SD_EVENT_OFF) {
1156                 r = source_io_register(s, s->enabled, events);
1157                 if (r < 0)
1158                         return r;
1159         }
1160
1161         s->io.events = events;
1162         source_set_pending(s, false);
1163
1164         return 0;
1165 }
1166
1167 _public_ int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1168         assert_return(s, -EINVAL);
1169         assert_return(revents, -EINVAL);
1170         assert_return(s->type == SOURCE_IO, -EDOM);
1171         assert_return(s->pending, -ENODATA);
1172         assert_return(!event_pid_changed(s->event), -ECHILD);
1173
1174         *revents = s->io.revents;
1175         return 0;
1176 }
1177
1178 _public_ int sd_event_source_get_signal(sd_event_source *s) {
1179         assert_return(s, -EINVAL);
1180         assert_return(s->type == SOURCE_SIGNAL, -EDOM);
1181         assert_return(!event_pid_changed(s->event), -ECHILD);
1182
1183         return s->signal.sig;
1184 }
1185
1186 _public_ int sd_event_source_get_priority(sd_event_source *s, int64_t *priority) {
1187         assert_return(s, -EINVAL);
1188         assert_return(!event_pid_changed(s->event), -ECHILD);
1189
1190         return s->priority;
1191 }
1192
1193 _public_ int sd_event_source_set_priority(sd_event_source *s, int64_t priority) {
1194         assert_return(s, -EINVAL);
1195         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1196         assert_return(!event_pid_changed(s->event), -ECHILD);
1197
1198         if (s->priority == priority)
1199                 return 0;
1200
1201         s->priority = priority;
1202
1203         if (s->pending)
1204                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1205
1206         if (s->prepare)
1207                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1208
1209         if (s->type == SOURCE_EXIT)
1210                 prioq_reshuffle(s->event->exit, s, &s->exit.prioq_index);
1211
1212         return 0;
1213 }
1214
1215 _public_ int sd_event_source_get_enabled(sd_event_source *s, int *m) {
1216         assert_return(s, -EINVAL);
1217         assert_return(m, -EINVAL);
1218         assert_return(!event_pid_changed(s->event), -ECHILD);
1219
1220         *m = s->enabled;
1221         return 0;
1222 }
1223
1224 _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
1225         int r;
1226
1227         assert_return(s, -EINVAL);
1228         assert_return(m == SD_EVENT_OFF || m == SD_EVENT_ON || m == SD_EVENT_ONESHOT, -EINVAL);
1229         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1230         assert_return(!event_pid_changed(s->event), -ECHILD);
1231
1232         if (s->enabled == m)
1233                 return 0;
1234
1235         if (m == SD_EVENT_OFF) {
1236
1237                 switch (s->type) {
1238
1239                 case SOURCE_IO:
1240                         r = source_io_unregister(s);
1241                         if (r < 0)
1242                                 return r;
1243
1244                         s->enabled = m;
1245                         break;
1246
1247                 case SOURCE_MONOTONIC:
1248                         s->enabled = m;
1249                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1250                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1251                         break;
1252
1253                 case SOURCE_REALTIME:
1254                         s->enabled = m;
1255                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1256                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1257                         break;
1258
1259                 case SOURCE_SIGNAL:
1260                         s->enabled = m;
1261                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1262                                 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1263                                 event_update_signal_fd(s->event);
1264                         }
1265
1266                         break;
1267
1268                 case SOURCE_CHILD:
1269                         s->enabled = m;
1270
1271                         assert(s->event->n_enabled_child_sources > 0);
1272                         s->event->n_enabled_child_sources--;
1273
1274                         if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1275                                 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1276                                 event_update_signal_fd(s->event);
1277                         }
1278
1279                         break;
1280
1281                 case SOURCE_EXIT:
1282                         s->enabled = m;
1283                         prioq_reshuffle(s->event->exit, s, &s->exit.prioq_index);
1284                         break;
1285
1286                 case SOURCE_DEFER:
1287                 case SOURCE_POST:
1288                         s->enabled = m;
1289                         break;
1290
1291                 case SOURCE_WATCHDOG:
1292                         assert_not_reached("Wut? I shouldn't exist.");
1293                 }
1294
1295         } else {
1296                 switch (s->type) {
1297
1298                 case SOURCE_IO:
1299                         r = source_io_register(s, m, s->io.events);
1300                         if (r < 0)
1301                                 return r;
1302
1303                         s->enabled = m;
1304                         break;
1305
1306                 case SOURCE_MONOTONIC:
1307                         s->enabled = m;
1308                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1309                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1310                         break;
1311
1312                 case SOURCE_REALTIME:
1313                         s->enabled = m;
1314                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1315                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1316                         break;
1317
1318                 case SOURCE_SIGNAL:
1319                         s->enabled = m;
1320
1321                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)  {
1322                                 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1323                                 event_update_signal_fd(s->event);
1324                         }
1325                         break;
1326
1327                 case SOURCE_CHILD:
1328                         if (s->enabled == SD_EVENT_OFF) {
1329                                 s->event->n_enabled_child_sources++;
1330
1331                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1332                                         assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1333                                         event_update_signal_fd(s->event);
1334                                 }
1335                         }
1336
1337                         s->enabled = m;
1338                         break;
1339
1340                 case SOURCE_EXIT:
1341                         s->enabled = m;
1342                         prioq_reshuffle(s->event->exit, s, &s->exit.prioq_index);
1343                         break;
1344
1345                 case SOURCE_DEFER:
1346                 case SOURCE_POST:
1347                         s->enabled = m;
1348                         break;
1349
1350                 case SOURCE_WATCHDOG:
1351                         assert_not_reached("Wut? I shouldn't exist.");
1352                 }
1353         }
1354
1355         if (s->pending)
1356                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1357
1358         if (s->prepare)
1359                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1360
1361         return 0;
1362 }
1363
1364 _public_ int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1365         assert_return(s, -EINVAL);
1366         assert_return(usec, -EINVAL);
1367         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1368         assert_return(!event_pid_changed(s->event), -ECHILD);
1369
1370         *usec = s->time.next;
1371         return 0;
1372 }
1373
1374 _public_ int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1375         assert_return(s, -EINVAL);
1376         assert_return(usec != (uint64_t) -1, -EINVAL);
1377         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1378         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1379         assert_return(!event_pid_changed(s->event), -ECHILD);
1380
1381         s->time.next = usec;
1382
1383         source_set_pending(s, false);
1384
1385         if (s->type == SOURCE_REALTIME) {
1386                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1387                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1388         } else {
1389                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1390                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1391         }
1392
1393         return 0;
1394 }
1395
1396 _public_ int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1397         assert_return(s, -EINVAL);
1398         assert_return(usec, -EINVAL);
1399         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1400         assert_return(!event_pid_changed(s->event), -ECHILD);
1401
1402         *usec = s->time.accuracy;
1403         return 0;
1404 }
1405
1406 _public_ int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1407         assert_return(s, -EINVAL);
1408         assert_return(usec != (uint64_t) -1, -EINVAL);
1409         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1410         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1411         assert_return(!event_pid_changed(s->event), -ECHILD);
1412
1413         if (usec == 0)
1414                 usec = DEFAULT_ACCURACY_USEC;
1415
1416         s->time.accuracy = usec;
1417
1418         source_set_pending(s, false);
1419
1420         if (s->type == SOURCE_REALTIME)
1421                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1422         else
1423                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1424
1425         return 0;
1426 }
1427
1428 _public_ int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid) {
1429         assert_return(s, -EINVAL);
1430         assert_return(pid, -EINVAL);
1431         assert_return(s->type == SOURCE_CHILD, -EDOM);
1432         assert_return(!event_pid_changed(s->event), -ECHILD);
1433
1434         *pid = s->child.pid;
1435         return 0;
1436 }
1437
1438 _public_ int sd_event_source_set_prepare(sd_event_source *s, sd_event_handler_t callback) {
1439         int r;
1440
1441         assert_return(s, -EINVAL);
1442         assert_return(s->type != SOURCE_EXIT, -EDOM);
1443         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1444         assert_return(!event_pid_changed(s->event), -ECHILD);
1445
1446         if (s->prepare == callback)
1447                 return 0;
1448
1449         if (callback && s->prepare) {
1450                 s->prepare = callback;
1451                 return 0;
1452         }
1453
1454         r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1455         if (r < 0)
1456                 return r;
1457
1458         s->prepare = callback;
1459
1460         if (callback) {
1461                 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1462                 if (r < 0)
1463                         return r;
1464         } else
1465                 prioq_remove(s->event->prepare, s, &s->prepare_index);
1466
1467         return 0;
1468 }
1469
1470 _public_ void* sd_event_source_get_userdata(sd_event_source *s) {
1471         assert_return(s, NULL);
1472
1473         return s->userdata;
1474 }
1475
1476 _public_ void *sd_event_source_set_userdata(sd_event_source *s, void *userdata) {
1477         void *ret;
1478
1479         assert_return(s, NULL);
1480
1481         ret = s->userdata;
1482         s->userdata = userdata;
1483
1484         return ret;
1485 }
1486
1487 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1488         usec_t c;
1489         assert(e);
1490         assert(a <= b);
1491
1492         if (a <= 0)
1493                 return 0;
1494
1495         if (b <= a + 1)
1496                 return a;
1497
1498         /*
1499           Find a good time to wake up again between times a and b. We
1500           have two goals here:
1501
1502           a) We want to wake up as seldom as possible, hence prefer
1503              later times over earlier times.
1504
1505           b) But if we have to wake up, then let's make sure to
1506              dispatch as much as possible on the entire system.
1507
1508           We implement this by waking up everywhere at the same time
1509           within any given minute if we can, synchronised via the
1510           perturbation value determined from the boot ID. If we can't,
1511           then we try to find the same spot in every 10s, then 1s and
1512           then 250ms step. Otherwise, we pick the last possible time
1513           to wake up.
1514         */
1515
1516         c = (b / USEC_PER_MINUTE) * USEC_PER_MINUTE + e->perturb;
1517         if (c >= b) {
1518                 if (_unlikely_(c < USEC_PER_MINUTE))
1519                         return b;
1520
1521                 c -= USEC_PER_MINUTE;
1522         }
1523
1524         if (c >= a)
1525                 return c;
1526
1527         c = (b / (USEC_PER_SEC*10)) * (USEC_PER_SEC*10) + (e->perturb % (USEC_PER_SEC*10));
1528         if (c >= b) {
1529                 if (_unlikely_(c < USEC_PER_SEC*10))
1530                         return b;
1531
1532                 c -= USEC_PER_SEC*10;
1533         }
1534
1535         if (c >= a)
1536                 return c;
1537
1538         c = (b / USEC_PER_SEC) * USEC_PER_SEC + (e->perturb % USEC_PER_SEC);
1539         if (c >= b) {
1540                 if (_unlikely_(c < USEC_PER_SEC))
1541                         return b;
1542
1543                 c -= USEC_PER_SEC;
1544         }
1545
1546         if (c >= a)
1547                 return c;
1548
1549         c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1550         if (c >= b) {
1551                 if (_unlikely_(c < USEC_PER_MSEC*250))
1552                         return b;
1553
1554                 c -= USEC_PER_MSEC*250;
1555         }
1556
1557         if (c >= a)
1558                 return c;
1559
1560         return b;
1561 }
1562
1563 static int event_arm_timer(
1564                 sd_event *e,
1565                 int timer_fd,
1566                 Prioq *earliest,
1567                 Prioq *latest,
1568                 usec_t *next) {
1569
1570         struct itimerspec its = {};
1571         sd_event_source *a, *b;
1572         usec_t t;
1573         int r;
1574
1575         assert(e);
1576         assert(next);
1577
1578         a = prioq_peek(earliest);
1579         if (!a || a->enabled == SD_EVENT_OFF) {
1580
1581                 if (timer_fd < 0)
1582                         return 0;
1583
1584                 if (*next == (usec_t) -1)
1585                         return 0;
1586
1587                 /* disarm */
1588                 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1589                 if (r < 0)
1590                         return r;
1591
1592                 *next = (usec_t) -1;
1593
1594                 return 0;
1595         }
1596
1597         b = prioq_peek(latest);
1598         assert_se(b && b->enabled != SD_EVENT_OFF);
1599
1600         t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1601         if (*next == t)
1602                 return 0;
1603
1604         assert_se(timer_fd >= 0);
1605
1606         if (t == 0) {
1607                 /* We don' want to disarm here, just mean some time looooong ago. */
1608                 its.it_value.tv_sec = 0;
1609                 its.it_value.tv_nsec = 1;
1610         } else
1611                 timespec_store(&its.it_value, t);
1612
1613         r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1614         if (r < 0)
1615                 return -errno;
1616
1617         *next = t;
1618         return 0;
1619 }
1620
1621 static int process_io(sd_event *e, sd_event_source *s, uint32_t revents) {
1622         assert(e);
1623         assert(s);
1624         assert(s->type == SOURCE_IO);
1625
1626         /* If the event source was already pending, we just OR in the
1627          * new revents, otherwise we reset the value. The ORing is
1628          * necessary to handle EPOLLONESHOT events properly where
1629          * readability might happen independently of writability, and
1630          * we need to keep track of both */
1631
1632         if (s->pending)
1633                 s->io.revents |= revents;
1634         else
1635                 s->io.revents = revents;
1636
1637         return source_set_pending(s, true);
1638 }
1639
1640 static int flush_timer(sd_event *e, int fd, uint32_t events, usec_t *next) {
1641         uint64_t x;
1642         ssize_t ss;
1643
1644         assert(e);
1645         assert(fd >= 0);
1646
1647         assert_return(events == EPOLLIN, -EIO);
1648
1649         ss = read(fd, &x, sizeof(x));
1650         if (ss < 0) {
1651                 if (errno == EAGAIN || errno == EINTR)
1652                         return 0;
1653
1654                 return -errno;
1655         }
1656
1657         if (_unlikely_(ss != sizeof(x)))
1658                 return -EIO;
1659
1660         if (next)
1661                 *next = (usec_t) -1;
1662
1663         return 0;
1664 }
1665
1666 static int process_timer(
1667                 sd_event *e,
1668                 usec_t n,
1669                 Prioq *earliest,
1670                 Prioq *latest) {
1671
1672         sd_event_source *s;
1673         int r;
1674
1675         assert(e);
1676
1677         for (;;) {
1678                 s = prioq_peek(earliest);
1679                 if (!s ||
1680                     s->time.next > n ||
1681                     s->enabled == SD_EVENT_OFF ||
1682                     s->pending)
1683                         break;
1684
1685                 r = source_set_pending(s, true);
1686                 if (r < 0)
1687                         return r;
1688
1689                 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1690                 prioq_reshuffle(latest, s, &s->time.latest_index);
1691         }
1692
1693         return 0;
1694 }
1695
1696 static int process_child(sd_event *e) {
1697         sd_event_source *s;
1698         Iterator i;
1699         int r;
1700
1701         assert(e);
1702
1703         e->need_process_child = false;
1704
1705         /*
1706            So, this is ugly. We iteratively invoke waitid() with P_PID
1707            + WNOHANG for each PID we wait for, instead of using
1708            P_ALL. This is because we only want to get child
1709            information of very specific child processes, and not all
1710            of them. We might not have processed the SIGCHLD even of a
1711            previous invocation and we don't want to maintain a
1712            unbounded *per-child* event queue, hence we really don't
1713            want anything flushed out of the kernel's queue that we
1714            don't care about. Since this is O(n) this means that if you
1715            have a lot of processes you probably want to handle SIGCHLD
1716            yourself.
1717
1718            We do not reap the children here (by using WNOWAIT), this
1719            is only done after the event source is dispatched so that
1720            the callback still sees the process as a zombie.
1721         */
1722
1723         HASHMAP_FOREACH(s, e->child_sources, i) {
1724                 assert(s->type == SOURCE_CHILD);
1725
1726                 if (s->pending)
1727                         continue;
1728
1729                 if (s->enabled == SD_EVENT_OFF)
1730                         continue;
1731
1732                 zero(s->child.siginfo);
1733                 r = waitid(P_PID, s->child.pid, &s->child.siginfo,
1734                            WNOHANG | (s->child.options & WEXITED ? WNOWAIT : 0) | s->child.options);
1735                 if (r < 0)
1736                         return -errno;
1737
1738                 if (s->child.siginfo.si_pid != 0) {
1739                         bool zombie =
1740                                 s->child.siginfo.si_code == CLD_EXITED ||
1741                                 s->child.siginfo.si_code == CLD_KILLED ||
1742                                 s->child.siginfo.si_code == CLD_DUMPED;
1743
1744                         if (!zombie && (s->child.options & WEXITED)) {
1745                                 /* If the child isn't dead then let's
1746                                  * immediately remove the state change
1747                                  * from the queue, since there's no
1748                                  * benefit in leaving it queued */
1749
1750                                 assert(s->child.options & (WSTOPPED|WCONTINUED));
1751                                 waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|(s->child.options & (WSTOPPED|WCONTINUED)));
1752                         }
1753
1754                         r = source_set_pending(s, true);
1755                         if (r < 0)
1756                                 return r;
1757                 }
1758         }
1759
1760         return 0;
1761 }
1762
1763 static int process_signal(sd_event *e, uint32_t events) {
1764         bool read_one = false;
1765         int r;
1766
1767         assert(e);
1768         assert(e->signal_sources);
1769
1770         assert_return(events == EPOLLIN, -EIO);
1771
1772         for (;;) {
1773                 struct signalfd_siginfo si;
1774                 ssize_t ss;
1775                 sd_event_source *s;
1776
1777                 ss = read(e->signal_fd, &si, sizeof(si));
1778                 if (ss < 0) {
1779                         if (errno == EAGAIN || errno == EINTR)
1780                                 return read_one;
1781
1782                         return -errno;
1783                 }
1784
1785                 if (_unlikely_(ss != sizeof(si)))
1786                         return -EIO;
1787
1788                 read_one = true;
1789
1790                 s = e->signal_sources[si.ssi_signo];
1791                 if (si.ssi_signo == SIGCHLD) {
1792                         r = process_child(e);
1793                         if (r < 0)
1794                                 return r;
1795                         if (r > 0 || !s)
1796                                 continue;
1797                 } else
1798                         if (!s)
1799                                 return -EIO;
1800
1801                 s->signal.siginfo = si;
1802                 r = source_set_pending(s, true);
1803                 if (r < 0)
1804                         return r;
1805         }
1806 }
1807
1808 static int source_dispatch(sd_event_source *s) {
1809         int r = 0;
1810
1811         assert(s);
1812         assert(s->pending || s->type == SOURCE_EXIT);
1813
1814         if (s->type != SOURCE_DEFER && s->type != SOURCE_EXIT) {
1815                 r = source_set_pending(s, false);
1816                 if (r < 0)
1817                         return r;
1818         }
1819
1820         if (s->type != SOURCE_POST) {
1821                 sd_event_source *z;
1822                 Iterator i;
1823
1824                 /* If we execute a non-post source, let's mark all
1825                  * post sources as pending */
1826
1827                 SET_FOREACH(z, s->event->post_sources, i) {
1828                         if (z->enabled == SD_EVENT_OFF)
1829                                 continue;
1830
1831                         r = source_set_pending(z, true);
1832                         if (r < 0)
1833                                 return r;
1834                 }
1835         }
1836
1837         if (s->enabled == SD_EVENT_ONESHOT) {
1838                 r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
1839                 if (r < 0)
1840                         return r;
1841         }
1842
1843         s->dispatching = true;
1844
1845         switch (s->type) {
1846
1847         case SOURCE_IO:
1848                 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1849                 break;
1850
1851         case SOURCE_MONOTONIC:
1852                 r = s->time.callback(s, s->time.next, s->userdata);
1853                 break;
1854
1855         case SOURCE_REALTIME:
1856                 r = s->time.callback(s, s->time.next, s->userdata);
1857                 break;
1858
1859         case SOURCE_SIGNAL:
1860                 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1861                 break;
1862
1863         case SOURCE_CHILD: {
1864                 bool zombie;
1865
1866                 zombie = s->child.siginfo.si_code == CLD_EXITED ||
1867                          s->child.siginfo.si_code == CLD_KILLED ||
1868                          s->child.siginfo.si_code == CLD_DUMPED;
1869
1870                 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1871
1872                 /* Now, reap the PID for good. */
1873                 if (zombie)
1874                         waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|WEXITED);
1875
1876                 break;
1877         }
1878
1879         case SOURCE_DEFER:
1880                 r = s->defer.callback(s, s->userdata);
1881                 break;
1882
1883         case SOURCE_POST:
1884                 r = s->post.callback(s, s->userdata);
1885                 break;
1886
1887         case SOURCE_EXIT:
1888                 r = s->exit.callback(s, s->userdata);
1889                 break;
1890
1891         case SOURCE_WATCHDOG:
1892                 assert_not_reached("Wut? I shouldn't exist.");
1893         }
1894
1895         s->dispatching = false;
1896
1897         if (r < 0)
1898                 log_debug("Event source %p returned error, disabling: %s", s, strerror(-r));
1899
1900         if (s->n_ref == 0)
1901                 source_free(s);
1902         else if (r < 0)
1903                 sd_event_source_set_enabled(s, SD_EVENT_OFF);
1904
1905         return 1;
1906 }
1907
1908 static int event_prepare(sd_event *e) {
1909         int r;
1910
1911         assert(e);
1912
1913         for (;;) {
1914                 sd_event_source *s;
1915
1916                 s = prioq_peek(e->prepare);
1917                 if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
1918                         break;
1919
1920                 s->prepare_iteration = e->iteration;
1921                 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1922                 if (r < 0)
1923                         return r;
1924
1925                 assert(s->prepare);
1926
1927                 s->dispatching = true;
1928                 r = s->prepare(s, s->userdata);
1929                 s->dispatching = false;
1930
1931                 if (r < 0)
1932                         log_debug("Prepare callback of event source %p returned error, disabling: %s", s, strerror(-r));
1933
1934                 if (s->n_ref == 0)
1935                         source_free(s);
1936                 else if (r < 0)
1937                         sd_event_source_set_enabled(s, SD_EVENT_OFF);
1938         }
1939
1940         return 0;
1941 }
1942
1943 static int dispatch_exit(sd_event *e) {
1944         sd_event_source *p;
1945         int r;
1946
1947         assert(e);
1948
1949         p = prioq_peek(e->exit);
1950         if (!p || p->enabled == SD_EVENT_OFF) {
1951                 e->state = SD_EVENT_FINISHED;
1952                 return 0;
1953         }
1954
1955         sd_event_ref(e);
1956         e->iteration++;
1957         e->state = SD_EVENT_EXITING;
1958
1959         r = source_dispatch(p);
1960
1961         e->state = SD_EVENT_PASSIVE;
1962         sd_event_unref(e);
1963
1964         return r;
1965 }
1966
1967 static sd_event_source* event_next_pending(sd_event *e) {
1968         sd_event_source *p;
1969
1970         assert(e);
1971
1972         p = prioq_peek(e->pending);
1973         if (!p)
1974                 return NULL;
1975
1976         if (p->enabled == SD_EVENT_OFF)
1977                 return NULL;
1978
1979         return p;
1980 }
1981
1982 static int arm_watchdog(sd_event *e) {
1983         struct itimerspec its = {};
1984         usec_t t;
1985         int r;
1986
1987         assert(e);
1988         assert(e->watchdog_fd >= 0);
1989
1990         t = sleep_between(e,
1991                           e->watchdog_last + (e->watchdog_period / 2),
1992                           e->watchdog_last + (e->watchdog_period * 3 / 4));
1993
1994         timespec_store(&its.it_value, t);
1995
1996         r = timerfd_settime(e->watchdog_fd, TFD_TIMER_ABSTIME, &its, NULL);
1997         if (r < 0)
1998                 return -errno;
1999
2000         return 0;
2001 }
2002
2003 static int process_watchdog(sd_event *e) {
2004         assert(e);
2005
2006         if (!e->watchdog)
2007                 return 0;
2008
2009         /* Don't notify watchdog too often */
2010         if (e->watchdog_last + e->watchdog_period / 4 > e->timestamp.monotonic)
2011                 return 0;
2012
2013         sd_notify(false, "WATCHDOG=1");
2014         e->watchdog_last = e->timestamp.monotonic;
2015
2016         return arm_watchdog(e);
2017 }
2018
2019 _public_ int sd_event_run(sd_event *e, uint64_t timeout) {
2020         struct epoll_event *ev_queue;
2021         unsigned ev_queue_max;
2022         sd_event_source *p;
2023         int r, i, m;
2024
2025         assert_return(e, -EINVAL);
2026         assert_return(!event_pid_changed(e), -ECHILD);
2027         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
2028         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
2029
2030         if (e->exit_requested)
2031                 return dispatch_exit(e);
2032
2033         sd_event_ref(e);
2034         e->iteration++;
2035         e->state = SD_EVENT_RUNNING;
2036
2037         r = event_prepare(e);
2038         if (r < 0)
2039                 goto finish;
2040
2041         r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
2042         if (r < 0)
2043                 goto finish;
2044
2045         r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
2046         if (r < 0)
2047                 goto finish;
2048
2049         if (event_next_pending(e) || e->need_process_child)
2050                 timeout = 0;
2051         ev_queue_max = CLAMP(e->n_sources, 1U, EPOLL_QUEUE_MAX);
2052         ev_queue = newa(struct epoll_event, ev_queue_max);
2053
2054         m = epoll_wait(e->epoll_fd, ev_queue, ev_queue_max,
2055                        timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
2056         if (m < 0) {
2057                 r = errno == EAGAIN || errno == EINTR ? 1 : -errno;
2058                 goto finish;
2059         }
2060
2061         dual_timestamp_get(&e->timestamp);
2062
2063         for (i = 0; i < m; i++) {
2064
2065                 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
2066                         r = flush_timer(e, e->monotonic_fd, ev_queue[i].events, &e->monotonic_next);
2067                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
2068                         r = flush_timer(e, e->realtime_fd, ev_queue[i].events, &e->realtime_next);
2069                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
2070                         r = process_signal(e, ev_queue[i].events);
2071                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_WATCHDOG))
2072                         r = flush_timer(e, e->watchdog_fd, ev_queue[i].events, NULL);
2073                 else
2074                         r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
2075
2076                 if (r < 0)
2077                         goto finish;
2078         }
2079
2080         r = process_watchdog(e);
2081         if (r < 0)
2082                 goto finish;
2083
2084         r = process_timer(e, e->timestamp.monotonic, e->monotonic_earliest, e->monotonic_latest);
2085         if (r < 0)
2086                 goto finish;
2087
2088         r = process_timer(e, e->timestamp.realtime, e->realtime_earliest, e->realtime_latest);
2089         if (r < 0)
2090                 goto finish;
2091
2092         if (e->need_process_child) {
2093                 r = process_child(e);
2094                 if (r < 0)
2095                         goto finish;
2096         }
2097
2098         p = event_next_pending(e);
2099         if (!p) {
2100                 r = 1;
2101                 goto finish;
2102         }
2103
2104         r = source_dispatch(p);
2105
2106 finish:
2107         e->state = SD_EVENT_PASSIVE;
2108         sd_event_unref(e);
2109
2110         return r;
2111 }
2112
2113 _public_ int sd_event_loop(sd_event *e) {
2114         int r;
2115
2116         assert_return(e, -EINVAL);
2117         assert_return(!event_pid_changed(e), -ECHILD);
2118         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
2119
2120         sd_event_ref(e);
2121
2122         while (e->state != SD_EVENT_FINISHED) {
2123                 r = sd_event_run(e, (uint64_t) -1);
2124                 if (r < 0)
2125                         goto finish;
2126         }
2127
2128         r = e->exit_code;
2129
2130 finish:
2131         sd_event_unref(e);
2132         return r;
2133 }
2134
2135 _public_ int sd_event_get_state(sd_event *e) {
2136         assert_return(e, -EINVAL);
2137         assert_return(!event_pid_changed(e), -ECHILD);
2138
2139         return e->state;
2140 }
2141
2142 _public_ int sd_event_get_exit_code(sd_event *e, int *code) {
2143         assert_return(e, -EINVAL);
2144         assert_return(code, -EINVAL);
2145         assert_return(!event_pid_changed(e), -ECHILD);
2146
2147         if (!e->exit_requested)
2148                 return -ENODATA;
2149
2150         *code = e->exit_code;
2151         return 0;
2152 }
2153
2154 _public_ int sd_event_exit(sd_event *e, int code) {
2155         assert_return(e, -EINVAL);
2156         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
2157         assert_return(!event_pid_changed(e), -ECHILD);
2158
2159         e->exit_requested = true;
2160         e->exit_code = code;
2161
2162         return 0;
2163 }
2164
2165 _public_ int sd_event_get_now_realtime(sd_event *e, uint64_t *usec) {
2166         assert_return(e, -EINVAL);
2167         assert_return(usec, -EINVAL);
2168         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
2169         assert_return(!event_pid_changed(e), -ECHILD);
2170
2171         *usec = e->timestamp.realtime;
2172         return 0;
2173 }
2174
2175 _public_ int sd_event_get_now_monotonic(sd_event *e, uint64_t *usec) {
2176         assert_return(e, -EINVAL);
2177         assert_return(usec, -EINVAL);
2178         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
2179         assert_return(!event_pid_changed(e), -ECHILD);
2180
2181         *usec = e->timestamp.monotonic;
2182         return 0;
2183 }
2184
2185 _public_ int sd_event_default(sd_event **ret) {
2186
2187         static thread_local sd_event *default_event = NULL;
2188         sd_event *e = NULL;
2189         int r;
2190
2191         if (!ret)
2192                 return !!default_event;
2193
2194         if (default_event) {
2195                 *ret = sd_event_ref(default_event);
2196                 return 0;
2197         }
2198
2199         r = sd_event_new(&e);
2200         if (r < 0)
2201                 return r;
2202
2203         e->default_event_ptr = &default_event;
2204         e->tid = gettid();
2205         default_event = e;
2206
2207         *ret = e;
2208         return 1;
2209 }
2210
2211 _public_ int sd_event_get_tid(sd_event *e, pid_t *tid) {
2212         assert_return(e, -EINVAL);
2213         assert_return(tid, -EINVAL);
2214         assert_return(!event_pid_changed(e), -ECHILD);
2215
2216         if (e->tid != 0) {
2217                 *tid = e->tid;
2218                 return 0;
2219         }
2220
2221         return -ENXIO;
2222 }
2223
2224 _public_ int sd_event_set_watchdog(sd_event *e, int b) {
2225         int r;
2226
2227         assert_return(e, -EINVAL);
2228         assert_return(!event_pid_changed(e), -ECHILD);
2229
2230         if (e->watchdog == !!b)
2231                 return e->watchdog;
2232
2233         if (b) {
2234                 struct epoll_event ev = {};
2235
2236                 r = sd_watchdog_enabled(false, &e->watchdog_period);
2237                 if (r <= 0)
2238                         return r;
2239
2240                 /* Issue first ping immediately */
2241                 sd_notify(false, "WATCHDOG=1");
2242                 e->watchdog_last = now(CLOCK_MONOTONIC);
2243
2244                 e->watchdog_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC);
2245                 if (e->watchdog_fd < 0)
2246                         return -errno;
2247
2248                 r = arm_watchdog(e);
2249                 if (r < 0)
2250                         goto fail;
2251
2252                 ev.events = EPOLLIN;
2253                 ev.data.ptr = INT_TO_PTR(SOURCE_WATCHDOG);
2254
2255                 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->watchdog_fd, &ev);
2256                 if (r < 0) {
2257                         r = -errno;
2258                         goto fail;
2259                 }
2260
2261         } else {
2262                 if (e->watchdog_fd >= 0) {
2263                         epoll_ctl(e->epoll_fd, EPOLL_CTL_DEL, e->watchdog_fd, NULL);
2264                         e->watchdog_fd = safe_close(e->watchdog_fd);
2265                 }
2266         }
2267
2268         e->watchdog = !!b;
2269         return e->watchdog;
2270
2271 fail:
2272         e->watchdog_fd = safe_close(e->watchdog_fd);
2273         return r;
2274 }
2275
2276 _public_ int sd_event_get_watchdog(sd_event *e) {
2277         assert_return(e, -EINVAL);
2278         assert_return(!event_pid_changed(e), -ECHILD);
2279
2280         return e->watchdog;
2281 }