chiark / gitweb /
bus: add sd_bus_process_priority() to support prioq mode of kdbus
authorLennart Poettering <lennart@poettering.net>
Wed, 22 Jan 2014 19:26:58 +0000 (20:26 +0100)
committerLennart Poettering <lennart@poettering.net>
Wed, 22 Jan 2014 19:26:58 +0000 (20:26 +0100)
src/libsystemd/libsystemd.sym
src/libsystemd/sd-bus/bus-kernel.c
src/libsystemd/sd-bus/bus-kernel.h
src/libsystemd/sd-bus/sd-bus.c
src/libsystemd/sd-bus/test-bus-kernel.c
src/systemd/sd-bus.h

index fc9c4f830c4ed6e6e54130e5e3a8aa9a64ab4913..984bc155a9080237a3daa70fc2eaa225e2957554 100644 (file)
@@ -46,6 +46,7 @@ global:
         sd_bus_get_events;
         sd_bus_get_timeout;
         sd_bus_process;
+        sd_bus_process_priority;
         sd_bus_wait;
         sd_bus_flush;
         sd_bus_get_current;
index 77ad5ca506869ea8db5ea0b965636bed4cc931b9..27c2e0efcf71c11629aa7e15920ddbb0b1a43eb5 100644 (file)
@@ -1039,7 +1039,7 @@ static int bus_kernel_translate_message(sd_bus *bus, struct kdbus_msg *k) {
         return translate[found->type - _KDBUS_ITEM_KERNEL_BASE](bus, k, found);
 }
 
-int bus_kernel_read_message(sd_bus *bus) {
+int bus_kernel_read_message(sd_bus *bus, bool hint_priority, int64_t priority) {
         struct kdbus_cmd_recv recv = {};
         struct kdbus_msg *k;
         int r;
@@ -1050,6 +1050,11 @@ int bus_kernel_read_message(sd_bus *bus) {
         if (r < 0)
                 return r;
 
+        if (hint_priority) {
+                recv.flags |= KDBUS_RECV_USE_PRIORITY;
+                recv.priority = priority;
+        }
+
         r = ioctl(bus->input_fd, KDBUS_CMD_MSG_RECV, &recv);
         if (r < 0) {
                 if (errno == EAGAIN)
index b5163964e7fdcb2ff2ef50e5b3c73545bd8a97b6..63df63e4baa9833febd91229ad1bc9a597436c6d 100644 (file)
@@ -61,7 +61,7 @@ int bus_kernel_connect(sd_bus *b);
 int bus_kernel_take_fd(sd_bus *b);
 
 int bus_kernel_write_message(sd_bus *bus, sd_bus_message *m, bool hint_sync_call);
-int bus_kernel_read_message(sd_bus *bus);
+int bus_kernel_read_message(sd_bus *bus, bool hint_priority, int64_t priority);
 
 int bus_kernel_create_bus(const char *name, bool world, char **s);
 int bus_kernel_create_namespace(const char *name, char **s);
index a8295b2778b7b78b86b70abf407fd063af8b09ea..9f8c244bf55c45085b995af3c6d864f53b9ef386 100644 (file)
@@ -1458,11 +1458,11 @@ static int dispatch_wqueue(sd_bus *bus) {
         return ret;
 }
 
-static int bus_read_message(sd_bus *bus) {
+static int bus_read_message(sd_bus *bus, bool hint_priority, int64_t priority) {
         assert(bus);
 
         if (bus->is_kernel)
-                return bus_kernel_read_message(bus);
+                return bus_kernel_read_message(bus, hint_priority, priority);
         else
                 return bus_socket_read_message(bus);
 }
@@ -1479,13 +1479,17 @@ int bus_rqueue_make_room(sd_bus *bus) {
         return 0;
 }
 
-static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
+static int dispatch_rqueue(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **m) {
         int r, ret = 0;
 
         assert(bus);
         assert(m);
         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
 
+        /* Note that the priority logic is only available on kdbus,
+         * where the rqueue is unused. We check the rqueue here
+         * anyway, because it's simple... */
+
         for (;;) {
                 if (bus->rqueue_size > 0) {
                         /* Dispatch a queued message */
@@ -1497,7 +1501,7 @@ static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
                 }
 
                 /* Try to read a new message */
-                r = bus_read_message(bus);
+                r = bus_read_message(bus, hint_priority, priority);
                 if (r < 0)
                         return r;
                 if (r == 0)
@@ -1837,7 +1841,7 @@ _public_ int sd_bus_call(
                         i++;
                 }
 
-                r = bus_read_message(bus);
+                r = bus_read_message(bus, false, 0);
                 if (r < 0) {
                         if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
                                 bus_enter_closing(bus);
@@ -2203,7 +2207,7 @@ finish:
         return r;
 }
 
-static int process_running(sd_bus *bus, sd_bus_message **ret) {
+static int process_running(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **ret) {
         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
         int r;
 
@@ -2218,7 +2222,7 @@ static int process_running(sd_bus *bus, sd_bus_message **ret) {
         if (r != 0)
                 goto null_message;
 
-        r = dispatch_rqueue(bus, &m);
+        r = dispatch_rqueue(bus, hint_priority, priority, &m);
         if (r < 0)
                 return r;
         if (!m)
@@ -2344,7 +2348,7 @@ finish:
         return r;
 }
 
-_public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
+static int bus_process_internal(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **ret) {
         BUS_DONT_DESTROY(bus);
         int r;
 
@@ -2393,7 +2397,7 @@ _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
 
         case BUS_RUNNING:
         case BUS_HELLO:
-                r = process_running(bus, ret);
+                r = process_running(bus, hint_priority, priority, ret);
                 if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
                         bus_enter_closing(bus);
                         r = 1;
@@ -2411,6 +2415,14 @@ _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
         assert_not_reached("Unknown state");
 }
 
+_public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
+        return bus_process_internal(bus, false, 0, ret);
+}
+
+_public_ int sd_bus_process_priority(sd_bus *bus, int64_t priority, sd_bus_message **ret) {
+        return bus_process_internal(bus, true, priority, ret);
+}
+
 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
         struct pollfd p[2] = {};
         int r, e, n;
index 0fab88021d1fbc1cf08d8db8a34cd6a938c36078..30cc313af498702d7ab1831ada261c255b6a724d 100644 (file)
@@ -110,6 +110,9 @@ int main(int argc, char *argv[]) {
         r = sd_bus_try_close(b);
         assert_se(r == -EBUSY);
 
+        r = sd_bus_process_priority(b, -10, &m);
+        assert_se(r == -ENOMSG);
+
         r = sd_bus_process(b, &m);
         assert_se(r > 0);
         assert_se(m);
index 6f9443aa37a365bb2dc027448e3c3feccdcfceab..01f6275c4b700afcb3da5cdeb71655a9ec72506f 100644 (file)
@@ -139,6 +139,7 @@ int sd_bus_get_fd(sd_bus *bus);
 int sd_bus_get_events(sd_bus *bus);
 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec);
 int sd_bus_process(sd_bus *bus, sd_bus_message **r);
+int sd_bus_process_priority(sd_bus *bus, int64_t max_priority, sd_bus_message **r);
 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec);
 int sd_bus_flush(sd_bus *bus);