chiark / gitweb /
core: convert PID 1 to libsystemd-bus
[elogind.git] / src / libsystemd-bus / sd-bus.c
index c8248e19aa074fa1235bef566ec6f44907b8ccec..bc88ac977c96f3c4903995485e0eb95f2b2a6a71 100644 (file)
@@ -1204,6 +1204,18 @@ _public_ void sd_bus_close(sd_bus *bus) {
          * freed. */
 }
 
+static void bus_enter_closing(sd_bus *bus) {
+        assert(bus);
+
+        if (bus->state != BUS_OPENING &&
+            bus->state != BUS_AUTHENTICATING &&
+            bus->state != BUS_HELLO &&
+            bus->state != BUS_RUNNING)
+                return;
+
+        bus->state = BUS_CLOSING;
+}
+
 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
         assert_return(bus, NULL);
 
@@ -1282,6 +1294,20 @@ static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
         return bus_message_seal(m, ++b->serial);
 }
 
+static int bus_write_message(sd_bus *bus, sd_bus_message *message, size_t *idx) {
+        int r;
+
+        assert(bus);
+        assert(message);
+
+        if (bus->is_kernel)
+                r = bus_kernel_write_message(bus, message);
+        else
+                r = bus_socket_write_message(bus, message, idx);
+
+        return r;
+}
+
 static int dispatch_wqueue(sd_bus *bus) {
         int r, ret = 0;
 
@@ -1290,15 +1316,10 @@ static int dispatch_wqueue(sd_bus *bus) {
 
         while (bus->wqueue_size > 0) {
 
-                if (bus->is_kernel)
-                        r = bus_kernel_write_message(bus, bus->wqueue[0]);
-                else
-                        r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
-
-                if (r < 0) {
-                        sd_bus_close(bus);
+                r = bus_write_message(bus, bus->wqueue[0], &bus->windex);
+                if (r < 0)
                         return r;
-                else if (r == 0)
+                else if (r == 0)
                         /* Didn't do anything this time */
                         return ret;
                 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
@@ -1324,6 +1345,20 @@ static int dispatch_wqueue(sd_bus *bus) {
         return ret;
 }
 
+static int bus_read_message(sd_bus *bus, sd_bus_message **m) {
+        int r;
+
+        assert(bus);
+        assert(m);
+
+        if (bus->is_kernel)
+                r = bus_kernel_read_message(bus, m);
+        else
+                r = bus_socket_read_message(bus, m);
+
+        return r;
+}
+
 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
         sd_bus_message *z = NULL;
         int r, ret = 0;
@@ -1343,15 +1378,9 @@ static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
 
         /* Try to read a new message */
         do {
-                if (bus->is_kernel)
-                        r = bus_kernel_read_message(bus, &z);
-                else
-                        r = bus_socket_read_message(bus, &z);
-
-                if (r < 0) {
-                        sd_bus_close(bus);
+                r = bus_read_message(bus, &z);
+                if (r < 0)
                         return r;
-                }
                 if (r == 0)
                         return ret;
 
@@ -1395,15 +1424,10 @@ _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
                 size_t idx = 0;
 
-                if (bus->is_kernel)
-                        r = bus_kernel_write_message(bus, m);
-                else
-                        r = bus_socket_write_message(bus, m, &idx);
-
-                if (r < 0) {
-                        sd_bus_close(bus);
+                r = bus_write_message(bus, m, &idx);
+                if (r < 0)
                         return r;
-                else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m))  {
+                else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m))  {
                         /* Wasn't fully written. So let's remember how
                          * much was written. Note that the first entry
                          * of the wqueue array is always allocated so
@@ -1573,7 +1597,7 @@ int bus_ensure_running(sd_bus *bus) {
 
         assert(bus);
 
-        if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED)
+        if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED || bus->state == BUS_CLOSING)
                 return -ENOTCONN;
         if (bus->state == BUS_RUNNING)
                 return 1;
@@ -1644,12 +1668,10 @@ _public_ int sd_bus_call(
                         room = true;
                 }
 
-                if (bus->is_kernel)
-                        r = bus_kernel_read_message(bus, &incoming);
-                else
-                        r = bus_socket_read_message(bus, &incoming);
+                r = bus_read_message(bus, &incoming);
                 if (r < 0)
                         return r;
+
                 if (incoming) {
 
                         if (incoming->reply_serial == serial) {
@@ -1731,7 +1753,6 @@ _public_ int sd_bus_call(
 _public_ int sd_bus_get_fd(sd_bus *bus) {
 
         assert_return(bus, -EINVAL);
-        assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
         assert_return(bus->input_fd == bus->output_fd, -EPERM);
         assert_return(!bus_pid_changed(bus), -ECHILD);
 
@@ -1742,7 +1763,7 @@ _public_ int sd_bus_get_events(sd_bus *bus) {
         int flags = 0;
 
         assert_return(bus, -EINVAL);
-        assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
+        assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
         assert_return(!bus_pid_changed(bus), -ECHILD);
 
         if (bus->state == BUS_OPENING)
@@ -1769,9 +1790,14 @@ _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
 
         assert_return(bus, -EINVAL);
         assert_return(timeout_usec, -EINVAL);
-        assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
+        assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
         assert_return(!bus_pid_changed(bus), -ECHILD);
 
+        if (bus->state == BUS_CLOSING) {
+                *timeout_usec = 0;
+                return 1;
+        }
+
         if (bus->state == BUS_AUTHENTICATING) {
                 *timeout_usec = bus->auth_timeout;
                 return 1;
@@ -1821,12 +1847,21 @@ static int process_timeout(sd_bus *bus) {
         if (r < 0)
                 return r;
 
+        r = bus_seal_message(bus, m);
+        if (r < 0)
+                return r;
+
         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
         hashmap_remove(bus->reply_callbacks, &c->serial);
 
+        bus->current = m;
+        bus->iteration_counter ++;
+
         r = c->callback(bus, m, c->userdata);
         free(c);
 
+        bus->current = NULL;
+
         return r < 0 ? r : 1;
 }
 
@@ -1877,7 +1912,7 @@ static int process_reply(sd_bus *bus, sd_bus_message *m) {
         r = c->callback(bus, m, c->userdata);
         free(c);
 
-        return r;
+        return r < 0 ? r : 1;
 }
 
 static int process_filter(sd_bus *bus, sd_bus_message *m) {
@@ -2078,6 +2113,85 @@ null_message:
         return r;
 }
 
+static int process_closing(sd_bus *bus, sd_bus_message **ret) {
+        _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
+        struct reply_callback *c;
+        int r;
+
+        assert(bus);
+        assert(bus->state == BUS_CLOSING);
+
+        c = hashmap_first(bus->reply_callbacks);
+        if (c) {
+                /* First, fail all outstanding method calls */
+                r = bus_message_new_synthetic_error(
+                                bus,
+                                c->serial,
+                                &SD_BUS_ERROR_MAKE(SD_BUS_ERROR_NO_REPLY, "Connection terminated"),
+                                &m);
+                if (r < 0)
+                        return r;
+
+                r = bus_seal_message(bus, m);
+                if (r < 0)
+                        return r;
+
+                if (c->timeout != 0)
+                        prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
+
+                hashmap_remove(bus->reply_callbacks, &c->serial);
+
+                bus->current = m;
+                bus->iteration_counter++;
+
+                r = c->callback(bus, m, c->userdata);
+                free(c);
+
+                if (r >= 0)
+                        r = 1;
+
+                goto finish;
+        }
+
+        /* Then, synthesize a Disconnected message */
+        r = sd_bus_message_new_signal(
+                        bus,
+                        "/org/freedesktop/DBus/Local",
+                        "org.freedesktop.DBus.Local",
+                        "Disconnected",
+                        &m);
+        if (r < 0)
+                return r;
+
+        r = bus_seal_message(bus, m);
+        if (r < 0)
+                return r;
+
+        sd_bus_close(bus);
+
+        bus->current = m;
+        bus->iteration_counter++;
+
+        r = process_filter(bus, m);
+        if (r != 0)
+                goto finish;
+
+        r = process_match(bus, m);
+        if (r != 0)
+                goto finish;
+
+        if (ret) {
+                *ret = m;
+                m = NULL;
+        }
+
+        r = 1;
+
+finish:
+        bus->current = NULL;
+        return r;
+}
+
 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
         BUS_DONT_DESTROY(bus);
         int r;
@@ -2091,7 +2205,7 @@ _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
         assert_return(!bus_pid_changed(bus), -ECHILD);
 
         /* We don't allow recursively invoking sd_bus_process(). */
-        assert_return(!bus->processing, -EBUSY);
+        assert_return(!bus->current, -EBUSY);
 
         switch (bus->state) {
 
@@ -2101,29 +2215,43 @@ _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
 
         case BUS_OPENING:
                 r = bus_socket_process_opening(bus);
-                if (r < 0)
+                if (r == -ECONNRESET || r == -EPIPE) {
+                        bus_enter_closing(bus);
+                        r = 1;
+                } else if (r < 0)
                         return r;
                 if (ret)
                         *ret = NULL;
                 return r;
 
         case BUS_AUTHENTICATING:
-
                 r = bus_socket_process_authenticating(bus);
-                if (r < 0)
+                if (r == -ECONNRESET || r == -EPIPE) {
+                        bus_enter_closing(bus);
+                        r = 1;
+                } else if (r < 0)
                         return r;
+
                 if (ret)
                         *ret = NULL;
+
                 return r;
 
         case BUS_RUNNING:
         case BUS_HELLO:
-
-                bus->processing = true;
                 r = process_running(bus, ret);
-                bus->processing = false;
+                if (r == -ECONNRESET || r == -EPIPE) {
+                        bus_enter_closing(bus);
+                        r = 1;
+
+                        if (ret)
+                                *ret = NULL;
+                }
 
                 return r;
+
+        case BUS_CLOSING:
+                return process_closing(bus, ret);
         }
 
         assert_not_reached("Unknown state");
@@ -2136,6 +2264,10 @@ static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
         usec_t m = (usec_t) -1;
 
         assert(bus);
+
+        if (bus->state == BUS_CLOSING)
+                return 1;
+
         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
 
         e = sd_bus_get_events(bus);
@@ -2186,9 +2318,13 @@ static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
 
         assert_return(bus, -EINVAL);
-        assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
         assert_return(!bus_pid_changed(bus), -ECHILD);
 
+        if (bus->state == BUS_CLOSING)
+                return 0;
+
+        assert_return(BUS_IS_OPEN(bus->state) , -ENOTCONN);
+
         if (bus->rqueue_size > 0)
                 return 0;
 
@@ -2199,9 +2335,13 @@ _public_ int sd_bus_flush(sd_bus *bus) {
         int r;
 
         assert_return(bus, -EINVAL);
-        assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
         assert_return(!bus_pid_changed(bus), -ECHILD);
 
+        if (bus->state == BUS_CLOSING)
+                return 0;
+
+        assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
+
         r = bus_ensure_running(bus);
         if (r < 0)
                 return r;