1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
39 #include "bus-internal.h"
40 #include "bus-message.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
46 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
48 static void bus_close_fds(sd_bus *b) {
52 close_nointr_nofail(b->input_fd);
54 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
55 close_nointr_nofail(b->output_fd);
57 b->input_fd = b->output_fd = -1;
60 static void bus_free(sd_bus *b) {
61 struct filter_callback *f;
62 struct object_callback *c;
70 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
79 strv_free(b->exec_argv);
81 close_many(b->fds, b->n_fds);
84 for (i = 0; i < b->rqueue_size; i++)
85 sd_bus_message_unref(b->rqueue[i]);
88 for (i = 0; i < b->wqueue_size; i++)
89 sd_bus_message_unref(b->wqueue[i]);
92 hashmap_free_free(b->reply_callbacks);
93 prioq_free(b->reply_callbacks_prioq);
95 while ((f = b->filter_callbacks)) {
96 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
100 while ((c = hashmap_steal_first(b->object_callbacks))) {
105 hashmap_free(b->object_callbacks);
106 bus_match_free(&b->match_callbacks);
108 bus_kernel_flush_memfd(b);
110 assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
115 int sd_bus_new(sd_bus **ret) {
125 r->n_ref = REFCNT_INIT;
126 r->input_fd = r->output_fd = -1;
127 r->message_version = 1;
128 r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
129 r->original_pid = getpid();
131 assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
133 /* We guarantee that wqueue always has space for at least one
135 r->wqueue = new(sd_bus_message*, 1);
145 int sd_bus_set_address(sd_bus *bus, const char *address) {
150 if (bus->state != BUS_UNSET)
154 if (bus_pid_changed(bus))
167 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
170 if (bus->state != BUS_UNSET)
176 if (bus_pid_changed(bus))
179 bus->input_fd = input_fd;
180 bus->output_fd = output_fd;
184 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
189 if (bus->state != BUS_UNSET)
193 if (strv_isempty(argv))
195 if (bus_pid_changed(bus))
208 free(bus->exec_path);
209 strv_free(bus->exec_argv);
217 int sd_bus_set_bus_client(sd_bus *bus, int b) {
220 if (bus->state != BUS_UNSET)
222 if (bus_pid_changed(bus))
225 bus->bus_client = !!b;
229 int sd_bus_negotiate_fds(sd_bus *bus, int b) {
232 if (bus->state != BUS_UNSET)
234 if (bus_pid_changed(bus))
237 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
241 int sd_bus_negotiate_attach_comm(sd_bus *bus, int b) {
244 if (bus->state != BUS_UNSET)
246 if (bus_pid_changed(bus))
249 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_COMM, b);
253 int sd_bus_negotiate_attach_exe(sd_bus *bus, int b) {
256 if (bus->state != BUS_UNSET)
258 if (bus_pid_changed(bus))
261 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_EXE, b);
265 int sd_bus_negotiate_attach_cmdline(sd_bus *bus, int b) {
268 if (bus->state != BUS_UNSET)
270 if (bus_pid_changed(bus))
273 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CMDLINE, b);
277 int sd_bus_negotiate_attach_cgroup(sd_bus *bus, int b) {
280 if (bus->state != BUS_UNSET)
282 if (bus_pid_changed(bus))
285 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CGROUP, b);
289 int sd_bus_negotiate_attach_caps(sd_bus *bus, int b) {
292 if (bus->state != BUS_UNSET)
294 if (bus_pid_changed(bus))
297 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CAPS, b);
301 int sd_bus_negotiate_attach_selinux_context(sd_bus *bus, int b) {
304 if (bus->state != BUS_UNSET)
306 if (bus_pid_changed(bus))
309 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_SECLABEL, b);
313 int sd_bus_negotiate_attach_audit(sd_bus *bus, int b) {
316 if (bus->state != BUS_UNSET)
318 if (bus_pid_changed(bus))
321 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_AUDIT, b);
325 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
328 if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
330 if (bus->state != BUS_UNSET)
332 if (bus_pid_changed(bus))
335 bus->is_server = !!b;
336 bus->server_id = server_id;
340 int sd_bus_set_anonymous(sd_bus *bus, int b) {
343 if (bus->state != BUS_UNSET)
345 if (bus_pid_changed(bus))
348 bus->anonymous_auth = !!b;
352 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
357 assert(bus->state == BUS_HELLO);
360 r = bus_message_to_errno(reply);
364 r = sd_bus_message_read(reply, "s", &s);
368 if (!service_name_is_valid(s) || s[0] != ':')
371 bus->unique_name = strdup(s);
372 if (!bus->unique_name)
375 bus->state = BUS_RUNNING;
380 static int bus_send_hello(sd_bus *bus) {
381 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
386 if (!bus->bus_client || bus->is_kernel)
389 r = sd_bus_message_new_method_call(
391 "org.freedesktop.DBus",
393 "org.freedesktop.DBus",
399 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
402 int bus_start_running(sd_bus *bus) {
405 if (bus->bus_client && !bus->is_kernel) {
406 bus->state = BUS_HELLO;
410 bus->state = BUS_RUNNING;
414 static int parse_address_key(const char **p, const char *key, char **value) {
425 if (strncmp(*p, key, l) != 0)
438 while (*a != ';' && *a != ',' && *a != 0) {
456 c = (char) ((x << 4) | y);
463 t = realloc(r, n + 2);
491 static void skip_address_key(const char **p) {
495 *p += strcspn(*p, ",");
501 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
502 _cleanup_free_ char *path = NULL, *abstract = NULL;
511 while (**p != 0 && **p != ';') {
512 r = parse_address_key(p, "guid", guid);
518 r = parse_address_key(p, "path", &path);
524 r = parse_address_key(p, "abstract", &abstract);
533 if (!path && !abstract)
536 if (path && abstract)
541 if (l > sizeof(b->sockaddr.un.sun_path))
544 b->sockaddr.un.sun_family = AF_UNIX;
545 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
546 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
547 } else if (abstract) {
548 l = strlen(abstract);
549 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
552 b->sockaddr.un.sun_family = AF_UNIX;
553 b->sockaddr.un.sun_path[0] = 0;
554 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
555 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
561 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
562 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
564 struct addrinfo *result, hints = {
565 .ai_socktype = SOCK_STREAM,
566 .ai_flags = AI_ADDRCONFIG,
574 while (**p != 0 && **p != ';') {
575 r = parse_address_key(p, "guid", guid);
581 r = parse_address_key(p, "host", &host);
587 r = parse_address_key(p, "port", &port);
593 r = parse_address_key(p, "family", &family);
606 if (streq(family, "ipv4"))
607 hints.ai_family = AF_INET;
608 else if (streq(family, "ipv6"))
609 hints.ai_family = AF_INET6;
614 r = getaddrinfo(host, port, &hints, &result);
618 return -EADDRNOTAVAIL;
620 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
621 b->sockaddr_size = result->ai_addrlen;
623 freeaddrinfo(result);
628 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
630 unsigned n_argv = 0, j;
639 while (**p != 0 && **p != ';') {
640 r = parse_address_key(p, "guid", guid);
646 r = parse_address_key(p, "path", &path);
652 if (startswith(*p, "argv")) {
656 ul = strtoul(*p + 4, (char**) p, 10);
657 if (errno > 0 || **p != '=' || ul > 256) {
667 x = realloc(argv, sizeof(char*) * (ul + 2));
673 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
679 r = parse_address_key(p, NULL, argv + ul);
694 /* Make sure there are no holes in the array, with the
695 * exception of argv[0] */
696 for (j = 1; j < n_argv; j++)
702 if (argv && argv[0] == NULL) {
703 argv[0] = strdup(path);
715 for (j = 0; j < n_argv; j++)
723 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
724 _cleanup_free_ char *path = NULL;
732 while (**p != 0 && **p != ';') {
733 r = parse_address_key(p, "guid", guid);
739 r = parse_address_key(p, "path", &path);
758 static void bus_reset_parsed_address(sd_bus *b) {
762 b->sockaddr_size = 0;
763 strv_free(b->exec_argv);
767 b->server_id = SD_ID128_NULL;
772 static int bus_parse_next_address(sd_bus *b) {
773 _cleanup_free_ char *guid = NULL;
781 if (b->address[b->address_index] == 0)
784 bus_reset_parsed_address(b);
786 a = b->address + b->address_index;
795 if (startswith(a, "unix:")) {
798 r = parse_unix_address(b, &a, &guid);
803 } else if (startswith(a, "tcp:")) {
806 r = parse_tcp_address(b, &a, &guid);
812 } else if (startswith(a, "unixexec:")) {
815 r = parse_exec_address(b, &a, &guid);
821 } else if (startswith(a, "kernel:")) {
824 r = parse_kernel_address(b, &a, &guid);
837 r = sd_id128_from_string(guid, &b->server_id);
842 b->address_index = a - b->address;
846 static int bus_start_address(sd_bus *b) {
854 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
856 r = bus_socket_connect(b);
860 b->last_connect_error = -r;
862 } else if (b->exec_path) {
864 r = bus_socket_exec(b);
868 b->last_connect_error = -r;
869 } else if (b->kernel) {
871 r = bus_kernel_connect(b);
875 b->last_connect_error = -r;
878 r = bus_parse_next_address(b);
882 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
886 int bus_next_address(sd_bus *b) {
889 bus_reset_parsed_address(b);
890 return bus_start_address(b);
893 static int bus_start_fd(sd_bus *b) {
898 assert(b->input_fd >= 0);
899 assert(b->output_fd >= 0);
901 r = fd_nonblock(b->input_fd, true);
905 r = fd_cloexec(b->input_fd, true);
909 if (b->input_fd != b->output_fd) {
910 r = fd_nonblock(b->output_fd, true);
914 r = fd_cloexec(b->output_fd, true);
919 if (fstat(b->input_fd, &st) < 0)
922 if (S_ISCHR(b->input_fd))
923 return bus_kernel_take_fd(b);
925 return bus_socket_take_fd(b);
928 int sd_bus_start(sd_bus *bus) {
933 if (bus->state != BUS_UNSET)
935 if (bus_pid_changed(bus))
938 bus->state = BUS_OPENING;
940 if (bus->is_server && bus->bus_client)
943 if (bus->input_fd >= 0)
944 r = bus_start_fd(bus);
945 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel)
946 r = bus_start_address(bus);
953 return bus_send_hello(bus);
956 int sd_bus_open_system(sd_bus **ret) {
968 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
970 r = sd_bus_set_address(b, e);
974 b->sockaddr.un.sun_family = AF_UNIX;
975 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
976 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
979 b->bus_client = true;
993 int sd_bus_open_user(sd_bus **ret) {
1006 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1008 r = sd_bus_set_address(b, e);
1012 e = secure_getenv("XDG_RUNTIME_DIR");
1019 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1024 b->sockaddr.un.sun_family = AF_UNIX;
1025 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1026 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1029 b->bus_client = true;
1031 r = sd_bus_start(b);
1043 void sd_bus_close(sd_bus *bus) {
1046 if (bus->state == BUS_CLOSED)
1048 if (bus_pid_changed(bus))
1051 bus->state = BUS_CLOSED;
1053 if (!bus->is_kernel)
1056 /* We'll leave the fd open in case this is a kernel bus, since
1057 * there might still be memblocks around that reference this
1058 * bus, and they might need to invoke the
1059 * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1063 sd_bus *sd_bus_ref(sd_bus *bus) {
1067 assert_se(REFCNT_INC(bus->n_ref) >= 2);
1072 sd_bus *sd_bus_unref(sd_bus *bus) {
1076 if (REFCNT_DEC(bus->n_ref) <= 0)
1082 int sd_bus_is_open(sd_bus *bus) {
1085 if (bus_pid_changed(bus))
1088 return BUS_IS_OPEN(bus->state);
1091 int sd_bus_can_send(sd_bus *bus, char type) {
1096 if (bus->state == BUS_UNSET)
1098 if (bus_pid_changed(bus))
1101 if (type == SD_BUS_TYPE_UNIX_FD) {
1102 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1105 r = bus_ensure_running(bus);
1109 return bus->can_fds;
1112 return bus_type_is_valid(type);
1115 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1122 if (bus_pid_changed(bus))
1125 r = bus_ensure_running(bus);
1129 *server_id = bus->server_id;
1133 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1136 if (m->header->version > b->message_version)
1142 return bus_message_seal(m, ++b->serial);
1145 static int dispatch_wqueue(sd_bus *bus) {
1149 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1151 while (bus->wqueue_size > 0) {
1154 r = bus_kernel_write_message(bus, bus->wqueue[0]);
1156 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1162 /* Didn't do anything this time */
1164 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1165 /* Fully written. Let's drop the entry from
1168 * This isn't particularly optimized, but
1169 * well, this is supposed to be our worst-case
1170 * buffer only, and the socket buffer is
1171 * supposed to be our primary buffer, and if
1172 * it got full, then all bets are off
1175 sd_bus_message_unref(bus->wqueue[0]);
1176 bus->wqueue_size --;
1177 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1187 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1188 sd_bus_message *z = NULL;
1193 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1195 if (bus->rqueue_size > 0) {
1196 /* Dispatch a queued message */
1198 *m = bus->rqueue[0];
1199 bus->rqueue_size --;
1200 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1204 /* Try to read a new message */
1207 r = bus_kernel_read_message(bus, &z);
1209 r = bus_socket_read_message(bus, &z);
1225 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1230 if (!BUS_IS_OPEN(bus->state))
1234 if (bus_pid_changed(bus))
1238 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1245 /* If the serial number isn't kept, then we know that no reply
1247 if (!serial && !m->sealed)
1248 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1250 r = bus_seal_message(bus, m);
1254 /* If this is a reply and no reply was requested, then let's
1255 * suppress this, if we can */
1256 if (m->dont_send && !serial)
1259 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1263 r = bus_kernel_write_message(bus, m);
1265 r = bus_socket_write_message(bus, m, &idx);
1270 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1271 /* Wasn't fully written. So let's remember how
1272 * much was written. Note that the first entry
1273 * of the wqueue array is always allocated so
1274 * that we always can remember how much was
1276 bus->wqueue[0] = sd_bus_message_ref(m);
1277 bus->wqueue_size = 1;
1283 /* Just append it to the queue. */
1285 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1288 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1293 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1297 *serial = BUS_MESSAGE_SERIAL(m);
1302 static usec_t calc_elapse(uint64_t usec) {
1303 if (usec == (uint64_t) -1)
1307 usec = BUS_DEFAULT_TIMEOUT;
1309 return now(CLOCK_MONOTONIC) + usec;
1312 static int timeout_compare(const void *a, const void *b) {
1313 const struct reply_callback *x = a, *y = b;
1315 if (x->timeout != 0 && y->timeout == 0)
1318 if (x->timeout == 0 && y->timeout != 0)
1321 if (x->timeout < y->timeout)
1324 if (x->timeout > y->timeout)
1330 int sd_bus_send_with_reply(
1333 sd_bus_message_handler_t callback,
1338 struct reply_callback *c;
1343 if (!BUS_IS_OPEN(bus->state))
1349 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1351 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1353 if (bus_pid_changed(bus))
1356 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1360 if (usec != (uint64_t) -1) {
1361 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1366 r = bus_seal_message(bus, m);
1370 c = new0(struct reply_callback, 1);
1374 c->callback = callback;
1375 c->userdata = userdata;
1376 c->serial = BUS_MESSAGE_SERIAL(m);
1377 c->timeout = calc_elapse(usec);
1379 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1385 if (c->timeout != 0) {
1386 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1389 sd_bus_send_with_reply_cancel(bus, c->serial);
1394 r = sd_bus_send(bus, m, serial);
1396 sd_bus_send_with_reply_cancel(bus, c->serial);
1403 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1404 struct reply_callback *c;
1410 if (bus_pid_changed(bus))
1413 c = hashmap_remove(bus->reply_callbacks, &serial);
1417 if (c->timeout != 0)
1418 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1424 int bus_ensure_running(sd_bus *bus) {
1429 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED)
1431 if (bus->state == BUS_RUNNING)
1435 r = sd_bus_process(bus, NULL);
1438 if (bus->state == BUS_RUNNING)
1443 r = sd_bus_wait(bus, (uint64_t) -1);
1449 int sd_bus_send_with_reply_and_block(
1453 sd_bus_error *error,
1454 sd_bus_message **reply) {
1463 if (!BUS_IS_OPEN(bus->state))
1467 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1469 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1471 if (bus_error_is_dirty(error))
1473 if (bus_pid_changed(bus))
1476 r = bus_ensure_running(bus);
1480 r = sd_bus_send(bus, m, &serial);
1484 timeout = calc_elapse(usec);
1488 sd_bus_message *incoming = NULL;
1493 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1496 /* Make sure there's room for queuing this
1497 * locally, before we read the message */
1499 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1508 r = bus_kernel_read_message(bus, &incoming);
1510 r = bus_socket_read_message(bus, &incoming);
1515 if (incoming->reply_serial == serial) {
1516 /* Found a match! */
1518 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1523 sd_bus_message_unref(incoming);
1528 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1531 r = sd_bus_error_copy(error, &incoming->error);
1533 sd_bus_message_unref(incoming);
1537 k = bus_error_to_errno(&incoming->error);
1538 sd_bus_message_unref(incoming);
1542 sd_bus_message_unref(incoming);
1546 /* There's already guaranteed to be room for
1547 * this, so need to resize things here */
1548 bus->rqueue[bus->rqueue_size ++] = incoming;
1551 /* Try to read more, right-away */
1560 n = now(CLOCK_MONOTONIC);
1566 left = (uint64_t) -1;
1568 r = bus_poll(bus, true, left);
1572 r = dispatch_wqueue(bus);
1578 int sd_bus_get_fd(sd_bus *bus) {
1581 if (!BUS_IS_OPEN(bus->state))
1583 if (bus->input_fd != bus->output_fd)
1585 if (bus_pid_changed(bus))
1588 return bus->input_fd;
1591 int sd_bus_get_events(sd_bus *bus) {
1596 if (!BUS_IS_OPEN(bus->state))
1598 if (bus_pid_changed(bus))
1601 if (bus->state == BUS_OPENING)
1603 else if (bus->state == BUS_AUTHENTICATING) {
1605 if (bus_socket_auth_needs_write(bus))
1610 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1611 if (bus->rqueue_size <= 0)
1613 if (bus->wqueue_size > 0)
1620 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1621 struct reply_callback *c;
1627 if (!BUS_IS_OPEN(bus->state))
1629 if (bus_pid_changed(bus))
1632 if (bus->state == BUS_AUTHENTICATING) {
1633 *timeout_usec = bus->auth_timeout;
1637 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1638 *timeout_usec = (uint64_t) -1;
1642 c = prioq_peek(bus->reply_callbacks_prioq);
1644 *timeout_usec = (uint64_t) -1;
1648 *timeout_usec = c->timeout;
1652 static int process_timeout(sd_bus *bus) {
1653 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1654 struct reply_callback *c;
1660 c = prioq_peek(bus->reply_callbacks_prioq);
1664 n = now(CLOCK_MONOTONIC);
1668 r = bus_message_new_synthetic_error(
1671 &SD_BUS_ERROR_MAKE("org.freedesktop.DBus.Error.Timeout", "Timed out"),
1676 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1677 hashmap_remove(bus->reply_callbacks, &c->serial);
1679 r = c->callback(bus, m, c->userdata);
1682 return r < 0 ? r : 1;
1685 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1689 if (bus->state != BUS_HELLO)
1692 /* Let's make sure the first message on the bus is the HELLO
1693 * reply. But note that we don't actually parse the message
1694 * here (we leave that to the usual handling), we just verify
1695 * we don't let any earlier msg through. */
1697 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1698 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1701 if (m->reply_serial != bus->hello_serial)
1707 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1708 struct reply_callback *c;
1714 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1715 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1718 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1722 if (c->timeout != 0)
1723 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1725 r = sd_bus_message_rewind(m, true);
1729 r = c->callback(bus, m, c->userdata);
1735 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1736 struct filter_callback *l;
1743 bus->filter_callbacks_modified = false;
1745 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1747 if (bus->filter_callbacks_modified)
1750 /* Don't run this more than once per iteration */
1751 if (l->last_iteration == bus->iteration_counter)
1754 l->last_iteration = bus->iteration_counter;
1756 r = sd_bus_message_rewind(m, true);
1760 r = l->callback(bus, m, l->userdata);
1766 } while (bus->filter_callbacks_modified);
1771 static int process_match(sd_bus *bus, sd_bus_message *m) {
1778 bus->match_callbacks_modified = false;
1780 r = bus_match_run(bus, &bus->match_callbacks, m);
1784 } while (bus->match_callbacks_modified);
1789 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1790 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1796 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1799 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1802 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1805 if (streq_ptr(m->member, "Ping"))
1806 r = sd_bus_message_new_method_return(bus, m, &reply);
1807 else if (streq_ptr(m->member, "GetMachineId")) {
1811 r = sd_id128_get_machine(&id);
1815 r = sd_bus_message_new_method_return(bus, m, &reply);
1819 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1821 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1823 sd_bus_error_set(&error,
1824 "org.freedesktop.DBus.Error.UnknownMethod",
1825 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1827 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1833 r = sd_bus_send(bus, reply, NULL);
1840 static int process_object(sd_bus *bus, sd_bus_message *m) {
1841 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1842 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1843 struct object_callback *c;
1851 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1854 if (hashmap_isempty(bus->object_callbacks))
1857 pl = strlen(m->path);
1862 bus->object_callbacks_modified = false;
1864 c = hashmap_get(bus->object_callbacks, m->path);
1865 if (c && c->last_iteration != bus->iteration_counter) {
1867 c->last_iteration = bus->iteration_counter;
1869 r = sd_bus_message_rewind(m, true);
1873 r = c->callback(bus, m, c->userdata);
1880 /* Look for fallback prefixes */
1885 if (bus->object_callbacks_modified)
1888 e = strrchr(p, '/');
1894 c = hashmap_get(bus->object_callbacks, p);
1895 if (c && c->last_iteration != bus->iteration_counter && c->is_fallback) {
1897 c->last_iteration = bus->iteration_counter;
1899 r = sd_bus_message_rewind(m, true);
1903 r = c->callback(bus, m, c->userdata);
1911 } while (bus->object_callbacks_modified);
1913 /* We found some handlers but none wanted to take this, then
1914 * return this -- with one exception, we can handle
1915 * introspection minimally ourselves */
1916 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1919 sd_bus_error_set(&error,
1920 "org.freedesktop.DBus.Error.UnknownMethod",
1921 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1923 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1927 r = sd_bus_send(bus, reply, NULL);
1934 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1935 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1936 _cleanup_free_ char *introspection = NULL;
1937 _cleanup_set_free_free_ Set *s = NULL;
1938 _cleanup_fclose_ FILE *f = NULL;
1939 struct object_callback *c;
1948 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1954 s = set_new(string_hash_func, string_compare_func);
1958 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1962 if (streq(c->path, "/"))
1965 if (streq(m->path, "/"))
1968 e = startswith(c->path, m->path);
1969 if (!e || *e != '/')
1981 r = set_consume(s, a);
1982 if (r < 0 && r != -EEXIST)
1986 f = open_memstream(&introspection, &size);
1990 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1991 fputs("<node>\n", f);
1992 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1993 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1995 while ((node = set_steal_first(s))) {
1996 fprintf(f, " <node name=\"%s\"/>\n", node);
2000 fputs("</node>\n", f);
2007 r = sd_bus_message_new_method_return(bus, m, &reply);
2011 r = sd_bus_message_append(reply, "s", introspection);
2015 r = sd_bus_send(bus, reply, NULL);
2022 static int process_message(sd_bus *bus, sd_bus_message *m) {
2028 bus->iteration_counter++;
2030 r = process_hello(bus, m);
2034 r = process_reply(bus, m);
2038 r = process_filter(bus, m);
2042 r = process_match(bus, m);
2046 r = process_builtin(bus, m);
2050 r = process_object(bus, m);
2054 return process_introspect(bus, m);
2057 static int process_running(sd_bus *bus, sd_bus_message **ret) {
2058 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2062 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2064 r = process_timeout(bus);
2068 r = dispatch_wqueue(bus);
2072 r = dispatch_rqueue(bus, &m);
2078 r = process_message(bus, m);
2083 r = sd_bus_message_rewind(m, true);
2092 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
2093 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
2094 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
2096 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
2098 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
2102 r = sd_bus_send(bus, reply, NULL);
2116 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2119 /* Returns 0 when we didn't do anything. This should cause the
2120 * caller to invoke sd_bus_wait() before returning the next
2121 * time. Returns > 0 when we did something, which possibly
2122 * means *ret is filled in with an unprocessed message. */
2126 if (bus_pid_changed(bus))
2129 /* We don't allow recursively invoking sd_bus_process(). */
2130 if (bus->processing)
2133 switch (bus->state) {
2140 r = bus_socket_process_opening(bus);
2147 case BUS_AUTHENTICATING:
2149 r = bus_socket_process_authenticating(bus);
2159 bus->processing = true;
2160 r = process_running(bus, ret);
2161 bus->processing = false;
2166 assert_not_reached("Unknown state");
2169 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2170 struct pollfd p[2] = {};
2177 if (!BUS_IS_OPEN(bus->state))
2180 e = sd_bus_get_events(bus);
2187 r = sd_bus_get_timeout(bus, &until);
2194 nw = now(CLOCK_MONOTONIC);
2195 m = until > nw ? until - nw : 0;
2198 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2201 p[0].fd = bus->input_fd;
2202 if (bus->output_fd == bus->input_fd) {
2206 p[0].events = e & POLLIN;
2207 p[1].fd = bus->output_fd;
2208 p[1].events = e & POLLOUT;
2212 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2216 return r > 0 ? 1 : 0;
2219 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2223 if (!BUS_IS_OPEN(bus->state))
2225 if (bus_pid_changed(bus))
2228 if (bus->rqueue_size > 0)
2231 return bus_poll(bus, false, timeout_usec);
2234 int sd_bus_flush(sd_bus *bus) {
2239 if (!BUS_IS_OPEN(bus->state))
2241 if (bus_pid_changed(bus))
2244 r = bus_ensure_running(bus);
2248 if (bus->wqueue_size <= 0)
2252 r = dispatch_wqueue(bus);
2256 if (bus->wqueue_size <= 0)
2259 r = bus_poll(bus, false, (uint64_t) -1);
2265 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2266 struct filter_callback *f;
2272 if (bus_pid_changed(bus))
2275 f = new0(struct filter_callback, 1);
2278 f->callback = callback;
2279 f->userdata = userdata;
2281 bus->filter_callbacks_modified = true;
2282 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2286 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2287 struct filter_callback *f;
2293 if (bus_pid_changed(bus))
2296 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2297 if (f->callback == callback && f->userdata == userdata) {
2298 bus->filter_callbacks_modified = true;
2299 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2308 static int bus_add_object(
2312 sd_bus_message_handler_t callback,
2315 struct object_callback *c;
2324 if (bus_pid_changed(bus))
2327 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2331 c = new0(struct object_callback, 1);
2335 c->path = strdup(path);
2341 c->callback = callback;
2342 c->userdata = userdata;
2343 c->is_fallback = fallback;
2345 bus->object_callbacks_modified = true;
2346 r = hashmap_put(bus->object_callbacks, c->path, c);
2356 static int bus_remove_object(
2360 sd_bus_message_handler_t callback,
2363 struct object_callback *c;
2371 if (bus_pid_changed(bus))
2374 c = hashmap_get(bus->object_callbacks, path);
2378 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2381 bus->object_callbacks_modified = true;
2382 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2390 int sd_bus_add_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2391 return bus_add_object(bus, false, path, callback, userdata);
2394 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2395 return bus_remove_object(bus, false, path, callback, userdata);
2398 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2399 return bus_add_object(bus, true, prefix, callback, userdata);
2402 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2403 return bus_remove_object(bus, true, prefix, callback, userdata);
2406 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2407 struct bus_match_component *components = NULL;
2408 unsigned n_components = 0;
2409 uint64_t cookie = 0;
2416 if (bus_pid_changed(bus))
2419 r = bus_match_parse(match, &components, &n_components);
2423 if (bus->bus_client) {
2424 cookie = ++bus->match_cookie;
2426 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2431 bus->match_callbacks_modified = true;
2432 r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2434 if (bus->bus_client)
2435 bus_remove_match_internal(bus, match, cookie);
2439 bus_match_parse_free(components, n_components);
2443 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2444 struct bus_match_component *components = NULL;
2445 unsigned n_components = 0;
2447 uint64_t cookie = 0;
2453 if (bus_pid_changed(bus))
2456 r = bus_match_parse(match, &components, &n_components);
2460 bus->match_callbacks_modified = true;
2461 r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2463 if (bus->bus_client)
2464 q = bus_remove_match_internal(bus, match, cookie);
2466 bus_match_parse_free(components, n_components);
2468 return r < 0 ? r : q;
2471 int sd_bus_emit_signal(
2474 const char *interface,
2476 const char *types, ...) {
2478 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2484 if (!BUS_IS_OPEN(bus->state))
2486 if (bus_pid_changed(bus))
2489 r = sd_bus_message_new_signal(bus, path, interface, member, &m);
2493 va_start(ap, types);
2494 r = bus_message_append_ap(m, types, ap);
2499 return sd_bus_send(bus, m, NULL);
2502 int sd_bus_call_method(
2504 const char *destination,
2506 const char *interface,
2508 sd_bus_error *error,
2509 sd_bus_message **reply,
2510 const char *types, ...) {
2512 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2519 if (!BUS_IS_OPEN(bus->state))
2521 if (bus_pid_changed(bus))
2524 r = sd_bus_message_new_method_call(bus, destination, path, interface, member, &m);
2528 va_start(ap, types);
2529 r = bus_message_append_ap(m, types, ap);
2534 return sd_bus_send_with_reply_and_block(bus, m, 0, error, reply);
2537 int sd_bus_reply_method_return(
2539 sd_bus_message *call,
2540 const char *types, ...) {
2542 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2552 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2554 if (!BUS_IS_OPEN(bus->state))
2556 if (bus_pid_changed(bus))
2559 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2562 r = sd_bus_message_new_method_return(bus, call, &m);
2566 va_start(ap, types);
2567 r = bus_message_append_ap(m, types, ap);
2572 return sd_bus_send(bus, m, NULL);
2575 int sd_bus_reply_method_error(
2577 sd_bus_message *call,
2578 const sd_bus_error *e) {
2580 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2589 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2591 if (!sd_bus_error_is_set(e))
2593 if (!BUS_IS_OPEN(bus->state))
2595 if (bus_pid_changed(bus))
2598 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2601 r = sd_bus_message_new_method_error(bus, call, e, &m);
2605 return sd_bus_send(bus, m, NULL);
2608 bool bus_pid_changed(sd_bus *bus) {
2611 /* We don't support people creating a bus connection and
2612 * keeping it around over a fork(). Let's complain. */
2614 return bus->original_pid != getpid();