We want to allow clients to process an sd_bus_message on a different
thread than it was received on. Since unreffing a bus message might
readd some of its memfds to the memfd cache add some minimal locking
around the cache.
libsystemd-shared.la \
libsystemd-daemon.la
libsystemd-shared.la \
libsystemd-daemon.la
+libsystemd_bus_la_CFLAGS = \
+ $(AM_CFLAGS) \
+ -pthread
+
noinst_LTLIBRARIES += \
libsystemd-bus.la
noinst_LTLIBRARIES += \
libsystemd-bus.la
- port systemd to new library
- implement busname unit type in systemd
- move to gvariant
- port systemd to new library
- implement busname unit type in systemd
- move to gvariant
- - minimal locking around the memfd cache
- keep the connection fds around as long as the bus is open
- merge busctl into systemctl or so?
- synthesize sd_bus_message objects from kernel messages
- keep the connection fds around as long as the bus is open
- merge busctl into systemctl or so?
- synthesize sd_bus_message objects from kernel messages
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include "hashmap.h"
#include "prioq.h"
#include "hashmap.h"
#include "prioq.h"
+ /* We do locking around the memfd cache, since we want to
+ * allow people to process a sd_bus_message in a different
+ * thread then it was generated on and free it there. Since
+ * adding something to the memfd cache might happen when a
+ * message is released, we hence need to protect this bit with
+ * a mutex. */
+ pthread_mutex_t memfd_cache_mutex;
struct memfd_cache memfd_cache[MEMFD_CACHE_MAX];
unsigned n_memfd_cache;
struct memfd_cache memfd_cache[MEMFD_CACHE_MAX];
unsigned n_memfd_cache;
int bus_kernel_pop_memfd(sd_bus *bus, void **address, size_t *size) {
struct memfd_cache *c;
int bus_kernel_pop_memfd(sd_bus *bus, void **address, size_t *size) {
struct memfd_cache *c;
assert(address);
assert(size);
assert(address);
assert(size);
if (!bus || !bus->is_kernel)
return -ENOTSUP;
if (!bus || !bus->is_kernel)
return -ENOTSUP;
+ assert_se(pthread_mutex_lock(&bus->memfd_cache_mutex) == 0);
+
if (bus->n_memfd_cache <= 0) {
if (bus->n_memfd_cache <= 0) {
+ int r;
+
+ assert_se(pthread_mutex_unlock(&bus->memfd_cache_mutex) == 0);
r = ioctl(bus->input_fd, KDBUS_CMD_MEMFD_NEW, &fd);
if (r < 0)
r = ioctl(bus->input_fd, KDBUS_CMD_MEMFD_NEW, &fd);
if (r < 0)
*address = c->address;
*size = c->size;
*address = c->address;
*size = c->size;
+ fd = c->fd;
+
+ assert_se(pthread_mutex_unlock(&bus->memfd_cache_mutex) == 0);
+ return fd;
+}
+
+static void close_and_munmap(int fd, void *address, size_t size) {
+ if (size > 0)
+ assert_se(munmap(address, PAGE_ALIGN(size)) == 0);
+
+ close_nointr_nofail(fd);
}
void bus_kernel_push_memfd(sd_bus *bus, int fd, void *address, size_t size) {
}
void bus_kernel_push_memfd(sd_bus *bus, int fd, void *address, size_t size) {
assert(fd >= 0);
assert(size == 0 || address);
assert(fd >= 0);
assert(size == 0 || address);
- if (!bus || !bus->is_kernel ||
- bus->n_memfd_cache >= ELEMENTSOF(bus->memfd_cache)) {
+ if (!bus || !bus->is_kernel) {
+ close_and_munmap(fd, address, size);
+ return;
+ }
+
+ assert_se(pthread_mutex_lock(&bus->memfd_cache_mutex) == 0);
- if (size > 0)
- assert_se(munmap(address, PAGE_ALIGN(size)) == 0);
+ if (bus->n_memfd_cache >= ELEMENTSOF(bus->memfd_cache)) {
+ assert_se(pthread_mutex_unlock(&bus->memfd_cache_mutex) == 0);
- close_nointr_nofail(fd);
+ close_and_munmap(fd, address, size);
c->size = MEMFD_CACHE_ITEM_SIZE_MAX;
} else
c->size = size;
c->size = MEMFD_CACHE_ITEM_SIZE_MAX;
} else
c->size = size;
+
+ assert_se(pthread_mutex_unlock(&bus->memfd_cache_mutex) == 0);
}
void bus_kernel_flush_memfd(sd_bus *b) {
}
void bus_kernel_flush_memfd(sd_bus *b) {
#include <sys/poll.h>
#include <byteswap.h>
#include <sys/mman.h>
#include <sys/poll.h>
#include <byteswap.h>
#include <sys/mman.h>
#include "util.h"
#include "macro.h"
#include "util.h"
#include "macro.h"
bus_kernel_flush_memfd(b);
bus_kernel_flush_memfd(b);
+ assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
+
r->negotiate_fds = true;
r->original_pid = getpid();
r->negotiate_fds = true;
r->original_pid = getpid();
+ assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
+
/* We guarantee that wqueue always has space for at least one
* entry */
r->wqueue = new(sd_bus_message*, 1);
/* We guarantee that wqueue always has space for at least one
* entry */
r->wqueue = new(sd_bus_message*, 1);