chiark / gitweb /
event: dynamically adjust size of events array instead of pre-allocating it possibly...
[elogind.git] / src / libsystemd-bus / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25
26 #include "sd-id128.h"
27 #include "sd-daemon.h"
28 #include "macro.h"
29 #include "prioq.h"
30 #include "hashmap.h"
31 #include "util.h"
32 #include "time-util.h"
33 #include "missing.h"
34
35 #include "sd-event.h"
36
37 #define EPOLL_QUEUE_MAX 512U
38 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
39
40 typedef enum EventSourceType {
41         SOURCE_IO,
42         SOURCE_MONOTONIC,
43         SOURCE_REALTIME,
44         SOURCE_SIGNAL,
45         SOURCE_CHILD,
46         SOURCE_DEFER,
47         SOURCE_EXIT,
48         SOURCE_WATCHDOG
49 } EventSourceType;
50
51 struct sd_event_source {
52         unsigned n_ref;
53
54         sd_event *event;
55         void *userdata;
56         sd_event_handler_t prepare;
57
58         EventSourceType type:4;
59         int enabled:3;
60         bool pending:1;
61         bool dispatching:1;
62
63         int priority;
64         unsigned pending_index;
65         unsigned prepare_index;
66         unsigned pending_iteration;
67         unsigned prepare_iteration;
68
69         union {
70                 struct {
71                         sd_event_io_handler_t callback;
72                         int fd;
73                         uint32_t events;
74                         uint32_t revents;
75                         bool registered:1;
76                 } io;
77                 struct {
78                         sd_event_time_handler_t callback;
79                         usec_t next, accuracy;
80                         unsigned earliest_index;
81                         unsigned latest_index;
82                 } time;
83                 struct {
84                         sd_event_signal_handler_t callback;
85                         struct signalfd_siginfo siginfo;
86                         int sig;
87                 } signal;
88                 struct {
89                         sd_event_child_handler_t callback;
90                         siginfo_t siginfo;
91                         pid_t pid;
92                         int options;
93                 } child;
94                 struct {
95                         sd_event_handler_t callback;
96                 } defer;
97                 struct {
98                         sd_event_handler_t callback;
99                         unsigned prioq_index;
100                 } exit;
101         };
102 };
103
104 struct sd_event {
105         unsigned n_ref;
106
107         int epoll_fd;
108         int signal_fd;
109         int realtime_fd;
110         int monotonic_fd;
111         int watchdog_fd;
112
113         Prioq *pending;
114         Prioq *prepare;
115
116         /* For both clocks we maintain two priority queues each, one
117          * ordered for the earliest times the events may be
118          * dispatched, and one ordered by the latest times they must
119          * have been dispatched. The range between the top entries in
120          * the two prioqs is the time window we can freely schedule
121          * wakeups in */
122         Prioq *monotonic_earliest;
123         Prioq *monotonic_latest;
124         Prioq *realtime_earliest;
125         Prioq *realtime_latest;
126
127         usec_t realtime_next, monotonic_next;
128         usec_t perturb;
129
130         sigset_t sigset;
131         sd_event_source **signal_sources;
132
133         Hashmap *child_sources;
134         unsigned n_enabled_child_sources;
135
136         Prioq *exit;
137
138         pid_t original_pid;
139
140         unsigned iteration;
141         dual_timestamp timestamp;
142         int state;
143
144         bool exit_requested:1;
145         bool need_process_child:1;
146         bool watchdog:1;
147
148         int exit_code;
149
150         pid_t tid;
151         sd_event **default_event_ptr;
152
153         usec_t watchdog_last, watchdog_period;
154
155         unsigned n_sources;
156 };
157
158 static int pending_prioq_compare(const void *a, const void *b) {
159         const sd_event_source *x = a, *y = b;
160
161         assert(x->pending);
162         assert(y->pending);
163
164         /* Enabled ones first */
165         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
166                 return -1;
167         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
168                 return 1;
169
170         /* Lower priority values first */
171         if (x->priority < y->priority)
172                 return -1;
173         if (x->priority > y->priority)
174                 return 1;
175
176         /* Older entries first */
177         if (x->pending_iteration < y->pending_iteration)
178                 return -1;
179         if (x->pending_iteration > y->pending_iteration)
180                 return 1;
181
182         /* Stability for the rest */
183         if (x < y)
184                 return -1;
185         if (x > y)
186                 return 1;
187
188         return 0;
189 }
190
191 static int prepare_prioq_compare(const void *a, const void *b) {
192         const sd_event_source *x = a, *y = b;
193
194         assert(x->prepare);
195         assert(y->prepare);
196
197         /* Move most recently prepared ones last, so that we can stop
198          * preparing as soon as we hit one that has already been
199          * prepared in the current iteration */
200         if (x->prepare_iteration < y->prepare_iteration)
201                 return -1;
202         if (x->prepare_iteration > y->prepare_iteration)
203                 return 1;
204
205         /* Enabled ones first */
206         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
207                 return -1;
208         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
209                 return 1;
210
211         /* Lower priority values first */
212         if (x->priority < y->priority)
213                 return -1;
214         if (x->priority > y->priority)
215                 return 1;
216
217         /* Stability for the rest */
218         if (x < y)
219                 return -1;
220         if (x > y)
221                 return 1;
222
223         return 0;
224 }
225
226 static int earliest_time_prioq_compare(const void *a, const void *b) {
227         const sd_event_source *x = a, *y = b;
228
229         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
230         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
231
232         /* Enabled ones first */
233         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
234                 return -1;
235         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
236                 return 1;
237
238         /* Move the pending ones to the end */
239         if (!x->pending && y->pending)
240                 return -1;
241         if (x->pending && !y->pending)
242                 return 1;
243
244         /* Order by time */
245         if (x->time.next < y->time.next)
246                 return -1;
247         if (x->time.next > y->time.next)
248                 return 1;
249
250         /* Stability for the rest */
251         if (x < y)
252                 return -1;
253         if (x > y)
254                 return 1;
255
256         return 0;
257 }
258
259 static int latest_time_prioq_compare(const void *a, const void *b) {
260         const sd_event_source *x = a, *y = b;
261
262         assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) ||
263                (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME));
264
265         /* Enabled ones first */
266         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
267                 return -1;
268         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
269                 return 1;
270
271         /* Move the pending ones to the end */
272         if (!x->pending && y->pending)
273                 return -1;
274         if (x->pending && !y->pending)
275                 return 1;
276
277         /* Order by time */
278         if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
279                 return -1;
280         if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
281                 return 1;
282
283         /* Stability for the rest */
284         if (x < y)
285                 return -1;
286         if (x > y)
287                 return 1;
288
289         return 0;
290 }
291
292 static int exit_prioq_compare(const void *a, const void *b) {
293         const sd_event_source *x = a, *y = b;
294
295         assert(x->type == SOURCE_EXIT);
296         assert(y->type == SOURCE_EXIT);
297
298         /* Enabled ones first */
299         if (x->enabled != SD_EVENT_OFF && y->enabled == SD_EVENT_OFF)
300                 return -1;
301         if (x->enabled == SD_EVENT_OFF && y->enabled != SD_EVENT_OFF)
302                 return 1;
303
304         /* Lower priority values first */
305         if (x->priority < y->priority)
306                 return -1;
307         if (x->priority > y->priority)
308                 return 1;
309
310         /* Stability for the rest */
311         if (x < y)
312                 return -1;
313         if (x > y)
314                 return 1;
315
316         return 0;
317 }
318
319 static void event_free(sd_event *e) {
320         assert(e);
321         assert(e->n_sources == 0);
322
323         if (e->default_event_ptr)
324                 *(e->default_event_ptr) = NULL;
325
326         if (e->epoll_fd >= 0)
327                 close_nointr_nofail(e->epoll_fd);
328
329         if (e->signal_fd >= 0)
330                 close_nointr_nofail(e->signal_fd);
331
332         if (e->realtime_fd >= 0)
333                 close_nointr_nofail(e->realtime_fd);
334
335         if (e->monotonic_fd >= 0)
336                 close_nointr_nofail(e->monotonic_fd);
337
338         if (e->watchdog_fd >= 0)
339                 close_nointr_nofail(e->watchdog_fd);
340
341         prioq_free(e->pending);
342         prioq_free(e->prepare);
343         prioq_free(e->monotonic_earliest);
344         prioq_free(e->monotonic_latest);
345         prioq_free(e->realtime_earliest);
346         prioq_free(e->realtime_latest);
347         prioq_free(e->exit);
348
349         free(e->signal_sources);
350
351         hashmap_free(e->child_sources);
352         free(e);
353 }
354
355 _public_ int sd_event_new(sd_event** ret) {
356         sd_event *e;
357         int r;
358
359         assert_return(ret, -EINVAL);
360
361         e = new0(sd_event, 1);
362         if (!e)
363                 return -ENOMEM;
364
365         e->n_ref = 1;
366         e->signal_fd = e->realtime_fd = e->monotonic_fd = e->watchdog_fd = e->epoll_fd = -1;
367         e->realtime_next = e->monotonic_next = (usec_t) -1;
368         e->original_pid = getpid();
369
370         assert_se(sigemptyset(&e->sigset) == 0);
371
372         e->pending = prioq_new(pending_prioq_compare);
373         if (!e->pending) {
374                 r = -ENOMEM;
375                 goto fail;
376         }
377
378         e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
379         if (e->epoll_fd < 0) {
380                 r = -errno;
381                 goto fail;
382         }
383
384         *ret = e;
385         return 0;
386
387 fail:
388         event_free(e);
389         return r;
390 }
391
392 _public_ sd_event* sd_event_ref(sd_event *e) {
393         assert_return(e, NULL);
394
395         assert(e->n_ref >= 1);
396         e->n_ref++;
397
398         return e;
399 }
400
401 _public_ sd_event* sd_event_unref(sd_event *e) {
402
403         if (!e)
404                 return NULL;
405
406         assert(e->n_ref >= 1);
407         e->n_ref--;
408
409         if (e->n_ref <= 0)
410                 event_free(e);
411
412         return NULL;
413 }
414
415 static bool event_pid_changed(sd_event *e) {
416         assert(e);
417
418         /* We don't support people creating am event loop and keeping
419          * it around over a fork(). Let's complain. */
420
421         return e->original_pid != getpid();
422 }
423
424 static int source_io_unregister(sd_event_source *s) {
425         int r;
426
427         assert(s);
428         assert(s->type == SOURCE_IO);
429
430         if (!s->io.registered)
431                 return 0;
432
433         r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
434         if (r < 0)
435                 return -errno;
436
437         s->io.registered = false;
438         return 0;
439 }
440
441 static int source_io_register(
442                 sd_event_source *s,
443                 int enabled,
444                 uint32_t events) {
445
446         struct epoll_event ev = {};
447         int r;
448
449         assert(s);
450         assert(s->type == SOURCE_IO);
451         assert(enabled != SD_EVENT_OFF);
452
453         ev.events = events;
454         ev.data.ptr = s;
455
456         if (enabled == SD_EVENT_ONESHOT)
457                 ev.events |= EPOLLONESHOT;
458
459         if (s->io.registered)
460                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
461         else
462                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
463
464         if (r < 0)
465                 return -errno;
466
467         s->io.registered = true;
468
469         return 0;
470 }
471
472 static void source_free(sd_event_source *s) {
473         assert(s);
474
475         if (s->event) {
476                 assert(s->event->n_sources > 0);
477
478                 switch (s->type) {
479
480                 case SOURCE_IO:
481                         if (s->io.fd >= 0)
482                                 source_io_unregister(s);
483
484                         break;
485
486                 case SOURCE_MONOTONIC:
487                         prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
488                         prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
489                         break;
490
491                 case SOURCE_REALTIME:
492                         prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
493                         prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
494                         break;
495
496                 case SOURCE_SIGNAL:
497                         if (s->signal.sig > 0) {
498                                 if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)
499                                         assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
500
501                                 if (s->event->signal_sources)
502                                         s->event->signal_sources[s->signal.sig] = NULL;
503                         }
504
505                         break;
506
507                 case SOURCE_CHILD:
508                         if (s->child.pid > 0) {
509                                 if (s->enabled != SD_EVENT_OFF) {
510                                         assert(s->event->n_enabled_child_sources > 0);
511                                         s->event->n_enabled_child_sources--;
512                                 }
513
514                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
515                                         assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
516
517                                 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
518                         }
519
520                         break;
521
522                 case SOURCE_DEFER:
523                         /* nothing */
524                         break;
525
526                 case SOURCE_EXIT:
527                         prioq_remove(s->event->exit, s, &s->exit.prioq_index);
528                         break;
529
530                 case SOURCE_WATCHDOG:
531                         assert_not_reached("Wut? I shouldn't exist.");
532                 }
533
534                 if (s->pending)
535                         prioq_remove(s->event->pending, s, &s->pending_index);
536
537                 if (s->prepare)
538                         prioq_remove(s->event->prepare, s, &s->prepare_index);
539
540                 s->event->n_sources--;
541                 sd_event_unref(s->event);
542         }
543
544         free(s);
545 }
546
547 static int source_set_pending(sd_event_source *s, bool b) {
548         int r;
549
550         assert(s);
551         assert(s->type != SOURCE_EXIT);
552
553         if (s->pending == b)
554                 return 0;
555
556         s->pending = b;
557
558         if (b) {
559                 s->pending_iteration = s->event->iteration;
560
561                 r = prioq_put(s->event->pending, s, &s->pending_index);
562                 if (r < 0) {
563                         s->pending = false;
564                         return r;
565                 }
566         } else
567                 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
568
569         if (s->type == SOURCE_REALTIME) {
570                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
571                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
572         } else if (s->type == SOURCE_MONOTONIC) {
573                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
574                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
575         }
576
577         return 0;
578 }
579
580 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
581         sd_event_source *s;
582
583         assert(e);
584
585         s = new0(sd_event_source, 1);
586         if (!s)
587                 return NULL;
588
589         s->n_ref = 1;
590         s->event = sd_event_ref(e);
591         s->type = type;
592         s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
593
594         e->n_sources ++;
595
596         return s;
597 }
598
599 _public_ int sd_event_add_io(
600                 sd_event *e,
601                 int fd,
602                 uint32_t events,
603                 sd_event_io_handler_t callback,
604                 void *userdata,
605                 sd_event_source **ret) {
606
607         sd_event_source *s;
608         int r;
609
610         assert_return(e, -EINVAL);
611         assert_return(fd >= 0, -EINVAL);
612         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP|EPOLLET)), -EINVAL);
613         assert_return(callback, -EINVAL);
614         assert_return(ret, -EINVAL);
615         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
616         assert_return(!event_pid_changed(e), -ECHILD);
617
618         s = source_new(e, SOURCE_IO);
619         if (!s)
620                 return -ENOMEM;
621
622         s->io.fd = fd;
623         s->io.events = events;
624         s->io.callback = callback;
625         s->userdata = userdata;
626         s->enabled = SD_EVENT_ON;
627
628         r = source_io_register(s, s->enabled, events);
629         if (r < 0) {
630                 source_free(s);
631                 return -errno;
632         }
633
634         *ret = s;
635         return 0;
636 }
637
638 static int event_setup_timer_fd(
639                 sd_event *e,
640                 EventSourceType type,
641                 int *timer_fd,
642                 clockid_t id) {
643
644         struct epoll_event ev = {};
645         int r, fd;
646         sd_id128_t bootid;
647
648         assert(e);
649         assert(timer_fd);
650
651         if (_likely_(*timer_fd >= 0))
652                 return 0;
653
654         fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
655         if (fd < 0)
656                 return -errno;
657
658         ev.events = EPOLLIN;
659         ev.data.ptr = INT_TO_PTR(type);
660
661         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
662         if (r < 0) {
663                 close_nointr_nofail(fd);
664                 return -errno;
665         }
666
667         /* When we sleep for longer, we try to realign the wakeup to
668            the same time wihtin each minute/second/250ms, so that
669            events all across the system can be coalesced into a single
670            CPU wakeup. However, let's take some system-specific
671            randomness for this value, so that in a network of systems
672            with synced clocks timer events are distributed a
673            bit. Here, we calculate a perturbation usec offset from the
674            boot ID. */
675
676         if (sd_id128_get_boot(&bootid) >= 0)
677                 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_MINUTE;
678
679         *timer_fd = fd;
680         return 0;
681 }
682
683 static int event_add_time_internal(
684                 sd_event *e,
685                 EventSourceType type,
686                 int *timer_fd,
687                 clockid_t id,
688                 Prioq **earliest,
689                 Prioq **latest,
690                 uint64_t usec,
691                 uint64_t accuracy,
692                 sd_event_time_handler_t callback,
693                 void *userdata,
694                 sd_event_source **ret) {
695
696         sd_event_source *s;
697         int r;
698
699         assert_return(e, -EINVAL);
700         assert_return(callback, -EINVAL);
701         assert_return(ret, -EINVAL);
702         assert_return(usec != (uint64_t) -1, -EINVAL);
703         assert_return(accuracy != (uint64_t) -1, -EINVAL);
704         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
705         assert_return(!event_pid_changed(e), -ECHILD);
706
707         assert(timer_fd);
708         assert(earliest);
709         assert(latest);
710
711         if (!*earliest) {
712                 *earliest = prioq_new(earliest_time_prioq_compare);
713                 if (!*earliest)
714                         return -ENOMEM;
715         }
716
717         if (!*latest) {
718                 *latest = prioq_new(latest_time_prioq_compare);
719                 if (!*latest)
720                         return -ENOMEM;
721         }
722
723         if (*timer_fd < 0) {
724                 r = event_setup_timer_fd(e, type, timer_fd, id);
725                 if (r < 0)
726                         return r;
727         }
728
729         s = source_new(e, type);
730         if (!s)
731                 return -ENOMEM;
732
733         s->time.next = usec;
734         s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
735         s->time.callback = callback;
736         s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL;
737         s->userdata = userdata;
738         s->enabled = SD_EVENT_ONESHOT;
739
740         r = prioq_put(*earliest, s, &s->time.earliest_index);
741         if (r < 0)
742                 goto fail;
743
744         r = prioq_put(*latest, s, &s->time.latest_index);
745         if (r < 0)
746                 goto fail;
747
748         *ret = s;
749         return 0;
750
751 fail:
752         source_free(s);
753         return r;
754 }
755
756 _public_ int sd_event_add_monotonic(sd_event *e,
757                                     uint64_t usec,
758                                     uint64_t accuracy,
759                                     sd_event_time_handler_t callback,
760                                     void *userdata,
761                                     sd_event_source **ret) {
762
763         return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
764 }
765
766 _public_ int sd_event_add_realtime(sd_event *e,
767                                    uint64_t usec,
768                                    uint64_t accuracy,
769                                    sd_event_time_handler_t callback,
770                                    void *userdata,
771                                    sd_event_source **ret) {
772
773         return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
774 }
775
776 static int event_update_signal_fd(sd_event *e) {
777         struct epoll_event ev = {};
778         bool add_to_epoll;
779         int r;
780
781         assert(e);
782
783         add_to_epoll = e->signal_fd < 0;
784
785         r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
786         if (r < 0)
787                 return -errno;
788
789         e->signal_fd = r;
790
791         if (!add_to_epoll)
792                 return 0;
793
794         ev.events = EPOLLIN;
795         ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
796
797         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
798         if (r < 0) {
799                 close_nointr_nofail(e->signal_fd);
800                 e->signal_fd = -1;
801
802                 return -errno;
803         }
804
805         return 0;
806 }
807
808 _public_ int sd_event_add_signal(
809                 sd_event *e,
810                 int sig,
811                 sd_event_signal_handler_t callback,
812                 void *userdata,
813                 sd_event_source **ret) {
814
815         sd_event_source *s;
816         int r;
817
818         assert_return(e, -EINVAL);
819         assert_return(sig > 0, -EINVAL);
820         assert_return(sig < _NSIG, -EINVAL);
821         assert_return(callback, -EINVAL);
822         assert_return(ret, -EINVAL);
823         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
824         assert_return(!event_pid_changed(e), -ECHILD);
825
826         if (!e->signal_sources) {
827                 e->signal_sources = new0(sd_event_source*, _NSIG);
828                 if (!e->signal_sources)
829                         return -ENOMEM;
830         } else if (e->signal_sources[sig])
831                 return -EBUSY;
832
833         s = source_new(e, SOURCE_SIGNAL);
834         if (!s)
835                 return -ENOMEM;
836
837         s->signal.sig = sig;
838         s->signal.callback = callback;
839         s->userdata = userdata;
840         s->enabled = SD_EVENT_ON;
841
842         e->signal_sources[sig] = s;
843         assert_se(sigaddset(&e->sigset, sig) == 0);
844
845         if (sig != SIGCHLD || e->n_enabled_child_sources == 0) {
846                 r = event_update_signal_fd(e);
847                 if (r < 0) {
848                         source_free(s);
849                         return r;
850                 }
851         }
852
853         *ret = s;
854         return 0;
855 }
856
857 _public_ int sd_event_add_child(
858                 sd_event *e,
859                 pid_t pid,
860                 int options,
861                 sd_event_child_handler_t callback,
862                 void *userdata,
863                 sd_event_source **ret) {
864
865         sd_event_source *s;
866         int r;
867
868         assert_return(e, -EINVAL);
869         assert_return(pid > 1, -EINVAL);
870         assert_return(!(options & ~(WEXITED|WSTOPPED|WCONTINUED)), -EINVAL);
871         assert_return(options != 0, -EINVAL);
872         assert_return(callback, -EINVAL);
873         assert_return(ret, -EINVAL);
874         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
875         assert_return(!event_pid_changed(e), -ECHILD);
876
877         r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
878         if (r < 0)
879                 return r;
880
881         if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
882                 return -EBUSY;
883
884         s = source_new(e, SOURCE_CHILD);
885         if (!s)
886                 return -ENOMEM;
887
888         s->child.pid = pid;
889         s->child.options = options;
890         s->child.callback = callback;
891         s->userdata = userdata;
892         s->enabled = SD_EVENT_ONESHOT;
893
894         r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
895         if (r < 0) {
896                 source_free(s);
897                 return r;
898         }
899
900         e->n_enabled_child_sources ++;
901
902         assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
903
904         if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
905                 r = event_update_signal_fd(e);
906                 if (r < 0) {
907                         source_free(s);
908                         return -errno;
909                 }
910         }
911
912         e->need_process_child = true;
913
914         *ret = s;
915         return 0;
916 }
917
918 _public_ int sd_event_add_defer(
919                 sd_event *e,
920                 sd_event_handler_t callback,
921                 void *userdata,
922                 sd_event_source **ret) {
923
924         sd_event_source *s;
925         int r;
926
927         assert_return(e, -EINVAL);
928         assert_return(callback, -EINVAL);
929         assert_return(ret, -EINVAL);
930         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
931         assert_return(!event_pid_changed(e), -ECHILD);
932
933         s = source_new(e, SOURCE_DEFER);
934         if (!s)
935                 return -ENOMEM;
936
937         s->defer.callback = callback;
938         s->userdata = userdata;
939         s->enabled = SD_EVENT_ONESHOT;
940
941         r = source_set_pending(s, true);
942         if (r < 0) {
943                 source_free(s);
944                 return r;
945         }
946
947         *ret = s;
948         return 0;
949 }
950
951 _public_ int sd_event_add_exit(
952                 sd_event *e,
953                 sd_event_handler_t callback,
954                 void *userdata,
955                 sd_event_source **ret) {
956
957         sd_event_source *s;
958         int r;
959
960         assert_return(e, -EINVAL);
961         assert_return(callback, -EINVAL);
962         assert_return(ret, -EINVAL);
963         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
964         assert_return(!event_pid_changed(e), -ECHILD);
965
966         if (!e->exit) {
967                 e->exit = prioq_new(exit_prioq_compare);
968                 if (!e->exit)
969                         return -ENOMEM;
970         }
971
972         s = source_new(e, SOURCE_EXIT);
973         if (!s)
974                 return -ENOMEM;
975
976         s->exit.callback = callback;
977         s->userdata = userdata;
978         s->exit.prioq_index = PRIOQ_IDX_NULL;
979         s->enabled = SD_EVENT_ONESHOT;
980
981         r = prioq_put(s->event->exit, s, &s->exit.prioq_index);
982         if (r < 0) {
983                 source_free(s);
984                 return r;
985         }
986
987         *ret = s;
988         return 0;
989 }
990
991 _public_ sd_event_source* sd_event_source_ref(sd_event_source *s) {
992         assert_return(s, NULL);
993
994         assert(s->n_ref >= 1);
995         s->n_ref++;
996
997         return s;
998 }
999
1000 _public_ sd_event_source* sd_event_source_unref(sd_event_source *s) {
1001
1002         if (!s)
1003                 return NULL;
1004
1005         assert(s->n_ref >= 1);
1006         s->n_ref--;
1007
1008         if (s->n_ref <= 0) {
1009                 /* Here's a special hack: when we are called from a
1010                  * dispatch handler we won't free the event source
1011                  * immediately, but we will detach the fd from the
1012                  * epoll. This way it is safe for the caller to unref
1013                  * the event source and immediately close the fd, but
1014                  * we still retain a valid event source object after
1015                  * the callback. */
1016
1017                 if (s->dispatching) {
1018                         if (s->type == SOURCE_IO)
1019                                 source_io_unregister(s);
1020                 } else
1021                         source_free(s);
1022         }
1023
1024         return NULL;
1025 }
1026
1027 _public_ sd_event *sd_event_source_get_event(sd_event_source *s) {
1028         assert_return(s, NULL);
1029
1030         return s->event;
1031 }
1032
1033 _public_ int sd_event_source_get_pending(sd_event_source *s) {
1034         assert_return(s, -EINVAL);
1035         assert_return(s->type != SOURCE_EXIT, -EDOM);
1036         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1037         assert_return(!event_pid_changed(s->event), -ECHILD);
1038
1039         return s->pending;
1040 }
1041
1042 _public_ int sd_event_source_get_io_fd(sd_event_source *s) {
1043         assert_return(s, -EINVAL);
1044         assert_return(s->type == SOURCE_IO, -EDOM);
1045         assert_return(!event_pid_changed(s->event), -ECHILD);
1046
1047         return s->io.fd;
1048 }
1049
1050 _public_ int sd_event_source_set_io_fd(sd_event_source *s, int fd) {
1051         int r;
1052
1053         assert_return(s, -EINVAL);
1054         assert_return(fd >= 0, -EINVAL);
1055         assert_return(s->type == SOURCE_IO, -EDOM);
1056         assert_return(!event_pid_changed(s->event), -ECHILD);
1057
1058         if (s->io.fd == fd)
1059                 return 0;
1060
1061         if (s->enabled == SD_EVENT_OFF) {
1062                 s->io.fd = fd;
1063                 s->io.registered = false;
1064         } else {
1065                 int saved_fd;
1066
1067                 saved_fd = s->io.fd;
1068                 assert(s->io.registered);
1069
1070                 s->io.fd = fd;
1071                 s->io.registered = false;
1072
1073                 r = source_io_register(s, s->enabled, s->io.events);
1074                 if (r < 0) {
1075                         s->io.fd = saved_fd;
1076                         s->io.registered = true;
1077                         return r;
1078                 }
1079
1080                 epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, saved_fd, NULL);
1081         }
1082
1083         return 0;
1084 }
1085
1086 _public_ int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
1087         assert_return(s, -EINVAL);
1088         assert_return(events, -EINVAL);
1089         assert_return(s->type == SOURCE_IO, -EDOM);
1090         assert_return(!event_pid_changed(s->event), -ECHILD);
1091
1092         *events = s->io.events;
1093         return 0;
1094 }
1095
1096 _public_ int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
1097         int r;
1098
1099         assert_return(s, -EINVAL);
1100         assert_return(s->type == SOURCE_IO, -EDOM);
1101         assert_return(!(events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP|EPOLLET)), -EINVAL);
1102         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1103         assert_return(!event_pid_changed(s->event), -ECHILD);
1104
1105         if (s->io.events == events)
1106                 return 0;
1107
1108         if (s->enabled != SD_EVENT_OFF) {
1109                 r = source_io_register(s, s->enabled, events);
1110                 if (r < 0)
1111                         return r;
1112         }
1113
1114         s->io.events = events;
1115         source_set_pending(s, false);
1116
1117         return 0;
1118 }
1119
1120 _public_ int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
1121         assert_return(s, -EINVAL);
1122         assert_return(revents, -EINVAL);
1123         assert_return(s->type == SOURCE_IO, -EDOM);
1124         assert_return(s->pending, -ENODATA);
1125         assert_return(!event_pid_changed(s->event), -ECHILD);
1126
1127         *revents = s->io.revents;
1128         return 0;
1129 }
1130
1131 _public_ int sd_event_source_get_signal(sd_event_source *s) {
1132         assert_return(s, -EINVAL);
1133         assert_return(s->type == SOURCE_SIGNAL, -EDOM);
1134         assert_return(!event_pid_changed(s->event), -ECHILD);
1135
1136         return s->signal.sig;
1137 }
1138
1139 _public_ int sd_event_source_get_priority(sd_event_source *s, int *priority) {
1140         assert_return(s, -EINVAL);
1141         assert_return(!event_pid_changed(s->event), -ECHILD);
1142
1143         return s->priority;
1144 }
1145
1146 _public_ int sd_event_source_set_priority(sd_event_source *s, int priority) {
1147         assert_return(s, -EINVAL);
1148         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1149         assert_return(!event_pid_changed(s->event), -ECHILD);
1150
1151         if (s->priority == priority)
1152                 return 0;
1153
1154         s->priority = priority;
1155
1156         if (s->pending)
1157                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1158
1159         if (s->prepare)
1160                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1161
1162         if (s->type == SOURCE_EXIT)
1163                 prioq_reshuffle(s->event->exit, s, &s->exit.prioq_index);
1164
1165         return 0;
1166 }
1167
1168 _public_ int sd_event_source_get_enabled(sd_event_source *s, int *m) {
1169         assert_return(s, -EINVAL);
1170         assert_return(m, -EINVAL);
1171         assert_return(!event_pid_changed(s->event), -ECHILD);
1172
1173         *m = s->enabled;
1174         return 0;
1175 }
1176
1177 _public_ int sd_event_source_set_enabled(sd_event_source *s, int m) {
1178         int r;
1179
1180         assert_return(s, -EINVAL);
1181         assert_return(m == SD_EVENT_OFF || m == SD_EVENT_ON || m == SD_EVENT_ONESHOT, -EINVAL);
1182         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1183         assert_return(!event_pid_changed(s->event), -ECHILD);
1184
1185         if (s->enabled == m)
1186                 return 0;
1187
1188         if (m == SD_EVENT_OFF) {
1189
1190                 switch (s->type) {
1191
1192                 case SOURCE_IO:
1193                         r = source_io_unregister(s);
1194                         if (r < 0)
1195                                 return r;
1196
1197                         s->enabled = m;
1198                         break;
1199
1200                 case SOURCE_MONOTONIC:
1201                         s->enabled = m;
1202                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1203                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1204                         break;
1205
1206                 case SOURCE_REALTIME:
1207                         s->enabled = m;
1208                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1209                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1210                         break;
1211
1212                 case SOURCE_SIGNAL:
1213                         s->enabled = m;
1214                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0) {
1215                                 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
1216                                 event_update_signal_fd(s->event);
1217                         }
1218
1219                         break;
1220
1221                 case SOURCE_CHILD:
1222                         s->enabled = m;
1223
1224                         assert(s->event->n_enabled_child_sources > 0);
1225                         s->event->n_enabled_child_sources--;
1226
1227                         if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1228                                 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1229                                 event_update_signal_fd(s->event);
1230                         }
1231
1232                         break;
1233
1234                 case SOURCE_EXIT:
1235                         s->enabled = m;
1236                         prioq_reshuffle(s->event->exit, s, &s->exit.prioq_index);
1237                         break;
1238
1239                 case SOURCE_DEFER:
1240                         s->enabled = m;
1241                         break;
1242
1243                 case SOURCE_WATCHDOG:
1244                         assert_not_reached("Wut? I shouldn't exist.");
1245                 }
1246
1247         } else {
1248                 switch (s->type) {
1249
1250                 case SOURCE_IO:
1251                         r = source_io_register(s, m, s->io.events);
1252                         if (r < 0)
1253                                 return r;
1254
1255                         s->enabled = m;
1256                         break;
1257
1258                 case SOURCE_MONOTONIC:
1259                         s->enabled = m;
1260                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1261                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1262                         break;
1263
1264                 case SOURCE_REALTIME:
1265                         s->enabled = m;
1266                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1267                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1268                         break;
1269
1270                 case SOURCE_SIGNAL:
1271                         s->enabled = m;
1272
1273                         if (s->signal.sig != SIGCHLD || s->event->n_enabled_child_sources == 0)  {
1274                                 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1275                                 event_update_signal_fd(s->event);
1276                         }
1277                         break;
1278
1279                 case SOURCE_CHILD:
1280                         s->enabled = m;
1281
1282                         if (s->enabled == SD_EVENT_OFF) {
1283                                 s->event->n_enabled_child_sources++;
1284
1285                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1286                                         assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1287                                         event_update_signal_fd(s->event);
1288                                 }
1289                         }
1290                         break;
1291
1292                 case SOURCE_EXIT:
1293                         s->enabled = m;
1294                         prioq_reshuffle(s->event->exit, s, &s->exit.prioq_index);
1295                         break;
1296
1297                 case SOURCE_DEFER:
1298                         s->enabled = m;
1299                         break;
1300
1301                 case SOURCE_WATCHDOG:
1302                         assert_not_reached("Wut? I shouldn't exist.");
1303                 }
1304         }
1305
1306         if (s->pending)
1307                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1308
1309         if (s->prepare)
1310                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1311
1312         return 0;
1313 }
1314
1315 _public_ int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1316         assert_return(s, -EINVAL);
1317         assert_return(usec, -EINVAL);
1318         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1319         assert_return(!event_pid_changed(s->event), -ECHILD);
1320
1321         *usec = s->time.next;
1322         return 0;
1323 }
1324
1325 _public_ int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1326         assert_return(s, -EINVAL);
1327         assert_return(usec != (uint64_t) -1, -EINVAL);
1328         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1329         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1330         assert_return(!event_pid_changed(s->event), -ECHILD);
1331
1332         s->time.next = usec;
1333
1334         source_set_pending(s, false);
1335
1336         if (s->type == SOURCE_REALTIME) {
1337                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1338                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1339         } else {
1340                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1341                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1342         }
1343
1344         return 0;
1345 }
1346
1347 _public_ int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1348         assert_return(s, -EINVAL);
1349         assert_return(usec, -EINVAL);
1350         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1351         assert_return(!event_pid_changed(s->event), -ECHILD);
1352
1353         *usec = s->time.accuracy;
1354         return 0;
1355 }
1356
1357 _public_ int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1358         assert_return(s, -EINVAL);
1359         assert_return(usec != (uint64_t) -1, -EINVAL);
1360         assert_return(s->type == SOURCE_REALTIME || s->type == SOURCE_MONOTONIC, -EDOM);
1361         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1362         assert_return(!event_pid_changed(s->event), -ECHILD);
1363
1364         if (usec == 0)
1365                 usec = DEFAULT_ACCURACY_USEC;
1366
1367         s->time.accuracy = usec;
1368
1369         source_set_pending(s, false);
1370
1371         if (s->type == SOURCE_REALTIME)
1372                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1373         else
1374                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1375
1376         return 0;
1377 }
1378
1379 _public_ int sd_event_source_get_child_pid(sd_event_source *s, pid_t *pid) {
1380         assert_return(s, -EINVAL);
1381         assert_return(pid, -EINVAL);
1382         assert_return(s->type == SOURCE_CHILD, -EDOM);
1383         assert_return(!event_pid_changed(s->event), -ECHILD);
1384
1385         *pid = s->child.pid;
1386         return 0;
1387 }
1388
1389 _public_ int sd_event_source_set_prepare(sd_event_source *s, sd_event_handler_t callback) {
1390         int r;
1391
1392         assert_return(s, -EINVAL);
1393         assert_return(s->type != SOURCE_EXIT, -EDOM);
1394         assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE);
1395         assert_return(!event_pid_changed(s->event), -ECHILD);
1396
1397         if (s->prepare == callback)
1398                 return 0;
1399
1400         if (callback && s->prepare) {
1401                 s->prepare = callback;
1402                 return 0;
1403         }
1404
1405         r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1406         if (r < 0)
1407                 return r;
1408
1409         s->prepare = callback;
1410
1411         if (callback) {
1412                 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1413                 if (r < 0)
1414                         return r;
1415         } else
1416                 prioq_remove(s->event->prepare, s, &s->prepare_index);
1417
1418         return 0;
1419 }
1420
1421 _public_ void* sd_event_source_get_userdata(sd_event_source *s) {
1422         assert_return(s, NULL);
1423
1424         return s->userdata;
1425 }
1426
1427 _public_ void *sd_event_source_set_userdata(sd_event_source *s, void *userdata) {
1428         void *ret;
1429
1430         assert_return(s, NULL);
1431
1432         ret = s->userdata;
1433         s->userdata = userdata;
1434
1435         return ret;
1436 }
1437
1438 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1439         usec_t c;
1440         assert(e);
1441         assert(a <= b);
1442
1443         if (a <= 0)
1444                 return 0;
1445
1446         if (b <= a + 1)
1447                 return a;
1448
1449         /*
1450           Find a good time to wake up again between times a and b. We
1451           have two goals here:
1452
1453           a) We want to wake up as seldom as possible, hence prefer
1454              later times over earlier times.
1455
1456           b) But if we have to wake up, then let's make sure to
1457              dispatch as much as possible on the entire system.
1458
1459           We implement this by waking up everywhere at the same time
1460           within any given minute if we can, synchronised via the
1461           perturbation value determined from the boot ID. If we can't,
1462           then we try to find the same spot in every 10s, then 1s and
1463           then 250ms step. Otherwise, we pick the last possible time
1464           to wake up.
1465         */
1466
1467         c = (b / USEC_PER_MINUTE) * USEC_PER_MINUTE + e->perturb;
1468         if (c >= b) {
1469                 if (_unlikely_(c < USEC_PER_MINUTE))
1470                         return b;
1471
1472                 c -= USEC_PER_MINUTE;
1473         }
1474
1475         if (c >= a)
1476                 return c;
1477
1478         c = (b / (USEC_PER_SEC*10)) * (USEC_PER_SEC*10) + (e->perturb % (USEC_PER_SEC*10));
1479         if (c >= b) {
1480                 if (_unlikely_(c < USEC_PER_SEC*10))
1481                         return b;
1482
1483                 c -= USEC_PER_SEC*10;
1484         }
1485
1486         if (c >= a)
1487                 return c;
1488
1489         c = (b / USEC_PER_SEC) * USEC_PER_SEC + (e->perturb % USEC_PER_SEC);
1490         if (c >= b) {
1491                 if (_unlikely_(c < USEC_PER_SEC))
1492                         return b;
1493
1494                 c -= USEC_PER_SEC;
1495         }
1496
1497         if (c >= a)
1498                 return c;
1499
1500         c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1501         if (c >= b) {
1502                 if (_unlikely_(c < USEC_PER_MSEC*250))
1503                         return b;
1504
1505                 c -= USEC_PER_MSEC*250;
1506         }
1507
1508         if (c >= a)
1509                 return c;
1510
1511         return b;
1512 }
1513
1514 static int event_arm_timer(
1515                 sd_event *e,
1516                 int timer_fd,
1517                 Prioq *earliest,
1518                 Prioq *latest,
1519                 usec_t *next) {
1520
1521         struct itimerspec its = {};
1522         sd_event_source *a, *b;
1523         usec_t t;
1524         int r;
1525
1526         assert(e);
1527         assert(next);
1528
1529         a = prioq_peek(earliest);
1530         if (!a || a->enabled == SD_EVENT_OFF) {
1531
1532                 if (timer_fd < 0)
1533                         return 0;
1534
1535                 if (*next == (usec_t) -1)
1536                         return 0;
1537
1538                 /* disarm */
1539                 r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1540                 if (r < 0)
1541                         return r;
1542
1543                 *next = (usec_t) -1;
1544
1545                 return 0;
1546         }
1547
1548         b = prioq_peek(latest);
1549         assert_se(b && b->enabled != SD_EVENT_OFF);
1550
1551         t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1552         if (*next == t)
1553                 return 0;
1554
1555         assert_se(timer_fd >= 0);
1556
1557         if (t == 0) {
1558                 /* We don' want to disarm here, just mean some time looooong ago. */
1559                 its.it_value.tv_sec = 0;
1560                 its.it_value.tv_nsec = 1;
1561         } else
1562                 timespec_store(&its.it_value, t);
1563
1564         r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1565         if (r < 0)
1566                 return -errno;
1567
1568         *next = t;
1569         return 0;
1570 }
1571
1572 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1573         assert(e);
1574         assert(s);
1575         assert(s->type == SOURCE_IO);
1576
1577         s->io.revents = events;
1578
1579         return source_set_pending(s, true);
1580 }
1581
1582 static int flush_timer(sd_event *e, int fd, uint32_t events, usec_t *next) {
1583         uint64_t x;
1584         ssize_t ss;
1585
1586         assert(e);
1587         assert(fd >= 0);
1588
1589         assert_return(events == EPOLLIN, -EIO);
1590
1591         ss = read(fd, &x, sizeof(x));
1592         if (ss < 0) {
1593                 if (errno == EAGAIN || errno == EINTR)
1594                         return 0;
1595
1596                 return -errno;
1597         }
1598
1599         if (ss != sizeof(x))
1600                 return -EIO;
1601
1602         if (next)
1603                 *next = (usec_t) -1;
1604
1605         return 0;
1606 }
1607
1608 static int process_timer(
1609                 sd_event *e,
1610                 usec_t n,
1611                 Prioq *earliest,
1612                 Prioq *latest) {
1613
1614         sd_event_source *s;
1615         int r;
1616
1617         assert(e);
1618
1619         for (;;) {
1620                 s = prioq_peek(earliest);
1621                 if (!s ||
1622                     s->time.next > n ||
1623                     s->enabled == SD_EVENT_OFF ||
1624                     s->pending)
1625                         break;
1626
1627                 r = source_set_pending(s, true);
1628                 if (r < 0)
1629                         return r;
1630
1631                 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1632                 prioq_reshuffle(latest, s, &s->time.latest_index);
1633         }
1634
1635         return 0;
1636 }
1637
1638 static int process_child(sd_event *e) {
1639         sd_event_source *s;
1640         Iterator i;
1641         int r;
1642
1643         assert(e);
1644
1645         e->need_process_child = false;
1646
1647         /*
1648            So, this is ugly. We iteratively invoke waitid() with P_PID
1649            + WNOHANG for each PID we wait for, instead of using
1650            P_ALL. This is because we only want to get child
1651            information of very specific child processes, and not all
1652            of them. We might not have processed the SIGCHLD even of a
1653            previous invocation and we don't want to maintain a
1654            unbounded *per-child* event queue, hence we really don't
1655            want anything flushed out of the kernel's queue that we
1656            don't care about. Since this is O(n) this means that if you
1657            have a lot of processes you probably want to handle SIGCHLD
1658            yourself.
1659
1660            We do not reap the children here (by using WNOWAIT), this
1661            is only done after the event source is dispatched so that
1662            the callback still sees the process as a zombie.
1663         */
1664
1665         HASHMAP_FOREACH(s, e->child_sources, i) {
1666                 assert(s->type == SOURCE_CHILD);
1667
1668                 if (s->pending)
1669                         continue;
1670
1671                 if (s->enabled == SD_EVENT_OFF)
1672                         continue;
1673
1674                 zero(s->child.siginfo);
1675                 r = waitid(P_PID, s->child.pid, &s->child.siginfo,
1676                            WNOHANG | (s->child.options & WEXITED ? WNOWAIT : 0) | s->child.options);
1677                 if (r < 0)
1678                         return -errno;
1679
1680                 if (s->child.siginfo.si_pid != 0) {
1681                         bool zombie =
1682                                 s->child.siginfo.si_code == CLD_EXITED ||
1683                                 s->child.siginfo.si_code == CLD_KILLED ||
1684                                 s->child.siginfo.si_code == CLD_DUMPED;
1685
1686                         if (!zombie && (s->child.options & WEXITED)) {
1687                                 /* If the child isn't dead then let's
1688                                  * immediately remove the state change
1689                                  * from the queue, since there's no
1690                                  * benefit in leaving it queued */
1691
1692                                 assert(s->child.options & (WSTOPPED|WCONTINUED));
1693                                 waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|(s->child.options & (WSTOPPED|WCONTINUED)));
1694                         }
1695
1696                         r = source_set_pending(s, true);
1697                         if (r < 0)
1698                                 return r;
1699                 }
1700         }
1701
1702         return 0;
1703 }
1704
1705 static int process_signal(sd_event *e, uint32_t events) {
1706         bool read_one = false;
1707         int r;
1708
1709         assert(e);
1710         assert(e->signal_sources);
1711
1712         assert_return(events == EPOLLIN, -EIO);
1713
1714         for (;;) {
1715                 struct signalfd_siginfo si;
1716                 ssize_t ss;
1717                 sd_event_source *s;
1718
1719                 ss = read(e->signal_fd, &si, sizeof(si));
1720                 if (ss < 0) {
1721                         if (errno == EAGAIN || errno == EINTR)
1722                                 return read_one;
1723
1724                         return -errno;
1725                 }
1726
1727                 if (ss != sizeof(si))
1728                         return -EIO;
1729
1730                 read_one = true;
1731
1732                 s = e->signal_sources[si.ssi_signo];
1733                 if (si.ssi_signo == SIGCHLD) {
1734                         r = process_child(e);
1735                         if (r < 0)
1736                                 return r;
1737                         if (r > 0 || !s)
1738                                 continue;
1739                 } else
1740                         if (!s)
1741                                 return -EIO;
1742
1743                 s->signal.siginfo = si;
1744                 r = source_set_pending(s, true);
1745                 if (r < 0)
1746                         return r;
1747         }
1748
1749         return 0;
1750 }
1751
1752 static int source_dispatch(sd_event_source *s) {
1753         int r = 0;
1754
1755         assert(s);
1756         assert(s->pending || s->type == SOURCE_EXIT);
1757
1758         if (s->type != SOURCE_DEFER && s->type != SOURCE_EXIT) {
1759                 r = source_set_pending(s, false);
1760                 if (r < 0)
1761                         return r;
1762         }
1763
1764         if (s->enabled == SD_EVENT_ONESHOT) {
1765                 r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
1766                 if (r < 0)
1767                         return r;
1768         }
1769
1770         s->dispatching = true;
1771
1772         switch (s->type) {
1773
1774         case SOURCE_IO:
1775                 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1776                 break;
1777
1778         case SOURCE_MONOTONIC:
1779                 r = s->time.callback(s, s->time.next, s->userdata);
1780                 break;
1781
1782         case SOURCE_REALTIME:
1783                 r = s->time.callback(s, s->time.next, s->userdata);
1784                 break;
1785
1786         case SOURCE_SIGNAL:
1787                 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1788                 break;
1789
1790         case SOURCE_CHILD: {
1791                 bool zombie;
1792
1793                 zombie = s->child.siginfo.si_code == CLD_EXITED ||
1794                          s->child.siginfo.si_code == CLD_KILLED ||
1795                          s->child.siginfo.si_code == CLD_DUMPED;
1796
1797                 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1798
1799                 /* Now, reap the PID for good. */
1800                 if (zombie)
1801                         waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|WEXITED);
1802
1803                 break;
1804         }
1805
1806         case SOURCE_DEFER:
1807                 r = s->defer.callback(s, s->userdata);
1808                 break;
1809
1810         case SOURCE_EXIT:
1811                 r = s->exit.callback(s, s->userdata);
1812                 break;
1813
1814         case SOURCE_WATCHDOG:
1815                 assert_not_reached("Wut? I shouldn't exist.");
1816         }
1817
1818         s->dispatching = false;
1819
1820         if (r < 0)
1821                 log_debug("Event source %p returned error, disabling: %s", s, strerror(-r));
1822
1823         if (s->n_ref == 0)
1824                 source_free(s);
1825         else if (r < 0)
1826                 sd_event_source_set_enabled(s, SD_EVENT_OFF);
1827
1828         return 1;
1829 }
1830
1831 static int event_prepare(sd_event *e) {
1832         int r;
1833
1834         assert(e);
1835
1836         for (;;) {
1837                 sd_event_source *s;
1838
1839                 s = prioq_peek(e->prepare);
1840                 if (!s || s->prepare_iteration == e->iteration || s->enabled == SD_EVENT_OFF)
1841                         break;
1842
1843                 s->prepare_iteration = e->iteration;
1844                 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1845                 if (r < 0)
1846                         return r;
1847
1848                 assert(s->prepare);
1849
1850                 s->dispatching = true;
1851                 r = s->prepare(s, s->userdata);
1852                 s->dispatching = false;
1853
1854                 if (r < 0)
1855                         log_debug("Prepare callback of event source %p returned error, disabling: %s", s, strerror(-r));
1856
1857                 if (s->n_ref == 0)
1858                         source_free(s);
1859                 else if (r < 0)
1860                         sd_event_source_set_enabled(s, SD_EVENT_OFF);
1861         }
1862
1863         return 0;
1864 }
1865
1866 static int dispatch_exit(sd_event *e) {
1867         sd_event_source *p;
1868         int r;
1869
1870         assert(e);
1871
1872         p = prioq_peek(e->exit);
1873         if (!p || p->enabled == SD_EVENT_OFF) {
1874                 e->state = SD_EVENT_FINISHED;
1875                 return 0;
1876         }
1877
1878         sd_event_ref(e);
1879         e->iteration++;
1880         e->state = SD_EVENT_EXITING;
1881
1882         r = source_dispatch(p);
1883
1884         e->state = SD_EVENT_PASSIVE;
1885         sd_event_unref(e);
1886
1887         return r;
1888 }
1889
1890 static sd_event_source* event_next_pending(sd_event *e) {
1891         sd_event_source *p;
1892
1893         assert(e);
1894
1895         p = prioq_peek(e->pending);
1896         if (!p)
1897                 return NULL;
1898
1899         if (p->enabled == SD_EVENT_OFF)
1900                 return NULL;
1901
1902         return p;
1903 }
1904
1905 static int arm_watchdog(sd_event *e) {
1906         struct itimerspec its = {};
1907         usec_t t;
1908         int r;
1909
1910         assert(e);
1911         assert(e->watchdog_fd >= 0);
1912
1913         t = sleep_between(e,
1914                           e->watchdog_last + (e->watchdog_period / 2),
1915                           e->watchdog_last + (e->watchdog_period * 3 / 4));
1916
1917         timespec_store(&its.it_value, t);
1918
1919         r = timerfd_settime(e->watchdog_fd, TFD_TIMER_ABSTIME, &its, NULL);
1920         if (r < 0)
1921                 return -errno;
1922
1923         return 0;
1924 }
1925
1926 static int process_watchdog(sd_event *e) {
1927         assert(e);
1928
1929         if (!e->watchdog)
1930                 return 0;
1931
1932         /* Don't notify watchdog too often */
1933         if (e->watchdog_last + e->watchdog_period / 4 > e->timestamp.monotonic)
1934                 return 0;
1935
1936         sd_notify(false, "WATCHDOG=1");
1937         e->watchdog_last = e->timestamp.monotonic;
1938
1939         return arm_watchdog(e);
1940 }
1941
1942 _public_ int sd_event_run(sd_event *e, uint64_t timeout) {
1943         struct epoll_event *ev_queue;
1944         unsigned ev_queue_max;
1945         sd_event_source *p;
1946         int r, i, m;
1947
1948         assert_return(e, -EINVAL);
1949         assert_return(!event_pid_changed(e), -ECHILD);
1950         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
1951         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
1952
1953         if (e->exit_requested)
1954                 return dispatch_exit(e);
1955
1956         sd_event_ref(e);
1957         e->iteration++;
1958         e->state = SD_EVENT_RUNNING;
1959
1960         r = event_prepare(e);
1961         if (r < 0)
1962                 goto finish;
1963
1964         r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1965         if (r < 0)
1966                 goto finish;
1967
1968         r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1969         if (r < 0)
1970                 goto finish;
1971
1972         if (event_next_pending(e) || e->need_process_child)
1973                 timeout = 0;
1974         ev_queue_max = CLAMP(e->n_sources, 1U, EPOLL_QUEUE_MAX);
1975         ev_queue = newa(struct epoll_event, ev_queue_max);
1976
1977         m = epoll_wait(e->epoll_fd, ev_queue, ev_queue_max,
1978                        timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1979         if (m < 0) {
1980                 r = errno == EAGAIN || errno == EINTR ? 0 : -errno;
1981                 goto finish;
1982         }
1983
1984         dual_timestamp_get(&e->timestamp);
1985
1986         for (i = 0; i < m; i++) {
1987
1988                 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1989                         r = flush_timer(e, e->monotonic_fd, ev_queue[i].events, &e->monotonic_next);
1990                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1991                         r = flush_timer(e, e->realtime_fd, ev_queue[i].events, &e->realtime_next);
1992                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1993                         r = process_signal(e, ev_queue[i].events);
1994                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_WATCHDOG))
1995                         r = flush_timer(e, e->watchdog_fd, ev_queue[i].events, NULL);
1996                 else
1997                         r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1998
1999                 if (r < 0)
2000                         goto finish;
2001         }
2002
2003         r = process_watchdog(e);
2004         if (r < 0)
2005                 goto finish;
2006
2007         r = process_timer(e, e->timestamp.monotonic, e->monotonic_earliest, e->monotonic_latest);
2008         if (r < 0)
2009                 goto finish;
2010
2011         r = process_timer(e, e->timestamp.realtime, e->realtime_earliest, e->realtime_latest);
2012         if (r < 0)
2013                 goto finish;
2014
2015         if (e->need_process_child) {
2016                 r = process_child(e);
2017                 if (r < 0)
2018                         goto finish;
2019         }
2020
2021         p = event_next_pending(e);
2022         if (!p) {
2023                 r = 0;
2024                 goto finish;
2025         }
2026
2027         r = source_dispatch(p);
2028
2029 finish:
2030         e->state = SD_EVENT_PASSIVE;
2031         sd_event_unref(e);
2032
2033         return r;
2034 }
2035
2036 _public_ int sd_event_loop(sd_event *e) {
2037         int r;
2038
2039         assert_return(e, -EINVAL);
2040         assert_return(!event_pid_changed(e), -ECHILD);
2041         assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY);
2042
2043         sd_event_ref(e);
2044
2045         while (e->state != SD_EVENT_FINISHED) {
2046                 r = sd_event_run(e, (uint64_t) -1);
2047                 if (r < 0)
2048                         goto finish;
2049         }
2050
2051         r = e->exit_code;
2052
2053 finish:
2054         sd_event_unref(e);
2055         return r;
2056 }
2057
2058 _public_ int sd_event_get_state(sd_event *e) {
2059         assert_return(e, -EINVAL);
2060         assert_return(!event_pid_changed(e), -ECHILD);
2061
2062         return e->state;
2063 }
2064
2065 _public_ int sd_event_get_exit_code(sd_event *e, int *code) {
2066         assert_return(e, -EINVAL);
2067         assert_return(code, -EINVAL);
2068         assert_return(!event_pid_changed(e), -ECHILD);
2069
2070         if (!e->exit_requested)
2071                 return -ENODATA;
2072
2073         *code = e->exit_code;
2074         return 0;
2075 }
2076
2077 _public_ int sd_event_exit(sd_event *e, int code) {
2078         assert_return(e, -EINVAL);
2079         assert_return(e->state != SD_EVENT_FINISHED, -ESTALE);
2080         assert_return(!event_pid_changed(e), -ECHILD);
2081
2082         e->exit_requested = true;
2083         e->exit_code = code;
2084
2085         return 0;
2086 }
2087
2088 _public_ int sd_event_get_now_realtime(sd_event *e, uint64_t *usec) {
2089         assert_return(e, -EINVAL);
2090         assert_return(usec, -EINVAL);
2091         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
2092         assert_return(!event_pid_changed(e), -ECHILD);
2093
2094         *usec = e->timestamp.realtime;
2095         return 0;
2096 }
2097
2098 _public_ int sd_event_get_now_monotonic(sd_event *e, uint64_t *usec) {
2099         assert_return(e, -EINVAL);
2100         assert_return(usec, -EINVAL);
2101         assert_return(dual_timestamp_is_set(&e->timestamp), -ENODATA);
2102         assert_return(!event_pid_changed(e), -ECHILD);
2103
2104         *usec = e->timestamp.monotonic;
2105         return 0;
2106 }
2107
2108 _public_ int sd_event_default(sd_event **ret) {
2109
2110         static __thread sd_event *default_event = NULL;
2111         sd_event *e;
2112         int r;
2113
2114         if (!ret)
2115                 return !!default_event;
2116
2117         if (default_event) {
2118                 *ret = sd_event_ref(default_event);
2119                 return 0;
2120         }
2121
2122         r = sd_event_new(&e);
2123         if (r < 0)
2124                 return r;
2125
2126         e->default_event_ptr = &default_event;
2127         e->tid = gettid();
2128         default_event = e;
2129
2130         *ret = e;
2131         return 1;
2132 }
2133
2134 _public_ int sd_event_get_tid(sd_event *e, pid_t *tid) {
2135         assert_return(e, -EINVAL);
2136         assert_return(tid, -EINVAL);
2137         assert_return(!event_pid_changed(e), -ECHILD);
2138
2139         if (e->tid != 0) {
2140                 *tid = e->tid;
2141                 return 0;
2142         }
2143
2144         return -ENXIO;
2145 }
2146
2147 _public_ int sd_event_set_watchdog(sd_event *e, int b) {
2148         int r;
2149
2150         assert_return(e, -EINVAL);
2151         assert_return(!event_pid_changed(e), -ECHILD);
2152
2153         if (e->watchdog == !!b)
2154                 return e->watchdog;
2155
2156         if (b) {
2157                 struct epoll_event ev = {};
2158                 const char *env;
2159
2160                 env = getenv("WATCHDOG_USEC");
2161                 if (!env)
2162                         return false;
2163
2164                 r = safe_atou64(env, &e->watchdog_period);
2165                 if (r < 0)
2166                         return r;
2167                 if (e->watchdog_period <= 0)
2168                         return -EIO;
2169
2170                 /* Issue first ping immediately */
2171                 sd_notify(false, "WATCHDOG=1");
2172                 e->watchdog_last = now(CLOCK_MONOTONIC);
2173
2174                 e->watchdog_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK|TFD_CLOEXEC);
2175                 if (e->watchdog_fd < 0)
2176                         return -errno;
2177
2178                 r = arm_watchdog(e);
2179                 if (r < 0)
2180                         goto fail;
2181
2182                 ev.events = EPOLLIN;
2183                 ev.data.ptr = INT_TO_PTR(SOURCE_WATCHDOG);
2184
2185                 r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->watchdog_fd, &ev);
2186                 if (r < 0) {
2187                         r = -errno;
2188                         goto fail;
2189                 }
2190
2191         } else {
2192                 if (e->watchdog_fd >= 0) {
2193                         epoll_ctl(e->epoll_fd, EPOLL_CTL_DEL, e->watchdog_fd, NULL);
2194                         close_nointr_nofail(e->watchdog_fd);
2195                         e->watchdog_fd = -1;
2196                 }
2197         }
2198
2199         e->watchdog = !!b;
2200         return e->watchdog;
2201
2202 fail:
2203         close_nointr_nofail(e->watchdog_fd);
2204         e->watchdog_fd = -1;
2205         return r;
2206 }
2207
2208 _public_ int sd_event_get_watchdog(sd_event *e) {
2209         assert_return(e, -EINVAL);
2210         assert_return(!event_pid_changed(e), -ECHILD);
2211
2212         return e->watchdog;
2213 }