From 7d22c717329c6317f97ccd0f68040a3a2b98e760 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Fri, 29 Nov 2013 21:29:16 +0100 Subject: [PATCH] bus: add the ability for backends to queue to input messages at the same time We need this so that one incoming kernel message can result in two high-level bus messages, for the case where we synthesize NameAcquired and NameOwnerChanged in the same instance. --- src/libsystemd-bus/bus-internal.h | 5 +- src/libsystemd-bus/bus-kernel.c | 64 +++++++++-------- src/libsystemd-bus/bus-kernel.h | 2 +- src/libsystemd-bus/bus-socket.c | 21 ++++-- src/libsystemd-bus/bus-socket.h | 2 +- src/libsystemd-bus/sd-bus.c | 116 +++++++++++++++++------------- 6 files changed, 122 insertions(+), 88 deletions(-) diff --git a/src/libsystemd-bus/bus-internal.h b/src/libsystemd-bus/bus-internal.h index 7a7d8f5ff..f21cf87ea 100644 --- a/src/libsystemd-bus/bus-internal.h +++ b/src/libsystemd-bus/bus-internal.h @@ -166,7 +166,7 @@ struct sd_bus { size_t rbuffer_size; sd_bus_message **rqueue; - unsigned rqueue_size; + unsigned rqueue_size, rqueue_allocated; sd_bus_message **wqueue; unsigned wqueue_size; @@ -296,6 +296,9 @@ int bus_next_address(sd_bus *bus); int bus_seal_message(sd_bus *b, sd_bus_message *m); +int bus_rqueue_make_room(sd_bus *bus, unsigned n); +int bus_rqueue_push(sd_bus *bus, sd_bus_message *m); + bool bus_pid_changed(sd_bus *bus); char *bus_address_escape(const char *v); diff --git a/src/libsystemd-bus/bus-kernel.c b/src/libsystemd-bus/bus-kernel.c index cffdf0995..54e85c944 100644 --- a/src/libsystemd-bus/bus-kernel.c +++ b/src/libsystemd-bus/bus-kernel.c @@ -425,12 +425,11 @@ static void close_kdbus_msg(sd_bus *bus, struct kdbus_msg *k) { } } -static int return_name_owner_changed(sd_bus *bus, const char *name, const char *old_owner, const char *new_owner, sd_bus_message **ret) { +static int push_name_owner_changed(sd_bus *bus, const char *name, const char *old_owner, const char *new_owner) { _cleanup_bus_message_unref_ sd_bus_message *m = NULL; int r; assert(bus); - assert(ret); r = sd_bus_message_new_signal( bus, @@ -451,19 +450,20 @@ static int return_name_owner_changed(sd_bus *bus, const char *name, const char * if (r < 0) return r; - *ret = m; - m = NULL; + r = bus_rqueue_push(bus, m); + if (r < 0) + return r; + m = NULL; return 1; } -static int translate_name_change(sd_bus *bus, struct kdbus_msg *k, struct kdbus_item *d, sd_bus_message **ret) { +static int translate_name_change(sd_bus *bus, struct kdbus_msg *k, struct kdbus_item *d) { char new_owner[UNIQUE_NAME_MAX], old_owner[UNIQUE_NAME_MAX]; assert(bus); assert(k); assert(d); - assert(ret); if (d->name_change.flags != 0) return 0; @@ -478,34 +478,31 @@ static int translate_name_change(sd_bus *bus, struct kdbus_msg *k, struct kdbus_ else sprintf(new_owner, ":1.%llu", (unsigned long long) d->name_change.new_id); - return return_name_owner_changed(bus, d->name_change.name, old_owner, new_owner, ret); + return push_name_owner_changed(bus, d->name_change.name, old_owner, new_owner); } -static int translate_id_change(sd_bus *bus, struct kdbus_msg *k, struct kdbus_item *d, sd_bus_message **ret) { +static int translate_id_change(sd_bus *bus, struct kdbus_msg *k, struct kdbus_item *d) { char owner[UNIQUE_NAME_MAX]; assert(bus); assert(k); assert(d); - assert(ret); sprintf(owner, ":1.%llu", d->id_change.id); - return return_name_owner_changed( + return push_name_owner_changed( bus, owner, d->type == KDBUS_ITEM_ID_ADD ? NULL : owner, - d->type == KDBUS_ITEM_ID_ADD ? owner : NULL, - ret); + d->type == KDBUS_ITEM_ID_ADD ? owner : NULL); } -static int translate_reply(sd_bus *bus, struct kdbus_msg *k, struct kdbus_item *d, sd_bus_message **ret) { +static int translate_reply(sd_bus *bus, struct kdbus_msg *k, struct kdbus_item *d) { _cleanup_bus_message_unref_ sd_bus_message *m = NULL; int r; assert(bus); assert(k); assert(d); - assert(ret); r = bus_message_new_synthetic_error( bus, @@ -523,16 +520,18 @@ static int translate_reply(sd_bus *bus, struct kdbus_msg *k, struct kdbus_item * if (r < 0) return r; - *ret = m; - m = NULL; + r = bus_rqueue_push(bus, m); + if (r < 0) + return r; + m = NULL; return 1; } -static int bus_kernel_translate_message(sd_bus *bus, struct kdbus_msg *k, sd_bus_message **ret) { +static int bus_kernel_translate_message(sd_bus *bus, struct kdbus_msg *k) { struct kdbus_item *d, *found = NULL; - static int (* const translate[])(sd_bus *bus, struct kdbus_msg *k, struct kdbus_item *d, sd_bus_message **ret) = { + static int (* const translate[])(sd_bus *bus, struct kdbus_msg *k, struct kdbus_item *d) = { [KDBUS_ITEM_NAME_ADD - _KDBUS_ITEM_KERNEL_BASE] = translate_name_change, [KDBUS_ITEM_NAME_REMOVE - _KDBUS_ITEM_KERNEL_BASE] = translate_name_change, [KDBUS_ITEM_NAME_CHANGE - _KDBUS_ITEM_KERNEL_BASE] = translate_name_change, @@ -546,7 +545,6 @@ static int bus_kernel_translate_message(sd_bus *bus, struct kdbus_msg *k, sd_bus assert(bus); assert(k); - assert(ret); assert(k->payload_type == KDBUS_PAYLOAD_KERNEL); KDBUS_PART_FOREACH(d, k, items) { @@ -563,7 +561,7 @@ static int bus_kernel_translate_message(sd_bus *bus, struct kdbus_msg *k, sd_bus return 0; } - return translate[found->type](bus, k, d, ret); + return translate[found->type](bus, k, d); } int kdbus_translate_attach_flags(uint64_t mask, uint64_t *kdbus_mask) { @@ -599,7 +597,7 @@ int kdbus_translate_attach_flags(uint64_t mask, uint64_t *kdbus_mask) { return 0; } -static int bus_kernel_make_message(sd_bus *bus, struct kdbus_msg *k, sd_bus_message **ret) { +static int bus_kernel_make_message(sd_bus *bus, struct kdbus_msg *k) { sd_bus_message *m = NULL; struct kdbus_item *d; unsigned n_fds = 0; @@ -611,7 +609,6 @@ static int bus_kernel_make_message(sd_bus *bus, struct kdbus_msg *k, sd_bus_mess assert(bus); assert(k); - assert(ret); assert(k->payload_type == KDBUS_PAYLOAD_DBUS1); KDBUS_PART_FOREACH(d, k, items) { @@ -833,7 +830,10 @@ static int bus_kernel_make_message(sd_bus *bus, struct kdbus_msg *k, sd_bus_mess fds = NULL; - *ret = m; + r = bus_rqueue_push(bus, m); + if (r < 0) + goto fail; + return 1; fail: @@ -852,13 +852,21 @@ fail: return r; } -int bus_kernel_read_message(sd_bus *bus, sd_bus_message **m) { - uint64_t off; +int bus_kernel_read_message(sd_bus *bus) { struct kdbus_msg *k; + uint64_t off; int r; assert(bus); - assert(m); + + /* Kernel messages might result in 2 new queued messages in + * the worst case (NameOwnerChange and LostName for the same + * well-known name, for example). Let's make room in + * advance. */ + + r = bus_rqueue_make_room(bus, 2); + if (r < 0) + return r; r = ioctl(bus->input_fd, KDBUS_CMD_MSG_RECV, &off); if (r < 0) { @@ -870,9 +878,9 @@ int bus_kernel_read_message(sd_bus *bus, sd_bus_message **m) { k = (struct kdbus_msg *)((uint8_t *)bus->kdbus_buffer + off); if (k->payload_type == KDBUS_PAYLOAD_DBUS1) - r = bus_kernel_make_message(bus, k, m); + r = bus_kernel_make_message(bus, k); else if (k->payload_type == KDBUS_PAYLOAD_KERNEL) - r = bus_kernel_translate_message(bus, k, m); + r = bus_kernel_translate_message(bus, k); else r = 0; diff --git a/src/libsystemd-bus/bus-kernel.h b/src/libsystemd-bus/bus-kernel.h index 4b63108e8..746ab227a 100644 --- a/src/libsystemd-bus/bus-kernel.h +++ b/src/libsystemd-bus/bus-kernel.h @@ -58,7 +58,7 @@ int bus_kernel_connect(sd_bus *b); int bus_kernel_take_fd(sd_bus *b); int bus_kernel_write_message(sd_bus *bus, sd_bus_message *m); -int bus_kernel_read_message(sd_bus *bus, sd_bus_message **m); +int bus_kernel_read_message(sd_bus *bus); int bus_kernel_create(const char *name, char **s); diff --git a/src/libsystemd-bus/bus-socket.c b/src/libsystemd-bus/bus-socket.c index 6845f9d1b..355f31c56 100644 --- a/src/libsystemd-bus/bus-socket.c +++ b/src/libsystemd-bus/bus-socket.c @@ -891,13 +891,12 @@ static int bus_socket_read_message_need(sd_bus *bus, size_t *need) { return 0; } -static int bus_socket_make_message(sd_bus *bus, size_t size, sd_bus_message **m) { +static int bus_socket_make_message(sd_bus *bus, size_t size) { sd_bus_message *t; void *b; int r; assert(bus); - assert(m); assert(bus->rbuffer_size >= size); assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO); @@ -926,11 +925,16 @@ static int bus_socket_make_message(sd_bus *bus, size_t size, sd_bus_message **m) bus->fds = NULL; bus->n_fds = 0; - *m = t; + r = bus_rqueue_push(bus, t); + if (r < 0) { + sd_bus_message_unref(t); + return r; + } + return 1; } -int bus_socket_read_message(sd_bus *bus, sd_bus_message **m) { +int bus_socket_read_message(sd_bus *bus) { struct msghdr mh; struct iovec iov; ssize_t k; @@ -947,15 +951,18 @@ int bus_socket_read_message(sd_bus *bus, sd_bus_message **m) { bool handle_cmsg = false; assert(bus); - assert(m); assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO); + r = bus_rqueue_make_room(bus, 1); + if (r < 0) + return r; + r = bus_socket_read_message_need(bus, &need); if (r < 0) return r; if (bus->rbuffer_size >= need) - return bus_socket_make_message(bus, need, m); + return bus_socket_make_message(bus, need); b = realloc(bus->rbuffer, need); if (!b) @@ -1045,7 +1052,7 @@ int bus_socket_read_message(sd_bus *bus, sd_bus_message **m) { return r; if (bus->rbuffer_size >= need) - return bus_socket_make_message(bus, need, m); + return bus_socket_make_message(bus, need); return 1; } diff --git a/src/libsystemd-bus/bus-socket.h b/src/libsystemd-bus/bus-socket.h index c61b90f47..f959787e7 100644 --- a/src/libsystemd-bus/bus-socket.h +++ b/src/libsystemd-bus/bus-socket.h @@ -30,7 +30,7 @@ int bus_socket_take_fd(sd_bus *b); int bus_socket_start_auth(sd_bus *b); int bus_socket_write_message(sd_bus *bus, sd_bus_message *m, size_t *idx); -int bus_socket_read_message(sd_bus *bus, sd_bus_message **m); +int bus_socket_read_message(sd_bus *bus); int bus_socket_process_opening(sd_bus *b); int bus_socket_process_authenticating(sd_bus *b); diff --git a/src/libsystemd-bus/sd-bus.c b/src/libsystemd-bus/sd-bus.c index a894af085..86265955b 100644 --- a/src/libsystemd-bus/sd-bus.c +++ b/src/libsystemd-bus/sd-bus.c @@ -1299,50 +1299,78 @@ static int dispatch_wqueue(sd_bus *bus) { return ret; } -static int bus_read_message(sd_bus *bus, sd_bus_message **m) { +static int bus_read_message(sd_bus *bus) { + assert(bus); + + if (bus->is_kernel) + return bus_kernel_read_message(bus); + else + return bus_socket_read_message(bus); +} + +int bus_rqueue_make_room(sd_bus *bus, unsigned n) { + sd_bus_message **q; + unsigned x; + + x = bus->rqueue_size + n; + + if (bus->rqueue_allocated >= x) + return 0; + + if (x > BUS_RQUEUE_MAX) + return -ENOBUFS; + + q = realloc(bus->rqueue, x * sizeof(sd_bus_message*)); + if (!q) + return -ENOMEM; + + bus->rqueue = q; + bus->rqueue_allocated = x; + + return 0; +} + +int bus_rqueue_push(sd_bus *bus, sd_bus_message *m) { int r; assert(bus); assert(m); - if (bus->is_kernel) - r = bus_kernel_read_message(bus, m); - else - r = bus_socket_read_message(bus, m); + r = bus_rqueue_make_room(bus, 1); + if (r < 0) + return r; - return r; + bus->rqueue[bus->rqueue_size++] = m; + + return 0; } static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) { - sd_bus_message *z = NULL; int r, ret = 0; assert(bus); assert(m); assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO); - if (bus->rqueue_size > 0) { - /* Dispatch a queued message */ + for (;;) { + if (bus->rqueue_size > 0) { + /* Dispatch a queued message */ - *m = bus->rqueue[0]; - bus->rqueue_size --; - memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size); - return 1; - } + *m = bus->rqueue[0]; + bus->rqueue_size --; + memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size); + return 1; + } - /* Try to read a new message */ - do { - r = bus_read_message(bus, &z); + /* Try to read a new message */ + r = bus_read_message(bus); if (r < 0) return r; if (r == 0) return ret; ret = 1; - } while (!z); - - *m = z; - return ret; + } } _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) { @@ -1578,10 +1606,10 @@ _public_ int sd_bus_call( sd_bus_error *error, sd_bus_message **reply) { - int r; usec_t timeout; uint64_t serial; - bool room = false; + unsigned i; + int r; assert_return(bus, -EINVAL); assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN); @@ -1600,37 +1628,26 @@ _public_ int sd_bus_call( return r; timeout = calc_elapse(usec); + i = bus->rqueue_size; for (;;) { usec_t left; - sd_bus_message *incoming = NULL; - - if (!room) { - sd_bus_message **q; - - if (bus->rqueue_size >= BUS_RQUEUE_MAX) - return -ENOBUFS; - /* Make sure there's room for queuing this - * locally, before we read the message */ - - q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*)); - if (!q) - return -ENOMEM; - - bus->rqueue = q; - room = true; - } - - r = bus_read_message(bus, &incoming); + r = bus_read_message(bus); if (r < 0) return r; - if (incoming) { + while (i < bus->rqueue_size) { + sd_bus_message *incoming = NULL; + + incoming = bus->rqueue[i]; if (incoming->reply_serial == serial) { /* Found a match! */ + memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1)); + bus->rqueue_size--; + if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) { if (reply) @@ -1663,6 +1680,9 @@ _public_ int sd_bus_call( incoming->sender && streq(bus->unique_name, incoming->sender)) { + memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1)); + bus->rqueue_size--; + /* Our own message? Somebody is trying * to send its own client a message, * let's not dead-lock, let's fail @@ -1672,15 +1692,11 @@ _public_ int sd_bus_call( return -ELOOP; } - /* There's already guaranteed to be room for - * this, so need to resize things here */ - bus->rqueue[bus->rqueue_size ++] = incoming; - room = false; - /* Try to read more, right-away */ - continue; + i++; } - if (r != 0) + + if (r > 0) continue; if (timeout > 0) { -- 2.30.2