chiark / gitweb /
event: add timer accuracy/coalescing logic
[elogind.git] / src / libsystemd-bus / sd-event.c
index de96fde8e242c5ca633c1ddd528231e581cc6421..511b271d4506351a262d60d2794f164c34aa5a24 100644 (file)
 #include "hashmap.h"
 #include "util.h"
 #include "time-util.h"
+#include "sd-id128.h"
 
 #include "sd-event.h"
 
 #define EPOLL_QUEUE_MAX 64
+#define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC)
 
 typedef enum EventSourceType {
         SOURCE_IO,
@@ -70,8 +72,9 @@ struct sd_event_source {
                 } io;
                 struct {
                         sd_time_handler_t callback;
-                        usec_t next;
-                        unsigned prioq_index;
+                        usec_t next, accuracy;
+                        unsigned earliest_index;
+                        unsigned latest_index;
                 } time;
                 struct {
                         sd_signal_handler_t callback;
@@ -100,8 +103,17 @@ struct sd_event {
 
         Prioq *pending;
         Prioq *prepare;
-        Prioq *monotonic;
-        Prioq *realtime;
+
+        /* For both clocks we maintain two priority queues each, one
+         * ordered for the earliest times the events may be
+         * dispatched, and one ordered by the latest times they must
+         * have been dispatched. The range between the top entries in
+         * the two prioqs is the time window we can freely schedule
+         * wakeups in */
+        Prioq *monotonic_earliest;
+        Prioq *monotonic_latest;
+        Prioq *realtime_earliest;
+        Prioq *realtime_latest;
 
         sigset_t sigset;
         sd_event_source **signal_sources;
@@ -110,11 +122,13 @@ struct sd_event {
         unsigned n_unmuted_child_sources;
 
         unsigned iteration;
-        unsigned processed_children;
 
         usec_t realtime_next, monotonic_next;
 
+        usec_t perturb;
+
         bool quit;
+        bool need_process_child;
 };
 
 static int pending_prioq_compare(const void *a, const void *b) {
@@ -185,7 +199,7 @@ static int prepare_prioq_compare(const void *a, const void *b) {
         return 0;
 }
 
-static int time_prioq_compare(const void *a, const void *b) {
+static int earliest_time_prioq_compare(const void *a, const void *b) {
         const sd_event_source *x = a, *y = b;
 
         assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
@@ -218,6 +232,39 @@ static int time_prioq_compare(const void *a, const void *b) {
         return 0;
 }
 
+static int latest_time_prioq_compare(const void *a, const void *b) {
+        const sd_event_source *x = a, *y = b;
+
+        assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
+        assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
+
+        /* Unmuted ones first */
+        if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
+                return -1;
+        if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
+                return 1;
+
+        /* Move the pending ones to the end */
+        if (!x->pending && y->pending)
+                return -1;
+        if (x->pending && !y->pending)
+                return 1;
+
+        /* Order by time */
+        if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy)
+                return -1;
+        if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy)
+                return -1;
+
+        /* Stability for the rest */
+        if (x < y)
+                return -1;
+        if (x > y)
+                return 1;
+
+        return 0;
+}
+
 static void event_free(sd_event *e) {
         assert(e);
 
@@ -235,8 +282,10 @@ static void event_free(sd_event *e) {
 
         prioq_free(e->pending);
         prioq_free(e->prepare);
-        prioq_free(e->monotonic);
-        prioq_free(e->realtime);
+        prioq_free(e->monotonic_earliest);
+        prioq_free(e->monotonic_latest);
+        prioq_free(e->realtime_earliest);
+        prioq_free(e->realtime_latest);
 
         free(e->signal_sources);
 
@@ -357,11 +406,13 @@ static void source_free(sd_event_source *s) {
                         break;
 
                 case SOURCE_MONOTONIC:
-                        prioq_remove(s->event->monotonic, s, &s->time.prioq_index);
+                        prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index);
+                        prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index);
                         break;
 
                 case SOURCE_REALTIME:
-                        prioq_remove(s->event->realtime, s, &s->time.prioq_index);
+                        prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index);
+                        prioq_remove(s->event->realtime_latest, s, &s->time.latest_index);
                         break;
 
                 case SOURCE_SIGNAL:
@@ -494,6 +545,7 @@ static int event_setup_timer_fd(
 
         struct epoll_event ev = {};
         int r, fd;
+        sd_id128_t bootid;
 
         assert(e);
         assert(timer_fd);
@@ -514,6 +566,17 @@ static int event_setup_timer_fd(
                 return -errno;
         }
 
+        /* When we sleep for longer, we try to realign the wakeup to
+           the same time wihtin each second, so that events all across
+           the system can be coalesced into a single CPU
+           wakeup. However, let's take some system-specific randomness
+           for this value, so that in a network of systems with synced
+           clocks timer events are distributed a bit. Here, we
+           calculate a perturbation usec offset from the boot ID. */
+
+        if (sd_id128_get_boot(&bootid) >= 0)
+                e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_SEC;
+
         *timer_fd = fd;
         return 0;
 }
@@ -523,8 +586,10 @@ static int event_add_time_internal(
                 EventSourceType type,
                 int *timer_fd,
                 clockid_t id,
-                Prioq **prioq,
+                Prioq **earliest,
+                Prioq **latest,
                 uint64_t usec,
+                uint64_t accuracy,
                 sd_time_handler_t callback,
                 void *userdata,
                 sd_event_source **ret) {
@@ -538,13 +603,24 @@ static int event_add_time_internal(
                 return -EINVAL;
         if (!ret)
                 return -EINVAL;
+        if (usec == (uint64_t) -1)
+                return -EINVAL;
+        if (accuracy == (uint64_t) -1)
+                return -EINVAL;
 
         assert(timer_fd);
-        assert(prioq);
+        assert(earliest);
+        assert(latest);
+
+        if (!*earliest) {
+                *earliest = prioq_new(earliest_time_prioq_compare);
+                if (!*earliest)
+                        return -ENOMEM;
+        }
 
-        if (!*prioq) {
-                *prioq = prioq_new(time_prioq_compare);
-                if (!*prioq)
+        if (!*latest) {
+                *latest = prioq_new(latest_time_prioq_compare);
+                if (!*latest)
                         return -ENOMEM;
         }
 
@@ -559,26 +635,34 @@ static int event_add_time_internal(
                 return -ENOMEM;
 
         s->time.next = usec;
+        s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy;
         s->time.callback = callback;
-        s->time.prioq_index = PRIOQ_IDX_NULL;
+        s->time.earliest_index = PRIOQ_IDX_NULL;
+        s->time.latest_index = PRIOQ_IDX_NULL;
         s->userdata = userdata;
 
-        r = prioq_put(*prioq, s, &s->time.prioq_index);
-        if (r < 0) {
-                source_free(s);
-                return r;
-        }
+        r = prioq_put(*earliest, s, &s->time.earliest_index);
+        if (r < 0)
+                goto fail;
+
+        r = prioq_put(*latest, s, &s->time.latest_index);
+        if (r < 0)
+                goto fail;
 
         *ret = s;
         return 0;
+
+fail:
+        source_free(s);
+        return r;
 }
 
-int sd_event_add_monotonic(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
-        return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic, usec, callback, userdata, ret);
+int sd_event_add_monotonic(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
+        return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
 }
 
-int sd_event_add_realtime(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
-        return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime, usec, callback, userdata, ret);
+int sd_event_add_realtime(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
+        return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret);
 }
 
 static int event_update_signal_fd(sd_event *e) {
@@ -707,6 +791,8 @@ int sd_event_add_child(sd_event *e, pid_t pid, int options, sd_child_handler_t c
                 }
         }
 
+        e->need_process_child = true;
+
         *ret = s;
         return 0;
 }
@@ -848,10 +934,10 @@ int sd_event_source_set_priority(sd_event_source *s, int priority) {
         s->priority = priority;
 
         if (s->pending)
-                assert_se(prioq_reshuffle(s->event->pending, s, &s->pending_index) == 0);
+                prioq_reshuffle(s->event->pending, s, &s->pending_index);
 
         if (s->prepare)
-                assert_se(prioq_reshuffle(s->event->prepare, s, &s->prepare_index) == 0);
+                prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
 
         return 0;
 }
@@ -891,12 +977,14 @@ int sd_event_source_set_mute(sd_event_source *s, sd_event_mute_t m) {
 
                 case SOURCE_MONOTONIC:
                         s->mute = m;
-                        prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index);
+                        prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
+                        prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
                         break;
 
                 case SOURCE_REALTIME:
                         s->mute = m;
-                        prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index);
+                        prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
+                        prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
                         break;
 
                 case SOURCE_SIGNAL:
@@ -939,12 +1027,14 @@ int sd_event_source_set_mute(sd_event_source *s, sd_event_mute_t m) {
 
                 case SOURCE_MONOTONIC:
                         s->mute = m;
-                        prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index);
+                        prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
+                        prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
                         break;
 
                 case SOURCE_REALTIME:
                         s->mute = m;
-                        prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index);
+                        prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
+                        prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
                         break;
 
                 case SOURCE_SIGNAL:
@@ -999,6 +1089,8 @@ int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
 int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
         if (!s)
                 return -EINVAL;
+        if (usec == (uint64_t) -1)
+                return -EINVAL;
         if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
                 return -EDOM;
 
@@ -1007,10 +1099,13 @@ int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
 
         s->time.next = usec;
 
-        if (s->type == SOURCE_REALTIME)
-                prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index);
-        else
-                prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index);
+        if (s->type == SOURCE_REALTIME) {
+                prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index);
+                prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
+        } else {
+                prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index);
+                prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
+        }
 
         return 0;
 }
@@ -1052,40 +1147,99 @@ void* sd_event_source_get_userdata(sd_event_source *s) {
         return s->userdata;
 }
 
+static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) {
+        usec_t c;
+        assert(e);
+        assert(a <= b);
+
+        if (a <= 0)
+                return 0;
+
+        if (b <= a + 1)
+                return a;
+
+        /*
+          Find a good time to wake up again between times a and b. We
+          have two goals here:
+
+          a) We want to wake up as seldom as possible, hence prefer
+             later times over earlier times.
+
+          b) But if we have to wake up, then let's make sure to
+             dispatch as much as possible on the entire system.
+
+          We implement this by waking up everywhere at the same time
+          within any given second if we can, synchronised via the
+          perturbation value determined from the boot ID. If we can't,
+          then we try to find the same spot in every a 250ms
+          step. Otherwise, we pick the last possible time to wake up.
+        */
+
+        c = (b / USEC_PER_SEC) * USEC_PER_SEC + e->perturb;
+        if (c >= b) {
+                if (_unlikely_(c < USEC_PER_SEC))
+                        return b;
+
+                c -= USEC_PER_SEC;
+        }
+
+        if (c >= a)
+                return c;
+
+        c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250));
+        if (c >= b) {
+                if (_unlikely_(c < USEC_PER_MSEC*250))
+                        return b;
+
+                c -= USEC_PER_MSEC*250;
+        }
+
+        if (c >= a)
+                return c;
+
+        return b;
+}
+
 static int event_arm_timer(
                 sd_event *e,
                 int timer_fd,
-                Prioq *prioq,
+                Prioq *earliest,
+                Prioq *latest,
                 usec_t *next) {
 
         struct itimerspec its = {};
-        sd_event_source *s;
+        sd_event_source *a, *b;
+        usec_t t;
         int r;
 
         assert_se(e);
         assert_se(next);
 
-        s = prioq_peek(prioq);
-        if (!s || s->mute == SD_EVENT_MUTED)
+        a = prioq_peek(earliest);
+        if (!a || a->mute == SD_EVENT_MUTED)
                 return 0;
 
-        if (*next == s->time.next)
+        b = prioq_peek(latest);
+        assert_se(b && b->mute != SD_EVENT_MUTED);
+
+        t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy);
+        if (*next == t)
                 return 0;
 
         assert_se(timer_fd >= 0);
 
-        if (s->time.next == 0) {
+        if (t == 0) {
                 /* We don' want to disarm here, just mean some time looooong ago. */
                 its.it_value.tv_sec = 0;
                 its.it_value.tv_nsec = 1;
         } else
-                timespec_store(&its.it_value, s->time.next);
+                timespec_store(&its.it_value, t);
 
         r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
         if (r < 0)
                 return r;
 
-        *next = s->time.next;
+        *next = t;
         return 0;
 }
 
@@ -1131,14 +1285,14 @@ static int flush_timer(sd_event *e, int fd, uint32_t events) {
         return 0;
 }
 
-static int process_timer(sd_event *e, usec_t n, Prioq *prioq) {
+static int process_timer(sd_event *e, usec_t n, Prioq *earliest, Prioq *latest) {
         sd_event_source *s;
         int r;
 
         assert(e);
 
         for (;;) {
-                s = prioq_peek(prioq);
+                s = prioq_peek(earliest);
                 if (!s ||
                     s->time.next > n ||
                     s->mute == SD_EVENT_MUTED ||
@@ -1149,9 +1303,8 @@ static int process_timer(sd_event *e, usec_t n, Prioq *prioq) {
                 if (r < 0)
                         return r;
 
-                r = prioq_reshuffle(prioq, s, &s->time.prioq_index);
-                if (r < 0)
-                        return r;
+                prioq_reshuffle(earliest, s, &s->time.earliest_index);
+                prioq_reshuffle(latest, s, &s->time.latest_index);
         }
 
         return 0;
@@ -1164,6 +1317,8 @@ static int process_child(sd_event *e) {
 
         assert(e);
 
+        e->need_process_child = false;
+
         /*
            So, this is ugly. We iteratively invoke waitid() with P_PID
            + WNOHANG for each PID we wait for, instead of using
@@ -1199,7 +1354,6 @@ static int process_child(sd_event *e) {
                 }
         }
 
-        e->processed_children = e->iteration;
         return 0;
 }
 
@@ -1323,6 +1477,19 @@ static int event_prepare(sd_event *e) {
         return 0;
 }
 
+static sd_event_source* event_next_pending(sd_event *e) {
+        sd_event_source *p;
+
+        p = prioq_peek(e->pending);
+        if (!p)
+                return NULL;
+
+        if (p->mute == SD_EVENT_MUTED)
+                return NULL;
+
+        return p;
+}
+
 int sd_event_run(sd_event *e, uint64_t timeout) {
         struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
         sd_event_source *p;
@@ -1340,25 +1507,21 @@ int sd_event_run(sd_event *e, uint64_t timeout) {
         if (r < 0)
                 return r;
 
-        r = event_arm_timer(e, e->monotonic_fd, e->monotonic, &e->monotonic_next);
-        if (r < 0)
-                return r;
+        if (event_next_pending(e) || e->need_process_child)
+                timeout = 0;
 
-        r = event_arm_timer(e, e->realtime_fd, e->realtime, &e->realtime_next);
-        if (r < 0)
-                return r;
+        if (timeout > 0) {
+                r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next);
+                if (r < 0)
+                        return r;
 
-        if (e->iteration == 1 && !hashmap_isempty(e->child_sources))
-                /* On the first iteration, there might be already some
-                 * zombies for us to care for, hence, don't wait */
-                timeout = 0;
-        else {
-                p = prioq_peek(e->pending);
-                if (p && p->mute != SD_EVENT_MUTED)
-                        timeout = 0;
+                r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next);
+                if (r < 0)
+                        return r;
         }
 
-        m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX, timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
+        m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX,
+                       timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
         if (m < 0)
                 return m;
 
@@ -1379,24 +1542,22 @@ int sd_event_run(sd_event *e, uint64_t timeout) {
                         return r;
         }
 
-        r = process_timer(e, n.monotonic, e->monotonic);
+        r = process_timer(e, n.monotonic, e->monotonic_earliest, e->monotonic_latest);
         if (r < 0)
                 return r;
 
-        r = process_timer(e, n.realtime, e->realtime);
+        r = process_timer(e, n.realtime, e->realtime_earliest, e->realtime_latest);
         if (r < 0)
                 return r;
 
-        if (e->iteration == 1 && e->processed_children != 1) {
-                /* On the first iteration, make sure we really process
-                 * all children which might already be zombies. */
+        if (e->need_process_child) {
                 r = process_child(e);
                 if (r < 0)
                         return r;
         }
 
-        p = prioq_peek(e->pending);
-        if (!p || p->mute == SD_EVENT_MUTED)
+        p = event_next_pending(e);
+        if (!p)
                 return 0;
 
         return source_dispatch(p);
@@ -1438,3 +1599,38 @@ sd_event *sd_event_get(sd_event_source *s) {
 
         return s->event;
 }
+
+int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) {
+        if (!s)
+                return -EINVAL;
+        if (s->type != SOURCE_MONOTONIC && s->type != SOURCE_REALTIME)
+                return -EDOM;
+
+        if (usec == 0)
+                usec = DEFAULT_ACCURACY_USEC;
+
+        if (s->time.accuracy == usec)
+                return 0;
+
+
+        s->time.accuracy = usec;
+
+        if (s->type == SOURCE_REALTIME)
+                prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index);
+        else
+                prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index);
+
+        return 0;
+}
+
+int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) {
+        if (!s)
+                return -EINVAL;
+        if (!usec)
+                return -EINVAL;
+        if (s->type != SOURCE_MONOTONIC && s->type != SOURCE_REALTIME)
+                return -EDOM;
+
+        *usec = s->time.accuracy;
+        return 0;
+}