From 766c580959336be16f7c724b158f8e8fbaba2e9a Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Wed, 22 Jan 2014 20:26:58 +0100 Subject: [PATCH] bus: add sd_bus_process_priority() to support prioq mode of kdbus --- src/libsystemd/libsystemd.sym | 1 + src/libsystemd/sd-bus/bus-kernel.c | 7 +++++- src/libsystemd/sd-bus/bus-kernel.h | 2 +- src/libsystemd/sd-bus/sd-bus.c | 30 +++++++++++++++++-------- src/libsystemd/sd-bus/test-bus-kernel.c | 3 +++ src/systemd/sd-bus.h | 1 + 6 files changed, 33 insertions(+), 11 deletions(-) diff --git a/src/libsystemd/libsystemd.sym b/src/libsystemd/libsystemd.sym index fc9c4f830..984bc155a 100644 --- a/src/libsystemd/libsystemd.sym +++ b/src/libsystemd/libsystemd.sym @@ -46,6 +46,7 @@ global: sd_bus_get_events; sd_bus_get_timeout; sd_bus_process; + sd_bus_process_priority; sd_bus_wait; sd_bus_flush; sd_bus_get_current; diff --git a/src/libsystemd/sd-bus/bus-kernel.c b/src/libsystemd/sd-bus/bus-kernel.c index 77ad5ca50..27c2e0efc 100644 --- a/src/libsystemd/sd-bus/bus-kernel.c +++ b/src/libsystemd/sd-bus/bus-kernel.c @@ -1039,7 +1039,7 @@ static int bus_kernel_translate_message(sd_bus *bus, struct kdbus_msg *k) { return translate[found->type - _KDBUS_ITEM_KERNEL_BASE](bus, k, found); } -int bus_kernel_read_message(sd_bus *bus) { +int bus_kernel_read_message(sd_bus *bus, bool hint_priority, int64_t priority) { struct kdbus_cmd_recv recv = {}; struct kdbus_msg *k; int r; @@ -1050,6 +1050,11 @@ int bus_kernel_read_message(sd_bus *bus) { if (r < 0) return r; + if (hint_priority) { + recv.flags |= KDBUS_RECV_USE_PRIORITY; + recv.priority = priority; + } + r = ioctl(bus->input_fd, KDBUS_CMD_MSG_RECV, &recv); if (r < 0) { if (errno == EAGAIN) diff --git a/src/libsystemd/sd-bus/bus-kernel.h b/src/libsystemd/sd-bus/bus-kernel.h index b5163964e..63df63e4b 100644 --- a/src/libsystemd/sd-bus/bus-kernel.h +++ b/src/libsystemd/sd-bus/bus-kernel.h @@ -61,7 +61,7 @@ int bus_kernel_connect(sd_bus *b); int bus_kernel_take_fd(sd_bus *b); int bus_kernel_write_message(sd_bus *bus, sd_bus_message *m, bool hint_sync_call); -int bus_kernel_read_message(sd_bus *bus); +int bus_kernel_read_message(sd_bus *bus, bool hint_priority, int64_t priority); int bus_kernel_create_bus(const char *name, bool world, char **s); int bus_kernel_create_namespace(const char *name, char **s); diff --git a/src/libsystemd/sd-bus/sd-bus.c b/src/libsystemd/sd-bus/sd-bus.c index a8295b277..9f8c244bf 100644 --- a/src/libsystemd/sd-bus/sd-bus.c +++ b/src/libsystemd/sd-bus/sd-bus.c @@ -1458,11 +1458,11 @@ static int dispatch_wqueue(sd_bus *bus) { return ret; } -static int bus_read_message(sd_bus *bus) { +static int bus_read_message(sd_bus *bus, bool hint_priority, int64_t priority) { assert(bus); if (bus->is_kernel) - return bus_kernel_read_message(bus); + return bus_kernel_read_message(bus, hint_priority, priority); else return bus_socket_read_message(bus); } @@ -1479,13 +1479,17 @@ int bus_rqueue_make_room(sd_bus *bus) { return 0; } -static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) { +static int dispatch_rqueue(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **m) { int r, ret = 0; assert(bus); assert(m); assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO); + /* Note that the priority logic is only available on kdbus, + * where the rqueue is unused. We check the rqueue here + * anyway, because it's simple... */ + for (;;) { if (bus->rqueue_size > 0) { /* Dispatch a queued message */ @@ -1497,7 +1501,7 @@ static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) { } /* Try to read a new message */ - r = bus_read_message(bus); + r = bus_read_message(bus, hint_priority, priority); if (r < 0) return r; if (r == 0) @@ -1837,7 +1841,7 @@ _public_ int sd_bus_call( i++; } - r = bus_read_message(bus); + r = bus_read_message(bus, false, 0); if (r < 0) { if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) { bus_enter_closing(bus); @@ -2203,7 +2207,7 @@ finish: return r; } -static int process_running(sd_bus *bus, sd_bus_message **ret) { +static int process_running(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **ret) { _cleanup_bus_message_unref_ sd_bus_message *m = NULL; int r; @@ -2218,7 +2222,7 @@ static int process_running(sd_bus *bus, sd_bus_message **ret) { if (r != 0) goto null_message; - r = dispatch_rqueue(bus, &m); + r = dispatch_rqueue(bus, hint_priority, priority, &m); if (r < 0) return r; if (!m) @@ -2344,7 +2348,7 @@ finish: return r; } -_public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) { +static int bus_process_internal(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **ret) { BUS_DONT_DESTROY(bus); int r; @@ -2393,7 +2397,7 @@ _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) { case BUS_RUNNING: case BUS_HELLO: - r = process_running(bus, ret); + r = process_running(bus, hint_priority, priority, ret); if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) { bus_enter_closing(bus); r = 1; @@ -2411,6 +2415,14 @@ _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) { assert_not_reached("Unknown state"); } +_public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) { + return bus_process_internal(bus, false, 0, ret); +} + +_public_ int sd_bus_process_priority(sd_bus *bus, int64_t priority, sd_bus_message **ret) { + return bus_process_internal(bus, true, priority, ret); +} + static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) { struct pollfd p[2] = {}; int r, e, n; diff --git a/src/libsystemd/sd-bus/test-bus-kernel.c b/src/libsystemd/sd-bus/test-bus-kernel.c index 0fab88021..30cc313af 100644 --- a/src/libsystemd/sd-bus/test-bus-kernel.c +++ b/src/libsystemd/sd-bus/test-bus-kernel.c @@ -110,6 +110,9 @@ int main(int argc, char *argv[]) { r = sd_bus_try_close(b); assert_se(r == -EBUSY); + r = sd_bus_process_priority(b, -10, &m); + assert_se(r == -ENOMSG); + r = sd_bus_process(b, &m); assert_se(r > 0); assert_se(m); diff --git a/src/systemd/sd-bus.h b/src/systemd/sd-bus.h index 6f9443aa3..01f6275c4 100644 --- a/src/systemd/sd-bus.h +++ b/src/systemd/sd-bus.h @@ -139,6 +139,7 @@ int sd_bus_get_fd(sd_bus *bus); int sd_bus_get_events(sd_bus *bus); int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec); int sd_bus_process(sd_bus *bus, sd_bus_message **r); +int sd_bus_process_priority(sd_bus *bus, int64_t max_priority, sd_bus_message **r); int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec); int sd_bus_flush(sd_bus *bus); -- 2.30.2