chiark / gitweb /
bus: add minimal event loop API
[elogind.git] / src / libsystemd-bus / sd-event.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/timerfd.h>
24 #include <sys/wait.h>
25
26 #include "macro.h"
27 #include "refcnt.h"
28 #include "prioq.h"
29 #include "hashmap.h"
30 #include "util.h"
31 #include "time-util.h"
32
33 #include "sd-event.h"
34
35 #define EPOLL_QUEUE_MAX 64
36
37 typedef enum EventSourceType {
38         SOURCE_IO,
39         SOURCE_MONOTONIC,
40         SOURCE_REALTIME,
41         SOURCE_SIGNAL,
42         SOURCE_CHILD,
43         SOURCE_DEFER
44 } EventSourceType;
45
46 struct sd_event_source {
47         RefCount n_ref;
48
49         sd_event *event;
50         void *userdata;
51         sd_prepare_handler_t prepare;
52
53         EventSourceType type:4;
54         sd_event_mute_t mute:3;
55         bool pending:1;
56
57         int priority;
58         unsigned pending_index;
59         unsigned prepare_index;
60         unsigned pending_iteration;
61         unsigned prepare_iteration;
62
63         union {
64                 struct {
65                         sd_io_handler_t callback;
66                         int fd;
67                         uint32_t events;
68                         uint32_t revents;
69                         bool registered:1;
70                 } io;
71                 struct {
72                         sd_time_handler_t callback;
73                         usec_t next;
74                         unsigned prioq_index;
75                 } time;
76                 struct {
77                         sd_signal_handler_t callback;
78                         struct signalfd_siginfo siginfo;
79                         int sig;
80                 } signal;
81                 struct {
82                         sd_child_handler_t callback;
83                         siginfo_t siginfo;
84                         pid_t pid;
85                         int options;
86                 } child;
87                 struct {
88                         sd_defer_handler_t callback;
89                 } defer;
90         };
91 };
92
93 struct sd_event {
94         RefCount n_ref;
95
96         int epoll_fd;
97         int signal_fd;
98         int realtime_fd;
99         int monotonic_fd;
100
101         Prioq *pending;
102         Prioq *prepare;
103         Prioq *monotonic;
104         Prioq *realtime;
105
106         sigset_t sigset;
107         sd_event_source **signal_sources;
108
109         Hashmap *child_sources;
110         unsigned n_unmuted_child_sources;
111
112         unsigned iteration;
113         unsigned processed_children;
114
115         usec_t realtime_next, monotonic_next;
116
117         bool quit;
118 };
119
120 static int pending_prioq_compare(const void *a, const void *b) {
121         const sd_event_source *x = a, *y = b;
122
123         assert(x->pending);
124         assert(y->pending);
125
126         /* Unmuted ones first */
127         if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
128                 return -1;
129         if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
130                 return 1;
131
132         /* Lower priority values first */
133         if (x->priority < y->priority)
134                 return -1;
135         if (x->priority > y->priority)
136                 return 1;
137
138         /* Older entries first */
139         if (x->pending_iteration < y->pending_iteration)
140                 return -1;
141         if (x->pending_iteration > y->pending_iteration)
142                 return 1;
143
144         /* Stability for the rest */
145         if (x < y)
146                 return -1;
147         if (y > x)
148                 return 1;
149
150         return 0;
151 }
152
153 static int prepare_prioq_compare(const void *a, const void *b) {
154         const sd_event_source *x = a, *y = b;
155
156         assert(x->prepare);
157         assert(y->prepare);
158
159         /* Move most recently prepared ones last, so that we can stop
160          * preparing as soon as we hit one that has already been
161          * prepared in the current iteration */
162         if (x->prepare_iteration < y->prepare_iteration)
163                 return -1;
164         if (x->prepare_iteration > y->prepare_iteration)
165                 return 1;
166
167         /* Unmuted ones first */
168         if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
169                 return -1;
170         if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
171                 return 1;
172
173         /* Lower priority values first */
174         if (x->priority < y->priority)
175                 return -1;
176         if (x->priority > y->priority)
177                 return 1;
178
179         /* Stability for the rest */
180         if (x < y)
181                 return -1;
182         if (y > x)
183                 return 1;
184
185         return 0;
186 }
187
188 static int time_prioq_compare(const void *a, const void *b) {
189         const sd_event_source *x = a, *y = b;
190
191         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
192         assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
193
194         /* Unmuted ones first */
195         if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
196                 return -1;
197         if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
198                 return 1;
199
200         /* Move the pending ones to the end */
201         if (!x->pending && y->pending)
202                 return -1;
203         if (x->pending && !y->pending)
204                 return 1;
205
206         /* Order by time */
207         if (x->time.next < y->time.next)
208                 return -1;
209         if (x->time.next > y->time.next)
210                 return -1;
211
212         /* Stability for the rest */
213         if (x < y)
214                 return -1;
215         if (y > x)
216                 return 1;
217
218         return 0;
219 }
220
221 static void event_free(sd_event *e) {
222         assert(e);
223
224         if (e->epoll_fd >= 0)
225                 close_nointr_nofail(e->epoll_fd);
226
227         if (e->signal_fd >= 0)
228                 close_nointr_nofail(e->signal_fd);
229
230         if (e->realtime_fd >= 0)
231                 close_nointr_nofail(e->realtime_fd);
232
233         if (e->monotonic_fd >= 0)
234                 close_nointr_nofail(e->monotonic_fd);
235
236         prioq_free(e->pending);
237         prioq_free(e->prepare);
238         prioq_free(e->monotonic);
239         prioq_free(e->realtime);
240
241         free(e->signal_sources);
242
243         hashmap_free(e->child_sources);
244         free(e);
245 }
246
247 int sd_event_new(sd_event** ret) {
248         sd_event *e;
249         int r;
250
251         if (!ret)
252                 return -EINVAL;
253
254         e = new0(sd_event, 1);
255         if (!e)
256                 return -ENOMEM;
257
258         e->n_ref = REFCNT_INIT;
259         e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
260         e->realtime_next = e->monotonic_next = (usec_t) -1;
261
262         assert_se(sigemptyset(&e->sigset) == 0);
263
264         e->pending = prioq_new(pending_prioq_compare);
265         if (!e->pending) {
266                 r = -ENOMEM;
267                 goto fail;
268         }
269
270         e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
271         if (e->epoll_fd < 0) {
272                 r = -errno;
273                 goto fail;
274         }
275
276         *ret = e;
277         return 0;
278
279 fail:
280         event_free(e);
281         return r;
282 }
283
284 sd_event* sd_event_ref(sd_event *e) {
285         if (!e)
286                 return NULL;
287
288         assert_se(REFCNT_INC(e->n_ref) >= 2);
289
290         return e;
291 }
292
293 sd_event* sd_event_unref(sd_event *e) {
294         if (!e)
295                 return NULL;
296
297         if (REFCNT_DEC(e->n_ref) <= 0)
298                 event_free(e);
299
300         return NULL;
301 }
302
303 static int source_io_unregister(sd_event_source *s) {
304         int r;
305
306         assert(s);
307         assert(s->type == SOURCE_IO);
308
309         if (!s->io.registered)
310                 return 0;
311
312         r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
313         if (r < 0)
314                 return -errno;
315
316         s->io.registered = false;
317         return 0;
318 }
319
320 static int source_io_register(sd_event_source *s, sd_event_mute_t m, uint32_t events) {
321         struct epoll_event ev = {};
322         int r;
323
324         assert(s);
325         assert(s->type == SOURCE_IO);
326         assert(m != SD_EVENT_MUTED);
327
328         ev.events = events;
329         ev.data.ptr = s;
330
331         if (m == SD_EVENT_ONESHOT)
332                 ev.events |= EPOLLONESHOT;
333
334         if (s->io.registered)
335                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
336         else
337                 r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
338
339         if (r < 0)
340                 return -errno;
341
342         s->io.registered = true;
343
344         return 0;
345 }
346
347 static void source_free(sd_event_source *s) {
348         assert(s);
349
350         if (s->event) {
351                 switch (s->type) {
352
353                 case SOURCE_IO:
354                         if (s->io.fd >= 0)
355                                 source_io_unregister(s);
356
357                         break;
358
359                 case SOURCE_MONOTONIC:
360                         prioq_remove(s->event->monotonic, s, &s->time.prioq_index);
361                         break;
362
363                 case SOURCE_REALTIME:
364                         prioq_remove(s->event->realtime, s, &s->time.prioq_index);
365                         break;
366
367                 case SOURCE_SIGNAL:
368                         if (s->signal.sig > 0) {
369                                 if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0)
370                                         assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
371
372                                 if (s->event->signal_sources)
373                                         s->event->signal_sources[s->signal.sig] = NULL;
374                         }
375
376                         break;
377
378                 case SOURCE_CHILD:
379                         if (s->child.pid > 0) {
380                                 if (s->mute != SD_EVENT_MUTED) {
381                                         assert(s->event->n_unmuted_child_sources > 0);
382                                         s->event->n_unmuted_child_sources--;
383                                 }
384
385                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
386                                         assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
387
388                                 hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
389                         }
390
391                         break;
392                 }
393
394                 if (s->pending)
395                         prioq_remove(s->event->pending, s, &s->pending_index);
396
397                 if (s->prepare)
398                         prioq_remove(s->event->prepare, s, &s->prepare_index);
399
400                 sd_event_unref(s->event);
401         }
402
403         free(s);
404 }
405
406 static int source_set_pending(sd_event_source *s, bool b) {
407         int r;
408
409         assert(s);
410
411         if (s->pending == b)
412                 return 0;
413
414         s->pending = b;
415
416         if (b) {
417                 s->pending_iteration = s->event->iteration;
418
419                 r = prioq_put(s->event->pending, s, &s->pending_index);
420                 if (r < 0) {
421                         s->pending = false;
422                         return r;
423                 }
424         } else
425                 assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
426
427         return 0;
428 }
429
430 static sd_event_source *source_new(sd_event *e, EventSourceType type) {
431         sd_event_source *s;
432
433         assert(e);
434
435         s = new0(sd_event_source, 1);
436         if (!s)
437                 return NULL;
438
439         s->n_ref = REFCNT_INIT;
440         s->event = sd_event_ref(e);
441         s->type = type;
442         s->mute = SD_EVENT_UNMUTED;
443         s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
444
445         return s;
446 }
447
448 int sd_event_add_io(
449                 sd_event *e,
450                 int fd,
451                 uint32_t events,
452                 sd_io_handler_t callback,
453                 void *userdata,
454                 sd_event_source **ret) {
455
456         sd_event_source *s;
457         int r;
458
459         if (!e)
460                 return -EINVAL;
461         if (fd < 0)
462                 return -EINVAL;
463         if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP))
464                 return -EINVAL;
465         if (!callback)
466                 return -EINVAL;
467         if (!ret)
468                 return -EINVAL;
469
470         s = source_new(e, SOURCE_IO);
471         if (!s)
472                 return -ENOMEM;
473
474         s->io.fd = fd;
475         s->io.events = events;
476         s->io.callback = callback;
477         s->userdata = userdata;
478
479         r = source_io_register(s, s->mute, events);
480         if (r < 0) {
481                 source_free(s);
482                 return -errno;
483         }
484
485         *ret = s;
486         return 0;
487 }
488
489 static int event_setup_timer_fd(
490                 sd_event *e,
491                 EventSourceType type,
492                 int *timer_fd,
493                 clockid_t id) {
494
495         struct epoll_event ev = {};
496         int r, fd;
497
498         assert(e);
499         assert(timer_fd);
500
501         if (_likely_(*timer_fd >= 0))
502                 return 0;
503
504         fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
505         if (fd < 0)
506                 return -errno;
507
508         ev.events = EPOLLIN;
509         ev.data.ptr = INT_TO_PTR(type);
510
511         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
512         if (r < 0) {
513                 close_nointr_nofail(fd);
514                 return -errno;
515         }
516
517         *timer_fd = fd;
518         return 0;
519 }
520
521 static int event_add_time_internal(
522                 sd_event *e,
523                 EventSourceType type,
524                 int *timer_fd,
525                 clockid_t id,
526                 Prioq **prioq,
527                 uint64_t usec,
528                 sd_time_handler_t callback,
529                 void *userdata,
530                 sd_event_source **ret) {
531
532         sd_event_source *s;
533         int r;
534
535         if (!e)
536                 return -EINVAL;
537         if (!callback)
538                 return -EINVAL;
539         if (!ret)
540                 return -EINVAL;
541
542         assert(timer_fd);
543         assert(prioq);
544
545         if (!*prioq) {
546                 *prioq = prioq_new(time_prioq_compare);
547                 if (!*prioq)
548                         return -ENOMEM;
549         }
550
551         if (*timer_fd < 0) {
552                 r = event_setup_timer_fd(e, type, timer_fd, id);
553                 if (r < 0)
554                         return r;
555         }
556
557         s = source_new(e, type);
558         if (!s)
559                 return -ENOMEM;
560
561         s->time.next = usec;
562         s->time.callback = callback;
563         s->time.prioq_index = PRIOQ_IDX_NULL;
564         s->userdata = userdata;
565
566         r = prioq_put(*prioq, s, &s->time.prioq_index);
567         if (r < 0) {
568                 source_free(s);
569                 return r;
570         }
571
572         *ret = s;
573         return 0;
574 }
575
576 int sd_event_add_monotonic(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
577         return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic, usec, callback, userdata, ret);
578 }
579
580 int sd_event_add_realtime(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
581         return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime, usec, callback, userdata, ret);
582 }
583
584 static int event_update_signal_fd(sd_event *e) {
585         struct epoll_event ev = {};
586         bool add_to_epoll;
587         int r;
588
589         assert(e);
590
591         add_to_epoll = e->signal_fd < 0;
592
593         r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
594         if (r < 0)
595                 return -errno;
596
597         e->signal_fd = r;
598
599         if (!add_to_epoll)
600                 return 0;
601
602         ev.events = EPOLLIN;
603         ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
604
605         r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
606         if (r < 0) {
607                 close_nointr_nofail(e->signal_fd);
608                 e->signal_fd = -1;
609
610                 return -errno;
611         }
612
613         return 0;
614 }
615
616 int sd_event_add_signal(sd_event *e, int sig, sd_signal_handler_t callback, void *userdata, sd_event_source **ret) {
617         sd_event_source *s;
618         int r;
619
620         if (!e)
621                 return -EINVAL;
622         if (sig <= 0)
623                 return -EINVAL;
624         if (sig >= _NSIG)
625                 return -EINVAL;
626         if (!callback)
627                 return -EINVAL;
628         if (!ret)
629                 return -EINVAL;
630
631         if (!e->signal_sources) {
632                 e->signal_sources = new0(sd_event_source*, _NSIG);
633                 if (!e->signal_sources)
634                         return -ENOMEM;
635         } else if (e->signal_sources[sig])
636                 return -EBUSY;
637
638         s = source_new(e, SOURCE_SIGNAL);
639         if (!s)
640                 return -ENOMEM;
641
642         s->signal.sig = sig;
643         s->signal.callback = callback;
644         s->userdata = userdata;
645
646         e->signal_sources[sig] = s;
647         assert_se(sigaddset(&e->sigset, sig) == 0);
648
649         if (sig != SIGCHLD || e->n_unmuted_child_sources == 0) {
650                 r = event_update_signal_fd(e);
651                 if (r < 0) {
652                         source_free(s);
653                         return r;
654                 }
655         }
656
657         *ret = s;
658         return 0;
659 }
660
661 int sd_event_add_child(sd_event *e, pid_t pid, int options, sd_child_handler_t callback, void *userdata, sd_event_source **ret) {
662         sd_event_source *s;
663         int r;
664
665         if (!e)
666                 return -EINVAL;
667         if (pid <= 1)
668                 return -EINVAL;
669         if (options & ~(WEXITED|WSTOPPED|WCONTINUED))
670                 return -EINVAL;
671         if (!callback)
672                 return -EINVAL;
673         if (!ret)
674                 return -EINVAL;
675
676         r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
677         if (r < 0)
678                 return r;
679
680         if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
681                 return -EBUSY;
682
683         s = source_new(e, SOURCE_CHILD);
684         if (!s)
685                 return -ENOMEM;
686
687         s->child.pid = pid;
688         s->child.options = options;
689         s->child.callback = callback;
690         s->userdata = userdata;
691
692         r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
693         if (r < 0) {
694                 source_free(s);
695                 return r;
696         }
697
698         e->n_unmuted_child_sources ++;
699
700         assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
701
702         if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
703                 r = event_update_signal_fd(e);
704                 if (r < 0) {
705                         source_free(s);
706                         return -errno;
707                 }
708         }
709
710         *ret = s;
711         return 0;
712 }
713
714 int sd_event_add_defer(sd_event *e, sd_defer_handler_t callback, void *userdata, sd_event_source **ret) {
715         sd_event_source *s;
716         int r;
717
718         if (!e)
719                 return -EINVAL;
720         if (!ret)
721                 return -EINVAL;
722
723         s = source_new(e, SOURCE_DEFER);
724         if (!s)
725                 return -ENOMEM;
726
727         s->defer.callback = callback;
728         s->userdata = userdata;
729
730         r = source_set_pending(s, true);
731         if (r < 0) {
732                 source_free(s);
733                 return r;
734         }
735
736         *ret = s;
737         return 0;
738 }
739
740 sd_event_source* sd_event_source_ref(sd_event_source *s) {
741         if (!s)
742                 return NULL;
743
744         assert_se(REFCNT_INC(s->n_ref) >= 2);
745
746         return s;
747 }
748
749 sd_event_source* sd_event_source_unref(sd_event_source *s) {
750         if (!s)
751                 return NULL;
752
753         if (REFCNT_DEC(s->n_ref) <= 0)
754                 source_free(s);
755
756         return NULL;
757 }
758
759 int sd_event_source_get_pending(sd_event_source *s) {
760         if (!s)
761                 return -EINVAL;
762
763         return s->pending;
764 }
765
766 int sd_event_source_get_io_fd(sd_event_source *s) {
767         if (!s)
768                 return -EINVAL;
769         if (s->type != SOURCE_IO)
770                 return -EDOM;
771
772         return s->io.fd;
773 }
774
775 int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
776         if (!s)
777                 return -EINVAL;
778         if (s->type != SOURCE_IO)
779                 return -EDOM;
780         if (!events)
781                 return -EINVAL;
782
783         *events = s->io.events;
784         return 0;
785 }
786
787 int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
788         int r;
789
790         if (!s)
791                 return -EINVAL;
792         if (!s->type != SOURCE_IO)
793                 return -EDOM;
794         if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP))
795                 return -EINVAL;
796
797         if (s->io.events == events)
798                 return 0;
799
800         if (s->mute != SD_EVENT_MUTED) {
801                 r = source_io_register(s, s->io.events, events);
802                 if (r < 0)
803                         return r;
804         }
805
806         s->io.events = events;
807
808         return 0;
809 }
810
811 int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
812         if (!s)
813                 return -EINVAL;
814         if (s->type != SOURCE_IO)
815                 return -EDOM;
816         if (!revents)
817                 return -EINVAL;
818         if (!s->pending)
819                 return -ENODATA;
820
821         *revents = s->io.revents;
822         return 0;
823 }
824
825 int sd_event_source_get_signal(sd_event_source *s) {
826         if (!s)
827                 return -EINVAL;
828         if (s->type != SOURCE_SIGNAL)
829                 return -EDOM;
830
831         return s->signal.sig;
832 }
833
834 int sd_event_source_get_priority(sd_event_source *s, int *priority) {
835         if (!s)
836                 return -EINVAL;
837
838         return s->priority;
839 }
840
841 int sd_event_source_set_priority(sd_event_source *s, int priority) {
842         if (!s)
843                 return -EINVAL;
844
845         if (s->priority == priority)
846                 return 0;
847
848         s->priority = priority;
849
850         if (s->pending)
851                 assert_se(prioq_reshuffle(s->event->pending, s, &s->pending_index) == 0);
852
853         if (s->prepare)
854                 assert_se(prioq_reshuffle(s->event->prepare, s, &s->prepare_index) == 0);
855
856         return 0;
857 }
858
859 int sd_event_source_get_mute(sd_event_source *s, sd_event_mute_t *m) {
860         if (!s)
861                 return -EINVAL;
862         if (!m)
863                 return -EINVAL;
864
865         *m = s->mute;
866         return 0;
867 }
868
869 int sd_event_source_set_mute(sd_event_source *s, sd_event_mute_t m) {
870         int r;
871
872         if (!s)
873                 return -EINVAL;
874         if (m != SD_EVENT_MUTED && m != SD_EVENT_UNMUTED && !SD_EVENT_ONESHOT)
875                 return -EINVAL;
876
877         if (s->mute == m)
878                 return 0;
879
880         if (m == SD_EVENT_MUTED) {
881
882                 switch (s->type) {
883
884                 case SOURCE_IO:
885                         r = source_io_unregister(s);
886                         if (r < 0)
887                                 return r;
888
889                         s->mute = m;
890                         break;
891
892                 case SOURCE_MONOTONIC:
893                         s->mute = m;
894                         prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index);
895                         break;
896
897                 case SOURCE_REALTIME:
898                         s->mute = m;
899                         prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index);
900                         break;
901
902                 case SOURCE_SIGNAL:
903                         s->mute = m;
904                         if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0) {
905                                 assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
906                                 event_update_signal_fd(s->event);
907                         }
908
909                         break;
910
911                 case SOURCE_CHILD:
912                         s->mute = m;
913
914                         assert(s->event->n_unmuted_child_sources > 0);
915                         s->event->n_unmuted_child_sources--;
916
917                         if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
918                                 assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
919                                 event_update_signal_fd(s->event);
920                         }
921
922                         break;
923
924                 case SOURCE_DEFER:
925                         s->mute = m;
926                         break;
927                 }
928
929         } else {
930                 switch (s->type) {
931
932                 case SOURCE_IO:
933                         r = source_io_register(s, m, s->io.events);
934                         if (r < 0)
935                                 return r;
936
937                         s->mute = m;
938                         break;
939
940                 case SOURCE_MONOTONIC:
941                         s->mute = m;
942                         prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index);
943                         break;
944
945                 case SOURCE_REALTIME:
946                         s->mute = m;
947                         prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index);
948                         break;
949
950                 case SOURCE_SIGNAL:
951                         s->mute = m;
952
953                         if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0)  {
954                                 assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
955                                 event_update_signal_fd(s->event);
956                         }
957                         break;
958
959                 case SOURCE_CHILD:
960                         s->mute = m;
961
962                         if (s->mute == SD_EVENT_MUTED) {
963                                 s->event->n_unmuted_child_sources++;
964
965                                 if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
966                                         assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
967                                         event_update_signal_fd(s->event);
968                                 }
969                         }
970                         break;
971
972                 case SOURCE_DEFER:
973                         s->mute = m;
974                         break;
975                 }
976         }
977
978         if (s->pending)
979                 prioq_reshuffle(s->event->pending, s, &s->pending_index);
980
981         if (s->prepare)
982                 prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
983
984         return 0;
985 }
986
987 int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
988         if (!s)
989                 return -EINVAL;
990         if (!usec)
991                 return -EINVAL;
992         if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
993                 return -EDOM;
994
995         *usec = s->time.next;
996         return 0;
997 }
998
999 int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
1000         if (!s)
1001                 return -EINVAL;
1002         if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
1003                 return -EDOM;
1004
1005         if (s->time.next == usec)
1006                 return 0;
1007
1008         s->time.next = usec;
1009
1010         if (s->type == SOURCE_REALTIME)
1011                 prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index);
1012         else
1013                 prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index);
1014
1015         return 0;
1016 }
1017
1018 int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback) {
1019         int r;
1020
1021         if (!s)
1022                 return -EINVAL;
1023
1024         if (s->prepare == callback)
1025                 return 0;
1026
1027         if (callback && s->prepare) {
1028                 s->prepare = callback;
1029                 return 0;
1030         }
1031
1032         r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
1033         if (r < 0)
1034                 return r;
1035
1036         s->prepare = callback;
1037
1038         if (callback) {
1039                 r = prioq_put(s->event->prepare, s, &s->prepare_index);
1040                 if (r < 0)
1041                         return r;
1042         } else
1043                 prioq_remove(s->event->prepare, s, &s->prepare_index);
1044
1045         return 0;
1046 }
1047
1048 void* sd_event_source_get_userdata(sd_event_source *s) {
1049         if (!s)
1050                 return NULL;
1051
1052         return s->userdata;
1053 }
1054
1055 static int event_arm_timer(
1056                 sd_event *e,
1057                 int timer_fd,
1058                 Prioq *prioq,
1059                 usec_t *next) {
1060
1061         struct itimerspec its = {};
1062         sd_event_source *s;
1063         int r;
1064
1065         assert_se(e);
1066         assert_se(next);
1067
1068         s = prioq_peek(prioq);
1069         if (!s || s->mute == SD_EVENT_MUTED)
1070                 return 0;
1071
1072         if (*next == s->time.next)
1073                 return 0;
1074
1075         assert_se(timer_fd >= 0);
1076
1077         if (s->time.next == 0) {
1078                 /* We don' want to disarm here, just mean some time looooong ago. */
1079                 its.it_value.tv_sec = 0;
1080                 its.it_value.tv_nsec = 1;
1081         } else
1082                 timespec_store(&its.it_value, s->time.next);
1083
1084         r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
1085         if (r < 0)
1086                 return r;
1087
1088         *next = s->time.next;
1089         return 0;
1090 }
1091
1092 static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
1093         assert(e);
1094         assert(s);
1095         assert(s->type == SOURCE_IO);
1096
1097         s->io.revents = events;
1098
1099         /*
1100            If this is a oneshot event source, then we added it to the
1101            epoll with EPOLLONESHOT, hence we know it's not registered
1102            anymore. We can save a syscall here...
1103         */
1104
1105         if (s->mute == SD_EVENT_ONESHOT)
1106                 s->io.registered = false;
1107
1108         return source_set_pending(s, true);
1109 }
1110
1111 static int flush_timer(sd_event *e, int fd, uint32_t events) {
1112         uint64_t x;
1113         ssize_t ss;
1114
1115         assert(e);
1116
1117         if (events != EPOLLIN)
1118                 return -EIO;
1119
1120         ss = read(fd, &x, sizeof(x));
1121         if (ss < 0) {
1122                 if (errno == EAGAIN || errno == EINTR)
1123                         return 0;
1124
1125                 return -errno;
1126         }
1127
1128         if (ss != sizeof(x))
1129                 return -EIO;
1130
1131         return 0;
1132 }
1133
1134 static int process_timer(sd_event *e, usec_t n, Prioq *prioq) {
1135         sd_event_source *s;
1136         int r;
1137
1138         assert(e);
1139
1140         for (;;) {
1141                 s = prioq_peek(prioq);
1142                 if (!s ||
1143                     s->time.next > n ||
1144                     s->mute == SD_EVENT_MUTED ||
1145                     s->pending)
1146                         break;
1147
1148                 r = source_set_pending(s, true);
1149                 if (r < 0)
1150                         return r;
1151
1152                 r = prioq_reshuffle(prioq, s, &s->time.prioq_index);
1153                 if (r < 0)
1154                         return r;
1155         }
1156
1157         return 0;
1158 }
1159
1160 static int process_child(sd_event *e) {
1161         sd_event_source *s;
1162         Iterator i;
1163         int r;
1164
1165         assert(e);
1166
1167         /*
1168            So, this is ugly. We iteratively invoke waitid() with P_PID
1169            + WNOHANG for each PID we wait for, instead of using
1170            P_ALL. This is because we only want to get child
1171            information of very specific child processes, and not all
1172            of them. We might not have processed the SIGCHLD even of a
1173            previous invocation and we don't want to maintain a
1174            unbounded *per-child* event queue, hence we really don't
1175            want anything flushed out of the kernel's queue that we
1176            don't care about. Since this is O(n) this means that if you
1177            have a lot of processes you probably want to handle SIGCHLD
1178            yourself.
1179         */
1180
1181         HASHMAP_FOREACH(s, e->child_sources, i) {
1182                 assert(s->type == SOURCE_CHILD);
1183
1184                 if (s->pending)
1185                         continue;
1186
1187                 if (s->mute == SD_EVENT_MUTED)
1188                         continue;
1189
1190                 zero(s->child.siginfo);
1191                 r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options);
1192                 if (r < 0)
1193                         return -errno;
1194
1195                 if (s->child.siginfo.si_pid != 0) {
1196                         r = source_set_pending(s, true);
1197                         if (r < 0)
1198                                 return r;
1199                 }
1200         }
1201
1202         e->processed_children = e->iteration;
1203         return 0;
1204 }
1205
1206 static int process_signal(sd_event *e, uint32_t events) {
1207         struct signalfd_siginfo si;
1208         bool read_one = false;
1209         ssize_t ss;
1210         int r;
1211
1212         if (events != EPOLLIN)
1213                 return -EIO;
1214
1215         for (;;) {
1216                 sd_event_source *s;
1217
1218                 ss = read(e->signal_fd, &si, sizeof(si));
1219                 if (ss < 0) {
1220                         if (errno == EAGAIN || errno == EINTR)
1221                                 return read_one;
1222
1223                         return -errno;
1224                 }
1225
1226                 if (ss != sizeof(si))
1227                         return -EIO;
1228
1229                 read_one = true;
1230
1231                 if (si.ssi_signo == SIGCHLD) {
1232                         r = process_child(e);
1233                         if (r < 0)
1234                                 return r;
1235                         if (r > 0 || !e->signal_sources[si.ssi_signo])
1236                                 continue;
1237                 } else {
1238                         s = e->signal_sources[si.ssi_signo];
1239                         if (!s)
1240                                 return -EIO;
1241                 }
1242
1243                 s->signal.siginfo = si;
1244                 r = source_set_pending(s, true);
1245                 if (r < 0)
1246                         return r;
1247         }
1248
1249
1250         return 0;
1251 }
1252
1253 static int source_dispatch(sd_event_source *s) {
1254         int r;
1255
1256         assert(s);
1257         assert(s->pending);
1258
1259         r = source_set_pending(s, false);
1260         if (r < 0)
1261                 return r;
1262
1263         if (s->mute == SD_EVENT_ONESHOT) {
1264                 r = sd_event_source_set_mute(s, SD_EVENT_MUTED);
1265                 if (r < 0)
1266                         return r;
1267         }
1268
1269         switch (s->type) {
1270
1271         case SOURCE_IO:
1272                 r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
1273                 break;
1274
1275         case SOURCE_MONOTONIC:
1276                 r = s->time.callback(s, s->time.next, s->userdata);
1277                 break;
1278
1279         case SOURCE_REALTIME:
1280                 r = s->time.callback(s, s->time.next, s->userdata);
1281                 break;
1282
1283         case SOURCE_SIGNAL:
1284                 r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
1285                 break;
1286
1287         case SOURCE_CHILD:
1288                 r = s->child.callback(s, &s->child.siginfo, s->userdata);
1289                 break;
1290
1291         case SOURCE_DEFER:
1292                 r = s->defer.callback(s, s->userdata);
1293                 break;
1294         }
1295
1296         return r;
1297 }
1298
1299 static int event_prepare(sd_event *e) {
1300         int r;
1301
1302         assert(e);
1303
1304         for (;;) {
1305                 sd_event_source *s;
1306
1307                 s = prioq_peek(e->prepare);
1308                 if (!s || s->prepare_iteration == e->iteration || s->mute == SD_EVENT_MUTED)
1309                         break;
1310
1311                 s->prepare_iteration = e->iteration;
1312                 r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
1313                 if (r < 0)
1314                         return r;
1315
1316                 assert(s->prepare);
1317                 r = s->prepare(s, s->userdata);
1318                 if (r < 0)
1319                         return r;
1320
1321         }
1322
1323         return 0;
1324 }
1325
1326 int sd_event_run(sd_event *e, uint64_t timeout) {
1327         struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
1328         sd_event_source *p;
1329         int r, i, m;
1330         dual_timestamp n;
1331
1332         if (!e)
1333                 return -EINVAL;
1334         if (e->quit)
1335                 return -ESTALE;
1336
1337         e->iteration++;
1338
1339         r = event_prepare(e);
1340         if (r < 0)
1341                 return r;
1342
1343         r = event_arm_timer(e, e->monotonic_fd, e->monotonic, &e->monotonic_next);
1344         if (r < 0)
1345                 return r;
1346
1347         r = event_arm_timer(e, e->realtime_fd, e->realtime, &e->realtime_next);
1348         if (r < 0)
1349                 return r;
1350
1351         if (e->iteration == 1 && !hashmap_isempty(e->child_sources))
1352                 /* On the first iteration, there might be already some
1353                  * zombies for us to care for, hence, don't wait */
1354                 timeout = 0;
1355         else {
1356                 p = prioq_peek(e->pending);
1357                 if (p && p->mute != SD_EVENT_MUTED)
1358                         timeout = 0;
1359         }
1360
1361         m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX, timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
1362         if (m < 0)
1363                 return m;
1364
1365         dual_timestamp_get(&n);
1366
1367         for (i = 0; i < m; i++) {
1368
1369                 if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
1370                         r = flush_timer(e, e->monotonic_fd, ev_queue[i].events);
1371                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
1372                         r = flush_timer(e, e->realtime_fd, ev_queue[i].events);
1373                 else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
1374                         r = process_signal(e, ev_queue[i].events);
1375                 else
1376                         r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
1377
1378                 if (r < 0)
1379                         return r;
1380         }
1381
1382         r = process_timer(e, n.monotonic, e->monotonic);
1383         if (r < 0)
1384                 return r;
1385
1386         r = process_timer(e, n.realtime, e->realtime);
1387         if (r < 0)
1388                 return r;
1389
1390         if (e->iteration == 1 && e->processed_children != 1) {
1391                 /* On the first iteration, make sure we really process
1392                  * all children which might already be zombies. */
1393                 r = process_child(e);
1394                 if (r < 0)
1395                         return r;
1396         }
1397
1398         p = prioq_peek(e->pending);
1399         if (!p || p->mute == SD_EVENT_MUTED)
1400                 return 0;
1401
1402         return source_dispatch(p);
1403 }
1404
1405 int sd_event_loop(sd_event *e) {
1406         int r;
1407
1408         if (!e)
1409                 return -EINVAL;
1410
1411         while (!e->quit) {
1412                 r = sd_event_run(e, (uint64_t) -1);
1413                 if (r < 0)
1414                         return r;
1415         }
1416
1417         return 0;
1418 }
1419
1420 int sd_event_quit(sd_event *e) {
1421         if (!e)
1422                 return EINVAL;
1423
1424         return e->quit;
1425 }
1426
1427 int sd_event_request_quit(sd_event *e) {
1428         if (!e)
1429                 return -EINVAL;
1430
1431         e->quit = true;
1432         return 0;
1433 }
1434
1435 sd_event *sd_event_get(sd_event_source *s) {
1436         if (!s)
1437                 return NULL;
1438
1439         return s->event;
1440 }