chiark / gitweb /
dbus: send signals about jobs to the clients having created them unconditionally...
authorLennart Poettering <lennart@poettering.net>
Sun, 4 Jul 2010 22:58:07 +0000 (00:58 +0200)
committerLennart Poettering <lennart@poettering.net>
Sun, 4 Jul 2010 22:58:07 +0000 (00:58 +0200)
12 files changed:
fixme
src/dbus-job.c
src/dbus-manager.c
src/dbus-unit.c
src/dbus.c
src/dbus.h
src/job.c
src/job.h
src/manager.c
src/manager.h
src/systemctl.c
src/unit.c

diff --git a/fixme b/fixme
index 49b69a629f1b05c64490b797147b5cbf5cad5ba0..2f133d43a6a931e083158d1b014584b44e401ec6 100644 (file)
--- a/fixme
+++ b/fixme
@@ -39,8 +39,6 @@
 
 * systemctl daemon-reload is kaputt
 
-* get rid of Subscribe() in systemctl
-
 * Turn around negative options
 
 * Add missing man pages: update systemd.1, finish daemon.7
index 48b1588deea8b133c8343f9ef25187cbd643941a..0956dcff6f4c6cdb1d78d6b6ae51de63dcbd2e03 100644 (file)
@@ -146,6 +146,32 @@ const DBusObjectPathVTable bus_job_vtable = {
         .message_function = bus_job_message_handler
 };
 
+static int job_send_message(Job *j, DBusMessage *m) {
+        int r;
+
+        assert(j);
+        assert(m);
+
+        if (bus_has_subscriber(j->manager)) {
+                if ((r = bus_broadcast(j->manager, m)) < 0)
+                        return r;
+
+        } else  if (j->bus_client) {
+                /* If nobody is subscribed, we just send the message
+                 * to the client which created the job */
+
+                assert(j->bus);
+
+                if (!dbus_message_set_destination(m, j->bus_client))
+                        return -ENOMEM;
+
+                if (!dbus_connection_send(j->bus, m, NULL))
+                        return -ENOMEM;
+        }
+
+        return 0;
+}
+
 void bus_job_send_change_signal(Job *j) {
         char *p = NULL;
         DBusMessage *m = NULL;
@@ -156,7 +182,7 @@ void bus_job_send_change_signal(Job *j) {
         LIST_REMOVE(Job, dbus_queue, j->manager->dbus_job_queue, j);
         j->in_dbus_queue = false;
 
-        if (set_isempty(j->manager->subscribed)) {
+        if (!bus_has_subscriber(j->manager) && !j->bus_client) {
                 j->sent_dbus_new_signal = true;
                 return;
         }
@@ -182,7 +208,7 @@ void bus_job_send_change_signal(Job *j) {
                         goto oom;
         }
 
-        if (bus_broadcast(j->manager, m) < 0)
+        if (job_send_message(j, m) < 0)
                 goto oom;
 
         free(p);
@@ -208,7 +234,7 @@ void bus_job_send_removed_signal(Job *j, bool success) {
 
         assert(j);
 
-        if (set_isempty(j->manager->subscribed))
+        if (!bus_has_subscriber(j->manager) && !j->bus_client)
                 return;
 
         if (!j->sent_dbus_new_signal)
@@ -227,7 +253,7 @@ void bus_job_send_removed_signal(Job *j, bool success) {
                                       DBUS_TYPE_INVALID))
                 goto oom;
 
-        if (bus_broadcast(j->manager, m) < 0)
+        if (job_send_message(j, m) < 0)
                 goto oom;
 
         free(p);
index 705a4dc974e857eed44d60f32627fd0eb820c65f..704e5fa0184ca34fa2e0043f10bf14f4a4171e55 100644 (file)
@@ -452,14 +452,25 @@ static DBusHandlerResult bus_manager_message_handler(DBusConnection *connection,
 
         } else if (dbus_message_is_method_call(message, "org.freedesktop.systemd1.Manager", "Subscribe")) {
                 char *client;
+                Set *s;
+
+                if (!(s = BUS_CONNECTION_SUBSCRIBED(m, connection))) {
+                        if (!(s = set_new(string_hash_func, string_compare_func)))
+                                goto oom;
+
+                        if (!(dbus_connection_set_data(connection, m->subscribed_data_slot, s, NULL))) {
+                                set_free(s);
+                                goto oom;
+                        }
+                }
 
                 if (!(client = strdup(dbus_message_get_sender(message))))
                         goto oom;
 
-                r = set_put(m->subscribed, client);
-
-                if (r < 0)
+                if ((r = set_put(s, client)) < 0) {
+                        free(client);
                         return bus_send_error_reply(m, connection, message, NULL, r);
+                }
 
                 if (!(reply = dbus_message_new_method_return(message)))
                         goto oom;
@@ -467,7 +478,7 @@ static DBusHandlerResult bus_manager_message_handler(DBusConnection *connection,
         } else if (dbus_message_is_method_call(message, "org.freedesktop.systemd1.Manager", "Unsubscribe")) {
                 char *client;
 
-                if (!(client = set_remove(m->subscribed, (char*) dbus_message_get_sender(message))))
+                if (!(client = set_remove(BUS_CONNECTION_SUBSCRIBED(m, connection), (char*) dbus_message_get_sender(message))))
                         return bus_send_error_reply(m, connection, message, NULL, -ENOENT);
 
                 free(client);
@@ -702,6 +713,11 @@ static DBusHandlerResult bus_manager_message_handler(DBusConnection *connection,
                 if ((r = manager_add_job(m, job_type, u, mode, true, &j)) < 0)
                         return bus_send_error_reply(m, connection, message, NULL, r);
 
+                if (!(j->bus_client = strdup(dbus_message_get_sender(message))))
+                        goto oom;
+
+                j->bus = connection;
+
                 if (!(reply = dbus_message_new_method_return(message)))
                         goto oom;
 
@@ -713,6 +729,7 @@ static DBusHandlerResult bus_manager_message_handler(DBusConnection *connection,
                                     DBUS_TYPE_OBJECT_PATH, &path,
                                     DBUS_TYPE_INVALID))
                         goto oom;
+
         }
 
         free(path);
index 17ca7bdd8965bc51dd5da46fc5e60496c9457195..1f74a2a7ec905df8f2a674c88b3705b0fba18b79 100644 (file)
@@ -376,7 +376,7 @@ void bus_unit_send_change_signal(Unit *u) {
         LIST_REMOVE(Meta, dbus_queue, u->meta.manager->dbus_unit_queue, &u->meta);
         u->meta.in_dbus_queue = false;
 
-        if (set_isempty(u->meta.manager->subscribed)) {
+        if (!bus_has_subscriber(u->meta.manager)) {
                 u->meta.sent_dbus_new_signal = true;
                 return;
         }
@@ -427,7 +427,7 @@ void bus_unit_send_removed_signal(Unit *u) {
 
         assert(u);
 
-        if (set_isempty(u->meta.manager->subscribed))
+        if (!bus_has_subscriber(u->meta.manager))
                 return;
 
         if (!u->meta.sent_dbus_new_signal)
index 385bf6a93720b2ef737a5e62d4c2bebfe898b438..6660cf0c5a7ed77d0ecd956fbe327f24905a2640 100644 (file)
@@ -355,8 +355,8 @@ static DBusHandlerResult api_bus_message_filter(DBusConnection *connection, DBus
                                            DBUS_TYPE_INVALID))
                         log_error("Failed to parse NameOwnerChanged message: %s", error.message);
                 else  {
-                        if (set_remove(m->subscribed, (char*) name))
-                                log_debug("Subscription client vanished: %s (left: %u)", name, set_size(m->subscribed));
+                        if (set_remove(BUS_CONNECTION_SUBSCRIBED(m, connection), (char*) name))
+                                log_debug("Subscription client vanished: %s (left: %u)", name, set_size(BUS_CONNECTION_SUBSCRIBED(m, connection)));
 
                         if (old_owner[0] == 0)
                                 old_owner = NULL;
@@ -882,13 +882,6 @@ static int bus_init_api(Manager *m) {
                   strnull(dbus_bus_get_unique_name(m->api_bus)));
         dbus_free(id);
 
-        if (!m->subscribed)
-                if (!(m->subscribed = set_new(string_hash_func, string_compare_func))) {
-                        log_error("Not enough memory");
-                        r = -ENOMEM;
-                        goto fail;
-                }
-
         return 0;
 
 fail:
@@ -959,6 +952,12 @@ int bus_init(Manager *m) {
                         return -ENOMEM;
                 }
 
+        if (m->subscribed_data_slot < 0)
+                if (!dbus_pending_call_allocate_data_slot(&m->subscribed_data_slot)) {
+                        log_error("Not enough memory");
+                        return -ENOMEM;
+                }
+
         if ((r = bus_init_system(m)) < 0 ||
             (r = bus_init_api(m)) < 0 ||
             (r = bus_init_private(m)) < 0)
@@ -968,9 +967,30 @@ int bus_init(Manager *m) {
 }
 
 static void shutdown_connection(Manager *m, DBusConnection *c) {
+        Set *s;
+        Job *j;
+        Iterator i;
+
+        HASHMAP_FOREACH(j, m->jobs, i)
+                if (j->bus == c) {
+                        free(j->bus_client);
+                        j->bus_client = NULL;
+
+                        j->bus = NULL;
+                }
+
         set_remove(m->bus_connections, c);
         set_remove(m->bus_connections_for_dispatch, c);
 
+        if ((s = BUS_CONNECTION_SUBSCRIBED(m, c))) {
+                char *t;
+
+                while ((t = set_steal_first(s)))
+                        free(t);
+
+                set_free(s);
+        }
+
         dbus_connection_set_dispatch_status_function(c, NULL, NULL, NULL);
         dbus_connection_flush(c);
         dbus_connection_close(c);
@@ -988,15 +1008,6 @@ static void bus_done_api(Manager *m) {
                 m->api_bus = NULL;
         }
 
-        if (m->subscribed) {
-                char *c;
-
-                while ((c = set_steal_first(m->subscribed)))
-                        free(c);
-
-                set_free(m->subscribed);
-                m->subscribed = NULL;
-        }
 
        if (m->queued_message) {
                dbus_message_unref(m->queued_message);
@@ -1043,6 +1054,9 @@ void bus_done(Manager *m) {
 
         if (m->name_data_slot >= 0)
                dbus_pending_call_free_data_slot(&m->name_data_slot);
+
+        if (m->subscribed_data_slot >= 0)
+                dbus_pending_call_free_data_slot(&m->subscribed_data_slot);
 }
 
 static void query_pid_pending_cb(DBusPendingCall *pending, void *userdata) {
@@ -1053,7 +1067,7 @@ static void query_pid_pending_cb(DBusPendingCall *pending, void *userdata) {
 
         dbus_error_init(&error);
 
-        assert_se(name = dbus_pending_call_get_data(pending, m->name_data_slot));
+        assert_se(name = BUS_PENDING_CALL_NAME(m, pending));
         assert_se(reply = dbus_pending_call_steal_reply(pending));
 
         switch (dbus_message_get_type(reply)) {
@@ -1538,3 +1552,27 @@ int bus_parse_strv(DBusMessage *m, char ***_l) {
 
         return 0;
 }
+
+bool bus_has_subscriber(Manager *m) {
+        Iterator i;
+        DBusConnection *c;
+
+        assert(m);
+
+        SET_FOREACH(c, m->bus_connections_for_dispatch, i)
+                if (bus_connection_has_subscriber(m, c))
+                        return true;
+
+        SET_FOREACH(c, m->bus_connections, i)
+                if (bus_connection_has_subscriber(m, c))
+                        return true;
+
+        return false;
+}
+
+bool bus_connection_has_subscriber(Manager *m, DBusConnection *c) {
+        assert(m);
+        assert(c);
+
+        return !set_isempty(BUS_CONNECTION_SUBSCRIBED(m, c));
+}
index ccee74f99fdfc8fddf649b007e60fdad77813a44..01ab2fcb8d5239bdd49bb0bb5951ba7cbbc94d69 100644 (file)
@@ -105,6 +105,12 @@ int bus_property_append_ul(Manager *m, DBusMessageIter *i, const char *property,
 
 int bus_parse_strv(DBusMessage *m, char ***_l);
 
+bool bus_has_subscriber(Manager *m);
+bool bus_connection_has_subscriber(Manager *m, DBusConnection *c);
+
+#define BUS_CONNECTION_SUBSCRIBED(m, c) dbus_connection_get_data((c), (m)->subscribed_data_slot)
+#define BUS_PENDING_CALL_NAME(m, p) dbus_pending_call_get_data((p), (m)->name_data_slot)
+
 extern const char * const bus_interface_table[];
 
 #endif
index 31e9cfe8d6f937ddf7973548c599f2c0d64283cb..a090ec9b113cc2e5c3c111c55d6b97628e98d0db 100644 (file)
--- a/src/job.c
+++ b/src/job.c
@@ -76,6 +76,7 @@ void job_free(Job *j) {
         if (j->in_dbus_queue)
                 LIST_REMOVE(Job, dbus_queue, j->manager->dbus_job_queue, j);
 
+        free(j->bus_client);
         free(j);
 }
 
@@ -544,10 +545,9 @@ void job_add_to_dbus_queue(Job *j) {
         if (j->in_dbus_queue)
                 return;
 
-        if (set_isempty(j->manager->subscribed)) {
-                j->sent_dbus_new_signal = true;
-                return;
-        }
+        /* We don't check if anybody is subscribed here, since this
+         * job might just have been created and not yet assigned to a
+         * connection/client. */
 
         LIST_PREPEND(Job, dbus_queue, j->manager->dbus_job_queue, j);
         j->in_dbus_queue = true;
index 054aa530c6c23419c423265084aea53b105b7255..9c685f1dad9c6f0d532b308cc2d23269b17eaadc 100644 (file)
--- a/src/job.h
+++ b/src/job.h
@@ -102,6 +102,10 @@ struct Job {
         JobType type;
         JobState state;
 
+        /* Note that this bus object is not ref counted here. */
+        DBusConnection *bus;
+        char *bus_client;
+
         bool installed:1;
         bool in_run_queue:1;
         bool matters_to_anchor:1;
index 74a414aab11405b7a4e953686bebce3270066e95..6e571ea8b5e61702ceee3609311682656ddc6917 100644 (file)
@@ -198,7 +198,7 @@ int manager_new(ManagerRunningAs running_as, bool confirm_spawn, Manager **_m) {
 
         m->running_as = running_as;
         m->confirm_spawn = confirm_spawn;
-        m->name_data_slot = -1;
+        m->name_data_slot = m->subscribed_data_slot = -1;
         m->exit_code = _MANAGER_EXIT_CODE_INVALID;
         m->pin_cgroupfs_fd = -1;
 
index e856f5384b8ab3770e3697fabef72b00577c0784..aff4cb82efe328a19efad9963cb6985612d463ef 100644 (file)
@@ -156,7 +156,6 @@ struct Manager {
         DBusServer *private_bus;
         Set *bus_connections, *bus_connections_for_dispatch;
 
-        Set *subscribed;
         DBusMessage *queued_message; /* This is used during reloading:
                                       * before the reload we queue the
                                       * reply message here, and
@@ -164,6 +163,7 @@ struct Manager {
 
         Hashmap *watch_bus;  /* D-Bus names => Unit object n:1 */
         int32_t name_data_slot;
+        int32_t subscribed_data_slot;
 
         /* Data specific to the Automount subsystem */
         int dev_autofs_fd;
index 226ecee3adfaeb82d603864c59850a534b3fd8ec..1ad0c48a89f8eef669ab8867960295d2b2373955 100644 (file)
@@ -583,13 +583,10 @@ static DBusHandlerResult wait_filter(DBusConnection *connection, DBusMessage *me
 
 static int enable_wait_for_jobs(DBusConnection *bus) {
         DBusError error;
-        DBusMessage *m = NULL, *reply = NULL;
-        int r;
 
         assert(bus);
 
         dbus_error_init(&error);
-
         dbus_bus_add_match(bus,
                            "type='signal',"
                            "sender='org.freedesktop.systemd1',"
@@ -600,40 +597,12 @@ static int enable_wait_for_jobs(DBusConnection *bus) {
 
         if (dbus_error_is_set(&error)) {
                 log_error("Failed to add match: %s", error.message);
-                r = -EIO;
-                goto finish;
-        }
-
-        if (!(m = dbus_message_new_method_call(
-                              "org.freedesktop.systemd1",
-                              "/org/freedesktop/systemd1",
-                              "org.freedesktop.systemd1.Manager",
-                              "Subscribe"))) {
-                log_error("Could not allocate message.");
-                r = -ENOMEM;
-                goto finish;
-        }
-
-        if (!(reply = dbus_connection_send_with_reply_and_block(bus, m, -1, &error))) {
-                log_error("Failed to issue method call: %s", error.message);
-                r = -EIO;
-                goto finish;
+                dbus_error_free(&error);
+                return -EIO;
         }
 
-        r = 0;
-
-finish:
         /* This is slightly dirty, since we don't undo the match registrations. */
-
-        if (m)
-                dbus_message_unref(m);
-
-        if (reply)
-                dbus_message_unref(reply);
-
-        dbus_error_free(&error);
-
-        return r;
+        return 0;
 }
 
 static int wait_for_jobs(DBusConnection *bus, Set *s) {
index 8f5ae8af3f8610ce10abdea7c8a85c853baaf141..8b5714838dd8c6ccfcd42fad07e49e3a7702fa1c 100644 (file)
@@ -282,7 +282,8 @@ void unit_add_to_dbus_queue(Unit *u) {
         if (u->meta.load_state == UNIT_STUB || u->meta.in_dbus_queue)
                 return;
 
-        if (set_isempty(u->meta.manager->subscribed)) {
+        /* Shortcut things if nobody cares */
+        if (!bus_has_subscriber(u->meta.manager)) {
                 u->meta.sent_dbus_new_signal = true;
                 return;
         }