From 069f5e61eb128aa08b6fdff12fc6ac71c3897b48 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Tue, 21 Jan 2014 15:02:07 +0100 Subject: [PATCH] bus: implement synchronous message calls via kernel ioctl --- src/libsystemd/sd-bus/bus-kernel.c | 75 +++++++++++++++++++++--------- src/libsystemd/sd-bus/bus-kernel.h | 2 +- src/libsystemd/sd-bus/sd-bus.c | 16 ++++--- 3 files changed, 65 insertions(+), 28 deletions(-) diff --git a/src/libsystemd/sd-bus/bus-kernel.c b/src/libsystemd/sd-bus/bus-kernel.c index f85b4d567..99ac5b1ed 100644 --- a/src/libsystemd/sd-bus/bus-kernel.c +++ b/src/libsystemd/sd-bus/bus-kernel.c @@ -206,6 +206,8 @@ static int bus_message_setup_kmsg(sd_bus *b, sd_bus_message *m) { assert(m); assert(m->sealed); + /* We put this together only once, if this message is reused + * we reuse the earlier-built version */ if (m->kdbus) return 0; @@ -722,7 +724,26 @@ int bus_kernel_connect(sd_bus *b) { return bus_kernel_take_fd(b); } -int bus_kernel_write_message(sd_bus *bus, sd_bus_message *m) { +static void close_kdbus_msg(sd_bus *bus, struct kdbus_msg *k) { + uint64_t off; + struct kdbus_item *d; + + assert(bus); + assert(k); + + off = (uint8_t *)k - (uint8_t *)bus->kdbus_buffer; + ioctl(bus->input_fd, KDBUS_CMD_FREE, &off); + + KDBUS_ITEM_FOREACH(d, k, items) { + + if (d->type == KDBUS_ITEM_FDS) + close_many(d->fds, (d->size - offsetof(struct kdbus_item, fds)) / sizeof(int)); + else if (d->type == KDBUS_ITEM_PAYLOAD_MEMFD) + close_nointr_nofail(d->memfd.fd); + } +} + +int bus_kernel_write_message(sd_bus *bus, sd_bus_message *m, bool hint_sync_call) { int r; assert(bus); @@ -738,6 +759,14 @@ int bus_kernel_write_message(sd_bus *bus, sd_bus_message *m) { if (r < 0) return r; + /* If this is a synchronous method call, then let's tell the + * kernel, so that it can pass CPU time/scheduling to the + * destination for the time, if it wants to. If we + * synchronously wait for the result anyway, we won't need CPU + * anyway. */ + if (hint_sync_call) + m->kdbus->flags |= KDBUS_MSG_FLAGS_EXPECT_REPLY|KDBUS_MSG_FLAGS_SYNC_REPLY; + r = ioctl(bus->output_fd, KDBUS_CMD_MSG_SEND, m->kdbus); if (r < 0) { _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL; @@ -785,29 +814,31 @@ int bus_kernel_write_message(sd_bus *bus, sd_bus_message *m) { bus->rqueue[bus->rqueue_size++] = reply; - return 0; - } + } else if (hint_sync_call) { + struct kdbus_msg *k; - return 1; -} + k = (struct kdbus_msg *)((uint8_t *)bus->kdbus_buffer + m->kdbus->offset_reply); + assert(k); -static void close_kdbus_msg(sd_bus *bus, struct kdbus_msg *k) { - uint64_t off; - struct kdbus_item *d; - - assert(bus); - assert(k); + if (k->payload_type == KDBUS_PAYLOAD_DBUS) { - off = (uint8_t *)k - (uint8_t *)bus->kdbus_buffer; - ioctl(bus->input_fd, KDBUS_CMD_FREE, &off); - - KDBUS_ITEM_FOREACH(d, k, items) { + r = bus_kernel_make_message(bus, k); + if (r < 0) { + close_kdbus_msg(bus, k); - if (d->type == KDBUS_ITEM_FDS) - close_many(d->fds, (d->size - offsetof(struct kdbus_item, fds)) / sizeof(int)); - else if (d->type == KDBUS_ITEM_PAYLOAD_MEMFD) - close_nointr_nofail(d->memfd.fd); + /* Anybody can send us invalid messages, let's just drop them. */ + if (r == -EBADMSG || r == -EPROTOTYPE) + log_debug("Ignoring invalid message: %s", strerror(-r)); + else + return r; + } + } else { + log_debug("Ignoring message with unknown payload type %llu.", (unsigned long long) k->payload_type); + close_kdbus_msg(bus, k); + } } + + return 1; } static int push_name_owner_changed(sd_bus *bus, const char *name, const char *old_owner, const char *new_owner) { @@ -964,8 +995,8 @@ int bus_kernel_read_message(sd_bus *bus) { return -errno; } - k = (struct kdbus_msg *)((uint8_t *)bus->kdbus_buffer + recv.offset); + k = (struct kdbus_msg *)((uint8_t *)bus->kdbus_buffer + recv.offset); if (k->payload_type == KDBUS_PAYLOAD_DBUS) { r = bus_kernel_make_message(bus, k); @@ -977,8 +1008,10 @@ int bus_kernel_read_message(sd_bus *bus) { } else if (k->payload_type == KDBUS_PAYLOAD_KERNEL) r = bus_kernel_translate_message(bus, k); - else + else { + log_debug("Ignoring message with unknown payload type %llu.", (unsigned long long) k->payload_type); r = 0; + } if (r <= 0) close_kdbus_msg(bus, k); diff --git a/src/libsystemd/sd-bus/bus-kernel.h b/src/libsystemd/sd-bus/bus-kernel.h index 2aba0bbe0..b5163964e 100644 --- a/src/libsystemd/sd-bus/bus-kernel.h +++ b/src/libsystemd/sd-bus/bus-kernel.h @@ -60,7 +60,7 @@ struct memfd_cache { int bus_kernel_connect(sd_bus *b); int bus_kernel_take_fd(sd_bus *b); -int bus_kernel_write_message(sd_bus *bus, sd_bus_message *m); +int bus_kernel_write_message(sd_bus *bus, sd_bus_message *m, bool hint_sync_call); int bus_kernel_read_message(sd_bus *bus); int bus_kernel_create_bus(const char *name, bool world, char **s); diff --git a/src/libsystemd/sd-bus/sd-bus.c b/src/libsystemd/sd-bus/sd-bus.c index 95d80db24..2c458f311 100644 --- a/src/libsystemd/sd-bus/sd-bus.c +++ b/src/libsystemd/sd-bus/sd-bus.c @@ -1371,14 +1371,14 @@ int bus_seal_synthetic_message(sd_bus *b, sd_bus_message *m) { return bus_message_seal(m, 0xFFFFFFFFULL, 0); } -static int bus_write_message(sd_bus *bus, sd_bus_message *m, size_t *idx) { +static int bus_write_message(sd_bus *bus, sd_bus_message *m, bool hint_sync_call, size_t *idx) { int r; assert(bus); assert(m); if (bus->is_kernel) - r = bus_kernel_write_message(bus, m); + r = bus_kernel_write_message(bus, m, hint_sync_call); else r = bus_socket_write_message(bus, m, idx); @@ -1408,7 +1408,7 @@ static int dispatch_wqueue(sd_bus *bus) { while (bus->wqueue_size > 0) { - r = bus_write_message(bus, bus->wqueue[0], &bus->windex); + r = bus_write_message(bus, bus->wqueue[0], false, &bus->windex); if (r < 0) return r; else if (r == 0) @@ -1486,7 +1486,7 @@ static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) { } } -_public_ int sd_bus_send(sd_bus *bus, sd_bus_message *_m, uint64_t *cookie) { +static int bus_send_internal(sd_bus *bus, sd_bus_message *_m, uint64_t *cookie, bool hint_sync_call) { _cleanup_bus_message_unref_ sd_bus_message *m = sd_bus_message_ref(_m); int r; @@ -1526,7 +1526,7 @@ _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *_m, uint64_t *cookie) { if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) { size_t idx = 0; - r = bus_write_message(bus, m, &idx); + r = bus_write_message(bus, m, hint_sync_call, &idx); if (r < 0) { if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) { bus_enter_closing(bus); @@ -1562,6 +1562,10 @@ _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *_m, uint64_t *cookie) { return 1; } +_public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *cookie) { + return bus_send_internal(bus, m, cookie, false); +} + _public_ int sd_bus_send_to(sd_bus *bus, sd_bus_message *m, const char *destination, uint64_t *cookie) { int r; @@ -1755,7 +1759,7 @@ _public_ int sd_bus_call( if (r < 0) return r; - r = sd_bus_send(bus, m, &cookie); + r = bus_send_internal(bus, m, &cookie, true); if (r < 0) return r; -- 2.30.2