chiark / gitweb /
dbus: send out signals when units/jobs come, go and change
authorLennart Poettering <lennart@poettering.net>
Thu, 4 Feb 2010 23:38:41 +0000 (00:38 +0100)
committerLennart Poettering <lennart@poettering.net>
Thu, 4 Feb 2010 23:38:41 +0000 (00:38 +0100)
18 files changed:
Makefile.am
dbus-job.c
dbus-manager.c
dbus-unit.c
dbus.c
dbus.h
device.c
fixme
job.c
job.h
manager.c
manager.h
mount.c
systemadm.vala
systemctl.vala
systemd-interfaces.vala
unit.c
unit.h

index 8ac2c2d..8b481a6 100644 (file)
@@ -110,3 +110,8 @@ systemadm_SOURCES = \
 
 systemadm_CPPFLAGS = $(AM_CPPFLAGS) $(DBUSGLIB_CFLAGS) $(GTK_CFLAGS)
 systemadm_LDADD = $(DBUSGLIB_LIBS) $(GTK_LIBS)
+
+CLEANFILES = \
+       systemd-interfaces.c \
+       systemctl.c \
+       systemadm.c
index 544868f..e0c1153 100644 (file)
@@ -29,6 +29,7 @@ static const char introspection[] =
         "<node>"
         " <interface name=\"org.freedesktop.systemd1.Job\">"
         "  <method name=\"Cancel\"/>"
+        "  <signal name=\"Changed\"/>"
         "  <property name=\"Id\" type=\"u\" access=\"read\"/>"
         "  <property name=\"Unit\" type=\"(so)\" access=\"read\"/>"
         "  <property name=\"JobType\" type=\"s\" access=\"read\"/>"
@@ -173,3 +174,94 @@ static DBusHandlerResult bus_job_message_handler(DBusConnection  *connection, DB
 const DBusObjectPathVTable bus_job_vtable = {
         .message_function = bus_job_message_handler
 };
+
+void bus_job_send_change_signal(Job *j) {
+        char *p = NULL;
+        DBusMessage *m = NULL;
+
+        assert(j);
+        assert(j->in_dbus_queue);
+
+        LIST_REMOVE(Job, dbus_queue, j->manager->dbus_job_queue, j);
+        j->in_dbus_queue = false;
+
+        if (set_isempty(j->manager->subscribed))
+                return;
+
+        if (!(p = job_dbus_path(j)))
+                goto oom;
+
+        if (j->sent_dbus_new_signal) {
+                /* Send a change signal */
+
+                if (!(m = dbus_message_new_signal(p, "org.freedesktop.systemd1.Job", "Changed")))
+                        goto oom;
+        } else {
+                /* Send a new signal */
+
+                if (!(m = dbus_message_new_signal("/org/freedesktop/systemd1", "org.freedesktop.systemd1", "JobNew")))
+                        goto oom;
+
+                if (!dbus_message_append_args(m,
+                                              DBUS_TYPE_UINT32, &j->id,
+                                              DBUS_TYPE_OBJECT_PATH, &p,
+                                              DBUS_TYPE_INVALID))
+                        goto oom;
+        }
+
+        if (!dbus_connection_send(j->manager->bus, m, NULL))
+                goto oom;
+
+        free(p);
+        dbus_message_unref(m);
+
+        j->sent_dbus_new_signal = true;
+
+        return;
+
+oom:
+        free(p);
+
+        if (m)
+                dbus_message_unref(m);
+
+        log_error("Failed to allocate job change signal.");
+}
+
+void bus_job_send_removed_signal(Job *j) {
+        char *p = NULL;
+        DBusMessage *m = NULL;
+
+        assert(j);
+
+        if (set_isempty(j->manager->subscribed) || !j->sent_dbus_new_signal)
+                return;
+
+        if (!(p = job_dbus_path(j)))
+                goto oom;
+
+        if (!(m = dbus_message_new_signal("/org/freedesktop/systemd1", "org.freedesktop.systemd1", "JobRemoved")))
+                goto oom;
+
+        if (!dbus_message_append_args(m,
+                                      DBUS_TYPE_UINT32, &j->id,
+                                      DBUS_TYPE_OBJECT_PATH, &p,
+                                      DBUS_TYPE_INVALID))
+                goto oom;
+
+        if (!dbus_connection_send(j->manager->bus, m, NULL))
+                goto oom;
+
+        free(p);
+        dbus_message_unref(m);
+
+        return;
+
+oom:
+        free(p);
+
+        if (m)
+                dbus_message_unref(m);
+
+        log_error("Failed to allocate job remove signal.");
+}
index bdc82df..732e586 100644 (file)
         "  <method name=\"ListJobs\">"                                  \
         "   <arg name=\"jobs\" type=\"a(usssoo)\" direction=\"out\"/>"  \
         "  </method>"                                                   \
+        "  <method name=\"Subscribe\"/>"                                \
+        "  <method name=\"Unsubscribe\"/>"                              \
+        "  <signal name=\"UnitNew\">"                                   \
+        "   <arg name=\"id\" type=\"s\"/>"                              \
+        "   <arg name=\"unit\" type=\"o\"/>"                            \
+        "  </signal>"                                                   \
+        "  <signal name=\"UnitRemoved\">"                               \
+        "   <arg name=\"id\" type=\"s\"/>"                              \
+        "   <arg name=\"unit\" type=\"o\"/>"                            \
+        "  </signal>"                                                   \
+        "  <signal name=\"JobNew\">"                                    \
+        "   <arg name=\"id\" type=\"u\"/>"                              \
+        "   <arg name=\"job\" type=\"o\"/>"                             \
+        "  </signal>"                                                   \
+        "  <signal name=\"JobRemoved\">"                                \
+        "   <arg name=\"id\" type=\"u\"/>"                              \
+        "   <arg name=\"job\" type=\"o\"/>"                             \
+        "  </signal>"                                                   \
         " </interface>"                                                 \
         BUS_PROPERTIES_INTERFACE                                        \
         BUS_INTROSPECTABLE_INTERFACE
@@ -287,6 +305,31 @@ static DBusHandlerResult bus_manager_message_handler(DBusConnection  *connection
                 if (!dbus_message_iter_close_container(&iter, &sub))
                         goto oom;
 
+        } else if (dbus_message_is_method_call(message, "org.freedesktop.systemd1",  "Subscribe")) {
+                char *client;
+
+                if (!(client = strdup(dbus_message_get_sender(message))))
+                        goto oom;
+
+                r = set_put(m->subscribed, client);
+
+                if (r < 0)
+                        return bus_send_error_reply(m, message, NULL, r);
+
+                if (!(reply = dbus_message_new_method_return(message)))
+                        goto oom;
+
+        } else if (dbus_message_is_method_call(message, "org.freedesktop.systemd1",  "Unsubscribe")) {
+                char *client;
+
+                if (!(client = set_remove(m->subscribed, (char*) dbus_message_get_sender(message))))
+                        return bus_send_error_reply(m, message, NULL, -ENOENT);
+
+                free(client);
+
+                if (!(reply = dbus_message_new_method_return(message)))
+                        goto oom;
+
         } else if (dbus_message_is_method_call(message, "org.freedesktop.DBus.Introspectable", "Introspect")) {
                 char *introspection = NULL;
                 FILE *f;
index 1c6b405..f48433b 100644 (file)
@@ -44,6 +44,7 @@ static const char introspection[] =
         "   <arg name=\"mode\" type=\"s\" direction=\"in\"/>"
         "   <arg name=\"job\" type=\"o\" direction=\"out\"/>"
         "  </method>"
+        "  <signal name=\"Changed\"/>"
         "  <property name=\"Id\" type=\"s\" access=\"read\"/>"
         "  <property name=\"Description\" type=\"s\" access=\"read\"/>"
         "  <property name=\"LoadState\" type=\"s\" access=\"read\"/>"
@@ -325,3 +326,98 @@ static DBusHandlerResult bus_unit_message_handler(DBusConnection  *connection, D
 const DBusObjectPathVTable bus_unit_vtable = {
         .message_function = bus_unit_message_handler
 };
+
+void bus_unit_send_change_signal(Unit *u) {
+        char *p = NULL;
+        DBusMessage *m = NULL;
+
+        assert(u);
+        assert(u->meta.in_dbus_queue);
+
+        LIST_REMOVE(Meta, dbus_queue, u->meta.manager->dbus_unit_queue, &u->meta);
+        u->meta.in_dbus_queue = false;
+
+        if (set_isempty(u->meta.manager->subscribed))
+                return;
+
+        if (!(p = unit_dbus_path(u)))
+                goto oom;
+
+        if (u->meta.sent_dbus_new_signal) {
+                /* Send a change signal */
+
+                if (!(m = dbus_message_new_signal(p, "org.freedesktop.systemd1.Unit", "Changed")))
+                        goto oom;
+        } else {
+                const char *id;
+                /* Send a new signal */
+
+                if (!(m = dbus_message_new_signal("/org/freedesktop/systemd1", "org.freedesktop.systemd1", "UnitNew")))
+                        goto oom;
+
+                id = unit_id(u);
+                if (!dbus_message_append_args(m,
+                                              DBUS_TYPE_STRING, &id,
+                                              DBUS_TYPE_OBJECT_PATH, &p,
+                                              DBUS_TYPE_INVALID))
+                        goto oom;
+        }
+
+        if (!dbus_connection_send(u->meta.manager->bus, m, NULL))
+                goto oom;
+
+        free(p);
+        dbus_message_unref(m);
+
+        u->meta.sent_dbus_new_signal = true;
+
+        return;
+
+oom:
+        free(p);
+
+        if (m)
+                dbus_message_unref(m);
+
+        log_error("Failed to allocate unit change/new signal.");
+}
+
+void bus_unit_send_removed_signal(Unit *u) {
+        char *p = NULL;
+        DBusMessage *m = NULL;
+        const char *id;
+
+        assert(u);
+
+        if (set_isempty(u->meta.manager->subscribed) || !u->meta.sent_dbus_new_signal)
+                return;
+
+        if (!(p = unit_dbus_path(u)))
+                goto oom;
+
+        if (!(m = dbus_message_new_signal("/org/freedesktop/systemd1", "org.freedesktop.systemd1", "UnitRemoved")))
+                goto oom;
+
+        id = unit_id(u);
+        if (!dbus_message_append_args(m,
+                                      DBUS_TYPE_STRING, &id,
+                                      DBUS_TYPE_OBJECT_PATH, &p,
+                                      DBUS_TYPE_INVALID))
+                goto oom;
+
+        if (!dbus_connection_send(u->meta.manager->bus, m, NULL))
+                goto oom;
+
+        free(p);
+        dbus_message_unref(m);
+
+        return;
+
+oom:
+        free(p);
+
+        if (m)
+                dbus_message_unref(m);
+
+        log_error("Failed to allocate unit remove signal.");
+}
diff --git a/dbus.c b/dbus.c
index 24d16a6..0088b2c 100644 (file)
--- a/dbus.c
+++ b/dbus.c
@@ -276,11 +276,55 @@ static void bus_toggle_timeout(DBusTimeout *timeout, void *data) {
                 log_error("Failed to rearm timer: %s", strerror(-r));
 }
 
-void bus_dispatch(Manager *m) {
+static DBusHandlerResult bus_message_filter(DBusConnection  *connection, DBusMessage  *message, void *data) {
+        Manager *m = data;
+        DBusError error;
+
+        assert(connection);
+        assert(message);
+        assert(m);
+
+        dbus_error_init(&error);
+
+        /* log_debug("Got D-Bus request: %s.%s() on %s", */
+        /*           dbus_message_get_interface(message), */
+        /*           dbus_message_get_member(message), */
+        /*           dbus_message_get_path(message)); */
+
+        if (dbus_message_is_signal(message, DBUS_INTERFACE_LOCAL, "Disconnected")) {
+                log_error("Warning! D-Bus connection terminated.");
+
+                /* FIXME: we probably should restart D-Bus here */
+
+        } else if (dbus_message_is_signal(message, DBUS_INTERFACE_DBUS, "NameOwnerChanged")) {
+                const char *name, *old, *new;
+
+                if (!dbus_message_get_args(message, &error,
+                                           DBUS_TYPE_STRING, &name,
+                                           DBUS_TYPE_STRING, &old,
+                                           DBUS_TYPE_STRING, &new,
+                                           DBUS_TYPE_INVALID))
+                        log_error("Failed to parse NameOwnerChanged message: %s", error.message);
+                else  {
+                        if (set_remove(m->subscribed, (char*) name))
+                                log_debug("Subscription client vanished: %s (left: %u)", name, set_size(m->subscribed));
+                }
+        }
+
+        dbus_error_free(&error);
+        return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
+}
+
+unsigned bus_dispatch(Manager *m) {
         assert(m);
 
+        if (!m->request_bus_dispatch)
+                return 0;
+
         if (dbus_connection_dispatch(m->bus) == DBUS_DISPATCH_COMPLETE)
                 m->request_bus_dispatch = false;
+
+        return 1;
 }
 
 static int request_name(Manager *m) {
@@ -328,6 +372,9 @@ int bus_init(Manager *m) {
         if (m->bus)
                 return 0;
 
+        if (!(m->subscribed = set_new(string_hash_func, string_compare_func)))
+                return -ENOMEM;
+
         dbus_connection_set_change_sigpipe(FALSE);
 
         dbus_error_init(&error);
@@ -343,7 +390,22 @@ int bus_init(Manager *m) {
             !dbus_connection_set_timeout_functions(m->bus, bus_add_timeout, bus_remove_timeout, bus_toggle_timeout, m, NULL) ||
             !dbus_connection_register_object_path(m->bus, "/org/freedesktop/systemd1", &bus_manager_vtable, m) ||
             !dbus_connection_register_fallback(m->bus, "/org/freedesktop/systemd1/unit", &bus_unit_vtable, m) ||
-            !dbus_connection_register_fallback(m->bus, "/org/freedesktop/systemd1/job", &bus_job_vtable, m)) {
+            !dbus_connection_register_fallback(m->bus, "/org/freedesktop/systemd1/job", &bus_job_vtable, m) ||
+            !dbus_connection_add_filter(m->bus, bus_message_filter, m, NULL)) {
+                bus_done(m);
+                return -ENOMEM;
+        }
+
+        dbus_bus_add_match(m->bus,
+                           "type='signal',"
+                           "sender='"DBUS_SERVICE_DBUS"',"
+                           "interface='"DBUS_INTERFACE_DBUS"',"
+                           "path='"DBUS_PATH_DBUS"'",
+                           &error);
+
+        if (dbus_error_is_set(&error)) {
+                log_error("Failed to register match: %s", error.message);
+                dbus_error_free(&error);
                 bus_done(m);
                 return -ENOMEM;
         }
@@ -371,6 +433,16 @@ void bus_done(Manager *m) {
                 dbus_connection_unref(m->bus);
                 m->bus = NULL;
         }
+
+        if (m->subscribed) {
+                char *c;
+
+                while ((c = set_steal_first(m->subscribed)))
+                        free(c);
+
+                set_free(m->subscribed);
+                m->subscribed = NULL;
+        }
 }
 
 DBusHandlerResult bus_default_message_handler(Manager *m, DBusMessage *message, const char*introspection, const BusProperty *properties) {
diff --git a/dbus.h b/dbus.h
index 64d4815..aaae89d 100644 (file)
--- a/dbus.h
+++ b/dbus.h
@@ -59,7 +59,7 @@ typedef struct BusProperty {
 int bus_init(Manager *m);
 void bus_done(Manager *m);
 
-void bus_dispatch(Manager *m);
+unsigned bus_dispatch(Manager *m);
 
 void bus_watch_event(Manager *m, Watch *w, int events);
 void bus_timeout_event(Manager *m, Watch *w, int events);
@@ -78,4 +78,10 @@ extern const DBusObjectPathVTable bus_manager_vtable;
 extern const DBusObjectPathVTable bus_job_vtable;
 extern const DBusObjectPathVTable bus_unit_vtable;
 
+void bus_unit_send_change_signal(Unit *u);
+void bus_unit_send_removed_signal(Unit *u);
+
+void bus_job_send_change_signal(Job *j);
+void bus_job_send_removed_signal(Job *j);
+
 #endif
index e51d2f3..e3814dd 100644 (file)
--- a/device.c
+++ b/device.c
@@ -212,6 +212,8 @@ static int device_process_new_device(Manager *m, struct udev_device *dev, bool u
                 device_set_state(DEVICE(u), DEVICE_AVAILABLE);
         }
 
+        unit_add_to_dbus_queue(u);
+
         return 0;
 
 fail:
diff --git a/fixme b/fixme
index 2cce6ac..40563ef 100644 (file)
--- a/fixme
+++ b/fixme
@@ -25,9 +25,7 @@
 
 - implement timer
 
-- implement mount/automount
-
-- more process attributes: cpu affinity, cpu scheduling
+- implement automount
 
 - create session/pgroup for child processes? handle input on console properly? interactive fsck? interactive luks password?
 
diff --git a/job.c b/job.c
index 4784d9c..be42eaf 100644 (file)
--- a/job.c
+++ b/job.c
@@ -55,6 +55,8 @@ void job_free(Job *j) {
 
         /* Detach from next 'bigger' objects */
         if (j->installed) {
+                bus_job_send_removed_signal(j);
+
                 if (j->unit->meta.job == j)
                         j->unit->meta.job = NULL;
 
@@ -65,6 +67,12 @@ void job_free(Job *j) {
         /* Detach from next 'smaller' objects */
         manager_transaction_unlink_job(j->manager, j);
 
+        if (j->in_run_queue)
+                LIST_REMOVE(Job, run_queue, j->manager->run_queue, j);
+
+        if (j->in_dbus_queue)
+                LIST_REMOVE(Job, dbus_queue, j->manager->dbus_job_queue, j);
+
         free(j);
 }
 
@@ -326,6 +334,7 @@ int job_run_and_invalidate(Job *j) {
                 return -EAGAIN;
 
         j->state = JOB_RUNNING;
+        job_add_to_dbus_queue(j);
 
         switch (j->type) {
 
@@ -408,6 +417,7 @@ int job_finish_and_invalidate(Job *j, bool success) {
         assert(j->installed);
 
         log_debug("Job %s/%s finished, success=%s", unit_id(j->unit), job_type_to_string(j->type), yes_no(success));
+        job_add_to_dbus_queue(j);
 
         /* Patch restart jobs so that they become normal start jobs */
         if (success && (j->type == JOB_RESTART || j->type == JOB_TRY_RESTART)) {
@@ -419,7 +429,7 @@ int job_finish_and_invalidate(Job *j, bool success) {
                 j->state = JOB_RUNNING;
                 j->type = JOB_START;
 
-                job_schedule_run(j);
+                job_add_to_run_queue(j);
                 return 0;
         }
 
@@ -463,15 +473,15 @@ int job_finish_and_invalidate(Job *j, bool success) {
         /* Try to start the next jobs that can be started */
         SET_FOREACH(other, u->meta.dependencies[UNIT_AFTER], i)
                 if (other->meta.job)
-                        job_schedule_run(other->meta.job);
+                        job_add_to_run_queue(other->meta.job);
         SET_FOREACH(other, u->meta.dependencies[UNIT_BEFORE], i)
                 if (other->meta.job)
-                        job_schedule_run(other->meta.job);
+                        job_add_to_run_queue(other->meta.job);
 
         return 0;
 }
 
-void job_schedule_run(Job *j) {
+void job_add_to_run_queue(Job *j) {
         assert(j);
         assert(j->installed);
 
@@ -482,6 +492,17 @@ void job_schedule_run(Job *j) {
         j->in_run_queue = true;
 }
 
+void job_add_to_dbus_queue(Job *j) {
+        assert(j);
+        assert(j->installed);
+
+        if (j->in_dbus_queue)
+                return;
+
+        LIST_PREPEND(Job, dbus_queue, j->manager->dbus_job_queue, j);
+        j->in_dbus_queue = true;
+}
+
 char *job_dbus_path(Job *j) {
         char *p;
 
diff --git a/job.h b/job.h
index 554f9fc..bfe2033 100644 (file)
--- a/job.h
+++ b/job.h
@@ -94,9 +94,12 @@ struct Job {
         bool in_run_queue:1;
         bool matters_to_anchor:1;
         bool forced:1;
+        bool in_dbus_queue:1;
+        bool sent_dbus_new_signal:1;
 
         LIST_FIELDS(Job, transaction);
         LIST_FIELDS(Job, run_queue);
+        LIST_FIELDS(Job, dbus_queue);
 
         LIST_HEAD(JobDependency, subject_list);
         LIST_HEAD(JobDependency, object_list);
@@ -126,7 +129,9 @@ bool job_type_is_conflicting(JobType a, JobType b);
 
 bool job_is_runnable(Job *j);
 
-void job_schedule_run(Job *j);
+void job_add_to_run_queue(Job *j);
+void job_add_to_dbus_queue(Job *j);
+
 int job_run_and_invalidate(Job *j);
 int job_finish_and_invalidate(Job *j, bool success);
 
index 5490178..57b64ad 100644 (file)
--- a/manager.c
+++ b/manager.c
@@ -665,7 +665,8 @@ static int transaction_apply(Manager *m, JobMode mode) {
                 assert(!j->transaction_next);
                 assert(!j->transaction_prev);
 
-                job_schedule_run(j);
+                job_add_to_run_queue(j);
+                job_add_to_dbus_queue(j);
         }
 
         /* As last step, kill all remaining job dependencies. */
@@ -946,14 +947,15 @@ Unit *manager_get_unit(Manager *m, const char *name) {
         return hashmap_get(m->units, name);
 }
 
-void manager_dispatch_load_queue(Manager *m) {
+unsigned manager_dispatch_load_queue(Manager *m) {
         Meta *meta;
+        unsigned n = 0;
 
         assert(m);
 
         /* Make sure we are not run recursively */
         if (m->dispatching_load_queue)
-                return;
+                return 0;
 
         m->dispatching_load_queue = true;
 
@@ -964,9 +966,11 @@ void manager_dispatch_load_queue(Manager *m) {
                 assert(meta->in_load_queue);
 
                 unit_load(UNIT(meta));
+                n++;
         }
 
         m->dispatching_load_queue = false;
+        return n;
 }
 
 int manager_load_unit(Manager *m, const char *path, Unit **_ret) {
@@ -1004,6 +1008,8 @@ int manager_load_unit(Manager *m, const char *path, Unit **_ret) {
         }
 
         unit_add_to_load_queue(ret);
+        unit_add_to_dbus_queue(ret);
+
         manager_dispatch_load_queue(m);
 
         *_ret = ret;
@@ -1045,11 +1051,12 @@ void manager_clear_jobs(Manager *m) {
                 job_free(j);
 }
 
-void manager_dispatch_run_queue(Manager *m) {
+unsigned manager_dispatch_run_queue(Manager *m) {
         Job *j;
+        unsigned n = 0;
 
         if (m->dispatching_run_queue)
-                return;
+                return 0;
 
         m->dispatching_run_queue = true;
 
@@ -1058,9 +1065,42 @@ void manager_dispatch_run_queue(Manager *m) {
                 assert(j->in_run_queue);
 
                 job_run_and_invalidate(j);
+                n++;
         }
 
         m->dispatching_run_queue = false;
+        return n;
+}
+
+unsigned manager_dispatch_dbus_queue(Manager *m) {
+        Job *j;
+        Meta *meta;
+        unsigned n = 0;
+
+        assert(m);
+
+        if (m->dispatching_dbus_queue)
+                return 0;
+
+        m->dispatching_dbus_queue = true;
+
+        while ((meta = m->dbus_unit_queue)) {
+                Unit *u = (Unit*) meta;
+                assert(u->meta.in_dbus_queue);
+
+                bus_unit_send_change_signal(u);
+                n++;
+        }
+
+        while ((j = m->dbus_job_queue)) {
+                assert(j->in_dbus_queue);
+
+                bus_job_send_change_signal(j);
+                n++;
+        }
+
+        m->dispatching_dbus_queue = false;
+        return n;
 }
 
 static int manager_dispatch_sigchld(Manager *m) {
@@ -1227,12 +1267,17 @@ int manager_loop(Manager *m) {
                         sleep(1);
                 }
 
-                manager_dispatch_run_queue(m);
+                if (manager_dispatch_load_queue(m) > 0)
+                        continue;
 
-                if (m->request_bus_dispatch) {
-                        bus_dispatch(m);
+                if (manager_dispatch_run_queue(m) > 0)
+                        continue;
+
+                if (bus_dispatch(m) > 0)
+                        continue;
+
+                if (manager_dispatch_dbus_queue(m) > 0)
                         continue;
-                }
 
                 if ((n = epoll_wait(m->epoll_fd, &event, 1, -1)) < 0) {
 
index ea4e0a4..b92680d 100644 (file)
--- a/manager.h
+++ b/manager.h
@@ -89,12 +89,20 @@ struct Manager {
         /* Jobs that need to be run */
         LIST_HEAD(Job, run_queue);   /* more a stack than a queue, too */
 
+        /* Units and jobs that have not yet been announced via
+         * D-Bus. When something about a job changes it is added here
+         * if it is not in there yet. This allows easy coalescing of
+         * D-Bus change signals. */
+        LIST_HEAD(Meta, dbus_unit_queue);
+        LIST_HEAD(Job, dbus_job_queue);
+
         /* Jobs to be added */
         Hashmap *transaction_jobs;      /* Unit object => Job object list 1:1 */
         JobDependency *transaction_anchor;
 
         bool dispatching_load_queue:1;
         bool dispatching_run_queue:1;
+        bool dispatching_dbus_queue:1;
 
         bool is_init:1;
 
@@ -117,6 +125,7 @@ struct Manager {
 
         /* Data specific to the D-Bus subsystem */
         DBusConnection *bus;
+        Set *subscribed;
 };
 
 Manager* manager_new(void);
@@ -140,8 +149,9 @@ void manager_transaction_unlink_job(Manager *m, Job *j);
 
 void manager_clear_jobs(Manager *m);
 
-void manager_dispatch_load_queue(Manager *m);
-void manager_dispatch_run_queue(Manager *m);
+unsigned manager_dispatch_load_queue(Manager *m);
+unsigned manager_dispatch_run_queue(Manager *m);
+unsigned manager_dispatch_dbus_queue(Manager *m);
 
 int manager_loop(Manager *m);
 
diff --git a/mount.c b/mount.c
index 7394fb9..4d05081 100644 (file)
--- a/mount.c
+++ b/mount.c
@@ -252,6 +252,8 @@ static int mount_add_one(Manager *m, const char *what, const char *where, bool l
         if ((r = mount_add_path_links(MOUNT(u))) < 0)
                 goto fail;
 
+        unit_add_to_dbus_queue(u);
+
         return 0;
 
 fail:
index fabf8b2..edb3a38 100644 (file)
@@ -83,7 +83,7 @@ public class MainWindow : Window {
                 position = WindowPosition.CENTER;
                 set_default_size(1000, 700);
                 set_border_width(12);
-                destroy.connect(Gtk.main_quit);
+                destroy += Gtk.main_quit;
 
                 Notebook notebook = new Notebook();
                 add(notebook);
@@ -97,14 +97,14 @@ public class MainWindow : Window {
                 job_vbox.set_border_width(12);
 
 
-                unit_model = new ListStore(6, typeof(string), typeof(string), typeof(string), typeof(string), typeof(string), typeof(string), typeof(string));
-                job_model = new ListStore(5, typeof(string), typeof(string), typeof(string), typeof(string), typeof(string), typeof(string));
+                unit_model = new ListStore(6, typeof(string), typeof(string), typeof(string), typeof(string), typeof(string), typeof(Unit));
+                job_model = new ListStore(5, typeof(string), typeof(string), typeof(string), typeof(string), typeof(Job));
 
                 unit_view = new TreeView.with_model(unit_model);
                 job_view = new TreeView.with_model(job_model);
 
-                unit_view.cursor_changed.connect(unit_changed);
-                job_view.cursor_changed.connect(job_changed);
+                unit_view.cursor_changed += unit_changed;
+                job_view.cursor_changed += job_changed;
 
                 unit_view.insert_column_with_attributes(-1, "Unit", new CellRendererText(), "text", 0);
                 unit_view.insert_column_with_attributes(-1, "Description", new CellRendererText(), "text", 1);
@@ -189,10 +189,10 @@ public class MainWindow : Window {
                 reload_button = new Button.with_mnemonic("_Reload");
                 restart_button = new Button.with_mnemonic("Res_tart");
 
-                start_button.clicked.connect(on_start);
-                stop_button.clicked.connect(on_stop);
-                reload_button.clicked.connect(on_reload);
-                restart_button.clicked.connect(on_restart);
+                start_button.clicked += on_start;
+                stop_button.clicked += on_stop;
+                reload_button.clicked += on_reload;
+                restart_button.clicked += on_restart;
 
                 bbox.pack_start(start_button, false, true, 0);
                 bbox.pack_start(stop_button, false, true, 0);
@@ -206,17 +206,24 @@ public class MainWindow : Window {
 
                 cancel_button = new Button.with_mnemonic("_Cancel");
 
-                cancel_button.clicked.connect(on_cancel);
+                cancel_button.clicked += on_cancel;
 
                 bbox.pack_start(cancel_button, false, true, 0);
 
                 bus = Bus.get(BusType.SESSION);
 
-                manager = bus.get_object (
+                manager = bus.get_object(
                                 "org.freedesktop.systemd1",
                                 "/org/freedesktop/systemd1",
                                 "org.freedesktop.systemd1") as Manager;
 
+                manager.unit_new += on_unit_new;
+                manager.job_new += on_job_new;
+                manager.unit_removed += on_unit_removed;
+                manager.job_removed += on_job_removed;
+
+                manager.subscribe();
+
                 clear_unit();
                 populate_unit_model();
                 populate_job_model();
@@ -230,6 +237,11 @@ public class MainWindow : Window {
                 foreach (var i in list) {
                         TreeIter iter;
 
+                        Unit u = bus.get_object(
+                                        "org.freedesktop.systemd1",
+                                        i.unit_path,
+                                        "org.freedesktop.systemd1.Unit") as Unit;
+
                         unit_model.append(out iter);
                         unit_model.set(iter,
                                        0, i.id,
@@ -237,7 +249,7 @@ public class MainWindow : Window {
                                        2, i.load_state,
                                        3, i.active_state,
                                        4, i.job_type != "" ? "→ %s".printf(i.job_type) : "",
-                                       5, i.unit_path);
+                                       5, u);
                 }
         }
 
@@ -249,13 +261,18 @@ public class MainWindow : Window {
                 foreach (var i in list) {
                         TreeIter iter;
 
+                        Job j = bus.get_object(
+                                        "org.freedesktop.systemd1",
+                                        i.job_path,
+                                        "org.freedesktop.systemd1.Job") as Job;
+
                         job_model.append(out iter);
                         job_model.set(iter,
                                       0, "%u".printf(i.id),
                                       1, i.name,
                                       2, "→ %s".printf(i.type),
                                       3, i.state,
-                                      4, i.job_path);
+                                      4, j);
                 }
         }
 
@@ -267,15 +284,12 @@ public class MainWindow : Window {
                         return null;
 
                 TreeIter iter;
-                string path;
+                Unit u;
 
                 unit_model.get_iter(out iter, p);
-                unit_model.get(iter, 5, out path);
+                unit_model.get(iter, 5, out u);
 
-                return bus.get_object (
-                                "org.freedesktop.systemd1",
-                                path,
-                                "org.freedesktop.systemd1.Unit") as Unit;
+                return u;
         }
 
         public void unit_changed() {
@@ -344,15 +358,12 @@ public class MainWindow : Window {
                         return null;
 
                 TreeIter iter;
-                string path;
+                Job *j;
 
                 job_model.get_iter(out iter, p);
-                job_model.get(iter, 4, out path);
+                job_model.get(iter, 4, out j);
 
-                return bus.get_object (
-                                "org.freedesktop.systemd1",
-                                path,
-                                "org.freedesktop.systemd1.Job") as Job;
+                return j;
         }
 
         public void job_changed() {
@@ -440,6 +451,67 @@ public class MainWindow : Window {
                         message("%s", e.message);
                 }
         }
+
+        public void on_unit_new(string id, ObjectPath path) {
+                stderr.printf("new path %s", path);
+
+                Unit u = bus.get_object(
+                                "org.freedesktop.systemd1",
+                                path,
+                                "org.freedesktop.systemd1.Unit") as Unit;
+
+                string t = "";
+                Unit.JobLink jl = u.job;
+
+                if (jl.id != 0) {
+                        Job j = bus.get_object(
+                                        "org.freedesktop.systemd1",
+                                        jl.path,
+                                        "org.freedesktop.systemd1.Job") as Job;
+
+                        t = j.job_type;
+                }
+
+                TreeIter iter;
+                unit_model.append(out iter);
+                unit_model.set(iter,
+                               0, u.id,
+                               1, u.description,
+                               2, u.load_state,
+                               3, u.active_state,
+                               4, t != "" ? "→ %s".printf(t) : "",
+                               5, u);
+        }
+
+        public void on_job_new(uint32 id, ObjectPath path) {
+                stderr.printf("new path %s", path);
+
+                Job j = bus.get_object(
+                                "org.freedesktop.systemd1",
+                                path,
+                                "org.freedesktop.systemd1.Job") as Job;
+
+                TreeIter iter;
+                job_model.append(out iter);
+                job_model.set(iter,
+                              0, "%u".printf(j.id),
+                              1, j.unit.id,
+                              2, "→ %s".printf(j.job_type),
+                              3, j.state,
+                              4, j);
+        }
+
+        public void on_unit_removed(string id, ObjectPath path) {
+                stdout.printf("Unit %s removed.\n", id);
+
+                /* FIXME */
+        }
+
+        public void on_job_removed(uint32 id, ObjectPath path) {
+                stdout.printf("Job %u removed.\n", id);
+
+                /* FIXME */
+        }
 }
 
 int main (string[] args) {
index 5ffceca..fbcdefe 100644 (file)
@@ -42,6 +42,22 @@ public static int unit_info_compare(void* key1, void* key2) {
         return Posix.strcmp(u1->id, u2->id);
 }
 
+public void on_unit_new(string id, ObjectPath path) {
+        stdout.printf("Unit %s added.\n", id);
+}
+
+public void on_job_new(uint32 id, ObjectPath path) {
+        stdout.printf("Job %u added.\n", id);
+}
+
+public void on_unit_removed(string id, ObjectPath path) {
+        stdout.printf("Unit %s removed.\n", id);
+}
+
+public void on_job_removed(uint32 id, ObjectPath path) {
+        stdout.printf("Job %u removed.\n", id);
+}
+
 static const OptionEntry entries[] = {
         { "type",    't', 0, OptionArg.STRING, out type,    "List only particular type of units", "TYPE" },
         { "all",     'a', 0, OptionArg.NONE,   out all,     "Show all units, including dead ones", null  },
@@ -51,8 +67,20 @@ static const OptionEntry entries[] = {
 
 int main (string[] args) {
 
-        OptionContext context = new OptionContext(" -- Control systemd");
+        OptionContext context = new OptionContext(" [COMMAND [ARGUMENT...]]");
         context.add_main_entries(entries, null);
+        context.set_description(
+                        "Commands:\n" +
+                        "  list-units          List units\n" +
+                        "  list-jobs           List jobs\n" +
+                        "  clear-jobs          Cancel all jobs\n" +
+                        "  load [NAME...]      Load one or more units\n" +
+                        "  cancel [JOB...]     Cancel one or more jobs\n" +
+                        "  start [NAME...]     Start on or more units\n" +
+                        "  stop [NAME...]      Stop on or more units\n" +
+                        "  restart [NAME...]   Restart on or more units\n" +
+                        "  reload [NAME...]    Reload on or more units\n" +
+                        "  monitor             Monitor unit/job changes\n");
 
         try {
                 context.parse(ref args);
@@ -179,6 +207,18 @@ int main (string[] args) {
                                         u.reload(mode);
                         }
 
+                } else if (args[1] == "monitor") {
+
+                        manager.subscribe();
+
+                        manager.unit_new += on_unit_new;
+                        manager.unit_removed += on_unit_removed;
+                        manager.job_new += on_job_new;
+                        manager.job_removed += on_job_removed;
+
+                        MainLoop l = new MainLoop();
+                        l.run();
+
                 } else {
                         stderr.printf("Unknown command %s.\n", args[1]);
                         return 1;
index 2196366..6eba8a5 100644 (file)
@@ -50,10 +50,23 @@ public interface Manager : DBus.Object {
         public abstract ObjectPath get_job(uint32 id) throws DBus.Error;
 
         public abstract void clear_jobs() throws DBus.Error;
+
+        public abstract void subscribe() throws DBus.Error;
+        public abstract void unsubscribe() throws DBus.Error;
+
+        public abstract signal void unit_new(string id, ObjectPath path);
+        public abstract signal void unit_removed(string id, ObjectPath path);
+        public abstract signal void job_new(uint32 id, ObjectPath path);
+        public abstract signal void job_removed(uint32 id, ObjectPath path);
 }
 
 [DBus (name = "org.freedesktop.systemd1.Unit")]
 public interface Unit : DBus.Object {
+        public struct JobLink {
+                uint32 id;
+                ObjectPath path;
+        }
+
         public abstract string id { owned get; }
         public abstract string description { owned get; }
         public abstract string load_state { owned get; }
@@ -63,18 +76,29 @@ public interface Unit : DBus.Object {
         public abstract uint64 active_exit_timestamp { owned get; }
         public abstract bool can_reload { owned get; }
         public abstract bool can_start { owned get; }
+        public abstract JobLink job { owned get; /* FIXME: this setter is a temporary fix to make valac not segfault */ set; }
 
         public abstract ObjectPath start(string mode) throws DBus.Error;
         public abstract ObjectPath stop(string mode) throws DBus.Error;
         public abstract ObjectPath restart(string mode) throws DBus.Error;
         public abstract ObjectPath reload(string mode) throws DBus.Error;
+
+        public abstract signal void changed();
 }
 
 [DBus (name = "org.freedesktop.systemd1.Job")]
 public interface Job : DBus.Object {
+        public struct UnitLink {
+                string id;
+                ObjectPath path;
+        }
+
         public abstract uint32 id { owned get; }
         public abstract string state { owned get; }
         public abstract string job_type { owned get; }
+        public abstract UnitLink unit { owned get; /* FIXME: this setter is a temporary fix to make valac not segfault */ set; }
 
         public abstract void cancel() throws DBus.Error;
+
+        public abstract signal void changed();
 }
diff --git a/unit.c b/unit.c
index a14e18e..c27e4ec 100644 (file)
--- a/unit.c
+++ b/unit.c
@@ -174,6 +174,7 @@ int unit_add_name(Unit *u, const char *text) {
         if (!u->meta.id)
                 u->meta.id = s;
 
+        unit_add_to_dbus_queue(u);
         return 0;
 }
 
@@ -189,6 +190,8 @@ int unit_choose_id(Unit *u, const char *name) {
                 return -ENOENT;
 
         u->meta.id = s;
+
+        unit_add_to_dbus_queue(u);
         return 0;
 }
 
@@ -202,6 +205,8 @@ int unit_set_description(Unit *u, const char *description) {
 
         free(u->meta.description);
         u->meta.description = s;
+
+        unit_add_to_dbus_queue(u);
         return 0;
 }
 
@@ -215,6 +220,16 @@ void unit_add_to_load_queue(Unit *u) {
         u->meta.in_load_queue = true;
 }
 
+void unit_add_to_dbus_queue(Unit *u) {
+        assert(u);
+
+        if (u->meta.load_state == UNIT_STUB || u->meta.in_dbus_queue || set_isempty(u->meta.manager->subscribed))
+                return;
+
+        LIST_PREPEND(Meta, dbus_queue, u->meta.manager->dbus_unit_queue, &u->meta);
+        u->meta.in_dbus_queue = true;
+}
+
 static void bidi_set_free(Unit *u, Set *s) {
         Iterator i;
         Unit *other;
@@ -241,6 +256,8 @@ void unit_free(Unit *u) {
 
         assert(u);
 
+        bus_unit_send_removed_signal(u);
+
         /* Detach from next 'bigger' objects */
 
         SET_FOREACH(t, u->meta.names, i)
@@ -252,6 +269,9 @@ void unit_free(Unit *u) {
         if (u->meta.in_load_queue)
                 LIST_REMOVE(Meta, load_queue, u->meta.manager->load_queue, &u->meta);
 
+        if (u->meta.in_dbus_queue)
+                LIST_REMOVE(Meta, dbus_queue, u->meta.manager->dbus_unit_queue, &u->meta);
+
         if (u->meta.load_state == UNIT_LOADED)
                 if (UNIT_VTABLE(u)->done)
                         UNIT_VTABLE(u)->done(u);
@@ -325,6 +345,8 @@ int unit_merge(Unit *u, Unit *other) {
                 if ((r = ensure_merge(&u->meta.dependencies[d], other->meta.dependencies[d])) < 0)
                         return r;
 
+        unit_add_to_dbus_queue(u);
+
         return 0;
 }
 
@@ -437,10 +459,12 @@ int unit_load(Unit *u) {
                         goto fail;
 
         u->meta.load_state = UNIT_LOADED;
+        unit_add_to_dbus_queue(u);
         return 0;
 
 fail:
         u->meta.load_state = UNIT_FAILED;
+        unit_add_to_dbus_queue(u);
         return r;
 }
 
@@ -467,6 +491,7 @@ int unit_start(Unit *u) {
          * restart" state where it waits for a holdoff timer to elapse
          * before it will start again. */
 
+        unit_add_to_dbus_queue(u);
         return UNIT_VTABLE(u)->start(u);
 }
 
@@ -496,6 +521,7 @@ int unit_stop(Unit *u) {
         if (state == UNIT_DEACTIVATING)
                 return 0;
 
+        unit_add_to_dbus_queue(u);
         return UNIT_VTABLE(u)->stop(u);
 }
 
@@ -519,6 +545,7 @@ int unit_reload(Unit *u) {
         if (unit_active_state(u) != UNIT_ACTIVE)
                 return -ENOEXEC;
 
+        unit_add_to_dbus_queue(u);
         return UNIT_VTABLE(u)->reload(u);
 }
 
@@ -649,7 +676,7 @@ void unit_notify(Unit *u, UnitActiveState os, UnitActiveState ns) {
                         /* So we reached a different state for this
                          * job. Let's see if we can run it now if it
                          * failed previously due to EAGAIN. */
-                        job_schedule_run(u->meta.job);
+                        job_add_to_run_queue(u->meta.job);
 
                 else {
                         assert(u->meta.job->state == JOB_RUNNING);
@@ -718,6 +745,8 @@ void unit_notify(Unit *u, UnitActiveState os, UnitActiveState ns) {
         /* Maybe we finished startup and are now ready for being
          * stopped because unneeded? */
         unit_check_uneeded(u);
+
+        unit_add_to_dbus_queue(u);
 }
 
 int unit_watch_fd(Unit *u, int fd, uint32_t events, Watch *w) {
@@ -921,6 +950,7 @@ int unit_add_dependency(Unit *u, UnitDependency d, Unit *other) {
                 return r;
         }
 
+        unit_add_to_dbus_queue(u);
         return 0;
 }
 
diff --git a/unit.h b/unit.h
index 3b7f699..e4034fd 100644 (file)
--- a/unit.h
+++ b/unit.h
@@ -131,6 +131,8 @@ struct Meta {
         Job *job;
 
         bool in_load_queue:1;
+        bool in_dbus_queue:1;
+        bool sent_dbus_new_signal:1;
 
         /* If we go down, pull down everything that depends on us, too */
         bool recursive_stop;
@@ -146,6 +148,9 @@ struct Meta {
 
         /* Per type list */
         LIST_FIELDS(Meta, units_per_type);
+
+        /* D-Bus queue */
+        LIST_FIELDS(Meta, dbus_queue);
 };
 
 #include "service.h"
@@ -243,6 +248,7 @@ int unit_choose_id(Unit *u, const char *name);
 int unit_set_description(Unit *u, const char *description);
 
 void unit_add_to_load_queue(Unit *u);
+void unit_add_to_dbus_queue(Unit *u);
 
 int unit_merge(Unit *u, Unit *other);