chiark / gitweb /
sd-resolve: rework sd-resolve to be callback based, similar in style to sd-bus and...
[elogind.git] / src / libsystemd / sd-bus / sd-bus.c
index fa6d43adc8de9014025a290917fed0476c666054..dc54e211a490b3918a400bc7c8eebfa5a0da780f 100644 (file)
@@ -111,6 +111,12 @@ static void bus_node_destroy(sd_bus *b, struct node *n) {
 static void bus_reset_queues(sd_bus *b) {
         assert(b);
 
+        /* NOTE: We _must_ decrement b->Xqueue_size before calling
+         * sd_bus_message_unref() for _each_ message. Otherwise the
+         * self-reference checks in sd_bus_unref() will fire for each message.
+         * We would thus recurse into sd_bus_message_unref() and trigger the
+         * assert(m->n_ref > 0) */
+
         while (b->rqueue_size > 0)
                 sd_bus_message_unref(b->rqueue[--b->rqueue_size]);
 
@@ -1388,6 +1394,24 @@ _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
         if (!bus)
                 return NULL;
 
+        /* TODO/FIXME: It's naive to think REFCNT_GET() is thread-safe in any
+         * way but exclusive REFCNT_DEC(). The current logic _must_ lock around
+         * REFCNT_GET() until REFCNT_DEC() or two threads might end up in
+         * parallel in bus_reset_queues(). But locking would totally break the
+         * recursion we introduce by bus_reset_queues()...
+         * (Imagine one thread in sd_bus_message_unref() setting n_ref to 0 and
+         * thus calling into sd_bus_unref(). If at the same time the real
+         * thread calls sd_bus_unref(), both end up with "q == true" and will
+         * call into bus_reset_queues().
+         * If we require the main bus to be alive until all dispatch threads
+         * are done, there is no need to do ref-counts at all. So in both ways,
+         * the REFCNT thing is humbug.)
+         *
+         * On a second note: messages are *not* required to have ->bus set nor
+         * does it have to be _this_ bus that they're assigned to. This whole
+         * ref-cnt checking breaks apart if a message is not assigned to us.
+         * (which is _very_ easy to trigger with the current API). */
+
         if (REFCNT_GET(bus->n_ref) == bus->rqueue_size + bus->wqueue_size + 1) {
                 bool q = true;
 
@@ -1408,7 +1432,10 @@ _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
                 /* We are the only holders on the messages, and the
                  * messages are the only holders on us, so let's drop
                  * the messages and thus implicitly also kill our own
-                 * last references */
+                 * last references.
+                 * bus_reset_queues() decrements the queue-size before
+                 * calling into sd_bus_message_unref(). Thus, it
+                 * protects us from recursion. */
 
                 if (q)
                         bus_reset_queues(bus);
@@ -1638,11 +1665,13 @@ static int bus_send_internal(sd_bus *bus, sd_bus_message *_m, uint64_t *cookie,
         int r;
 
         assert_return(bus, -EINVAL);
-        assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
         assert_return(m, -EINVAL);
         assert_return(!bus_pid_changed(bus), -ECHILD);
         assert_return(!bus->is_kernel || !(bus->hello_flags & KDBUS_HELLO_MONITOR), -EROFS);
 
+        if (!BUS_IS_OPEN(bus->state))
+                return -ENOTCONN;
+
         if (m->n_fds > 0) {
                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
                 if (r < 0)
@@ -1718,10 +1747,12 @@ _public_ int sd_bus_send_to(sd_bus *bus, sd_bus_message *m, const char *destinat
         int r;
 
         assert_return(bus, -EINVAL);
-        assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
         assert_return(m, -EINVAL);
         assert_return(!bus_pid_changed(bus), -ECHILD);
 
+        if (!BUS_IS_OPEN(bus->state))
+                return -ENOTCONN;
+
         if (!streq_ptr(m->destination, destination)) {
 
                 if (!destination)
@@ -1773,7 +1804,6 @@ _public_ int sd_bus_call_async(
         int r;
 
         assert_return(bus, -EINVAL);
-        assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
         assert_return(m, -EINVAL);
         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
         assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
@@ -1781,6 +1811,9 @@ _public_ int sd_bus_call_async(
         assert_return(!bus_pid_changed(bus), -ECHILD);
         assert_return(!bus->is_kernel || !(bus->hello_flags & KDBUS_HELLO_MONITOR), -EROFS);
 
+        if (!BUS_IS_OPEN(bus->state))
+                return -ENOTCONN;
+
         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
         if (r < 0)
                 return r;
@@ -1887,7 +1920,6 @@ _public_ int sd_bus_call(
         int r;
 
         assert_return(bus, -EINVAL);
-        assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
         assert_return(m, -EINVAL);
         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
         assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
@@ -1895,6 +1927,9 @@ _public_ int sd_bus_call(
         assert_return(!bus_pid_changed(bus), -ECHILD);
         assert_return(!bus->is_kernel || !(bus->hello_flags & KDBUS_HELLO_MONITOR), -EROFS);
 
+        if (!BUS_IS_OPEN(bus->state))
+                return -ENOTCONN;
+
         r = bus_ensure_running(bus);
         if (r < 0)
                 return r;
@@ -2025,9 +2060,11 @@ _public_ int sd_bus_get_events(sd_bus *bus) {
         int flags = 0;
 
         assert_return(bus, -EINVAL);
-        assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
         assert_return(!bus_pid_changed(bus), -ECHILD);
 
+        if (!BUS_IS_OPEN(bus->state) && bus->state != BUS_CLOSING)
+                return -ENOTCONN;
+
         if (bus->state == BUS_OPENING)
                 flags |= POLLOUT;
         else if (bus->state == BUS_AUTHENTICATING) {
@@ -2052,9 +2089,11 @@ _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
 
         assert_return(bus, -EINVAL);
         assert_return(timeout_usec, -EINVAL);
-        assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
         assert_return(!bus_pid_changed(bus), -ECHILD);
 
+        if (!BUS_IS_OPEN(bus->state) && bus->state != BUS_CLOSING)
+                return -ENOTCONN;
+
         if (bus->track_queue) {
                 *timeout_usec = 0;
                 return 1;
@@ -2642,7 +2681,8 @@ static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
         if (bus->state == BUS_CLOSING)
                 return 1;
 
-        assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
+        if (!BUS_IS_OPEN(bus->state))
+                return -ENOTCONN;
 
         e = sd_bus_get_events(bus);
         if (e < 0)
@@ -2697,7 +2737,8 @@ _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
         if (bus->state == BUS_CLOSING)
                 return 0;
 
-        assert_return(BUS_IS_OPEN(bus->state) , -ENOTCONN);
+        if (!BUS_IS_OPEN(bus->state))
+                return -ENOTCONN;
 
         if (bus->rqueue_size > 0)
                 return 0;
@@ -2714,7 +2755,8 @@ _public_ int sd_bus_flush(sd_bus *bus) {
         if (bus->state == BUS_CLOSING)
                 return 0;
 
-        assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
+        if (!BUS_IS_OPEN(bus->state))
+                return -ENOTCONN;
 
         r = bus_ensure_running(bus);
         if (r < 0)
@@ -3023,7 +3065,7 @@ _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
 
         bus->event_priority = priority;
 
-        r = sd_event_add_monotonic(bus->event, &bus->time_event_source, 0, 0, time_callback, bus);
+        r = sd_event_add_time(bus->event, &bus->time_event_source, CLOCK_MONOTONIC, 0, 0, time_callback, bus);
         if (r < 0)
                 goto fail;
 
@@ -3064,9 +3106,7 @@ _public_ int sd_bus_detach_event(sd_bus *bus) {
                 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
         }
 
-        if (bus->event)
-                bus->event = sd_event_unref(bus->event);
-
+        bus->event = sd_event_unref(bus->event);
         return 1;
 }
 
@@ -3224,9 +3264,13 @@ _public_ int sd_bus_get_peer_creds(sd_bus *bus, uint64_t mask, sd_bus_creds **re
         assert_return(bus, -EINVAL);
         assert_return(mask <= _SD_BUS_CREDS_ALL, -ENOTSUP);
         assert_return(ret, -EINVAL);
-        assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
         assert_return(!bus_pid_changed(bus), -ECHILD);
-        assert_return(!bus->is_kernel, -ENOTSUP);
+
+        if (bus->is_kernel)
+                return -ENOTSUP;
+
+        if (!BUS_IS_OPEN(bus->state))
+                return -ENOTCONN;
 
         if (!bus->ucred_valid && !isempty(bus->label))
                 return -ENODATA;
@@ -3265,9 +3309,13 @@ _public_ int sd_bus_try_close(sd_bus *bus) {
         int r;
 
         assert_return(bus, -EINVAL);
-        assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
         assert_return(!bus_pid_changed(bus), -ECHILD);
-        assert_return(bus->is_kernel, -ENOTSUP);
+
+        if (!bus->is_kernel)
+                return -ENOTSUP;
+
+        if (!BUS_IS_OPEN(bus->state))
+                return -ENOTCONN;
 
         if (bus->rqueue_size > 0)
                 return -EBUSY;