chiark / gitweb /
sd-bus: optionally, use inotify to wait for bus sockets to appear
authorLennart Poettering <lennart@poettering.net>
Fri, 15 Dec 2017 21:24:52 +0000 (22:24 +0100)
committerSven Eden <yamakuzure@gmx.net>
Wed, 30 May 2018 05:49:55 +0000 (07:49 +0200)
This adds a "watch-bind" feature to sd-bus connections. If set and the
AF_UNIX socket we are connecting to doesn't exist yet, we'll establish
an inotify watch instead, and wait for the socket to appear. In other
words, a missing AF_UNIX just makes connecting slower.

This is useful for daemons such as networkd or resolved that shall be
able to run during early-boot, before dbus-daemon is up, and want to
connect to dbus-daemon as soon as it becomes ready.

src/libelogind/libelogind.sym
src/libelogind/sd-bus/bus-internal.h
src/libelogind/sd-bus/bus-socket.c
src/libelogind/sd-bus/bus-socket.h
src/libelogind/sd-bus/sd-bus.c
src/libelogind/sd-bus/test-bus-watch-bind.c [new file with mode: 0644]
src/systemd/sd-bus.h
src/test/meson.build

index ddf51760b96e5efd8362f05114d4734822fd42ab..5aa25369776b66987438d4e9d2b8419a0d578031 100644 (file)
@@ -536,3 +536,9 @@ global:
         sd_bus_message_new;
         sd_bus_message_seal;
 } LIBSYSTEMD_234;
+
+LIBSYSTEMD_237 {
+global:
+        sd_bus_set_watch_bind;
+        sd_bus_get_watch_bind;
+} LIBSYSTEMD_236;
index 8557d67e119d2002b43a900e8d3f14f1314000f0..ab84dde5f6a66295237f15dd85a18f36c03f515a 100644 (file)
@@ -157,6 +157,7 @@ struct sd_bus_slot {
 
 enum bus_state {
         BUS_UNSET,
+        BUS_WATCH_BIND, /* waiting for the socket to appear via inotify */
         BUS_OPENING,
         BUS_AUTHENTICATING,
         BUS_HELLO,
@@ -188,6 +189,7 @@ struct sd_bus {
 
         enum bus_state state;
         int input_fd, output_fd;
+        int inotify_fd;
         int message_version;
         int message_endian;
 
@@ -210,6 +212,7 @@ struct sd_bus {
         bool exited:1;
         bool exit_triggered:1;
         bool is_local:1;
+        bool watch_bind:1;
 
         int use_memfd;
 
@@ -293,6 +296,7 @@ struct sd_bus {
         sd_event_source *output_io_event_source;
         sd_event_source *time_event_source;
         sd_event_source *quit_event_source;
+        sd_event_source *inotify_event_source;
         sd_event *event;
         int event_priority;
 
@@ -312,6 +316,9 @@ struct sd_bus {
 
         LIST_HEAD(sd_bus_slot, slots);
         LIST_HEAD(sd_bus_track, tracks);
+
+        int *inotify_watches;
+        size_t n_inotify_watches;
 };
 
 /* For method calls we time-out at 25s, like in the D-Bus reference implementation */
@@ -369,6 +376,12 @@ bool bus_pid_changed(sd_bus *bus);
 
 char *bus_address_escape(const char *v);
 
+int bus_attach_io_events(sd_bus *b);
+int bus_attach_inotify_event(sd_bus *b);
+
+void bus_close_inotify_fd(sd_bus *b);
+void bus_close_io_fds(sd_bus *b);
+
 #define OBJECT_PATH_FOREACH_PREFIX(prefix, path)                        \
         for (char *_slash = ({ strcpy((prefix), (path)); streq((prefix), "/") ? NULL : strrchr((prefix), '/'); }) ; \
              _slash && !(_slash[(_slash) == (prefix)] = 0);             \
index 5034d51472d9edc9a255fbbe092227092c1259ee..74d90ecdf21235155ae6defc82dc439a16836f70 100644 (file)
 #include "bus-socket.h"
 #include "fd-util.h"
 #include "format-util.h"
+//#include "fs-util.h"
 #include "hexdecoct.h"
+//#include "io-util.h"
 #include "macro.h"
 #include "missing.h"
+//#include "path-util.h"
 #include "selinux-util.h"
 #include "signal-util.h"
 #include "stdio-util.h"
@@ -688,30 +691,249 @@ int bus_socket_start_auth(sd_bus *b) {
                 return bus_socket_start_auth_client(b);
 }
 
+static int bus_socket_inotify_setup(sd_bus *b) {
+        _cleanup_free_ int *new_watches = NULL;
+        _cleanup_free_ char *absolute = NULL;
+        size_t n_allocated = 0, n = 0, done = 0, i;
+        unsigned max_follow = 32;
+        const char *p;
+        int wd, r;
+
+        assert(b);
+        assert(b->watch_bind);
+        assert(b->sockaddr.sa.sa_family == AF_UNIX);
+        assert(b->sockaddr.un.sun_path[0] != 0);
+
+        /* Sets up an inotify fd in case watch_bind is enabled: wait until the configured AF_UNIX file system socket
+         * appears before connecting to it. The implemented is pretty simplistic: we just subscribe to relevant changes
+         * to all prefix components of the path, and every time we get an event for that we try to reconnect again,
+         * without actually caring what precisely the event we got told us. If we still can't connect we re-subscribe
+         * to all relevant changes of anything in the path, so that our watches include any possibly newly created path
+         * components. */
+
+        if (b->inotify_fd < 0) {
+                b->inotify_fd = inotify_init1(IN_NONBLOCK|IN_CLOEXEC);
+                if (b->inotify_fd < 0)
+                        return -errno;
+        }
+
+        /* Make sure the path is NUL terminated */
+        p = strndupa(b->sockaddr.un.sun_path, sizeof(b->sockaddr.un.sun_path));
+
+        /* Make sure the path is absolute */
+        r = path_make_absolute_cwd(p, &absolute);
+        if (r < 0)
+                goto fail;
+
+        /* Watch all parent directories, and don't mind any prefix that doesn't exist yet. For the innermost directory
+         * that exists we want to know when files are created or moved into it. For all parents of it we just care if
+         * they are removed or renamed. */
+
+        if (!GREEDY_REALLOC(new_watches, n_allocated, n + 1)) {
+                r = -ENOMEM;
+                goto fail;
+        }
+
+        /* Start with the top-level directory, which is a bit simpler than the rest, since it can't be a symlink, and
+         * always exists */
+        wd = inotify_add_watch(b->inotify_fd, "/", IN_CREATE|IN_MOVED_TO);
+        if (wd < 0) {
+                r = log_debug_errno(errno, "Failed to add inotify watch on /: %m");
+                goto fail;
+        } else
+                new_watches[n++] = wd;
+
+        for (;;) {
+                _cleanup_free_ char *component = NULL, *prefix = NULL, *destination = NULL;
+                size_t n_slashes, n_component;
+                char *c = NULL;
+
+                n_slashes = strspn(absolute + done, "/");
+                n_component = n_slashes + strcspn(absolute + done + n_slashes, "/");
+
+                if (n_component == 0) /* The end */
+                        break;
+
+                component = strndup(absolute + done, n_component);
+                if (!component) {
+                        r = -ENOMEM;
+                        goto fail;
+                }
+
+                /* A trailing slash? That's a directory, and not a socket then */
+                if (path_equal(component, "/")) {
+                        r = -EISDIR;
+                        goto fail;
+                }
+
+                /* A single dot? Let's eat this up */
+                if (path_equal(component, "/.")) {
+                        done += n_component;
+                        continue;
+                }
+
+                prefix = strndup(absolute, done + n_component);
+                if (!prefix) {
+                        r = -ENOMEM;
+                        goto fail;
+                }
+
+                if (!GREEDY_REALLOC(new_watches, n_allocated, n + 1)) {
+                        r = -ENOMEM;
+                        goto fail;
+                }
+
+                wd = inotify_add_watch(b->inotify_fd, prefix, IN_DELETE_SELF|IN_MOVE_SELF|IN_ATTRIB|IN_CREATE|IN_MOVED_TO|IN_DONT_FOLLOW);
+                log_debug("Added inotify watch for %s on bus %s: %i", prefix, strna(b->description), wd);
+
+                if (wd < 0) {
+                        if (IN_SET(errno, ENOENT, ELOOP))
+                                break; /* This component doesn't exist yet, or the path contains a cyclic symlink right now */
+
+                        r = log_debug_errno(errno, "Failed to add inotify watch on %s: %m", isempty(prefix) ? "/" : prefix);
+                        goto fail;
+                } else
+                        new_watches[n++] = wd;
+
+                /* Check if this is possibly a symlink. If so, let's follow it and watch it too. */
+                r = readlink_malloc(prefix, &destination);
+                if (r == -EINVAL) { /* not a symlink */
+                        done += n_component;
+                        continue;
+                }
+                if (r < 0)
+                        goto fail;
+
+                if (isempty(destination)) { /* Empty symlink target? Yuck! */
+                        r = -EINVAL;
+                        goto fail;
+                }
+
+                if (max_follow <= 0) { /* Let's make sure we don't follow symlinks forever */
+                        r = -ELOOP;
+                        goto fail;
+                }
+
+                if (path_is_absolute(destination)) {
+                        /* For absolute symlinks we build the new path and start anew */
+                        c = strjoin(destination, absolute + done + n_component);
+                        done = 0;
+                } else {
+                        _cleanup_free_ char *t = NULL;
+
+                        /* For relative symlinks we replace the last component, and try again */
+                        t = strndup(absolute, done);
+                        if (!t)
+                                return -ENOMEM;
+
+                        c = strjoin(t, "/", destination, absolute + done + n_component);
+                }
+                if (!c) {
+                        r = -ENOMEM;
+                        goto fail;
+                }
+
+                free(absolute);
+                absolute = c;
+
+                max_follow--;
+        }
+
+        /* And now, let's remove all watches from the previous iteration we don't need anymore */
+        for (i = 0; i < b->n_inotify_watches; i++) {
+                bool found = false;
+                size_t j;
+
+                for (j = 0; j < n; j++)
+                        if (new_watches[j] == b->inotify_watches[i]) {
+                                found = true;
+                                break;
+                        }
+
+                if (found)
+                        continue;
+
+                (void) inotify_rm_watch(b->inotify_fd, b->inotify_watches[i]);
+        }
+
+        free_and_replace(b->inotify_watches, new_watches);
+        b->n_inotify_watches = n;
+
+        return 0;
+
+fail:
+        bus_close_inotify_fd(b);
+        return r;
+}
+
 int bus_socket_connect(sd_bus *b) {
+        bool inotify_done = false;
         int r;
 
         assert(b);
-        assert(b->input_fd < 0);
-        assert(b->output_fd < 0);
-        assert(b->sockaddr.sa.sa_family != AF_UNSPEC);
 
-        b->input_fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
-        if (b->input_fd < 0)
-                return -errno;
+        for (;;) {
+                assert(b->input_fd < 0);
+                assert(b->output_fd < 0);
+                assert(b->sockaddr.sa.sa_family != AF_UNSPEC);
 
-        b->output_fd = b->input_fd;
+                b->input_fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
+                if (b->input_fd < 0)
+                        return -errno;
 
-        bus_socket_setup(b);
+                b->output_fd = b->input_fd;
+                bus_socket_setup(b);
 
-        r = connect(b->input_fd, &b->sockaddr.sa, b->sockaddr_size);
-        if (r < 0) {
-                if (errno == EINPROGRESS)
-                        return 1;
+                if (connect(b->input_fd, &b->sockaddr.sa, b->sockaddr_size) < 0) {
+                        if (errno == EINPROGRESS) {
 
-                return -errno;
+                                /* If we have any inotify watches open, close them now, we don't need them anymore, as
+                                 * we have successfully initiated a connection */
+                                bus_close_inotify_fd(b);
+
+                                /* Note that very likely we are already in BUS_OPENING state here, as we enter it when
+                                 * we start parsing the address string. The only reason we set the state explicitly
+                                 * here, is to undo BUS_WATCH_BIND, in case we did the inotify magic. */
+                                b->state = BUS_OPENING;
+                                return 1;
+                        }
+
+                        if (IN_SET(errno, ENOENT, ECONNREFUSED) &&  /* ENOENT → unix socket doesn't exist at all; ECONNREFUSED → unix socket stale */
+                            b->watch_bind &&
+                            b->sockaddr.sa.sa_family == AF_UNIX &&
+                            b->sockaddr.un.sun_path[0] != 0) {
+
+                                /* This connection attempt failed, let's release the socket for now, and start with a
+                                 * fresh one when reconnecting. */
+                                bus_close_io_fds(b);
+
+                                if (inotify_done) {
+                                        /* inotify set up already, don't do it again, just return now, and remember
+                                         * that we are waiting for inotify events now. */
+                                        b->state = BUS_WATCH_BIND;
+                                        return 1;
+                                }
+
+                                /* This is a file system socket, and the inotify logic is enabled. Let's create the necessary inotify fd. */
+                                r = bus_socket_inotify_setup(b);
+                                if (r < 0)
+                                        return r;
+
+                                /* Let's now try to connect a second time, because in theory there's otherwise a race
+                                 * here: the socket might have been created in the time between our first connect() and
+                                 * the time we set up the inotify logic. But let's remember that we set up inotify now,
+                                 * so that we don't do the connect() more than twice. */
+                                inotify_done = true;
+
+                        } else
+                                return -errno;
+                } else
+                        break;
         }
 
+        /* Yay, established, we don't need no inotify anymore! */
+        bus_close_inotify_fd(b);
+
         return bus_socket_start_auth(b);
 }
 
@@ -1069,3 +1291,34 @@ int bus_socket_process_authenticating(sd_bus *b) {
 
         return bus_socket_read_auth(b);
 }
+
+int bus_socket_process_watch_bind(sd_bus *b) {
+        int r, q;
+
+        assert(b);
+        assert(b->state == BUS_WATCH_BIND);
+        assert(b->inotify_fd >= 0);
+
+        r = flush_fd(b->inotify_fd);
+        if (r <= 0)
+                return r;
+
+        log_debug("Got inotify event on bus %s.", strna(b->description));
+
+        /* We flushed events out of the inotify fd. In that case, maybe the socket is valid now? Let's try to connect
+         * to it again */
+
+        r = bus_socket_connect(b);
+        if (r < 0)
+                return r;
+
+        q = bus_attach_io_events(b);
+        if (q < 0)
+                return q;
+
+        q = bus_attach_inotify_event(b);
+        if (q < 0)
+                return q;
+
+        return r;
+}
index 915a283f5add329b1a436f70065ac2aaca5dda1c..c180562f981dbe3ef1980047c3862ee29ae700c1 100644 (file)
@@ -34,5 +34,6 @@ int bus_socket_read_message(sd_bus *bus);
 
 int bus_socket_process_opening(sd_bus *b);
 int bus_socket_process_authenticating(sd_bus *b);
+int bus_socket_process_watch_bind(sd_bus *b);
 
 bool bus_socket_auth_needs_write(sd_bus *b);
index c71b0fa228b202cf643531a4d7afd01a414ed303..380881972aaed916dba70e8e7b5270034a76b92b 100644 (file)
@@ -75,8 +75,8 @@
         } while (false)
 
 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
-static int attach_io_events(sd_bus *b);
-static void detach_io_events(sd_bus *b);
+static void bus_detach_io_events(sd_bus *b);
+static void bus_detach_inotify_event(sd_bus *b);
 
 static thread_local sd_bus *default_system_bus = NULL;
 #if 0 /// UNNEEDED by elogind
@@ -84,16 +84,26 @@ static thread_local sd_bus *default_user_bus = NULL;
 #endif // 0
 static thread_local sd_bus *default_starter_bus = NULL;
 
-static void bus_close_fds(sd_bus *b) {
+void bus_close_io_fds(sd_bus *b) {
         assert(b);
 
-        detach_io_events(b);
+        bus_detach_io_events(b);
 
         if (b->input_fd != b->output_fd)
                 safe_close(b->output_fd);
         b->output_fd = b->input_fd = safe_close(b->input_fd);
 }
 
+void bus_close_inotify_fd(sd_bus *b) {
+        assert(b);
+
+        bus_detach_inotify_event(b);
+
+        b->inotify_fd = safe_close(b->inotify_fd);
+        b->inotify_watches = mfree(b->inotify_watches);
+        b->n_inotify_watches = 0;
+}
+
 static void bus_reset_queues(sd_bus *b) {
         assert(b);
 
@@ -137,7 +147,8 @@ static void bus_free(sd_bus *b) {
         if (b->default_bus_ptr)
                 *b->default_bus_ptr = NULL;
 
-        bus_close_fds(b);
+        bus_close_io_fds(b);
+        bus_close_inotify_fd(b);
 
         free(b->label);
         free(b->groups);
@@ -187,6 +198,7 @@ _public_ int sd_bus_new(sd_bus **ret) {
 
         r->n_ref = REFCNT_INIT;
         r->input_fd = r->output_fd = -1;
+        r->inotify_fd = -1;
         r->message_version = 1;
         r->creds_mask |= SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
         r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
@@ -384,6 +396,22 @@ _public_ int sd_bus_get_allow_interactive_authorization(sd_bus *bus) {
         return bus->allow_interactive_authorization;
 }
 
+_public_ int sd_bus_set_watch_bind(sd_bus *bus, int b) {
+        assert_return(bus, -EINVAL);
+        assert_return(bus->state == BUS_UNSET, -EPERM);
+        assert_return(!bus_pid_changed(bus), -ECHILD);
+
+        bus->watch_bind = b;
+        return 0;
+}
+
+_public_ int sd_bus_get_watch_bind(sd_bus *bus) {
+        assert_return(bus, -EINVAL);
+        assert_return(!bus_pid_changed(bus), -ECHILD);
+
+        return bus->watch_bind;
+}
+
 static int hello_callback(sd_bus_message *reply, void *userdata, sd_bus_error *error) {
         const char *s;
         sd_bus *bus;
@@ -906,7 +934,8 @@ static int bus_start_address(sd_bus *b) {
         assert(b);
 
         for (;;) {
-                bus_close_fds(b);
+                bus_close_io_fds(b);
+                bus_close_inotify_fd(b);
 
                 /* If you provide multiple different bus-addresses, we
                  * try all of them in order and use the first one that
@@ -914,20 +943,25 @@ static int bus_start_address(sd_bus *b) {
 
                 if (b->exec_path)
                         r = bus_socket_exec(b);
-
                 else if ((b->nspid > 0 || b->machine) && b->sockaddr.sa.sa_family != AF_UNSPEC)
                         r = bus_container_connect_socket(b);
-
                 else if (b->sockaddr.sa.sa_family != AF_UNSPEC)
                         r = bus_socket_connect(b);
-
                 else
                         goto next;
 
                 if (r >= 0) {
-                        r = attach_io_events(b);
-                        if (r >= 0)
-                                return r;
+                        int q;
+
+                        q = bus_attach_io_events(b);
+                        if (q < 0)
+                                return q;
+
+                        q = bus_attach_inotify_event(b);
+                        if (q < 0)
+                                return q;
+
+                        return r;
                 }
 
                 b->last_connect_error = -r;
@@ -1320,7 +1354,8 @@ _public_ void sd_bus_close(sd_bus *bus) {
          * the bus object and the bus may be freed */
         bus_reset_queues(bus);
 
-        bus_close_fds(bus);
+        bus_close_io_fds(bus);
+        bus_close_inotify_fd(bus);
 }
 
 _public_ sd_bus* sd_bus_flush_close_unref(sd_bus *bus) {
@@ -1337,7 +1372,7 @@ _public_ sd_bus* sd_bus_flush_close_unref(sd_bus *bus) {
 static void bus_enter_closing(sd_bus *bus) {
         assert(bus);
 
-        if (!IN_SET(bus->state, BUS_OPENING, BUS_AUTHENTICATING, BUS_HELLO, BUS_RUNNING))
+        if (!IN_SET(bus->state, BUS_WATCH_BIND, BUS_OPENING, BUS_AUTHENTICATING, BUS_HELLO, BUS_RUNNING))
                 return;
 
         bus->state = BUS_CLOSING;
@@ -1988,7 +2023,16 @@ _public_ int sd_bus_get_fd(sd_bus *bus) {
         assert_return(bus->input_fd == bus->output_fd, -EPERM);
         assert_return(!bus_pid_changed(bus), -ECHILD);
 
-        return bus->input_fd;
+        if (bus->state == BUS_CLOSED)
+                return -ENOTCONN;
+
+        if (bus->inotify_fd >= 0)
+                return bus->inotify_fd;
+
+        if (bus->input_fd >= 0)
+                return bus->input_fd;
+
+        return -ENOTCONN;
 }
 
 _public_ int sd_bus_get_events(sd_bus *bus) {
@@ -1997,23 +2041,40 @@ _public_ int sd_bus_get_events(sd_bus *bus) {
         assert_return(bus, -EINVAL);
         assert_return(!bus_pid_changed(bus), -ECHILD);
 
-        if (!BUS_IS_OPEN(bus->state) && bus->state != BUS_CLOSING)
+        switch (bus->state) {
+
+        case BUS_UNSET:
+        case BUS_CLOSED:
                 return -ENOTCONN;
 
-        if (bus->state == BUS_OPENING)
+        case BUS_WATCH_BIND:
+                flags |= POLLIN;
+                break;
+
+        case BUS_OPENING:
                 flags |= POLLOUT;
-        else if (bus->state == BUS_AUTHENTICATING) {
+                break;
 
+        case BUS_AUTHENTICATING:
                 if (bus_socket_auth_needs_write(bus))
                         flags |= POLLOUT;
 
                 flags |= POLLIN;
+                break;
 
-        } else if (IN_SET(bus->state, BUS_RUNNING, BUS_HELLO)) {
+        case BUS_RUNNING:
+        case BUS_HELLO:
                 if (bus->rqueue_size <= 0)
                         flags |= POLLIN;
                 if (bus->wqueue_size > 0)
                         flags |= POLLOUT;
+                break;
+
+        case BUS_CLOSING:
+                break;
+
+        default:
+                assert_not_reached("Unknown state");
         }
 
         return flags;
@@ -2034,39 +2095,45 @@ _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
                 return 1;
         }
 
-        if (bus->state == BUS_CLOSING) {
-                *timeout_usec = 0;
-                return 1;
-        }
+        switch (bus->state) {
 
-        if (bus->state == BUS_AUTHENTICATING) {
+        case BUS_AUTHENTICATING:
                 *timeout_usec = bus->auth_timeout;
                 return 1;
-        }
 
-        if (!IN_SET(bus->state, BUS_RUNNING, BUS_HELLO)) {
-                *timeout_usec = (uint64_t) -1;
-                return 0;
-        }
+        case BUS_RUNNING:
+        case BUS_HELLO:
+                if (bus->rqueue_size > 0) {
+                        *timeout_usec = 0;
+                        return 1;
+                }
+
+                c = prioq_peek(bus->reply_callbacks_prioq);
+                if (!c) {
+                        *timeout_usec = (uint64_t) -1;
+                        return 0;
+                }
+
+                if (c->timeout == 0) {
+                        *timeout_usec = (uint64_t) -1;
+                        return 0;
+                }
+
+                *timeout_usec = c->timeout;
+                return 1;
 
-        if (bus->rqueue_size > 0) {
+        case BUS_CLOSING:
                 *timeout_usec = 0;
                 return 1;
-        }
 
-        c = prioq_peek(bus->reply_callbacks_prioq);
-        if (!c) {
+        case BUS_WATCH_BIND:
+        case BUS_OPENING:
                 *timeout_usec = (uint64_t) -1;
                 return 0;
-        }
 
-        if (c->timeout == 0) {
-                *timeout_usec = (uint64_t) -1;
-                return 0;
+        default:
+                assert_not_reached("Unknown or unexpected stat");
         }
-
-        *timeout_usec = c->timeout;
-        return 1;
 }
 
 static int process_timeout(sd_bus *bus) {
@@ -2129,8 +2196,8 @@ static int process_timeout(sd_bus *bus) {
 
         sd_bus_slot_unref(slot);
 
-        /* When this is the hello message and it failed, then make sure to propagate the error up, don't just log and
-         * ignore the callback handler's return value. */
+        /* When this is the hello message and it timed out, then make sure to propagate the error up, don't just log
+         * and ignore the callback handler's return value. */
         if (is_hello)
                 return r;
 
@@ -2234,8 +2301,8 @@ static int process_reply(sd_bus *bus, sd_bus_message *m) {
 
         sd_bus_slot_unref(slot);
 
-        /* When this is the hello message and it timed out, then make sure to propagate the error up, don't just log
-         * and ignore the callback handler's return value. */
+        /* When this is the hello message and it failed, then make sure to propagate the error up, don't just log and
+         * ignore the callback handler's return value. */
         if (is_hello)
                 return r;
 
@@ -2671,48 +2738,44 @@ static int bus_process_internal(sd_bus *bus, bool hint_priority, int64_t priorit
         case BUS_CLOSED:
                 return -ECONNRESET;
 
+        case BUS_WATCH_BIND:
+                r = bus_socket_process_watch_bind(bus);
+                break;
+
         case BUS_OPENING:
                 r = bus_socket_process_opening(bus);
-                if (IN_SET(r, -ENOTCONN, -ECONNRESET, -EPIPE, -ESHUTDOWN)) {
-                        bus_enter_closing(bus);
-                        r = 1;
-                } else if (r < 0)
-                        return r;
-                if (ret)
-                        *ret = NULL;
-                return r;
+                break;
 
         case BUS_AUTHENTICATING:
                 r = bus_socket_process_authenticating(bus);
-                if (IN_SET(r, -ENOTCONN, -ECONNRESET, -EPIPE, -ESHUTDOWN)) {
-                        bus_enter_closing(bus);
-                        r = 1;
-                } else if (r < 0)
-                        return r;
-
-                if (ret)
-                        *ret = NULL;
-
-                return r;
+                break;
 
         case BUS_RUNNING:
         case BUS_HELLO:
                 r = process_running(bus, hint_priority, priority, ret);
-                if (IN_SET(r, -ENOTCONN, -ECONNRESET, -EPIPE, -ESHUTDOWN)) {
-                        bus_enter_closing(bus);
-                        r = 1;
+                if (r >= 0)
+                        return r;
 
-                        if (ret)
-                                *ret = NULL;
-                }
-
-                return r;
+                /* This branch initializes *ret, hence we don't use the generic error checking below */
+                break;
 
         case BUS_CLOSING:
                 return process_closing(bus, ret);
+
+        default:
+                assert_not_reached("Unknown state");
         }
 
-        assert_not_reached("Unknown state");
+        if (IN_SET(r, -ENOTCONN, -ECONNRESET, -EPIPE, -ESHUTDOWN)) {
+                bus_enter_closing(bus);
+                r = 1;
+        } else if (r < 0)
+                return r;
+
+        if (ret)
+                *ret = NULL;
+
+        return r;
 }
 
 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
@@ -2725,7 +2788,7 @@ _public_ int sd_bus_process_priority(sd_bus *bus, int64_t priority, sd_bus_messa
 
 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
         struct pollfd p[2] = {};
-        int r, e, n;
+        int r, n;
         struct timespec ts;
         usec_t m = USEC_INFINITY;
 
@@ -2737,45 +2800,52 @@ static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
         if (!BUS_IS_OPEN(bus->state))
                 return -ENOTCONN;
 
-        e = sd_bus_get_events(bus);
-        if (e < 0)
-                return e;
-
-        if (need_more)
-                /* The caller really needs some more data, he doesn't
-                 * care about what's already read, or any timeouts
-                 * except its own. */
-                e |= POLLIN;
-        else {
-                usec_t until;
-                /* The caller wants to process if there's something to
-                 * process, but doesn't care otherwise */
-
-                r = sd_bus_get_timeout(bus, &until);
-                if (r < 0)
-                        return r;
-                if (r > 0) {
-                        usec_t nw;
-                        nw = now(CLOCK_MONOTONIC);
-                        m = until > nw ? until - nw : 0;
-                }
-        }
-
-        if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
-                m = timeout_usec;
+        if (bus->state == BUS_WATCH_BIND) {
+                assert(bus->inotify_fd >= 0);
 
-        p[0].fd = bus->input_fd;
-        if (bus->output_fd == bus->input_fd) {
-                p[0].events = e;
+                p[0].events = POLLIN;
+                p[0].fd = bus->inotify_fd;
                 n = 1;
         } else {
-                p[0].events = e & POLLIN;
-                p[1].fd = bus->output_fd;
-                p[1].events = e & POLLOUT;
-                n = 2;
+                int e;
+
+                e = sd_bus_get_events(bus);
+                if (e < 0)
+                        return e;
+
+                if (need_more)
+                        /* The caller really needs some more data, he doesn't
+                         * care about what's already read, or any timeouts
+                         * except its own. */
+                        e |= POLLIN;
+                else {
+                        usec_t until;
+                        /* The caller wants to process if there's something to
+                         * process, but doesn't care otherwise */
+
+                        r = sd_bus_get_timeout(bus, &until);
+                        if (r < 0)
+                                return r;
+                        if (r > 0)
+                                m = usec_sub_unsigned(until, now(CLOCK_MONOTONIC));
+                }
+
+                p[0].fd = bus->input_fd;
+                if (bus->output_fd == bus->input_fd) {
+                        p[0].events = e;
+                        n = 1;
+                } else {
+                        p[0].events = e & POLLIN;
+                        p[1].fd = bus->output_fd;
+                        p[1].events = e & POLLOUT;
+                        n = 2;
+                }
         }
 
-        r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
+        if (timeout_usec != (uint64_t) -1 && (m == USEC_INFINITY || timeout_usec < m))
+                m = timeout_usec;
+
+        r = ppoll(p, n, m == USEC_INFINITY ? NULL : timespec_store(&ts, m), NULL);
         if (r < 0)
                 return -errno;
 
@@ -2811,6 +2881,10 @@ _public_ int sd_bus_flush(sd_bus *bus) {
         if (!BUS_IS_OPEN(bus->state))
                 return -ENOTCONN;
 
+        /* We never were connected? Don't hang in inotify for good, as there's no timeout set for it */
+        if (bus->state == BUS_WATCH_BIND)
+                return -EUNATCH;
+
         r = bus_ensure_running(bus);
         if (r < 0)
                 return r;
@@ -2983,6 +3057,8 @@ static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userd
 
         assert(bus);
 
+        /* Note that this is called both on input_fd, output_fd as well as inotify_fd events */
+
         r = sd_bus_process(bus, NULL);
         if (r < 0) {
                 log_debug_errno(r, "Processing of bus failed, closing down: %m");
@@ -3070,7 +3146,7 @@ static int quit_callback(sd_event_source *event, void *userdata) {
         return 1;
 }
 
-static int attach_io_events(sd_bus *bus) {
+int bus_attach_io_events(sd_bus *bus) {
         int r;
 
         assert(bus);
@@ -3124,7 +3200,7 @@ static int attach_io_events(sd_bus *bus) {
         return 0;
 }
 
-static void detach_io_events(sd_bus *bus) {
+static void bus_detach_io_events(sd_bus *bus) {
         assert(bus);
 
         if (bus->input_io_event_source) {
@@ -3138,6 +3214,44 @@ static void detach_io_events(sd_bus *bus) {
         }
 }
 
+int bus_attach_inotify_event(sd_bus *bus) {
+        int r;
+
+        assert(bus);
+
+        if (bus->inotify_fd < 0)
+                return 0;
+
+        if (!bus->event)
+                return 0;
+
+        if (!bus->inotify_event_source) {
+                r = sd_event_add_io(bus->event, &bus->inotify_event_source, bus->inotify_fd, EPOLLIN, io_callback, bus);
+                if (r < 0)
+                        return r;
+
+                r = sd_event_source_set_priority(bus->inotify_event_source, bus->event_priority);
+                if (r < 0)
+                        return r;
+
+                r = sd_event_source_set_description(bus->inotify_event_source, "bus-inotify");
+        } else
+                r = sd_event_source_set_io_fd(bus->inotify_event_source, bus->inotify_fd);
+        if (r < 0)
+                return r;
+
+        return 0;
+}
+
+static void bus_detach_inotify_event(sd_bus *bus) {
+        assert(bus);
+
+        if (bus->inotify_event_source) {
+                sd_event_source_set_enabled(bus->inotify_event_source, SD_EVENT_OFF);
+                bus->inotify_event_source = sd_event_source_unref(bus->inotify_event_source);
+        }
+}
+
 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
         int r;
 
@@ -3178,7 +3292,11 @@ _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
         if (r < 0)
                 goto fail;
 
-        r = attach_io_events(bus);
+        r = bus_attach_io_events(bus);
+        if (r < 0)
+                goto fail;
+
+        r = bus_attach_inotify_event(bus);
         if (r < 0)
                 goto fail;
 
@@ -3195,7 +3313,8 @@ _public_ int sd_bus_detach_event(sd_bus *bus) {
         if (!bus->event)
                 return 0;
 
-        detach_io_events(bus);
+        bus_detach_io_events(bus);
+        bus_detach_inotify_event(bus);
 
         if (bus->time_event_source) {
                 sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);
diff --git a/src/libelogind/sd-bus/test-bus-watch-bind.c b/src/libelogind/sd-bus/test-bus-watch-bind.c
new file mode 100644 (file)
index 0000000..38c80fd
--- /dev/null
@@ -0,0 +1,239 @@
+/* SPDX-License-Identifier: LGPL-2.1+ */
+/***
+  This file is part of systemd.
+
+  Copyright 2017 Lennart Poettering
+
+  systemd is free software; you can redistribute it and/or modify it
+  under the terms of the GNU Lesser General Public License as published by
+  the Free Software Foundation; either version 2.1 of the License, or
+  (at your option) any later version.
+
+  systemd is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public License
+  along with systemd; If not, see <http://www.gnu.org/licenses/>.
+***/
+
+//#include <pthread.h>
+
+//#include "sd-bus.h"
+//#include "sd-event.h"
+//#include "sd-id128.h"
+
+//#include "alloc-util.h"
+//#include "fd-util.h"
+//#include "fileio.h"
+//#include "fs-util.h"
+//#include "mkdir.h"
+//#include "path-util.h"
+//#include "random-util.h"
+//#include "rm-rf.h"
+//#include "socket-util.h"
+//#include "string-util.h"
+
+static int method_foobar(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) {
+        log_info("Got Foobar() call.");
+
+        assert_se(sd_event_exit(sd_bus_get_event(sd_bus_message_get_bus(m)), 0) >= 0);
+        return sd_bus_reply_method_return(m, NULL);
+}
+
+static int method_exit(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) {
+        log_info("Got Exit() call");
+        assert_se(sd_event_exit(sd_bus_get_event(sd_bus_message_get_bus(m)), 1) >= 0);
+        return sd_bus_reply_method_return(m, NULL);
+}
+
+static const sd_bus_vtable vtable[] = {
+        SD_BUS_VTABLE_START(0),
+        SD_BUS_METHOD("Foobar", NULL, NULL, method_foobar, SD_BUS_VTABLE_UNPRIVILEGED),
+        SD_BUS_METHOD("Exit", NULL, NULL, method_exit, SD_BUS_VTABLE_UNPRIVILEGED),
+        SD_BUS_VTABLE_END,
+};
+
+static void* thread_server(void *p) {
+        _cleanup_free_ char *suffixed = NULL, *suffixed2 = NULL, *d = NULL;
+        _cleanup_close_ int fd = -1;
+        union sockaddr_union u = {
+                .un.sun_family = AF_UNIX,
+        };
+        const char *path = p;
+
+        log_debug("Initializing server");
+
+        /* Let's play some games, by slowly creating the socket directory, and renaming it in the middle */
+        (void) usleep(100 * USEC_PER_MSEC);
+
+        assert_se(mkdir_parents(path, 0755) >= 0);
+        (void) usleep(100 * USEC_PER_MSEC);
+
+        d = dirname_malloc(path);
+        assert_se(d);
+        assert_se(asprintf(&suffixed, "%s.%" PRIx64, d, random_u64()) >= 0);
+        assert_se(rename(d, suffixed) >= 0);
+        (void) usleep(100 * USEC_PER_MSEC);
+
+        assert_se(asprintf(&suffixed2, "%s.%" PRIx64, d, random_u64()) >= 0);
+        assert_se(symlink(suffixed2, d) >= 0);
+        (void) usleep(100 * USEC_PER_MSEC);
+
+        assert_se(symlink(basename(suffixed), suffixed2) >= 0);
+        (void) usleep(100 * USEC_PER_MSEC);
+
+        strncpy(u.un.sun_path, path, sizeof(u.un.sun_path));
+
+        fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
+        assert_se(fd >= 0);
+
+        assert_se(bind(fd, &u.sa, SOCKADDR_UN_LEN(u.un)) >= 0);
+        usleep(100 * USEC_PER_MSEC);
+
+        assert_se(listen(fd, SOMAXCONN) >= 0);
+        usleep(100 * USEC_PER_MSEC);
+
+        assert_se(touch(path) >= 0);
+        usleep(100 * USEC_PER_MSEC);
+
+        log_debug("Initialized server");
+
+        for (;;) {
+                _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
+                _cleanup_(sd_event_unrefp) sd_event *event = NULL;
+                sd_id128_t id;
+                int bus_fd, code;
+
+                assert_se(sd_id128_randomize(&id) >= 0);
+
+                assert_se(sd_event_new(&event) >= 0);
+
+                bus_fd = accept4(fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
+                assert_se(bus_fd >= 0);
+
+                log_debug("Accepted server connection");
+
+                assert_se(sd_bus_new(&bus) >= 0);
+                assert_se(sd_bus_set_description(bus, "server") >= 0);
+                assert_se(sd_bus_set_fd(bus, bus_fd, bus_fd) >= 0);
+                assert_se(sd_bus_set_server(bus, true, id) >= 0);
+                /* assert_se(sd_bus_set_anonymous(bus, true) >= 0); */
+
+                assert_se(sd_bus_attach_event(bus, event, 0) >= 0);
+
+                assert_se(sd_bus_add_object_vtable(bus, NULL, "/foo", "foo.TestInterface", vtable, NULL) >= 0);
+
+                assert_se(sd_bus_start(bus) >= 0);
+
+                assert_se(sd_event_loop(event) >= 0);
+
+                assert_se(sd_event_get_exit_code(event, &code) >= 0);
+
+                if (code > 0)
+                        break;
+        }
+
+        log_debug("Server done");
+
+        return NULL;
+}
+
+static void* thread_client1(void *p) {
+        _cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL;
+        _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
+        const char *path = p, *t;
+        int r;
+
+        log_debug("Initializing client1");
+
+        assert_se(sd_bus_new(&bus) >= 0);
+        assert_se(sd_bus_set_description(bus, "client1") >= 0);
+
+        t = strjoina("unix:path=", path);
+        assert_se(sd_bus_set_address(bus, t) >= 0);
+        assert_se(sd_bus_set_watch_bind(bus, true) >= 0);
+        assert_se(sd_bus_start(bus) >= 0);
+
+        r = sd_bus_call_method(bus, "foo.bar", "/foo", "foo.TestInterface", "Foobar", &error, NULL, NULL);
+        assert_se(r >= 0);
+
+        log_debug("Client1 done");
+
+        return NULL;
+}
+
+static int client2_callback(sd_bus_message *m, void *userdata, sd_bus_error *ret_error) {
+        assert_se(sd_bus_message_is_method_error(m, NULL) == 0);
+        assert_se(sd_event_exit(sd_bus_get_event(sd_bus_message_get_bus(m)), 0) >= 0);
+        return 0;
+}
+
+static void* thread_client2(void *p) {
+        _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
+        _cleanup_(sd_event_unrefp) sd_event *event = NULL;
+        const char *path = p, *t;
+
+        log_debug("Initializing client2");
+
+        assert_se(sd_event_new(&event) >= 0);
+        assert_se(sd_bus_new(&bus) >= 0);
+        assert_se(sd_bus_set_description(bus, "client2") >= 0);
+
+        t = strjoina("unix:path=", path);
+        assert_se(sd_bus_set_address(bus, t) >= 0);
+        assert_se(sd_bus_set_watch_bind(bus, true) >= 0);
+        assert_se(sd_bus_attach_event(bus, event, 0) >= 0);
+        assert_se(sd_bus_start(bus) >= 0);
+
+        assert_se(sd_bus_call_method_async(bus, NULL, "foo.bar", "/foo", "foo.TestInterface", "Foobar", client2_callback, NULL, NULL) >= 0);
+
+        assert_se(sd_event_loop(event) >= 0);
+
+        log_debug("Client2 done");
+
+        return NULL;
+}
+
+static void request_exit(const char *path) {
+        _cleanup_(sd_bus_flush_close_unrefp) sd_bus *bus = NULL;
+        const char *t;
+
+        assert_se(sd_bus_new(&bus) >= 0);
+
+        t = strjoina("unix:path=", path);
+        assert_se(sd_bus_set_address(bus, t) >= 0);
+        assert_se(sd_bus_set_watch_bind(bus, true) >= 0);
+        assert_se(sd_bus_set_description(bus, "request-exit") >= 0);
+        assert_se(sd_bus_start(bus) >= 0);
+
+        assert_se(sd_bus_call_method(bus, "foo.bar", "/foo", "foo.TestInterface", "Exit", NULL, NULL, NULL) >= 0);
+}
+
+int main(int argc, char *argv[]) {
+        _cleanup_(rm_rf_physical_and_freep) char *d = NULL;
+        pthread_t server, client1, client2;
+        char *path;
+
+        log_set_max_level(LOG_DEBUG);
+
+        /* We use /dev/shm here rather than /tmp, since some weird distros might set up /tmp as some weird fs that
+         * doesn't support inotify properly. */
+        assert_se(mkdtemp_malloc("/dev/shm/elogind-watch-bind-XXXXXX", &d) >= 0);
+
+        path = strjoina(d, "/this/is/a/socket");
+
+        assert_se(pthread_create(&server, NULL, thread_server, path) == 0);
+        assert_se(pthread_create(&client1, NULL, thread_client1, path) == 0);
+        assert_se(pthread_create(&client2, NULL, thread_client2, path) == 0);
+
+        assert_se(pthread_join(client1, NULL) == 0);
+        assert_se(pthread_join(client2, NULL) == 0);
+
+        request_exit(path);
+
+        assert_se(pthread_join(server, NULL) == 0);
+
+        return 0;
+}
index c13c9b7d31b3ffbb7120900db83942f590074b92..62f10b632244b10f340605a468b351371418a853 100644 (file)
@@ -150,8 +150,10 @@ int sd_bus_set_allow_interactive_authorization(sd_bus *bus, int b);
 int sd_bus_get_allow_interactive_authorization(sd_bus *bus);
 int sd_bus_set_exit_on_disconnect(sd_bus *bus, int b);
 int sd_bus_get_exit_on_disconnect(sd_bus *bus);
+int sd_bus_set_watch_bind(sd_bus *bus, int b);
+int sd_bus_get_watch_bind(sd_bus *bus);
 
-int sd_bus_start(sd_bus *ret);
+int sd_bus_start(sd_bus *bus);
 
 int sd_bus_try_close(sd_bus *bus);
 void sd_bus_close(sd_bus *bus);
index ac8467ee693debdec864ffda3895b2dc53dc1f22..72fb33d9aec69cee1d0da4fc8e150cd3f378d228 100644 (file)
@@ -821,7 +821,11 @@ tests += [
          [threads]],
 
 #if 0 /// UNNEEDED in elogind
-#         [['src/libelogind/sd-bus/test-bus-chat.c'],
+#         [['src/libsystemd/sd-bus/test-bus-watch-bind.c'],
+#          [],
+#          [threads], '', 'timeout=120'],
+# 
+#         [['src/libsystemd/sd-bus/test-bus-chat.c'],
 #          [],
 #          [threads]],
 #