1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
38 #include "bus-internal.h"
39 #include "bus-message.h"
41 #include "bus-socket.h"
42 #include "bus-kernel.h"
43 #include "bus-control.h"
45 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
47 static void bus_close_fds(sd_bus *b) {
51 close_nointr_nofail(b->input_fd);
53 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
54 close_nointr_nofail(b->output_fd);
56 b->input_fd = b->output_fd = -1;
59 static void bus_free(sd_bus *b) {
60 struct filter_callback *f;
61 struct object_callback *c;
69 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
78 strv_free(b->exec_argv);
80 close_many(b->fds, b->n_fds);
83 for (i = 0; i < b->rqueue_size; i++)
84 sd_bus_message_unref(b->rqueue[i]);
87 for (i = 0; i < b->wqueue_size; i++)
88 sd_bus_message_unref(b->wqueue[i]);
91 hashmap_free_free(b->reply_callbacks);
92 prioq_free(b->reply_callbacks_prioq);
94 while ((f = b->filter_callbacks)) {
95 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
99 while ((c = hashmap_steal_first(b->object_callbacks))) {
104 hashmap_free(b->object_callbacks);
105 bus_match_free(&b->match_callbacks);
107 bus_kernel_flush_memfd(b);
112 int sd_bus_new(sd_bus **ret) {
122 r->n_ref = REFCNT_INIT;
123 r->input_fd = r->output_fd = -1;
124 r->message_version = 1;
125 r->negotiate_fds = true;
127 /* We guarantee that wqueue always has space for at least one
129 r->wqueue = new(sd_bus_message*, 1);
139 int sd_bus_set_address(sd_bus *bus, const char *address) {
144 if (bus->state != BUS_UNSET)
159 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
162 if (bus->state != BUS_UNSET)
169 bus->input_fd = input_fd;
170 bus->output_fd = output_fd;
174 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
179 if (bus->state != BUS_UNSET)
183 if (strv_isempty(argv))
196 free(bus->exec_path);
197 strv_free(bus->exec_argv);
205 int sd_bus_set_bus_client(sd_bus *bus, int b) {
208 if (bus->state != BUS_UNSET)
211 bus->bus_client = !!b;
215 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
218 if (bus->state != BUS_UNSET)
221 bus->negotiate_fds = !!b;
225 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
228 if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
230 if (bus->state != BUS_UNSET)
233 bus->is_server = !!b;
234 bus->server_id = server_id;
238 int sd_bus_set_anonymous(sd_bus *bus, int b) {
241 if (bus->state != BUS_UNSET)
244 bus->anonymous_auth = !!b;
248 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
253 assert(bus->state == BUS_HELLO);
256 r = bus_message_to_errno(reply);
260 r = sd_bus_message_read(reply, "s", &s);
264 if (!service_name_is_valid(s) || s[0] != ':')
267 bus->unique_name = strdup(s);
268 if (!bus->unique_name)
271 bus->state = BUS_RUNNING;
276 static int bus_send_hello(sd_bus *bus) {
277 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
282 if (!bus->bus_client || bus->is_kernel)
285 r = sd_bus_message_new_method_call(
287 "org.freedesktop.DBus",
289 "org.freedesktop.DBus",
295 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
298 int bus_start_running(sd_bus *bus) {
301 if (bus->bus_client && !bus->is_kernel) {
302 bus->state = BUS_HELLO;
306 bus->state = BUS_RUNNING;
310 static int parse_address_key(const char **p, const char *key, char **value) {
321 if (strncmp(*p, key, l) != 0)
334 while (*a != ';' && *a != ',' && *a != 0) {
352 c = (char) ((x << 4) | y);
359 t = realloc(r, n + 2);
387 static void skip_address_key(const char **p) {
391 *p += strcspn(*p, ",");
397 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
398 _cleanup_free_ char *path = NULL, *abstract = NULL;
407 while (**p != 0 && **p != ';') {
408 r = parse_address_key(p, "guid", guid);
414 r = parse_address_key(p, "path", &path);
420 r = parse_address_key(p, "abstract", &abstract);
429 if (!path && !abstract)
432 if (path && abstract)
437 if (l > sizeof(b->sockaddr.un.sun_path))
440 b->sockaddr.un.sun_family = AF_UNIX;
441 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
442 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
443 } else if (abstract) {
444 l = strlen(abstract);
445 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
448 b->sockaddr.un.sun_family = AF_UNIX;
449 b->sockaddr.un.sun_path[0] = 0;
450 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
451 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
457 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
458 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
460 struct addrinfo *result, hints = {
461 .ai_socktype = SOCK_STREAM,
462 .ai_flags = AI_ADDRCONFIG,
470 while (**p != 0 && **p != ';') {
471 r = parse_address_key(p, "guid", guid);
477 r = parse_address_key(p, "host", &host);
483 r = parse_address_key(p, "port", &port);
489 r = parse_address_key(p, "family", &family);
502 if (streq(family, "ipv4"))
503 hints.ai_family = AF_INET;
504 else if (streq(family, "ipv6"))
505 hints.ai_family = AF_INET6;
510 r = getaddrinfo(host, port, &hints, &result);
514 return -EADDRNOTAVAIL;
516 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
517 b->sockaddr_size = result->ai_addrlen;
519 freeaddrinfo(result);
524 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
526 unsigned n_argv = 0, j;
535 while (**p != 0 && **p != ';') {
536 r = parse_address_key(p, "guid", guid);
542 r = parse_address_key(p, "path", &path);
548 if (startswith(*p, "argv")) {
552 ul = strtoul(*p + 4, (char**) p, 10);
553 if (errno > 0 || **p != '=' || ul > 256) {
563 x = realloc(argv, sizeof(char*) * (ul + 2));
569 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
575 r = parse_address_key(p, NULL, argv + ul);
590 /* Make sure there are no holes in the array, with the
591 * exception of argv[0] */
592 for (j = 1; j < n_argv; j++)
598 if (argv && argv[0] == NULL) {
599 argv[0] = strdup(path);
611 for (j = 0; j < n_argv; j++)
619 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
620 _cleanup_free_ char *path = NULL;
628 while (**p != 0 && **p != ';') {
629 r = parse_address_key(p, "guid", guid);
635 r = parse_address_key(p, "path", &path);
654 static void bus_reset_parsed_address(sd_bus *b) {
658 b->sockaddr_size = 0;
659 strv_free(b->exec_argv);
663 b->server_id = SD_ID128_NULL;
668 static int bus_parse_next_address(sd_bus *b) {
669 _cleanup_free_ char *guid = NULL;
677 if (b->address[b->address_index] == 0)
680 bus_reset_parsed_address(b);
682 a = b->address + b->address_index;
691 if (startswith(a, "unix:")) {
694 r = parse_unix_address(b, &a, &guid);
699 } else if (startswith(a, "tcp:")) {
702 r = parse_tcp_address(b, &a, &guid);
708 } else if (startswith(a, "unixexec:")) {
711 r = parse_exec_address(b, &a, &guid);
717 } else if (startswith(a, "kernel:")) {
720 r = parse_kernel_address(b, &a, &guid);
733 r = sd_id128_from_string(guid, &b->server_id);
738 b->address_index = a - b->address;
742 static int bus_start_address(sd_bus *b) {
750 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
752 r = bus_socket_connect(b);
756 b->last_connect_error = -r;
758 } else if (b->exec_path) {
760 r = bus_socket_exec(b);
764 b->last_connect_error = -r;
765 } else if (b->kernel) {
767 r = bus_kernel_connect(b);
771 b->last_connect_error = -r;
774 r = bus_parse_next_address(b);
778 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
782 int bus_next_address(sd_bus *b) {
785 bus_reset_parsed_address(b);
786 return bus_start_address(b);
789 static int bus_start_fd(sd_bus *b) {
794 assert(b->input_fd >= 0);
795 assert(b->output_fd >= 0);
797 r = fd_nonblock(b->input_fd, true);
801 r = fd_cloexec(b->input_fd, true);
805 if (b->input_fd != b->output_fd) {
806 r = fd_nonblock(b->output_fd, true);
810 r = fd_cloexec(b->output_fd, true);
815 if (fstat(b->input_fd, &st) < 0)
818 if (S_ISCHR(b->input_fd))
819 return bus_kernel_take_fd(b);
821 return bus_socket_take_fd(b);
824 int sd_bus_start(sd_bus *bus) {
829 if (bus->state != BUS_UNSET)
832 bus->state = BUS_OPENING;
834 if (bus->is_server && bus->bus_client)
837 if (bus->input_fd >= 0)
838 r = bus_start_fd(bus);
839 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel)
840 r = bus_start_address(bus);
847 return bus_send_hello(bus);
850 int sd_bus_open_system(sd_bus **ret) {
862 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
864 r = sd_bus_set_address(b, e);
868 b->sockaddr.un.sun_family = AF_UNIX;
869 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
870 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
873 b->bus_client = true;
887 int sd_bus_open_user(sd_bus **ret) {
900 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
902 r = sd_bus_set_address(b, e);
906 e = secure_getenv("XDG_RUNTIME_DIR");
913 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
918 b->sockaddr.un.sun_family = AF_UNIX;
919 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
920 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
923 b->bus_client = true;
937 void sd_bus_close(sd_bus *bus) {
941 if (bus->state != BUS_CLOSED)
944 bus->state = BUS_CLOSED;
949 /* We'll leave the fd open in case this is a kernel bus, since
950 * there might still be memblocks around that reference this
951 * bus, and they might need to invoke the
952 * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
956 sd_bus *sd_bus_ref(sd_bus *bus) {
960 assert_se(REFCNT_INC(bus->n_ref) >= 2);
965 sd_bus *sd_bus_unref(sd_bus *bus) {
969 if (REFCNT_DEC(bus->n_ref) <= 0)
975 int sd_bus_is_open(sd_bus *bus) {
979 return BUS_IS_OPEN(bus->state);
982 int sd_bus_can_send(sd_bus *bus, char type) {
987 if (bus->state == BUS_UNSET)
990 if (type == SD_BUS_TYPE_UNIX_FD) {
991 if (!bus->negotiate_fds)
994 r = bus_ensure_running(bus);
1001 return bus_type_is_valid(type);
1004 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1012 r = bus_ensure_running(bus);
1016 *server_id = bus->server_id;
1020 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1023 if (m->header->version > b->message_version)
1029 return bus_message_seal(m, ++b->serial);
1032 static int dispatch_wqueue(sd_bus *bus) {
1036 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1038 while (bus->wqueue_size > 0) {
1041 r = bus_kernel_write_message(bus, bus->wqueue[0]);
1043 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1049 /* Didn't do anything this time */
1051 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1052 /* Fully written. Let's drop the entry from
1055 * This isn't particularly optimized, but
1056 * well, this is supposed to be our worst-case
1057 * buffer only, and the socket buffer is
1058 * supposed to be our primary buffer, and if
1059 * it got full, then all bets are off
1062 sd_bus_message_unref(bus->wqueue[0]);
1063 bus->wqueue_size --;
1064 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1074 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1075 sd_bus_message *z = NULL;
1080 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1082 if (bus->rqueue_size > 0) {
1083 /* Dispatch a queued message */
1085 *m = bus->rqueue[0];
1086 bus->rqueue_size --;
1087 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1091 /* Try to read a new message */
1094 r = bus_kernel_read_message(bus, &z);
1096 r = bus_socket_read_message(bus, &z);
1112 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1117 if (!BUS_IS_OPEN(bus->state))
1123 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1130 /* If the serial number isn't kept, then we know that no reply
1132 if (!serial && !m->sealed)
1133 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1135 r = bus_seal_message(bus, m);
1139 /* If this is a reply and no reply was requested, then let's
1140 * suppress this, if we can */
1141 if (m->dont_send && !serial)
1144 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1148 r = bus_kernel_write_message(bus, m);
1150 r = bus_socket_write_message(bus, m, &idx);
1155 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1156 /* Wasn't fully written. So let's remember how
1157 * much was written. Note that the first entry
1158 * of the wqueue array is always allocated so
1159 * that we always can remember how much was
1161 bus->wqueue[0] = sd_bus_message_ref(m);
1162 bus->wqueue_size = 1;
1168 /* Just append it to the queue. */
1170 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1173 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1178 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1182 *serial = BUS_MESSAGE_SERIAL(m);
1187 static usec_t calc_elapse(uint64_t usec) {
1188 if (usec == (uint64_t) -1)
1192 usec = BUS_DEFAULT_TIMEOUT;
1194 return now(CLOCK_MONOTONIC) + usec;
1197 static int timeout_compare(const void *a, const void *b) {
1198 const struct reply_callback *x = a, *y = b;
1200 if (x->timeout != 0 && y->timeout == 0)
1203 if (x->timeout == 0 && y->timeout != 0)
1206 if (x->timeout < y->timeout)
1209 if (x->timeout > y->timeout)
1215 int sd_bus_send_with_reply(
1218 sd_bus_message_handler_t callback,
1223 struct reply_callback *c;
1228 if (!BUS_IS_OPEN(bus->state))
1234 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1236 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1239 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1243 if (usec != (uint64_t) -1) {
1244 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1249 r = bus_seal_message(bus, m);
1253 c = new0(struct reply_callback, 1);
1257 c->callback = callback;
1258 c->userdata = userdata;
1259 c->serial = BUS_MESSAGE_SERIAL(m);
1260 c->timeout = calc_elapse(usec);
1262 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1268 if (c->timeout != 0) {
1269 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1272 sd_bus_send_with_reply_cancel(bus, c->serial);
1277 r = sd_bus_send(bus, m, serial);
1279 sd_bus_send_with_reply_cancel(bus, c->serial);
1286 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1287 struct reply_callback *c;
1294 c = hashmap_remove(bus->reply_callbacks, &serial);
1298 if (c->timeout != 0)
1299 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1305 int bus_ensure_running(sd_bus *bus) {
1310 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED)
1312 if (bus->state == BUS_RUNNING)
1316 r = sd_bus_process(bus, NULL);
1319 if (bus->state == BUS_RUNNING)
1324 r = sd_bus_wait(bus, (uint64_t) -1);
1330 int sd_bus_send_with_reply_and_block(
1334 sd_bus_error *error,
1335 sd_bus_message **reply) {
1344 if (!BUS_IS_OPEN(bus->state))
1348 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1350 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1352 if (bus_error_is_dirty(error))
1355 r = bus_ensure_running(bus);
1359 r = sd_bus_send(bus, m, &serial);
1363 timeout = calc_elapse(usec);
1367 sd_bus_message *incoming = NULL;
1372 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1375 /* Make sure there's room for queuing this
1376 * locally, before we read the message */
1378 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1387 r = bus_kernel_read_message(bus, &incoming);
1389 r = bus_socket_read_message(bus, &incoming);
1394 if (incoming->reply_serial == serial) {
1395 /* Found a match! */
1397 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1402 sd_bus_message_unref(incoming);
1407 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1410 r = sd_bus_error_copy(error, &incoming->error);
1412 sd_bus_message_unref(incoming);
1416 k = bus_error_to_errno(&incoming->error);
1417 sd_bus_message_unref(incoming);
1421 sd_bus_message_unref(incoming);
1425 /* There's already guaranteed to be room for
1426 * this, so need to resize things here */
1427 bus->rqueue[bus->rqueue_size ++] = incoming;
1430 /* Try to read more, right-away */
1439 n = now(CLOCK_MONOTONIC);
1445 left = (uint64_t) -1;
1447 r = bus_poll(bus, true, left);
1451 r = dispatch_wqueue(bus);
1457 int sd_bus_get_fd(sd_bus *bus) {
1460 if (!BUS_IS_OPEN(bus->state))
1462 if (bus->input_fd != bus->output_fd)
1465 return bus->input_fd;
1468 int sd_bus_get_events(sd_bus *bus) {
1473 if (!BUS_IS_OPEN(bus->state))
1476 if (bus->state == BUS_OPENING)
1478 else if (bus->state == BUS_AUTHENTICATING) {
1480 if (bus_socket_auth_needs_write(bus))
1485 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1486 if (bus->rqueue_size <= 0)
1488 if (bus->wqueue_size > 0)
1495 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1496 struct reply_callback *c;
1502 if (!BUS_IS_OPEN(bus->state))
1505 if (bus->state == BUS_AUTHENTICATING) {
1506 *timeout_usec = bus->auth_timeout;
1510 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1511 *timeout_usec = (uint64_t) -1;
1515 c = prioq_peek(bus->reply_callbacks_prioq);
1517 *timeout_usec = (uint64_t) -1;
1521 *timeout_usec = c->timeout;
1525 static int process_timeout(sd_bus *bus) {
1526 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1527 struct reply_callback *c;
1533 c = prioq_peek(bus->reply_callbacks_prioq);
1537 n = now(CLOCK_MONOTONIC);
1541 r = bus_message_new_synthetic_error(
1544 &SD_BUS_ERROR_MAKE("org.freedesktop.DBus.Error.Timeout", "Timed out"),
1549 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1550 hashmap_remove(bus->reply_callbacks, &c->serial);
1552 r = c->callback(bus, m, c->userdata);
1555 return r < 0 ? r : 1;
1558 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1562 if (bus->state != BUS_HELLO)
1565 /* Let's make sure the first message on the bus is the HELLO
1566 * reply. But note that we don't actually parse the message
1567 * here (we leave that to the usual handling), we just verify
1568 * we don't let any earlier msg through. */
1570 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1571 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1574 if (m->reply_serial != bus->hello_serial)
1580 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1581 struct reply_callback *c;
1587 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1588 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1591 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1595 if (c->timeout != 0)
1596 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1598 r = sd_bus_message_rewind(m, true);
1602 r = c->callback(bus, m, c->userdata);
1608 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1609 struct filter_callback *l;
1616 bus->filter_callbacks_modified = false;
1618 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1620 if (bus->filter_callbacks_modified)
1623 /* Don't run this more than once per iteration */
1624 if (l->last_iteration == bus->iteration_counter)
1627 l->last_iteration = bus->iteration_counter;
1629 r = sd_bus_message_rewind(m, true);
1633 r = l->callback(bus, m, l->userdata);
1639 } while (bus->filter_callbacks_modified);
1644 static int process_match(sd_bus *bus, sd_bus_message *m) {
1651 bus->match_callbacks_modified = false;
1653 r = bus_match_run(bus, &bus->match_callbacks, m);
1657 } while (bus->match_callbacks_modified);
1662 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1663 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1669 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1672 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1675 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1678 if (streq_ptr(m->member, "Ping"))
1679 r = sd_bus_message_new_method_return(bus, m, &reply);
1680 else if (streq_ptr(m->member, "GetMachineId")) {
1684 r = sd_id128_get_machine(&id);
1688 r = sd_bus_message_new_method_return(bus, m, &reply);
1692 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1694 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1696 sd_bus_error_set(&error,
1697 "org.freedesktop.DBus.Error.UnknownMethod",
1698 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1700 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1706 r = sd_bus_send(bus, reply, NULL);
1713 static int process_object(sd_bus *bus, sd_bus_message *m) {
1714 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1715 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1716 struct object_callback *c;
1724 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1727 if (hashmap_isempty(bus->object_callbacks))
1730 pl = strlen(m->path);
1735 bus->object_callbacks_modified = false;
1737 c = hashmap_get(bus->object_callbacks, m->path);
1738 if (c && c->last_iteration != bus->iteration_counter) {
1740 c->last_iteration = bus->iteration_counter;
1742 r = sd_bus_message_rewind(m, true);
1746 r = c->callback(bus, m, c->userdata);
1753 /* Look for fallback prefixes */
1758 if (bus->object_callbacks_modified)
1761 e = strrchr(p, '/');
1767 c = hashmap_get(bus->object_callbacks, p);
1768 if (c && c->last_iteration != bus->iteration_counter && c->is_fallback) {
1770 c->last_iteration = bus->iteration_counter;
1772 r = sd_bus_message_rewind(m, true);
1776 r = c->callback(bus, m, c->userdata);
1784 } while (bus->object_callbacks_modified);
1786 /* We found some handlers but none wanted to take this, then
1787 * return this -- with one exception, we can handle
1788 * introspection minimally ourselves */
1789 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1792 sd_bus_error_set(&error,
1793 "org.freedesktop.DBus.Error.UnknownMethod",
1794 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1796 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1800 r = sd_bus_send(bus, reply, NULL);
1807 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1808 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1809 _cleanup_free_ char *introspection = NULL;
1810 _cleanup_set_free_free_ Set *s = NULL;
1811 _cleanup_fclose_ FILE *f = NULL;
1812 struct object_callback *c;
1821 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1827 s = set_new(string_hash_func, string_compare_func);
1831 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1835 if (streq(c->path, "/"))
1838 if (streq(m->path, "/"))
1841 e = startswith(c->path, m->path);
1842 if (!e || *e != '/')
1854 r = set_consume(s, a);
1855 if (r < 0 && r != -EEXIST)
1859 f = open_memstream(&introspection, &size);
1863 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1864 fputs("<node>\n", f);
1865 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1866 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1868 while ((node = set_steal_first(s))) {
1869 fprintf(f, " <node name=\"%s\"/>\n", node);
1873 fputs("</node>\n", f);
1880 r = sd_bus_message_new_method_return(bus, m, &reply);
1884 r = sd_bus_message_append(reply, "s", introspection);
1888 r = sd_bus_send(bus, reply, NULL);
1895 static int process_message(sd_bus *bus, sd_bus_message *m) {
1901 bus->iteration_counter++;
1903 r = process_hello(bus, m);
1907 r = process_reply(bus, m);
1911 r = process_filter(bus, m);
1915 r = process_match(bus, m);
1919 r = process_builtin(bus, m);
1923 r = process_object(bus, m);
1927 return process_introspect(bus, m);
1930 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1931 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1935 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1937 r = process_timeout(bus);
1941 r = dispatch_wqueue(bus);
1945 r = dispatch_rqueue(bus, &m);
1951 r = process_message(bus, m);
1956 r = sd_bus_message_rewind(m, true);
1965 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1966 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1967 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1969 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1971 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1975 r = sd_bus_send(bus, reply, NULL);
1989 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1992 /* Returns 0 when we didn't do anything. This should cause the
1993 * caller to invoke sd_bus_wait() before returning the next
1994 * time. Returns > 0 when we did something, which possibly
1995 * means *ret is filled in with an unprocessed message. */
2000 /* We don't allow recursively invoking sd_bus_process(). */
2001 if (bus->processing)
2004 switch (bus->state) {
2011 r = bus_socket_process_opening(bus);
2018 case BUS_AUTHENTICATING:
2020 r = bus_socket_process_authenticating(bus);
2030 bus->processing = true;
2031 r = process_running(bus, ret);
2032 bus->processing = false;
2037 assert_not_reached("Unknown state");
2040 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2041 struct pollfd p[2] = {};
2048 if (!BUS_IS_OPEN(bus->state))
2051 e = sd_bus_get_events(bus);
2058 r = sd_bus_get_timeout(bus, &until);
2065 nw = now(CLOCK_MONOTONIC);
2066 m = until > nw ? until - nw : 0;
2069 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2072 p[0].fd = bus->input_fd;
2073 if (bus->output_fd == bus->input_fd) {
2077 p[0].events = e & POLLIN;
2078 p[1].fd = bus->output_fd;
2079 p[1].events = e & POLLOUT;
2083 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2087 return r > 0 ? 1 : 0;
2090 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2094 if (!BUS_IS_OPEN(bus->state))
2096 if (bus->rqueue_size > 0)
2099 return bus_poll(bus, false, timeout_usec);
2102 int sd_bus_flush(sd_bus *bus) {
2107 if (!BUS_IS_OPEN(bus->state))
2110 r = bus_ensure_running(bus);
2114 if (bus->wqueue_size <= 0)
2118 r = dispatch_wqueue(bus);
2122 if (bus->wqueue_size <= 0)
2125 r = bus_poll(bus, false, (uint64_t) -1);
2131 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2132 struct filter_callback *f;
2139 f = new0(struct filter_callback, 1);
2142 f->callback = callback;
2143 f->userdata = userdata;
2145 bus->filter_callbacks_modified = true;
2146 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2150 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2151 struct filter_callback *f;
2158 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2159 if (f->callback == callback && f->userdata == userdata) {
2160 bus->filter_callbacks_modified = true;
2161 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2170 static int bus_add_object(
2174 sd_bus_message_handler_t callback,
2177 struct object_callback *c;
2187 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2191 c = new0(struct object_callback, 1);
2195 c->path = strdup(path);
2201 c->callback = callback;
2202 c->userdata = userdata;
2203 c->is_fallback = fallback;
2205 bus->object_callbacks_modified = true;
2206 r = hashmap_put(bus->object_callbacks, c->path, c);
2216 static int bus_remove_object(
2220 sd_bus_message_handler_t callback,
2223 struct object_callback *c;
2232 c = hashmap_get(bus->object_callbacks, path);
2236 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2239 bus->object_callbacks_modified = true;
2240 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2248 int sd_bus_add_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2249 return bus_add_object(bus, false, path, callback, userdata);
2252 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2253 return bus_remove_object(bus, false, path, callback, userdata);
2256 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2257 return bus_add_object(bus, true, prefix, callback, userdata);
2260 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2261 return bus_remove_object(bus, true, prefix, callback, userdata);
2264 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2272 if (bus->bus_client) {
2273 r = bus_add_match_internal(bus, match);
2279 bus->match_callbacks_modified = true;
2280 r = bus_match_add(&bus->match_callbacks, match, callback, userdata, NULL);
2283 if (bus->bus_client)
2284 bus_remove_match_internal(bus, match);
2291 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2299 if (bus->bus_client)
2300 r = bus_remove_match_internal(bus, match);
2303 bus->match_callbacks_modified = true;
2304 q = bus_match_remove(&bus->match_callbacks, match, callback, userdata);
2312 int sd_bus_emit_signal(
2315 const char *interface,
2317 const char *types, ...) {
2319 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2326 r = sd_bus_message_new_signal(bus, path, interface, member, &m);
2330 va_start(ap, types);
2331 r = bus_message_append_ap(m, types, ap);
2336 return sd_bus_send(bus, m, NULL);
2339 int sd_bus_call_method(
2341 const char *destination,
2343 const char *interface,
2345 sd_bus_error *error,
2346 sd_bus_message **reply,
2347 const char *types, ...) {
2349 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2356 r = sd_bus_message_new_method_call(bus, destination, path, interface, member, &m);
2360 va_start(ap, types);
2361 r = bus_message_append_ap(m, types, ap);
2366 return sd_bus_send_with_reply_and_block(bus, m, 0, error, reply);
2369 int sd_bus_reply_method_return(
2371 sd_bus_message *call,
2372 const char *types, ...) {
2374 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2384 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2387 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2390 r = sd_bus_message_new_method_return(bus, call, &m);
2394 va_start(ap, types);
2395 r = bus_message_append_ap(m, types, ap);
2400 return sd_bus_send(bus, m, NULL);
2403 int sd_bus_reply_method_error(
2405 sd_bus_message *call,
2406 const sd_bus_error *e) {
2408 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2417 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2419 if (!sd_bus_error_is_set(e))
2422 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2425 r = sd_bus_message_new_method_error(bus, call, e, &m);
2429 return sd_bus_send(bus, m, NULL);