chiark / gitweb /
bus: rework how we attach fds to event loops
[elogind.git] / src / libsystemd-bus / sd-bus.c
index edd917e30375786811727fd44590625e99ce3df1..9ab4367819c9f58a46efee4708c4223ae3e86742 100644 (file)
 #include "bus-protocol.h"
 
 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
+static int attach_io_events(sd_bus *b);
+static void detach_io_events(sd_bus *b);
 
 static void bus_close_fds(sd_bus *b) {
         assert(b);
 
+        detach_io_events(b);
+
         if (b->input_fd >= 0)
                 close_nointr_nofail(b->input_fd);
 
@@ -882,36 +886,27 @@ static int bus_start_address(sd_bus *b) {
         assert(b);
 
         for (;;) {
-                sd_bus_close(b);
+                bool skipped = false;
 
-                if (b->exec_path) {
+                bus_close_fds(b);
 
+                if (b->exec_path)
                         r = bus_socket_exec(b);
-                        if (r >= 0)
-                                return r;
-
-                        b->last_connect_error = -r;
-                } else if (b->kernel) {
-
+                else if (b->kernel)
                         r = bus_kernel_connect(b);
-                        if (r >= 0)
-                                return r;
-
-                        b->last_connect_error = -r;
-
-                } else if (b->machine) {
-
+                else if (b->machine)
                         r = bus_container_connect(b);
-                        if (r >= 0)
-                                return r;
-
-                        b->last_connect_error = -r;
-
-                } else if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
-
+                else if (b->sockaddr.sa.sa_family != AF_UNSPEC)
                         r = bus_socket_connect(b);
-                        if (r >= 0)
-                                return r;
+                else
+                        skipped = true;
+
+                if (!skipped) {
+                        if (r >= 0) {
+                                r = attach_io_events(b);
+                                if (r >= 0)
+                                        return r;
+                        }
 
                         b->last_connect_error = -r;
                 }
@@ -1276,11 +1271,16 @@ _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
         return 0;
 }
 
-static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
+static int bus_seal_message(sd_bus *b, sd_bus_message *m, usec_t timeout) {
         assert(b);
         assert(m);
 
-        if (m->header->version > b->message_version)
+        if (b->message_version != 0 &&
+            m->header->version != b->message_version)
+                return -EPERM;
+
+        if (b->message_endian != 0 &&
+            m->header->endian != b->message_endian)
                 return -EPERM;
 
         if (m->sealed) {
@@ -1291,16 +1291,16 @@ static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
                 return 0;
         }
 
-        return bus_message_seal(m, ++b->serial);
+        if (timeout == 0)
+                timeout = BUS_DEFAULT_TIMEOUT;
+
+        return bus_message_seal(m, ++b->serial, timeout);
 }
 
 int bus_seal_synthetic_message(sd_bus *b, sd_bus_message *m) {
         assert(b);
         assert(m);
 
-        if (m->header->version > b->message_version)
-                return -EPERM;
-
         /* The bus specification says the serial number cannot be 0,
          * hence let's fill something in for synthetic messages. Since
          * synthetic messages might have a fake sender and we don't
@@ -1309,7 +1309,7 @@ int bus_seal_synthetic_message(sd_bus *b, sd_bus_message *m) {
          * than (uint64_t) -1 since dbus1 only had 32bit identifiers,
          * even though kdbus can do 64bit. */
 
-        return bus_message_seal(m, 0xFFFFFFFFULL);
+        return bus_message_seal(m, 0xFFFFFFFFULL, 0);
 }
 
 static int bus_write_message(sd_bus *bus, sd_bus_message *message, size_t *idx) {
@@ -1439,7 +1439,7 @@ _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
         if (!serial && !m->sealed)
                 m->header->flags |= BUS_MESSAGE_NO_REPLY_EXPECTED;
 
-        r = bus_seal_message(bus, m);
+        r = bus_seal_message(bus, m, 0);
         if (r < 0)
                 return r;
 
@@ -1514,9 +1514,6 @@ static usec_t calc_elapse(uint64_t usec) {
         if (usec == (uint64_t) -1)
                 return 0;
 
-        if (usec == 0)
-                usec = BUS_DEFAULT_TIMEOUT;
-
         return now(CLOCK_MONOTONIC) + usec;
 }
 
@@ -1561,13 +1558,11 @@ _public_ int sd_bus_call_async(
         if (r < 0)
                 return r;
 
-        if (usec != (uint64_t) -1) {
-                r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
-                if (r < 0)
-                        return r;
-        }
+        r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
+        if (r < 0)
+                return r;
 
-        r = bus_seal_message(bus, m);
+        r = bus_seal_message(bus, m, usec);
         if (r < 0)
                 return r;
 
@@ -1578,7 +1573,7 @@ _public_ int sd_bus_call_async(
         c->callback = callback;
         c->userdata = userdata;
         c->serial = BUS_MESSAGE_SERIAL(m);
-        c->timeout = calc_elapse(usec);
+        c->timeout = calc_elapse(m->timeout);
 
         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
         if (r < 0) {
@@ -1673,11 +1668,15 @@ _public_ int sd_bus_call(
 
         i = bus->rqueue_size;
 
+        r = bus_seal_message(bus, m, usec);
+        if (r < 0)
+                return r;
+
         r = sd_bus_send(bus, m, &serial);
         if (r < 0)
                 return r;
 
-        timeout = calc_elapse(usec);
+        timeout = calc_elapse(m->timeout);
 
         for (;;) {
                 usec_t left;
@@ -1754,6 +1753,8 @@ _public_ int sd_bus_call(
                 r = bus_poll(bus, true, left);
                 if (r < 0)
                         return r;
+                if (r == 0)
+                        return -ETIMEDOUT;
 
                 r = dispatch_wqueue(bus);
                 if (r < 0) {
@@ -2592,6 +2593,66 @@ static int quit_callback(sd_event_source *event, void *userdata) {
         return 1;
 }
 
+static int attach_io_events(sd_bus *bus) {
+        int r;
+
+        assert(bus);
+
+        if (bus->input_fd < 0)
+                return 0;
+
+        if (!bus->event)
+                return 0;
+
+        if (!bus->input_io_event_source) {
+                r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
+                if (r < 0)
+                        return r;
+
+                r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
+                if (r < 0)
+                        return r;
+
+                r = sd_event_source_set_priority(bus->input_io_event_source, bus->event_priority);
+        } else
+                r = sd_event_source_set_io_fd(bus->input_io_event_source, bus->input_fd);
+
+        if (r < 0)
+                return r;
+
+        if (bus->output_fd != bus->input_fd) {
+                assert(bus->output_fd >= 0);
+
+                if (!bus->output_io_event_source) {
+                        r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
+                        if (r < 0)
+                                return r;
+
+                        r = sd_event_source_set_priority(bus->output_io_event_source, bus->event_priority);
+                } else
+                        r = sd_event_source_set_io_fd(bus->output_io_event_source, bus->output_fd);
+
+                if (r < 0)
+                        return r;
+        }
+
+        return 0;
+}
+
+static void detach_io_events(sd_bus *bus) {
+        assert(bus);
+
+        if (bus->input_io_event_source) {
+                sd_event_source_set_enabled(bus->input_io_event_source, SD_EVENT_OFF);
+                bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
+        }
+
+        if (bus->output_io_event_source) {
+                sd_event_source_set_enabled(bus->output_io_event_source, SD_EVENT_OFF);
+                bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
+        }
+}
+
 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
         int r;
 
@@ -2610,37 +2671,21 @@ _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
                         return r;
         }
 
-        r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
-        if (r < 0)
-                goto fail;
-
-        r = sd_event_source_set_priority(bus->input_io_event_source, priority);
-        if (r < 0)
-                goto fail;
-
-        if (bus->output_fd != bus->input_fd) {
-                r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
-                if (r < 0)
-                        goto fail;
-
-                r = sd_event_source_set_priority(bus->output_io_event_source, priority);
-                if (r < 0)
-                        goto fail;
-        }
+        bus->event_priority = priority;
 
-        r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
+        r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
         if (r < 0)
                 goto fail;
 
-        r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
+        r = sd_event_source_set_priority(bus->time_event_source, priority);
         if (r < 0)
                 goto fail;
 
-        r = sd_event_source_set_priority(bus->time_event_source, priority);
+        r = sd_event_add_exit(bus->event, quit_callback, bus, &bus->quit_event_source);
         if (r < 0)
                 goto fail;
 
-        r = sd_event_add_quit(bus->event, quit_callback, bus, &bus->quit_event_source);
+        r = attach_io_events(bus);
         if (r < 0)
                 goto fail;
 
@@ -2657,15 +2702,7 @@ _public_ int sd_bus_detach_event(sd_bus *bus) {
         if (!bus->event)
                 return 0;
 
-        if (bus->input_io_event_source) {
-                sd_event_source_set_enabled(bus->input_io_event_source, SD_EVENT_OFF);
-                bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
-        }
-
-        if (bus->output_io_event_source) {
-                sd_event_source_set_enabled(bus->output_io_event_source, SD_EVENT_OFF);
-                bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
-        }
+        detach_io_events(bus);
 
         if (bus->time_event_source) {
                 sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);