chiark / gitweb /
event: add timer accuracy/coalescing logic
[elogind.git] / src / libsystemd-bus / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25
26 #include "macro.h"
27 #include "refcnt.h"
28 #include "prioq.h"
29 #include "hashmap.h"
30 #include "util.h"
31 #include "time-util.h"
32 #include "sd-id128.h"
33
34 #include "sd-event.h"
35
36 #define EPOLL_QUEUE_MAX 64
37 #define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
38
39 typedef enum EventSourceType {
40         SOURCE_IO,
41         SOURCE_MONOTONIC,
42         SOURCE_REALTIME,
43         SOURCE_SIGNAL,
44         SOURCE_CHILD,
45         SOURCE_DEFER
46 } EventSourceType;
47
48 struct sd_event_source {
49         RefCount n_ref;
50
51         sd_event *event;
52         void *userdata;
53         sd_prepare_handler_t prepare;
54
55         EventSourceType type:4;
56         sd_event_mute_t mute:3;
57         bool pending:1;
58
59         int priority;
60         unsigned pending_index;
61         unsigned prepare_index;
62         unsigned pending_iteration;
63         unsigned prepare_iteration;
64
65         union {
66                 struct {
67                         sd_io_handler_t callback;
68                         int fd;
69                         uint32_t events;
70                         uint32_t revents;
71                         bool registered:1;
72                 } io;
73                 struct {
74                         sd_time_handler_t callback;
75                         usec_t next, accuracy;
76                         unsigned earliest_index;
77                         unsigned latest_index;
78                 } time;
79                 struct {
80                         sd_signal_handler_t callback;
81                         struct signalfd_siginfo siginfo;
82                         int sig;
83                 } signal;
84                 struct {
85                         sd_child_handler_t callback;
86                         siginfo_t siginfo;
87                         pid_t pid;
88                         int options;
89                 } child;
90                 struct {
91                         sd_defer_handler_t callback;
92                 } defer;
93         };
94 };
95
96 struct sd_event {
97         RefCount n_ref;
98
99         int epoll_fd;
100         int signal_fd;
101         int realtime_fd;
102         int monotonic_fd;
103
104         Prioq *pending;
105         Prioq *prepare;
106
107         /* For both clocks we maintain two priority queues each, one
108          * ordered for the earliest times the events may be
109          * dispatched, and one ordered by the latest times they must
110          * have been dispatched. The range between the top entries in
111          * the two prioqs is the time window we can freely schedule
112          * wakeups in */
113         Prioq *monotonic_earliest;
114         Prioq *monotonic_latest;
115         Prioq *realtime_earliest;
116         Prioq *realtime_latest;
117
118         sigset_t sigset;
119         sd_event_source **signal_sources;
120
121         Hashmap *child_sources;
122         unsigned n_unmuted_child_sources;
123
124         unsigned iteration;
125
126         usec_t realtime_next, monotonic_next;
127
128         usec_t perturb;
129
130         bool quit;
131         bool need_process_child;
132 };
133
134 static int pending_prioq_compare(const void *a, const void *b) {
135         const sd_event_source *x = a, *y = b;
136
137         assert(x->pending);
138         assert(y->pending);
139
140         /* Unmuted ones first */
141         if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
142                 return -1;
143         if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
144                 return 1;
145
146         /* Lower priority values first */
147         if (x->priority < y->priority)
148                 return -1;
149         if (x->priority > y->priority)
150                 return 1;
151
152         /* Older entries first */
153         if (x->pending_iteration < y->pending_iteration)
154                 return -1;
155         if (x->pending_iteration > y->pending_iteration)
156                 return 1;
157
158         /* Stability for the rest */
159         if (x < y)
160                 return -1;
161         if (x > y)
162                 return 1;
163
164         return 0;
165 }
166
167 static int prepare_prioq_compare(const void *a, const void *b) {
168         const sd_event_source *x = a, *y = b;
169
170         assert(x->prepare);
171         assert(y->prepare);
172
173         /* Move most recently prepared ones last, so that we can stop
174          * preparing as soon as we hit one that has already been
175          * prepared in the current iteration */
176         if (x->prepare_iteration < y->prepare_iteration)
177                 return -1;
178         if (x->prepare_iteration > y->prepare_iteration)
179                 return 1;
180
181         /* Unmuted ones first */
182         if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
183                 return -1;
184         if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
185                 return 1;
186
187         /* Lower priority values first */
188         if (x->priority < y->priority)
189                 return -1;
190         if (x->priority > y->priority)
191                 return 1;
192
193         /* Stability for the rest */
194         if (x < y)
195                 return -1;
196         if (x > y)
197                 return 1;
198
199         return 0;
200 }
201
202 static int earliest_time_prioq_compare(const void *a, const void *b) {
203         const sd_event_source *x = a, *y = b;
204
205         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
206         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
207
208         /* Unmuted ones first */
209         if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
210                 return -1;
211         if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
212                 return 1;
213
214         /* Move the pending ones to the end */
215         if (!x->pending && y->pending)
216                 return -1;
217         if (x->pending && !y->pending)
218                 return 1;
219
220         /* Order by time */
221         if (x->time.next < y->time.next)
222                 return -1;
223         if (x->time.next > y->time.next)
224                 return -1;
225
226         /* Stability for the rest */
227         if (x < y)
228                 return -1;
229         if (x > y)
230                 return 1;
231
232         return 0;
233 }
234
235 static int latest_time_prioq_compare(const void *a, const void *b) {
236         const sd_event_source *x = a, *y = b;
237
238         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
239         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
240
241         /* Unmuted ones first */
242         if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
243                 return -1;
244         if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
245                 return 1;
246
247         /* Move the pending ones to the end */
248         if (!x->pending && y->pending)
249                 return -1;
250         if (x->pending && !y->pending)
251                 return 1;
252
253         /* Order by time */
254         if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
255                 return -1;
256         if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
257                 return -1;
258
259         /* Stability for the rest */
260         if (x < y)
261                 return -1;
262         if (x > y)
263                 return 1;
264
265         return 0;
266 }
267
268 static void event_free(sd_event *e) {
269         assert(e);
270
271         if (e->epoll_fd >= 0)
272                 close_nointr_nofail(e->epoll_fd);
273
274         if (e->signal_fd >= 0)
275                 close_nointr_nofail(e->signal_fd);
276
277         if (e->realtime_fd >= 0)
278                 close_nointr_nofail(e->realtime_fd);
279
280         if (e->monotonic_fd >= 0)
281                 close_nointr_nofail(e->monotonic_fd);
282
283         prioq_free(e->pending);
284         prioq_free(e->prepare);
285         prioq_free(e->monotonic_earliest);
286         prioq_free(e->monotonic_latest);
287         prioq_free(e->realtime_earliest);
288         prioq_free(e->realtime_latest);
289
290         free(e->signal_sources);
291
292         hashmap_free(e->child_sources);
293         free(e);
294 }
295
296 int sd_event_new(sd_event** ret) {
297         sd_event *e;
298         int r;
299
300         if (!ret)
301                 return -EINVAL;
302
303         e = new0(sd_event, 1);
304         if (!e)
305                 return -ENOMEM;
306
307         e->n_ref = REFCNT_INIT;
308         e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
309         e->realtime_next = e->monotonic_next = (usec_t) -1;
310
311         assert_se(sigemptyset(&e->sigset) == 0);
312
313         e->pending = prioq_new(pending_prioq_compare);
314         if (!e->pending) {
315                 r = -ENOMEM;
316                 goto fail;
317         }
318
319         e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
320         if (e->epoll_fd < 0) {
321                 r = -errno;
322                 goto fail;
323         }
324
325         *ret = e;
326         return 0;
327
328 fail:
329         event_free(e);
330         return r;
331 }
332
333 sd_event* sd_event_ref(sd_event *e) {
334         if (!e)
335                 return NULL;
336
337         assert_se(REFCNT_INC(e->n_ref) >= 2);
338
339         return e;
340 }
341
342 sd_event* sd_event_unref(sd_event *e) {
343         if (!e)
344                 return NULL;
345
346         if (REFCNT_DEC(e->n_ref) <= 0)
347                 event_free(e);
348
349         return NULL;
350 }
351
352 static int source_io_unregister(sd_event_source *s) {
353         int r;
354
355         assert(s);
356         assert(s->type == SOURCE_IO);
357
358         if (!s->io.registered)
359                 return 0;
360
361         r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
362         if (r < 0)
363                 return -errno;
364
365         s->io.registered = false;
366         return 0;
367 }
368
369 static int source_io_register(sd_event_source *s, sd_event_mute_t m, uint32_t events) {
370         struct epoll_event ev = {};
371         int r;
372
373         assert(s);
374         assert(s->type == SOURCE_IO);
375         assert(m != SD_EVENT_MUTED);
376
377         ev.events = events;
378         ev.data.ptr = s;
379
380         if (m == SD_EVENT_ONESHOT)
381                 ev.events |= EPOLLONESHOT;
382
383         if (s->io.registered)
384                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
385         else
386                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
387
388         if (r < 0)
389                 return -errno;
390
391         s->io.registered = true;
392
393         return 0;
394 }
395
396 static void source_free(sd_event_source *s) {
397         assert(s);
398
399         if (s->event) {
400                 switch (s->type) {
401
402                 case SOURCE_IO:
403                         if (s->io.fd >= 0)
404                                 source_io_unregister(s);
405
406                         break;
407
408                 case SOURCE_MONOTONIC:
409                         prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
410                         prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
411                         break;
412
413                 case SOURCE_REALTIME:
414                         prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
415                         prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
416                         break;
417
418                 case SOURCE_SIGNAL:
419                         if (s->signal.sig > 0) {
420                                 if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0)
421                                         assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
422
423                                 if (s->event->signal_sources)
424                                         s->event->signal_sources[s->signal.sig] = NULL;
425                         }
426
427                         break;
428
429                 case SOURCE_CHILD:
430                         if (s->child.pid > 0) {
431                                 if (s->mute != SD_EVENT_MUTED) {
432                                         assert(s->event->n_unmuted_child_sources > 0);
433                                         s->event->n_unmuted_child_sources--;
434                                 }
435
436                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
437                                         assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
438
439                                 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
440                         }
441
442                         break;
443                 }
444
445                 if (s->pending)
446                         prioq_remove(s->event->pending, s, &s->pending_index);
447
448                 if (s->prepare)
449                         prioq_remove(s->event->prepare, s, &s->prepare_index);
450
451                 sd_event_unref(s->event);
452         }
453
454         free(s);
455 }
456
457 static int source_set_pending(sd_event_source *s, bool b) {
458         int r;
459
460         assert(s);
461
462         if (s->pending == b)
463                 return 0;
464
465         s->pending = b;
466
467         if (b) {
468                 s->pending_iteration = s->event->iteration;
469
470                 r = prioq_put(s->event->pending, s, &s->pending_index);
471                 if (r < 0) {
472                         s->pending = false;
473                         return r;
474                 }
475         } else
476                 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
477
478         return 0;
479 }
480
481 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
482         sd_event_source *s;
483
484         assert(e);
485
486         s = new0(sd_event_source, 1);
487         if (!s)
488                 return NULL;
489
490         s->n_ref = REFCNT_INIT;
491         s->event = sd_event_ref(e);
492         s->type = type;
493         s->mute = SD_EVENT_UNMUTED;
494         s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
495
496         return s;
497 }
498
499 int sd_event_add_io(
500                 sd_event *e,
501                 int fd,
502                 uint32_t events,
503                 sd_io_handler_t callback,
504                 void *userdata,
505                 sd_event_source **ret) {
506
507         sd_event_source *s;
508         int r;
509
510         if (!e)
511                 return -EINVAL;
512         if (fd < 0)
513                 return -EINVAL;
514         if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP))
515                 return -EINVAL;
516         if (!callback)
517                 return -EINVAL;
518         if (!ret)
519                 return -EINVAL;
520
521         s = source_new(e, SOURCE_IO);
522         if (!s)
523                 return -ENOMEM;
524
525         s->io.fd = fd;
526         s->io.events = events;
527         s->io.callback = callback;
528         s->userdata = userdata;
529
530         r = source_io_register(s, s->mute, events);
531         if (r < 0) {
532                 source_free(s);
533                 return -errno;
534         }
535
536         *ret = s;
537         return 0;
538 }
539
540 static int event_setup_timer_fd(
541                 sd_event *e,
542                 EventSourceType type,
543                 int *timer_fd,
544                 clockid_t id) {
545
546         struct epoll_event ev = {};
547         int r, fd;
548         sd_id128_t bootid;
549
550         assert(e);
551         assert(timer_fd);
552
553         if (_likely_(*timer_fd >= 0))
554                 return 0;
555
556         fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
557         if (fd < 0)
558                 return -errno;
559
560         ev.events = EPOLLIN;
561         ev.data.ptr = INT_TO_PTR(type);
562
563         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
564         if (r < 0) {
565                 close_nointr_nofail(fd);
566                 return -errno;
567         }
568
569         /* When we sleep for longer, we try to realign the wakeup to
570            the same time wihtin each second, so that events all across
571            the system can be coalesced into a single CPU
572            wakeup. However, let's take some system-specific randomness
573            for this value, so that in a network of systems with synced
574            clocks timer events are distributed a bit. Here, we
575            calculate a perturbation usec offset from the boot ID. */
576
577         if (sd_id128_get_boot(&bootid) >= 0)
578                 e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_SEC;
579
580         *timer_fd = fd;
581         return 0;
582 }
583
584 static int event_add_time_internal(
585                 sd_event *e,
586                 EventSourceType type,
587                 int *timer_fd,
588                 clockid_t id,
589                 Prioq **earliest,
590                 Prioq **latest,
591                 uint64_t usec,
592                 uint64_t accuracy,
593                 sd_time_handler_t callback,
594                 void *userdata,
595                 sd_event_source **ret) {
596
597         sd_event_source *s;
598         int r;
599
600         if (!e)
601                 return -EINVAL;
602         if (!callback)
603                 return -EINVAL;
604         if (!ret)
605                 return -EINVAL;
606         if (usec == (uint64_t) -1)
607                 return -EINVAL;
608         if (accuracy == (uint64_t) -1)
609                 return -EINVAL;
610
611         assert(timer_fd);
612         assert(earliest);
613         assert(latest);
614
615         if (!*earliest) {
616                 *earliest = prioq_new(earliest_time_prioq_compare);
617                 if (!*earliest)
618                         return -ENOMEM;
619         }
620
621         if (!*latest) {
622                 *latest = prioq_new(latest_time_prioq_compare);
623                 if (!*latest)
624                         return -ENOMEM;
625         }
626
627         if (*timer_fd < 0) {
628                 r = event_setup_timer_fd(e, type, timer_fd, id);
629                 if (r < 0)
630                         return r;
631         }
632
633         s = source_new(e, type);
634         if (!s)
635                 return -ENOMEM;
636
637         s->time.next = usec;
638         s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
639         s->time.callback = callback;
640         s->time.earliest_index = PRIOQ_IDX_NULL;
641         s->time.latest_index = PRIOQ_IDX_NULL;
642         s->userdata = userdata;
643
644         r = prioq_put(*earliest, s, &s->time.earliest_index);
645         if (r < 0)
646                 goto fail;
647
648         r = prioq_put(*latest, s, &s->time.latest_index);
649         if (r < 0)
650                 goto fail;
651
652         *ret = s;
653         return 0;
654
655 fail:
656         source_free(s);
657         return r;
658 }
659
660 int sd_event_add_monotonic(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
661         return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
662 }
663
664 int sd_event_add_realtime(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
665         return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
666 }
667
668 static int event_update_signal_fd(sd_event *e) {
669         struct epoll_event ev = {};
670         bool add_to_epoll;
671         int r;
672
673         assert(e);
674
675         add_to_epoll = e->signal_fd < 0;
676
677         r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
678         if (r < 0)
679                 return -errno;
680
681         e->signal_fd = r;
682
683         if (!add_to_epoll)
684                 return 0;
685
686         ev.events = EPOLLIN;
687         ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
688
689         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
690         if (r < 0) {
691                 close_nointr_nofail(e->signal_fd);
692                 e->signal_fd = -1;
693
694                 return -errno;
695         }
696
697         return 0;
698 }
699
700 int sd_event_add_signal(sd_event *e, int sig, sd_signal_handler_t callback, void *userdata, sd_event_source **ret) {
701         sd_event_source *s;
702         int r;
703
704         if (!e)
705                 return -EINVAL;
706         if (sig <= 0)
707                 return -EINVAL;
708         if (sig >= _NSIG)
709                 return -EINVAL;
710         if (!callback)
711                 return -EINVAL;
712         if (!ret)
713                 return -EINVAL;
714
715         if (!e->signal_sources) {
716                 e->signal_sources = new0(sd_event_source*, _NSIG);
717                 if (!e->signal_sources)
718                         return -ENOMEM;
719         } else if (e->signal_sources[sig])
720                 return -EBUSY;
721
722         s = source_new(e, SOURCE_SIGNAL);
723         if (!s)
724                 return -ENOMEM;
725
726         s->signal.sig = sig;
727         s->signal.callback = callback;
728         s->userdata = userdata;
729
730         e->signal_sources[sig] = s;
731         assert_se(sigaddset(&e->sigset, sig) == 0);
732
733         if (sig != SIGCHLD || e->n_unmuted_child_sources == 0) {
734                 r = event_update_signal_fd(e);
735                 if (r < 0) {
736                         source_free(s);
737                         return r;
738                 }
739         }
740
741         *ret = s;
742         return 0;
743 }
744
745 int sd_event_add_child(sd_event *e, pid_t pid, int options, sd_child_handler_t callback, void *userdata, sd_event_source **ret) {
746         sd_event_source *s;
747         int r;
748
749         if (!e)
750                 return -EINVAL;
751         if (pid <= 1)
752                 return -EINVAL;
753         if (options & ~(WEXITED|WSTOPPED|WCONTINUED))
754                 return -EINVAL;
755         if (!callback)
756                 return -EINVAL;
757         if (!ret)
758                 return -EINVAL;
759
760         r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
761         if (r < 0)
762                 return r;
763
764         if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
765                 return -EBUSY;
766
767         s = source_new(e, SOURCE_CHILD);
768         if (!s)
769                 return -ENOMEM;
770
771         s->child.pid = pid;
772         s->child.options = options;
773         s->child.callback = callback;
774         s->userdata = userdata;
775
776         r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
777         if (r < 0) {
778                 source_free(s);
779                 return r;
780         }
781
782         e->n_unmuted_child_sources ++;
783
784         assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
785
786         if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
787                 r = event_update_signal_fd(e);
788                 if (r < 0) {
789                         source_free(s);
790                         return -errno;
791                 }
792         }
793
794         e->need_process_child = true;
795
796         *ret = s;
797         return 0;
798 }
799
800 int sd_event_add_defer(sd_event *e, sd_defer_handler_t callback, void *userdata, sd_event_source **ret) {
801         sd_event_source *s;
802         int r;
803
804         if (!e)
805                 return -EINVAL;
806         if (!ret)
807                 return -EINVAL;
808
809         s = source_new(e, SOURCE_DEFER);
810         if (!s)
811                 return -ENOMEM;
812
813         s->defer.callback = callback;
814         s->userdata = userdata;
815
816         r = source_set_pending(s, true);
817         if (r < 0) {
818                 source_free(s);
819                 return r;
820         }
821
822         *ret = s;
823         return 0;
824 }
825
826 sd_event_source* sd_event_source_ref(sd_event_source *s) {
827         if (!s)
828                 return NULL;
829
830         assert_se(REFCNT_INC(s->n_ref) >= 2);
831
832         return s;
833 }
834
835 sd_event_source* sd_event_source_unref(sd_event_source *s) {
836         if (!s)
837                 return NULL;
838
839         if (REFCNT_DEC(s->n_ref) <= 0)
840                 source_free(s);
841
842         return NULL;
843 }
844
845 int sd_event_source_get_pending(sd_event_source *s) {
846         if (!s)
847                 return -EINVAL;
848
849         return s->pending;
850 }
851
852 int sd_event_source_get_io_fd(sd_event_source *s) {
853         if (!s)
854                 return -EINVAL;
855         if (s->type != SOURCE_IO)
856                 return -EDOM;
857
858         return s->io.fd;
859 }
860
861 int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
862         if (!s)
863                 return -EINVAL;
864         if (s->type != SOURCE_IO)
865                 return -EDOM;
866         if (!events)
867                 return -EINVAL;
868
869         *events = s->io.events;
870         return 0;
871 }
872
873 int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
874         int r;
875
876         if (!s)
877                 return -EINVAL;
878         if (!s->type != SOURCE_IO)
879                 return -EDOM;
880         if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP))
881                 return -EINVAL;
882
883         if (s->io.events == events)
884                 return 0;
885
886         if (s->mute != SD_EVENT_MUTED) {
887                 r = source_io_register(s, s->io.events, events);
888                 if (r < 0)
889                         return r;
890         }
891
892         s->io.events = events;
893
894         return 0;
895 }
896
897 int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
898         if (!s)
899                 return -EINVAL;
900         if (s->type != SOURCE_IO)
901                 return -EDOM;
902         if (!revents)
903                 return -EINVAL;
904         if (!s->pending)
905                 return -ENODATA;
906
907         *revents = s->io.revents;
908         return 0;
909 }
910
911 int sd_event_source_get_signal(sd_event_source *s) {
912         if (!s)
913                 return -EINVAL;
914         if (s->type != SOURCE_SIGNAL)
915                 return -EDOM;
916
917         return s->signal.sig;
918 }
919
920 int sd_event_source_get_priority(sd_event_source *s, int *priority) {
921         if (!s)
922                 return -EINVAL;
923
924         return s->priority;
925 }
926
927 int sd_event_source_set_priority(sd_event_source *s, int priority) {
928         if (!s)
929                 return -EINVAL;
930
931         if (s->priority == priority)
932                 return 0;
933
934         s->priority = priority;
935
936         if (s->pending)
937                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
938
939         if (s->prepare)
940                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
941
942         return 0;
943 }
944
945 int sd_event_source_get_mute(sd_event_source *s, sd_event_mute_t *m) {
946         if (!s)
947                 return -EINVAL;
948         if (!m)
949                 return -EINVAL;
950
951         *m = s->mute;
952         return 0;
953 }
954
955 int sd_event_source_set_mute(sd_event_source *s, sd_event_mute_t m) {
956         int r;
957
958         if (!s)
959                 return -EINVAL;
960         if (m != SD_EVENT_MUTED && m != SD_EVENT_UNMUTED && !SD_EVENT_ONESHOT)
961                 return -EINVAL;
962
963         if (s->mute == m)
964                 return 0;
965
966         if (m == SD_EVENT_MUTED) {
967
968                 switch (s->type) {
969
970                 case SOURCE_IO:
971                         r = source_io_unregister(s);
972                         if (r < 0)
973                                 return r;
974
975                         s->mute = m;
976                         break;
977
978                 case SOURCE_MONOTONIC:
979                         s->mute = m;
980                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
981                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
982                         break;
983
984                 case SOURCE_REALTIME:
985                         s->mute = m;
986                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
987                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
988                         break;
989
990                 case SOURCE_SIGNAL:
991                         s->mute = m;
992                         if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0) {
993                                 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
994                                 event_update_signal_fd(s->event);
995                         }
996
997                         break;
998
999                 case SOURCE_CHILD:
1000                         s->mute = m;
1001
1002                         assert(s->event->n_unmuted_child_sources > 0);
1003                         s->event->n_unmuted_child_sources--;
1004
1005                         if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1006                                 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
1007                                 event_update_signal_fd(s->event);
1008                         }
1009
1010                         break;
1011
1012                 case SOURCE_DEFER:
1013                         s->mute = m;
1014                         break;
1015                 }
1016
1017         } else {
1018                 switch (s->type) {
1019
1020                 case SOURCE_IO:
1021                         r = source_io_register(s, m, s->io.events);
1022                         if (r < 0)
1023                                 return r;
1024
1025                         s->mute = m;
1026                         break;
1027
1028                 case SOURCE_MONOTONIC:
1029                         s->mute = m;
1030                         prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1031                         prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1032                         break;
1033
1034                 case SOURCE_REALTIME:
1035                         s->mute = m;
1036                         prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1037                         prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1038                         break;
1039
1040                 case SOURCE_SIGNAL:
1041                         s->mute = m;
1042
1043                         if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0)  {
1044                                 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
1045                                 event_update_signal_fd(s->event);
1046                         }
1047                         break;
1048
1049                 case SOURCE_CHILD:
1050                         s->mute = m;
1051
1052                         if (s->mute == SD_EVENT_MUTED) {
1053                                 s->event->n_unmuted_child_sources++;
1054
1055                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
1056                                         assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
1057                                         event_update_signal_fd(s->event);
1058                                 }
1059                         }
1060                         break;
1061
1062                 case SOURCE_DEFER:
1063                         s->mute = m;
1064                         break;
1065                 }
1066         }
1067
1068         if (s->pending)
1069                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
1070
1071         if (s->prepare)
1072                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
1073
1074         return 0;
1075 }
1076
1077 int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
1078         if (!s)
1079                 return -EINVAL;
1080         if (!usec)
1081                 return -EINVAL;
1082         if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
1083                 return -EDOM;
1084
1085         *usec = s->time.next;
1086         return 0;
1087 }
1088
1089 int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1090         if (!s)
1091                 return -EINVAL;
1092         if (usec == (uint64_t) -1)
1093                 return -EINVAL;
1094         if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
1095                 return -EDOM;
1096
1097         if (s->time.next == usec)
1098                 return 0;
1099
1100         s->time.next = usec;
1101
1102         if (s->type == SOURCE_REALTIME) {
1103                 prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
1104                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1105         } else {
1106                 prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
1107                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1108         }
1109
1110         return 0;
1111 }
1112
1113 int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback) {
1114         int r;
1115
1116         if (!s)
1117                 return -EINVAL;
1118
1119         if (s->prepare == callback)
1120                 return 0;
1121
1122         if (callback && s->prepare) {
1123                 s->prepare = callback;
1124                 return 0;
1125         }
1126
1127         r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1128         if (r < 0)
1129                 return r;
1130
1131         s->prepare = callback;
1132
1133         if (callback) {
1134                 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1135                 if (r < 0)
1136                         return r;
1137         } else
1138                 prioq_remove(s->event->prepare, s, &s->prepare_index);
1139
1140         return 0;
1141 }
1142
1143 void* sd_event_source_get_userdata(sd_event_source *s) {
1144         if (!s)
1145                 return NULL;
1146
1147         return s->userdata;
1148 }
1149
1150 static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
1151         usec_t c;
1152         assert(e);
1153         assert(a <= b);
1154
1155         if (a <= 0)
1156                 return 0;
1157
1158         if (b <= a + 1)
1159                 return a;
1160
1161         /*
1162           Find a good time to wake up again between times a and b. We
1163           have two goals here:
1164
1165           a) We want to wake up as seldom as possible, hence prefer
1166              later times over earlier times.
1167
1168           b) But if we have to wake up, then let's make sure to
1169              dispatch as much as possible on the entire system.
1170
1171           We implement this by waking up everywhere at the same time
1172           within any given second if we can, synchronised via the
1173           perturbation value determined from the boot ID. If we can't,
1174           then we try to find the same spot in every a 250ms
1175           step. Otherwise, we pick the last possible time to wake up.
1176         */
1177
1178         c = (b / USEC_PER_SEC) * USEC_PER_SEC + e->perturb;
1179         if (c >= b) {
1180                 if (_unlikely_(c < USEC_PER_SEC))
1181                         return b;
1182
1183                 c -= USEC_PER_SEC;
1184         }
1185
1186         if (c >= a)
1187                 return c;
1188
1189         c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
1190         if (c >= b) {
1191                 if (_unlikely_(c < USEC_PER_MSEC*250))
1192                         return b;
1193
1194                 c -= USEC_PER_MSEC*250;
1195         }
1196
1197         if (c >= a)
1198                 return c;
1199
1200         return b;
1201 }
1202
1203 static int event_arm_timer(
1204                 sd_event *e,
1205                 int timer_fd,
1206                 Prioq *earliest,
1207                 Prioq *latest,
1208                 usec_t *next) {
1209
1210         struct itimerspec its = {};
1211         sd_event_source *a, *b;
1212         usec_t t;
1213         int r;
1214
1215         assert_se(e);
1216         assert_se(next);
1217
1218         a = prioq_peek(earliest);
1219         if (!a || a->mute == SD_EVENT_MUTED)
1220                 return 0;
1221
1222         b = prioq_peek(latest);
1223         assert_se(b && b->mute != SD_EVENT_MUTED);
1224
1225         t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
1226         if (*next == t)
1227                 return 0;
1228
1229         assert_se(timer_fd >= 0);
1230
1231         if (t == 0) {
1232                 /* We don' want to disarm here, just mean some time looooong ago. */
1233                 its.it_value.tv_sec = 0;
1234                 its.it_value.tv_nsec = 1;
1235         } else
1236                 timespec_store(&its.it_value, t);
1237
1238         r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1239         if (r < 0)
1240                 return r;
1241
1242         *next = t;
1243         return 0;
1244 }
1245
1246 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1247         assert(e);
1248         assert(s);
1249         assert(s->type == SOURCE_IO);
1250
1251         s->io.revents = events;
1252
1253         /*
1254            If this is a oneshot event source, then we added it to the
1255            epoll with EPOLLONESHOT, hence we know it's not registered
1256            anymore. We can save a syscall here...
1257         */
1258
1259         if (s->mute == SD_EVENT_ONESHOT)
1260                 s->io.registered = false;
1261
1262         return source_set_pending(s, true);
1263 }
1264
1265 static int flush_timer(sd_event *e, int fd, uint32_t events) {
1266         uint64_t x;
1267         ssize_t ss;
1268
1269         assert(e);
1270
1271         if (events != EPOLLIN)
1272                 return -EIO;
1273
1274         ss = read(fd, &x, sizeof(x));
1275         if (ss < 0) {
1276                 if (errno == EAGAIN || errno == EINTR)
1277                         return 0;
1278
1279                 return -errno;
1280         }
1281
1282         if (ss != sizeof(x))
1283                 return -EIO;
1284
1285         return 0;
1286 }
1287
1288 static int process_timer(sd_event *e, usec_t n, Prioq *earliest, Prioq *latest) {
1289         sd_event_source *s;
1290         int r;
1291
1292         assert(e);
1293
1294         for (;;) {
1295                 s = prioq_peek(earliest);
1296                 if (!s ||
1297                     s->time.next > n ||
1298                     s->mute == SD_EVENT_MUTED ||
1299                     s->pending)
1300                         break;
1301
1302                 r = source_set_pending(s, true);
1303                 if (r < 0)
1304                         return r;
1305
1306                 prioq_reshuffle(earliest, s, &s->time.earliest_index);
1307                 prioq_reshuffle(latest, s, &s->time.latest_index);
1308         }
1309
1310         return 0;
1311 }
1312
1313 static int process_child(sd_event *e) {
1314         sd_event_source *s;
1315         Iterator i;
1316         int r;
1317
1318         assert(e);
1319
1320         e->need_process_child = false;
1321
1322         /*
1323            So, this is ugly. We iteratively invoke waitid() with P_PID
1324            + WNOHANG for each PID we wait for, instead of using
1325            P_ALL. This is because we only want to get child
1326            information of very specific child processes, and not all
1327            of them. We might not have processed the SIGCHLD even of a
1328            previous invocation and we don't want to maintain a
1329            unbounded *per-child* event queue, hence we really don't
1330            want anything flushed out of the kernel's queue that we
1331            don't care about. Since this is O(n) this means that if you
1332            have a lot of processes you probably want to handle SIGCHLD
1333            yourself.
1334         */
1335
1336         HASHMAP_FOREACH(s, e->child_sources, i) {
1337                 assert(s->type == SOURCE_CHILD);
1338
1339                 if (s->pending)
1340                         continue;
1341
1342                 if (s->mute == SD_EVENT_MUTED)
1343                         continue;
1344
1345                 zero(s->child.siginfo);
1346                 r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options);
1347                 if (r < 0)
1348                         return -errno;
1349
1350                 if (s->child.siginfo.si_pid != 0) {
1351                         r = source_set_pending(s, true);
1352                         if (r < 0)
1353                                 return r;
1354                 }
1355         }
1356
1357         return 0;
1358 }
1359
1360 static int process_signal(sd_event *e, uint32_t events) {
1361         struct signalfd_siginfo si;
1362         bool read_one = false;
1363         ssize_t ss;
1364         int r;
1365
1366         if (events != EPOLLIN)
1367                 return -EIO;
1368
1369         for (;;) {
1370                 sd_event_source *s;
1371
1372                 ss = read(e->signal_fd, &si, sizeof(si));
1373                 if (ss < 0) {
1374                         if (errno == EAGAIN || errno == EINTR)
1375                                 return read_one;
1376
1377                         return -errno;
1378                 }
1379
1380                 if (ss != sizeof(si))
1381                         return -EIO;
1382
1383                 read_one = true;
1384
1385                 if (si.ssi_signo == SIGCHLD) {
1386                         r = process_child(e);
1387                         if (r < 0)
1388                                 return r;
1389                         if (r > 0 || !e->signal_sources[si.ssi_signo])
1390                                 continue;
1391                 } else {
1392                         s = e->signal_sources[si.ssi_signo];
1393                         if (!s)
1394                                 return -EIO;
1395                 }
1396
1397                 s->signal.siginfo = si;
1398                 r = source_set_pending(s, true);
1399                 if (r < 0)
1400                         return r;
1401         }
1402
1403
1404         return 0;
1405 }
1406
1407 static int source_dispatch(sd_event_source *s) {
1408         int r;
1409
1410         assert(s);
1411         assert(s->pending);
1412
1413         r = source_set_pending(s, false);
1414         if (r < 0)
1415                 return r;
1416
1417         if (s->mute == SD_EVENT_ONESHOT) {
1418                 r = sd_event_source_set_mute(s, SD_EVENT_MUTED);
1419                 if (r < 0)
1420                         return r;
1421         }
1422
1423         switch (s->type) {
1424
1425         case SOURCE_IO:
1426                 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1427                 break;
1428
1429         case SOURCE_MONOTONIC:
1430                 r = s->time.callback(s, s->time.next, s->userdata);
1431                 break;
1432
1433         case SOURCE_REALTIME:
1434                 r = s->time.callback(s, s->time.next, s->userdata);
1435                 break;
1436
1437         case SOURCE_SIGNAL:
1438                 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1439                 break;
1440
1441         case SOURCE_CHILD:
1442                 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1443                 break;
1444
1445         case SOURCE_DEFER:
1446                 r = s->defer.callback(s, s->userdata);
1447                 break;
1448         }
1449
1450         return r;
1451 }
1452
1453 static int event_prepare(sd_event *e) {
1454         int r;
1455
1456         assert(e);
1457
1458         for (;;) {
1459                 sd_event_source *s;
1460
1461                 s = prioq_peek(e->prepare);
1462                 if (!s || s->prepare_iteration == e->iteration || s->mute == SD_EVENT_MUTED)
1463                         break;
1464
1465                 s->prepare_iteration = e->iteration;
1466                 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1467                 if (r < 0)
1468                         return r;
1469
1470                 assert(s->prepare);
1471                 r = s->prepare(s, s->userdata);
1472                 if (r < 0)
1473                         return r;
1474
1475         }
1476
1477         return 0;
1478 }
1479
1480 static sd_event_source* event_next_pending(sd_event *e) {
1481         sd_event_source *p;
1482
1483         p = prioq_peek(e->pending);
1484         if (!p)
1485                 return NULL;
1486
1487         if (p->mute == SD_EVENT_MUTED)
1488                 return NULL;
1489
1490         return p;
1491 }
1492
1493 int sd_event_run(sd_event *e, uint64_t timeout) {
1494         struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1495         sd_event_source *p;
1496         int r, i, m;
1497         dual_timestamp n;
1498
1499         if (!e)
1500                 return -EINVAL;
1501         if (e->quit)
1502                 return -ESTALE;
1503
1504         e->iteration++;
1505
1506         r = event_prepare(e);
1507         if (r < 0)
1508                 return r;
1509
1510         if (event_next_pending(e) || e->need_process_child)
1511                 timeout = 0;
1512
1513         if (timeout > 0) {
1514                 r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
1515                 if (r < 0)
1516                         return r;
1517
1518                 r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
1519                 if (r < 0)
1520                         return r;
1521         }
1522
1523         m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
1524                        timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1525         if (m < 0)
1526                 return m;
1527
1528         dual_timestamp_get(&n);
1529
1530         for (i = 0; i < m; i++) {
1531
1532                 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1533                         r = flush_timer(e, e->monotonic_fd, ev_queue[i].events);
1534                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1535                         r = flush_timer(e, e->realtime_fd, ev_queue[i].events);
1536                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1537                         r = process_signal(e, ev_queue[i].events);
1538                 else
1539                         r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1540
1541                 if (r < 0)
1542                         return r;
1543         }
1544
1545         r = process_timer(e, n.monotonic, e->monotonic_earliest, e->monotonic_latest);
1546         if (r < 0)
1547                 return r;
1548
1549         r = process_timer(e, n.realtime, e->realtime_earliest, e->realtime_latest);
1550         if (r < 0)
1551                 return r;
1552
1553         if (e->need_process_child) {
1554                 r = process_child(e);
1555                 if (r < 0)
1556                         return r;
1557         }
1558
1559         p = event_next_pending(e);
1560         if (!p)
1561                 return 0;
1562
1563         return source_dispatch(p);
1564 }
1565
1566 int sd_event_loop(sd_event *e) {
1567         int r;
1568
1569         if (!e)
1570                 return -EINVAL;
1571
1572         while (!e->quit) {
1573                 r = sd_event_run(e, (uint64_t) -1);
1574                 if (r < 0)
1575                         return r;
1576         }
1577
1578         return 0;
1579 }
1580
1581 int sd_event_quit(sd_event *e) {
1582         if (!e)
1583                 return EINVAL;
1584
1585         return e->quit;
1586 }
1587
1588 int sd_event_request_quit(sd_event *e) {
1589         if (!e)
1590                 return -EINVAL;
1591
1592         e->quit = true;
1593         return 0;
1594 }
1595
1596 sd_event *sd_event_get(sd_event_source *s) {
1597         if (!s)
1598                 return NULL;
1599
1600         return s->event;
1601 }
1602
1603 int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
1604         if (!s)
1605                 return -EINVAL;
1606         if (s->type != SOURCE_MONOTONIC && s->type != SOURCE_REALTIME)
1607                 return -EDOM;
1608
1609         if (usec == 0)
1610                 usec = DEFAULT_ACCURACY_USEC;
1611
1612         if (s->time.accuracy == usec)
1613                 return 0;
1614
1615
1616         s->time.accuracy = usec;
1617
1618         if (s->type == SOURCE_REALTIME)
1619                 prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
1620         else
1621                 prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
1622
1623         return 0;
1624 }
1625
1626 int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
1627         if (!s)
1628                 return -EINVAL;
1629         if (!usec)
1630                 return -EINVAL;
1631         if (s->type != SOURCE_MONOTONIC && s->type != SOURCE_REALTIME)
1632                 return -EDOM;
1633
1634         *usec = s->time.accuracy;
1635         return 0;
1636 }