From a567261a29b4e19c0c195240411b7562063d99f8 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Mon, 5 Jul 2010 00:58:07 +0200 Subject: [PATCH] dbus: send signals about jobs to the clients having created them unconditionally, and thus get rid of broadcast signals in most cases --- fixme | 2 -- src/dbus-job.c | 34 ++++++++++++++++++--- src/dbus-manager.c | 25 ++++++++++++--- src/dbus-unit.c | 4 +-- src/dbus.c | 76 ++++++++++++++++++++++++++++++++++------------ src/dbus.h | 6 ++++ src/job.c | 8 ++--- src/job.h | 4 +++ src/manager.c | 2 +- src/manager.h | 2 +- src/systemctl.c | 37 ++-------------------- src/unit.c | 3 +- 12 files changed, 131 insertions(+), 72 deletions(-) diff --git a/fixme b/fixme index 49b69a629..2f133d43a 100644 --- a/fixme +++ b/fixme @@ -39,8 +39,6 @@ * systemctl daemon-reload is kaputt -* get rid of Subscribe() in systemctl - * Turn around negative options * Add missing man pages: update systemd.1, finish daemon.7 diff --git a/src/dbus-job.c b/src/dbus-job.c index 48b1588de..0956dcff6 100644 --- a/src/dbus-job.c +++ b/src/dbus-job.c @@ -146,6 +146,32 @@ const DBusObjectPathVTable bus_job_vtable = { .message_function = bus_job_message_handler }; +static int job_send_message(Job *j, DBusMessage *m) { + int r; + + assert(j); + assert(m); + + if (bus_has_subscriber(j->manager)) { + if ((r = bus_broadcast(j->manager, m)) < 0) + return r; + + } else if (j->bus_client) { + /* If nobody is subscribed, we just send the message + * to the client which created the job */ + + assert(j->bus); + + if (!dbus_message_set_destination(m, j->bus_client)) + return -ENOMEM; + + if (!dbus_connection_send(j->bus, m, NULL)) + return -ENOMEM; + } + + return 0; +} + void bus_job_send_change_signal(Job *j) { char *p = NULL; DBusMessage *m = NULL; @@ -156,7 +182,7 @@ void bus_job_send_change_signal(Job *j) { LIST_REMOVE(Job, dbus_queue, j->manager->dbus_job_queue, j); j->in_dbus_queue = false; - if (set_isempty(j->manager->subscribed)) { + if (!bus_has_subscriber(j->manager) && !j->bus_client) { j->sent_dbus_new_signal = true; return; } @@ -182,7 +208,7 @@ void bus_job_send_change_signal(Job *j) { goto oom; } - if (bus_broadcast(j->manager, m) < 0) + if (job_send_message(j, m) < 0) goto oom; free(p); @@ -208,7 +234,7 @@ void bus_job_send_removed_signal(Job *j, bool success) { assert(j); - if (set_isempty(j->manager->subscribed)) + if (!bus_has_subscriber(j->manager) && !j->bus_client) return; if (!j->sent_dbus_new_signal) @@ -227,7 +253,7 @@ void bus_job_send_removed_signal(Job *j, bool success) { DBUS_TYPE_INVALID)) goto oom; - if (bus_broadcast(j->manager, m) < 0) + if (job_send_message(j, m) < 0) goto oom; free(p); diff --git a/src/dbus-manager.c b/src/dbus-manager.c index 705a4dc97..704e5fa01 100644 --- a/src/dbus-manager.c +++ b/src/dbus-manager.c @@ -452,14 +452,25 @@ static DBusHandlerResult bus_manager_message_handler(DBusConnection *connection, } else if (dbus_message_is_method_call(message, "org.freedesktop.systemd1.Manager", "Subscribe")) { char *client; + Set *s; + + if (!(s = BUS_CONNECTION_SUBSCRIBED(m, connection))) { + if (!(s = set_new(string_hash_func, string_compare_func))) + goto oom; + + if (!(dbus_connection_set_data(connection, m->subscribed_data_slot, s, NULL))) { + set_free(s); + goto oom; + } + } if (!(client = strdup(dbus_message_get_sender(message)))) goto oom; - r = set_put(m->subscribed, client); - - if (r < 0) + if ((r = set_put(s, client)) < 0) { + free(client); return bus_send_error_reply(m, connection, message, NULL, r); + } if (!(reply = dbus_message_new_method_return(message))) goto oom; @@ -467,7 +478,7 @@ static DBusHandlerResult bus_manager_message_handler(DBusConnection *connection, } else if (dbus_message_is_method_call(message, "org.freedesktop.systemd1.Manager", "Unsubscribe")) { char *client; - if (!(client = set_remove(m->subscribed, (char*) dbus_message_get_sender(message)))) + if (!(client = set_remove(BUS_CONNECTION_SUBSCRIBED(m, connection), (char*) dbus_message_get_sender(message)))) return bus_send_error_reply(m, connection, message, NULL, -ENOENT); free(client); @@ -702,6 +713,11 @@ static DBusHandlerResult bus_manager_message_handler(DBusConnection *connection, if ((r = manager_add_job(m, job_type, u, mode, true, &j)) < 0) return bus_send_error_reply(m, connection, message, NULL, r); + if (!(j->bus_client = strdup(dbus_message_get_sender(message)))) + goto oom; + + j->bus = connection; + if (!(reply = dbus_message_new_method_return(message))) goto oom; @@ -713,6 +729,7 @@ static DBusHandlerResult bus_manager_message_handler(DBusConnection *connection, DBUS_TYPE_OBJECT_PATH, &path, DBUS_TYPE_INVALID)) goto oom; + } free(path); diff --git a/src/dbus-unit.c b/src/dbus-unit.c index 17ca7bdd8..1f74a2a7e 100644 --- a/src/dbus-unit.c +++ b/src/dbus-unit.c @@ -376,7 +376,7 @@ void bus_unit_send_change_signal(Unit *u) { LIST_REMOVE(Meta, dbus_queue, u->meta.manager->dbus_unit_queue, &u->meta); u->meta.in_dbus_queue = false; - if (set_isempty(u->meta.manager->subscribed)) { + if (!bus_has_subscriber(u->meta.manager)) { u->meta.sent_dbus_new_signal = true; return; } @@ -427,7 +427,7 @@ void bus_unit_send_removed_signal(Unit *u) { assert(u); - if (set_isempty(u->meta.manager->subscribed)) + if (!bus_has_subscriber(u->meta.manager)) return; if (!u->meta.sent_dbus_new_signal) diff --git a/src/dbus.c b/src/dbus.c index 385bf6a93..6660cf0c5 100644 --- a/src/dbus.c +++ b/src/dbus.c @@ -355,8 +355,8 @@ static DBusHandlerResult api_bus_message_filter(DBusConnection *connection, DBus DBUS_TYPE_INVALID)) log_error("Failed to parse NameOwnerChanged message: %s", error.message); else { - if (set_remove(m->subscribed, (char*) name)) - log_debug("Subscription client vanished: %s (left: %u)", name, set_size(m->subscribed)); + if (set_remove(BUS_CONNECTION_SUBSCRIBED(m, connection), (char*) name)) + log_debug("Subscription client vanished: %s (left: %u)", name, set_size(BUS_CONNECTION_SUBSCRIBED(m, connection))); if (old_owner[0] == 0) old_owner = NULL; @@ -882,13 +882,6 @@ static int bus_init_api(Manager *m) { strnull(dbus_bus_get_unique_name(m->api_bus))); dbus_free(id); - if (!m->subscribed) - if (!(m->subscribed = set_new(string_hash_func, string_compare_func))) { - log_error("Not enough memory"); - r = -ENOMEM; - goto fail; - } - return 0; fail: @@ -959,6 +952,12 @@ int bus_init(Manager *m) { return -ENOMEM; } + if (m->subscribed_data_slot < 0) + if (!dbus_pending_call_allocate_data_slot(&m->subscribed_data_slot)) { + log_error("Not enough memory"); + return -ENOMEM; + } + if ((r = bus_init_system(m)) < 0 || (r = bus_init_api(m)) < 0 || (r = bus_init_private(m)) < 0) @@ -968,9 +967,30 @@ int bus_init(Manager *m) { } static void shutdown_connection(Manager *m, DBusConnection *c) { + Set *s; + Job *j; + Iterator i; + + HASHMAP_FOREACH(j, m->jobs, i) + if (j->bus == c) { + free(j->bus_client); + j->bus_client = NULL; + + j->bus = NULL; + } + set_remove(m->bus_connections, c); set_remove(m->bus_connections_for_dispatch, c); + if ((s = BUS_CONNECTION_SUBSCRIBED(m, c))) { + char *t; + + while ((t = set_steal_first(s))) + free(t); + + set_free(s); + } + dbus_connection_set_dispatch_status_function(c, NULL, NULL, NULL); dbus_connection_flush(c); dbus_connection_close(c); @@ -988,15 +1008,6 @@ static void bus_done_api(Manager *m) { m->api_bus = NULL; } - if (m->subscribed) { - char *c; - - while ((c = set_steal_first(m->subscribed))) - free(c); - - set_free(m->subscribed); - m->subscribed = NULL; - } if (m->queued_message) { dbus_message_unref(m->queued_message); @@ -1043,6 +1054,9 @@ void bus_done(Manager *m) { if (m->name_data_slot >= 0) dbus_pending_call_free_data_slot(&m->name_data_slot); + + if (m->subscribed_data_slot >= 0) + dbus_pending_call_free_data_slot(&m->subscribed_data_slot); } static void query_pid_pending_cb(DBusPendingCall *pending, void *userdata) { @@ -1053,7 +1067,7 @@ static void query_pid_pending_cb(DBusPendingCall *pending, void *userdata) { dbus_error_init(&error); - assert_se(name = dbus_pending_call_get_data(pending, m->name_data_slot)); + assert_se(name = BUS_PENDING_CALL_NAME(m, pending)); assert_se(reply = dbus_pending_call_steal_reply(pending)); switch (dbus_message_get_type(reply)) { @@ -1538,3 +1552,27 @@ int bus_parse_strv(DBusMessage *m, char ***_l) { return 0; } + +bool bus_has_subscriber(Manager *m) { + Iterator i; + DBusConnection *c; + + assert(m); + + SET_FOREACH(c, m->bus_connections_for_dispatch, i) + if (bus_connection_has_subscriber(m, c)) + return true; + + SET_FOREACH(c, m->bus_connections, i) + if (bus_connection_has_subscriber(m, c)) + return true; + + return false; +} + +bool bus_connection_has_subscriber(Manager *m, DBusConnection *c) { + assert(m); + assert(c); + + return !set_isempty(BUS_CONNECTION_SUBSCRIBED(m, c)); +} diff --git a/src/dbus.h b/src/dbus.h index ccee74f99..01ab2fcb8 100644 --- a/src/dbus.h +++ b/src/dbus.h @@ -105,6 +105,12 @@ int bus_property_append_ul(Manager *m, DBusMessageIter *i, const char *property, int bus_parse_strv(DBusMessage *m, char ***_l); +bool bus_has_subscriber(Manager *m); +bool bus_connection_has_subscriber(Manager *m, DBusConnection *c); + +#define BUS_CONNECTION_SUBSCRIBED(m, c) dbus_connection_get_data((c), (m)->subscribed_data_slot) +#define BUS_PENDING_CALL_NAME(m, p) dbus_pending_call_get_data((p), (m)->name_data_slot) + extern const char * const bus_interface_table[]; #endif diff --git a/src/job.c b/src/job.c index 31e9cfe8d..a090ec9b1 100644 --- a/src/job.c +++ b/src/job.c @@ -76,6 +76,7 @@ void job_free(Job *j) { if (j->in_dbus_queue) LIST_REMOVE(Job, dbus_queue, j->manager->dbus_job_queue, j); + free(j->bus_client); free(j); } @@ -544,10 +545,9 @@ void job_add_to_dbus_queue(Job *j) { if (j->in_dbus_queue) return; - if (set_isempty(j->manager->subscribed)) { - j->sent_dbus_new_signal = true; - return; - } + /* We don't check if anybody is subscribed here, since this + * job might just have been created and not yet assigned to a + * connection/client. */ LIST_PREPEND(Job, dbus_queue, j->manager->dbus_job_queue, j); j->in_dbus_queue = true; diff --git a/src/job.h b/src/job.h index 054aa530c..9c685f1da 100644 --- a/src/job.h +++ b/src/job.h @@ -102,6 +102,10 @@ struct Job { JobType type; JobState state; + /* Note that this bus object is not ref counted here. */ + DBusConnection *bus; + char *bus_client; + bool installed:1; bool in_run_queue:1; bool matters_to_anchor:1; diff --git a/src/manager.c b/src/manager.c index 74a414aab..6e571ea8b 100644 --- a/src/manager.c +++ b/src/manager.c @@ -198,7 +198,7 @@ int manager_new(ManagerRunningAs running_as, bool confirm_spawn, Manager **_m) { m->running_as = running_as; m->confirm_spawn = confirm_spawn; - m->name_data_slot = -1; + m->name_data_slot = m->subscribed_data_slot = -1; m->exit_code = _MANAGER_EXIT_CODE_INVALID; m->pin_cgroupfs_fd = -1; diff --git a/src/manager.h b/src/manager.h index e856f5384..aff4cb82e 100644 --- a/src/manager.h +++ b/src/manager.h @@ -156,7 +156,6 @@ struct Manager { DBusServer *private_bus; Set *bus_connections, *bus_connections_for_dispatch; - Set *subscribed; DBusMessage *queued_message; /* This is used during reloading: * before the reload we queue the * reply message here, and @@ -164,6 +163,7 @@ struct Manager { Hashmap *watch_bus; /* D-Bus names => Unit object n:1 */ int32_t name_data_slot; + int32_t subscribed_data_slot; /* Data specific to the Automount subsystem */ int dev_autofs_fd; diff --git a/src/systemctl.c b/src/systemctl.c index 226ecee3a..1ad0c48a8 100644 --- a/src/systemctl.c +++ b/src/systemctl.c @@ -583,13 +583,10 @@ static DBusHandlerResult wait_filter(DBusConnection *connection, DBusMessage *me static int enable_wait_for_jobs(DBusConnection *bus) { DBusError error; - DBusMessage *m = NULL, *reply = NULL; - int r; assert(bus); dbus_error_init(&error); - dbus_bus_add_match(bus, "type='signal'," "sender='org.freedesktop.systemd1'," @@ -600,40 +597,12 @@ static int enable_wait_for_jobs(DBusConnection *bus) { if (dbus_error_is_set(&error)) { log_error("Failed to add match: %s", error.message); - r = -EIO; - goto finish; - } - - if (!(m = dbus_message_new_method_call( - "org.freedesktop.systemd1", - "/org/freedesktop/systemd1", - "org.freedesktop.systemd1.Manager", - "Subscribe"))) { - log_error("Could not allocate message."); - r = -ENOMEM; - goto finish; - } - - if (!(reply = dbus_connection_send_with_reply_and_block(bus, m, -1, &error))) { - log_error("Failed to issue method call: %s", error.message); - r = -EIO; - goto finish; + dbus_error_free(&error); + return -EIO; } - r = 0; - -finish: /* This is slightly dirty, since we don't undo the match registrations. */ - - if (m) - dbus_message_unref(m); - - if (reply) - dbus_message_unref(reply); - - dbus_error_free(&error); - - return r; + return 0; } static int wait_for_jobs(DBusConnection *bus, Set *s) { diff --git a/src/unit.c b/src/unit.c index 8f5ae8af3..8b5714838 100644 --- a/src/unit.c +++ b/src/unit.c @@ -282,7 +282,8 @@ void unit_add_to_dbus_queue(Unit *u) { if (u->meta.load_state == UNIT_STUB || u->meta.in_dbus_queue) return; - if (set_isempty(u->meta.manager->subscribed)) { + /* Shortcut things if nobody cares */ + if (!bus_has_subscriber(u->meta.manager)) { u->meta.sent_dbus_new_signal = true; return; } -- 2.30.2