1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
38 #include "cgroup-util.h"
39 #include "bus-label.h"
42 #include "bus-internal.h"
43 #include "bus-message.h"
45 #include "bus-socket.h"
46 #include "bus-kernel.h"
47 #include "bus-control.h"
48 #include "bus-introspect.h"
49 #include "bus-signature.h"
50 #include "bus-objects.h"
52 #include "bus-container.h"
53 #include "bus-protocol.h"
54 #include "bus-track.h"
57 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
58 static int attach_io_events(sd_bus *b);
59 static void detach_io_events(sd_bus *b);
61 static void bus_close_fds(sd_bus *b) {
67 safe_close(b->input_fd);
69 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
70 safe_close(b->output_fd);
72 b->input_fd = b->output_fd = -1;
75 static void bus_reset_queues(sd_bus *b) {
78 /* NOTE: We _must_ decrement b->Xqueue_size before calling
79 * sd_bus_message_unref() for _each_ message. Otherwise the
80 * self-reference checks in sd_bus_unref() will fire for each message.
81 * We would thus recurse into sd_bus_message_unref() and trigger the
82 * assert(m->n_ref > 0) */
84 while (b->rqueue_size > 0)
85 sd_bus_message_unref(b->rqueue[--b->rqueue_size]);
89 b->rqueue_allocated = 0;
91 while (b->wqueue_size > 0)
92 sd_bus_message_unref(b->wqueue[--b->wqueue_size]);
96 b->wqueue_allocated = 0;
99 static void bus_free(sd_bus *b) {
103 assert(!b->track_queue);
105 b->state = BUS_CLOSED;
107 sd_bus_detach_event(b);
109 while ((s = b->slots)) {
110 /* At this point only floating slots can still be
111 * around, because the non-floating ones keep a
112 * reference to the bus, and we thus couldn't be
113 * destructing right now... We forcibly disconnect the
114 * slots here, so that they still can be referenced by
115 * apps, but are dead. */
118 bus_slot_disconnect(s);
119 sd_bus_slot_unref(s);
122 if (b->default_bus_ptr)
123 *b->default_bus_ptr = NULL;
128 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
131 free(b->unique_name);
132 free(b->auth_buffer);
137 free(b->cgroup_root);
138 free(b->connection_name);
141 strv_free(b->exec_argv);
143 close_many(b->fds, b->n_fds);
148 hashmap_free_free(b->reply_callbacks);
149 prioq_free(b->reply_callbacks_prioq);
151 bus_match_free(&b->match_callbacks);
153 hashmap_free_free(b->vtable_methods);
154 hashmap_free_free(b->vtable_properties);
156 assert(hashmap_isempty(b->nodes));
157 hashmap_free(b->nodes);
159 bus_kernel_flush_memfd(b);
161 assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
166 _public_ int sd_bus_new(sd_bus **ret) {
169 assert_return(ret, -EINVAL);
175 r->n_ref = REFCNT_INIT;
176 r->input_fd = r->output_fd = -1;
177 r->message_version = 1;
178 r->creds_mask |= SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
179 r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
180 r->attach_flags |= KDBUS_ATTACH_NAMES;
181 r->original_pid = getpid();
183 assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
185 /* We guarantee that wqueue always has space for at least one
187 if (!GREEDY_REALLOC(r->wqueue, r->wqueue_allocated, 1)) {
196 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
199 assert_return(bus, -EINVAL);
200 assert_return(bus->state == BUS_UNSET, -EPERM);
201 assert_return(address, -EINVAL);
202 assert_return(!bus_pid_changed(bus), -ECHILD);
214 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
215 assert_return(bus, -EINVAL);
216 assert_return(bus->state == BUS_UNSET, -EPERM);
217 assert_return(input_fd >= 0, -EINVAL);
218 assert_return(output_fd >= 0, -EINVAL);
219 assert_return(!bus_pid_changed(bus), -ECHILD);
221 bus->input_fd = input_fd;
222 bus->output_fd = output_fd;
226 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
229 assert_return(bus, -EINVAL);
230 assert_return(bus->state == BUS_UNSET, -EPERM);
231 assert_return(path, -EINVAL);
232 assert_return(!strv_isempty(argv), -EINVAL);
233 assert_return(!bus_pid_changed(bus), -ECHILD);
245 free(bus->exec_path);
246 strv_free(bus->exec_argv);
254 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
255 assert_return(bus, -EINVAL);
256 assert_return(bus->state == BUS_UNSET, -EPERM);
257 assert_return(!bus_pid_changed(bus), -ECHILD);
259 bus->bus_client = !!b;
263 _public_ int sd_bus_set_monitor(sd_bus *bus, int b) {
264 assert_return(bus, -EINVAL);
265 assert_return(bus->state == BUS_UNSET, -EPERM);
266 assert_return(!bus_pid_changed(bus), -ECHILD);
268 SET_FLAG(bus->hello_flags, KDBUS_HELLO_MONITOR, b);
272 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
273 assert_return(bus, -EINVAL);
274 assert_return(bus->state == BUS_UNSET, -EPERM);
275 assert_return(!bus_pid_changed(bus), -ECHILD);
277 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
281 _public_ int sd_bus_negotiate_timestamp(sd_bus *bus, int b) {
282 assert_return(bus, -EINVAL);
283 assert_return(bus->state == BUS_UNSET, -EPERM);
284 assert_return(!bus_pid_changed(bus), -ECHILD);
286 SET_FLAG(bus->attach_flags, KDBUS_ATTACH_TIMESTAMP, b);
290 _public_ int sd_bus_negotiate_creds(sd_bus *bus, uint64_t mask) {
291 assert_return(bus, -EINVAL);
292 assert_return(mask <= _SD_BUS_CREDS_ALL, -EINVAL);
293 assert_return(bus->state == BUS_UNSET, -EPERM);
294 assert_return(!bus_pid_changed(bus), -ECHILD);
296 /* The well knowns we need unconditionally, so that matches can work */
297 bus->creds_mask = mask | SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
299 return kdbus_translate_attach_flags(bus->creds_mask, &bus->attach_flags);
302 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
303 assert_return(bus, -EINVAL);
304 assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
305 assert_return(bus->state == BUS_UNSET, -EPERM);
306 assert_return(!bus_pid_changed(bus), -ECHILD);
308 bus->is_server = !!b;
309 bus->server_id = server_id;
313 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
314 assert_return(bus, -EINVAL);
315 assert_return(bus->state == BUS_UNSET, -EPERM);
316 assert_return(!bus_pid_changed(bus), -ECHILD);
318 bus->anonymous_auth = !!b;
322 _public_ int sd_bus_set_trusted(sd_bus *bus, int b) {
323 assert_return(bus, -EINVAL);
324 assert_return(bus->state == BUS_UNSET, -EPERM);
325 assert_return(!bus_pid_changed(bus), -ECHILD);
331 _public_ int sd_bus_set_name(sd_bus *bus, const char *name) {
334 assert_return(bus, -EINVAL);
335 assert_return(name, -EINVAL);
336 assert_return(bus->state == BUS_UNSET, -EPERM);
337 assert_return(!bus_pid_changed(bus), -ECHILD);
343 free(bus->connection_name);
344 bus->connection_name = n;
349 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata, sd_bus_error *error) {
354 assert(bus->state == BUS_HELLO || bus->state == BUS_CLOSING);
357 r = sd_bus_message_get_errno(reply);
363 r = sd_bus_message_read(reply, "s", &s);
367 if (!service_name_is_valid(s) || s[0] != ':')
370 bus->unique_name = strdup(s);
371 if (!bus->unique_name)
374 if (bus->state == BUS_HELLO)
375 bus->state = BUS_RUNNING;
380 static int bus_send_hello(sd_bus *bus) {
381 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
386 if (!bus->bus_client || bus->is_kernel)
389 r = sd_bus_message_new_method_call(
392 "org.freedesktop.DBus",
393 "/org/freedesktop/DBus",
394 "org.freedesktop.DBus",
399 return sd_bus_call_async(bus, NULL, m, hello_callback, NULL, 0);
402 int bus_start_running(sd_bus *bus) {
405 if (bus->bus_client && !bus->is_kernel) {
406 bus->state = BUS_HELLO;
410 bus->state = BUS_RUNNING;
414 static int parse_address_key(const char **p, const char *key, char **value) {
415 size_t l, n = 0, allocated = 0;
425 if (strncmp(*p, key, l) != 0)
438 while (*a != ';' && *a != ',' && *a != 0) {
456 c = (char) ((x << 4) | y);
463 if (!GREEDY_REALLOC(r, allocated, n + 2))
487 static void skip_address_key(const char **p) {
491 *p += strcspn(*p, ",");
497 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
498 _cleanup_free_ char *path = NULL, *abstract = NULL;
507 while (**p != 0 && **p != ';') {
508 r = parse_address_key(p, "guid", guid);
514 r = parse_address_key(p, "path", &path);
520 r = parse_address_key(p, "abstract", &abstract);
529 if (!path && !abstract)
532 if (path && abstract)
537 if (l > sizeof(b->sockaddr.un.sun_path))
540 b->sockaddr.un.sun_family = AF_UNIX;
541 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
542 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
543 } else if (abstract) {
544 l = strlen(abstract);
545 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
548 b->sockaddr.un.sun_family = AF_UNIX;
549 b->sockaddr.un.sun_path[0] = 0;
550 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
551 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
557 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
558 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
560 struct addrinfo *result, hints = {
561 .ai_socktype = SOCK_STREAM,
562 .ai_flags = AI_ADDRCONFIG,
570 while (**p != 0 && **p != ';') {
571 r = parse_address_key(p, "guid", guid);
577 r = parse_address_key(p, "host", &host);
583 r = parse_address_key(p, "port", &port);
589 r = parse_address_key(p, "family", &family);
602 if (streq(family, "ipv4"))
603 hints.ai_family = AF_INET;
604 else if (streq(family, "ipv6"))
605 hints.ai_family = AF_INET6;
610 r = getaddrinfo(host, port, &hints, &result);
614 return -EADDRNOTAVAIL;
616 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
617 b->sockaddr_size = result->ai_addrlen;
619 freeaddrinfo(result);
624 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
626 unsigned n_argv = 0, j;
628 size_t allocated = 0;
636 while (**p != 0 && **p != ';') {
637 r = parse_address_key(p, "guid", guid);
643 r = parse_address_key(p, "path", &path);
649 if (startswith(*p, "argv")) {
653 ul = strtoul(*p + 4, (char**) p, 10);
654 if (errno > 0 || **p != '=' || ul > 256) {
662 if (!GREEDY_REALLOC0(argv, allocated, ul + 2)) {
670 r = parse_address_key(p, NULL, argv + ul);
685 /* Make sure there are no holes in the array, with the
686 * exception of argv[0] */
687 for (j = 1; j < n_argv; j++)
693 if (argv && argv[0] == NULL) {
694 argv[0] = strdup(path);
706 for (j = 0; j < n_argv; j++)
714 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
715 _cleanup_free_ char *path = NULL;
723 while (**p != 0 && **p != ';') {
724 r = parse_address_key(p, "guid", guid);
730 r = parse_address_key(p, "path", &path);
749 static int parse_container_unix_address(sd_bus *b, const char **p, char **guid) {
750 _cleanup_free_ char *machine = NULL;
758 while (**p != 0 && **p != ';') {
759 r = parse_address_key(p, "guid", guid);
765 r = parse_address_key(p, "machine", &machine);
777 if (!filename_is_safe(machine))
781 b->machine = machine;
784 b->sockaddr.un.sun_family = AF_UNIX;
785 strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
786 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + strlen("/var/run/dbus/system_bus_socket");
791 static int parse_container_kernel_address(sd_bus *b, const char **p, char **guid) {
792 _cleanup_free_ char *machine = NULL;
800 while (**p != 0 && **p != ';') {
801 r = parse_address_key(p, "guid", guid);
807 r = parse_address_key(p, "machine", &machine);
819 if (!filename_is_safe(machine))
823 b->machine = machine;
827 b->kernel = strdup("/dev/kdbus/0-system/bus");
834 static void bus_reset_parsed_address(sd_bus *b) {
838 b->sockaddr_size = 0;
839 strv_free(b->exec_argv);
843 b->server_id = SD_ID128_NULL;
850 static int bus_parse_next_address(sd_bus *b) {
851 _cleanup_free_ char *guid = NULL;
859 if (b->address[b->address_index] == 0)
862 bus_reset_parsed_address(b);
864 a = b->address + b->address_index;
873 if (startswith(a, "unix:")) {
876 r = parse_unix_address(b, &a, &guid);
881 } else if (startswith(a, "tcp:")) {
884 r = parse_tcp_address(b, &a, &guid);
890 } else if (startswith(a, "unixexec:")) {
893 r = parse_exec_address(b, &a, &guid);
899 } else if (startswith(a, "kernel:")) {
902 r = parse_kernel_address(b, &a, &guid);
907 } else if (startswith(a, "x-container-unix:")) {
910 r = parse_container_unix_address(b, &a, &guid);
915 } else if (startswith(a, "x-container-kernel:")) {
918 r = parse_container_kernel_address(b, &a, &guid);
931 r = sd_id128_from_string(guid, &b->server_id);
936 b->address_index = a - b->address;
940 static int bus_start_address(sd_bus *b) {
946 bool skipped = false;
951 r = bus_socket_exec(b);
952 else if (b->machine && b->kernel)
953 r = bus_container_connect_kernel(b);
954 else if (b->machine && b->sockaddr.sa.sa_family != AF_UNSPEC)
955 r = bus_container_connect_socket(b);
957 r = bus_kernel_connect(b);
958 else if (b->sockaddr.sa.sa_family != AF_UNSPEC)
959 r = bus_socket_connect(b);
965 r = attach_io_events(b);
970 b->last_connect_error = -r;
973 r = bus_parse_next_address(b);
977 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
981 int bus_next_address(sd_bus *b) {
984 bus_reset_parsed_address(b);
985 return bus_start_address(b);
988 static int bus_start_fd(sd_bus *b) {
993 assert(b->input_fd >= 0);
994 assert(b->output_fd >= 0);
996 r = fd_nonblock(b->input_fd, true);
1000 r = fd_cloexec(b->input_fd, true);
1004 if (b->input_fd != b->output_fd) {
1005 r = fd_nonblock(b->output_fd, true);
1009 r = fd_cloexec(b->output_fd, true);
1014 if (fstat(b->input_fd, &st) < 0)
1017 if (S_ISCHR(b->input_fd))
1018 return bus_kernel_take_fd(b);
1020 return bus_socket_take_fd(b);
1023 _public_ int sd_bus_start(sd_bus *bus) {
1026 assert_return(bus, -EINVAL);
1027 assert_return(bus->state == BUS_UNSET, -EPERM);
1028 assert_return(!bus_pid_changed(bus), -ECHILD);
1030 bus->state = BUS_OPENING;
1032 if (bus->is_server && bus->bus_client)
1035 if (bus->input_fd >= 0)
1036 r = bus_start_fd(bus);
1037 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
1038 r = bus_start_address(bus);
1045 return bus_send_hello(bus);
1048 _public_ int sd_bus_open(sd_bus **ret) {
1053 assert_return(ret, -EINVAL);
1055 /* Let's connect to the starter bus if it is set, and
1056 * otherwise to the bus that is appropropriate for the scope
1057 * we are running in */
1059 e = secure_getenv("DBUS_STARTER_BUS_TYPE");
1061 if (streq(e, "system"))
1062 return sd_bus_open_system(ret);
1063 else if (STR_IN_SET(e, "session", "user"))
1064 return sd_bus_open_user(ret);
1067 e = secure_getenv("DBUS_STARTER_ADDRESS");
1069 if (cg_pid_get_owner_uid(0, NULL) >= 0)
1070 return sd_bus_open_user(ret);
1072 return sd_bus_open_system(ret);
1079 r = sd_bus_set_address(b, e);
1083 b->bus_client = true;
1085 /* We don't know whether the bus is trusted or not, so better
1086 * be safe, and authenticate everything */
1088 b->attach_flags |= KDBUS_ATTACH_CAPS | KDBUS_ATTACH_CREDS;
1090 r = sd_bus_start(b);
1102 int bus_set_address_system(sd_bus *b) {
1106 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1108 return sd_bus_set_address(b, e);
1110 return sd_bus_set_address(b, DEFAULT_SYSTEM_BUS_PATH);
1113 _public_ int sd_bus_open_system(sd_bus **ret) {
1117 assert_return(ret, -EINVAL);
1123 r = bus_set_address_system(b);
1127 b->bus_client = true;
1128 b->is_system = true;
1130 /* Let's do per-method access control on the system bus. We
1131 * need the caller's UID and capability set for that. */
1133 b->attach_flags |= KDBUS_ATTACH_CAPS | KDBUS_ATTACH_CREDS;
1135 r = sd_bus_start(b);
1147 int bus_set_address_user(sd_bus *b) {
1152 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1154 return sd_bus_set_address(b, e);
1156 e = secure_getenv("XDG_RUNTIME_DIR");
1158 _cleanup_free_ char *ee = NULL;
1160 ee = bus_address_escape(e);
1165 asprintf(&b->address, KERNEL_USER_BUS_FMT ";" UNIX_USER_BUS_FMT, getuid(), ee);
1167 asprintf(&b->address, UNIX_USER_BUS_FMT, ee);
1171 asprintf(&b->address, KERNEL_USER_BUS_FMT, getuid());
1173 return -ECONNREFUSED;
1183 _public_ int sd_bus_open_user(sd_bus **ret) {
1187 assert_return(ret, -EINVAL);
1193 r = bus_set_address_user(b);
1197 b->bus_client = true;
1200 /* We don't do any per-method access control on the user
1204 r = sd_bus_start(b);
1216 int bus_set_address_system_remote(sd_bus *b, const char *host) {
1217 _cleanup_free_ char *e = NULL;
1222 e = bus_address_escape(host);
1226 b->address = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1233 _public_ int sd_bus_open_system_remote(sd_bus **ret, const char *host) {
1237 assert_return(host, -EINVAL);
1238 assert_return(ret, -EINVAL);
1240 r = sd_bus_new(&bus);
1244 r = bus_set_address_system_remote(bus, host);
1248 bus->bus_client = true;
1249 bus->trusted = false;
1251 r = sd_bus_start(bus);
1263 int bus_set_address_system_container(sd_bus *b, const char *machine) {
1264 _cleanup_free_ char *e = NULL;
1269 e = bus_address_escape(machine);
1274 b->address = strjoin("x-container-kernel:machine=", e, ";x-container-unix:machine=", e, NULL);
1276 b->address = strjoin("x-container-unix:machine=", e, NULL);
1284 _public_ int sd_bus_open_system_container(sd_bus **ret, const char *machine) {
1288 assert_return(machine, -EINVAL);
1289 assert_return(ret, -EINVAL);
1290 assert_return(filename_is_safe(machine), -EINVAL);
1292 r = sd_bus_new(&bus);
1296 r = bus_set_address_system_container(bus, machine);
1300 bus->bus_client = true;
1301 bus->trusted = false;
1303 r = sd_bus_start(bus);
1315 _public_ void sd_bus_close(sd_bus *bus) {
1319 if (bus->state == BUS_CLOSED)
1321 if (bus_pid_changed(bus))
1324 bus->state = BUS_CLOSED;
1326 sd_bus_detach_event(bus);
1328 /* Drop all queued messages so that they drop references to
1329 * the bus object and the bus may be freed */
1330 bus_reset_queues(bus);
1332 if (!bus->is_kernel)
1335 /* We'll leave the fd open in case this is a kernel bus, since
1336 * there might still be memblocks around that reference this
1337 * bus, and they might need to invoke the KDBUS_CMD_FREE
1338 * ioctl on the fd when they are freed. */
1341 static void bus_enter_closing(sd_bus *bus) {
1344 if (bus->state != BUS_OPENING &&
1345 bus->state != BUS_AUTHENTICATING &&
1346 bus->state != BUS_HELLO &&
1347 bus->state != BUS_RUNNING)
1350 bus->state = BUS_CLOSING;
1353 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1354 assert_return(bus, NULL);
1356 assert_se(REFCNT_INC(bus->n_ref) >= 2);
1361 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1367 /* TODO/FIXME: It's naive to think REFCNT_GET() is thread-safe in any
1368 * way but exclusive REFCNT_DEC(). The current logic _must_ lock around
1369 * REFCNT_GET() until REFCNT_DEC() or two threads might end up in
1370 * parallel in bus_reset_queues(). But locking would totally break the
1371 * recursion we introduce by bus_reset_queues()...
1372 * (Imagine one thread in sd_bus_message_unref() setting n_ref to 0 and
1373 * thus calling into sd_bus_unref(). If at the same time the real
1374 * thread calls sd_bus_unref(), both end up with "q == true" and will
1375 * call into bus_reset_queues().
1376 * If we require the main bus to be alive until all dispatch threads
1377 * are done, there is no need to do ref-counts at all. So in both ways,
1378 * the REFCNT thing is humbug.)
1380 * On a second note: messages are *not* required to have ->bus set nor
1381 * does it have to be _this_ bus that they're assigned to. This whole
1382 * ref-cnt checking breaks apart if a message is not assigned to us.
1383 * (which is _very_ easy to trigger with the current API). */
1385 if (REFCNT_GET(bus->n_ref) == bus->rqueue_size + bus->wqueue_size + 1) {
1388 for (i = 0; i < bus->rqueue_size; i++)
1389 if (bus->rqueue[i]->n_ref > 1) {
1395 for (i = 0; i < bus->wqueue_size; i++)
1396 if (bus->wqueue[i]->n_ref > 1) {
1402 /* We are the only holders on the messages, and the
1403 * messages are the only holders on us, so let's drop
1404 * the messages and thus implicitly also kill our own
1406 * bus_reset_queues() decrements the queue-size before
1407 * calling into sd_bus_message_unref(). Thus, it
1408 * protects us from recursion. */
1411 bus_reset_queues(bus);
1414 i = REFCNT_DEC(bus->n_ref);
1422 _public_ int sd_bus_is_open(sd_bus *bus) {
1424 assert_return(bus, -EINVAL);
1425 assert_return(!bus_pid_changed(bus), -ECHILD);
1427 return BUS_IS_OPEN(bus->state);
1430 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1433 assert_return(bus, -EINVAL);
1434 assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1435 assert_return(!bus_pid_changed(bus), -ECHILD);
1437 if (bus->hello_flags & KDBUS_HELLO_MONITOR)
1440 if (type == SD_BUS_TYPE_UNIX_FD) {
1441 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1444 r = bus_ensure_running(bus);
1448 return bus->can_fds;
1451 return bus_type_is_valid(type);
1454 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1457 assert_return(bus, -EINVAL);
1458 assert_return(server_id, -EINVAL);
1459 assert_return(!bus_pid_changed(bus), -ECHILD);
1461 r = bus_ensure_running(bus);
1465 *server_id = bus->server_id;
1469 static int bus_seal_message(sd_bus *b, sd_bus_message *m, usec_t timeout) {
1474 /* If we copy the same message to multiple
1475 * destinations, avoid using the same cookie
1477 b->cookie = MAX(b->cookie, BUS_MESSAGE_COOKIE(m));
1482 timeout = BUS_DEFAULT_TIMEOUT;
1484 return bus_message_seal(m, ++b->cookie, timeout);
1487 static int bus_remarshal_message(sd_bus *b, sd_bus_message **m) {
1490 /* Do packet version and endianness already match? */
1491 if ((b->message_version == 0 || b->message_version == (*m)->header->version) &&
1492 (b->message_endian == 0 || b->message_endian == (*m)->header->endian))
1495 /* No? Then remarshal! */
1496 return bus_message_remarshal(b, m);
1499 int bus_seal_synthetic_message(sd_bus *b, sd_bus_message *m) {
1503 /* The bus specification says the serial number cannot be 0,
1504 * hence let's fill something in for synthetic messages. Since
1505 * synthetic messages might have a fake sender and we don't
1506 * want to interfere with the real sender's serial numbers we
1507 * pick a fixed, artificial one. We use (uint32_t) -1 rather
1508 * than (uint64_t) -1 since dbus1 only had 32bit identifiers,
1509 * even though kdbus can do 64bit. */
1511 return bus_message_seal(m, 0xFFFFFFFFULL, 0);
1514 static int bus_write_message(sd_bus *bus, sd_bus_message *m, bool hint_sync_call, size_t *idx) {
1521 r = bus_kernel_write_message(bus, m, hint_sync_call);
1523 r = bus_socket_write_message(bus, m, idx);
1528 if (bus->is_kernel || *idx >= BUS_MESSAGE_SIZE(m))
1529 log_debug("Sent message type=%s sender=%s destination=%s object=%s interface=%s member=%s cookie=%" PRIu64 " reply_cookie=%" PRIu64 " error=%s",
1530 bus_message_type_to_string(m->header->type),
1531 strna(sd_bus_message_get_sender(m)),
1532 strna(sd_bus_message_get_destination(m)),
1533 strna(sd_bus_message_get_path(m)),
1534 strna(sd_bus_message_get_interface(m)),
1535 strna(sd_bus_message_get_member(m)),
1536 BUS_MESSAGE_COOKIE(m),
1538 strna(m->error.message));
1543 static int dispatch_wqueue(sd_bus *bus) {
1547 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1549 while (bus->wqueue_size > 0) {
1551 r = bus_write_message(bus, bus->wqueue[0], false, &bus->windex);
1555 /* Didn't do anything this time */
1557 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1558 /* Fully written. Let's drop the entry from
1561 * This isn't particularly optimized, but
1562 * well, this is supposed to be our worst-case
1563 * buffer only, and the socket buffer is
1564 * supposed to be our primary buffer, and if
1565 * it got full, then all bets are off
1568 bus->wqueue_size --;
1569 sd_bus_message_unref(bus->wqueue[0]);
1570 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1580 static int bus_read_message(sd_bus *bus, bool hint_priority, int64_t priority) {
1584 return bus_kernel_read_message(bus, hint_priority, priority);
1586 return bus_socket_read_message(bus);
1589 int bus_rqueue_make_room(sd_bus *bus) {
1592 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1595 if (!GREEDY_REALLOC(bus->rqueue, bus->rqueue_allocated, bus->rqueue_size + 1))
1601 static int dispatch_rqueue(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **m) {
1606 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1608 /* Note that the priority logic is only available on kdbus,
1609 * where the rqueue is unused. We check the rqueue here
1610 * anyway, because it's simple... */
1613 if (bus->rqueue_size > 0) {
1614 /* Dispatch a queued message */
1616 *m = bus->rqueue[0];
1617 bus->rqueue_size --;
1618 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1622 /* Try to read a new message */
1623 r = bus_read_message(bus, hint_priority, priority);
1633 static int bus_send_internal(sd_bus *bus, sd_bus_message *_m, uint64_t *cookie, bool hint_sync_call) {
1634 _cleanup_bus_message_unref_ sd_bus_message *m = sd_bus_message_ref(_m);
1637 assert_return(bus, -EINVAL);
1638 assert_return(m, -EINVAL);
1639 assert_return(!bus_pid_changed(bus), -ECHILD);
1640 assert_return(!bus->is_kernel || !(bus->hello_flags & KDBUS_HELLO_MONITOR), -EROFS);
1642 if (!BUS_IS_OPEN(bus->state))
1646 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1653 /* If the cookie number isn't kept, then we know that no reply
1655 if (!cookie && !m->sealed)
1656 m->header->flags |= BUS_MESSAGE_NO_REPLY_EXPECTED;
1658 r = bus_seal_message(bus, m, 0);
1662 /* Remarshall if we have to. This will possibly unref the
1663 * message and place a replacement in m */
1664 r = bus_remarshal_message(bus, &m);
1668 /* If this is a reply and no reply was requested, then let's
1669 * suppress this, if we can */
1670 if (m->dont_send && !cookie)
1673 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1676 r = bus_write_message(bus, m, hint_sync_call, &idx);
1678 if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
1679 bus_enter_closing(bus);
1684 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1685 /* Wasn't fully written. So let's remember how
1686 * much was written. Note that the first entry
1687 * of the wqueue array is always allocated so
1688 * that we always can remember how much was
1690 bus->wqueue[0] = sd_bus_message_ref(m);
1691 bus->wqueue_size = 1;
1695 /* Just append it to the queue. */
1697 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1700 if (!GREEDY_REALLOC(bus->wqueue, bus->wqueue_allocated, bus->wqueue_size + 1))
1703 bus->wqueue[bus->wqueue_size ++] = sd_bus_message_ref(m);
1707 *cookie = BUS_MESSAGE_COOKIE(m);
1712 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *cookie) {
1713 return bus_send_internal(bus, m, cookie, false);
1716 _public_ int sd_bus_send_to(sd_bus *bus, sd_bus_message *m, const char *destination, uint64_t *cookie) {
1719 assert_return(bus, -EINVAL);
1720 assert_return(m, -EINVAL);
1721 assert_return(!bus_pid_changed(bus), -ECHILD);
1723 if (!BUS_IS_OPEN(bus->state))
1726 if (!streq_ptr(m->destination, destination)) {
1731 r = sd_bus_message_set_destination(m, destination);
1736 return sd_bus_send(bus, m, cookie);
1739 static usec_t calc_elapse(uint64_t usec) {
1740 if (usec == (uint64_t) -1)
1743 return now(CLOCK_MONOTONIC) + usec;
1746 static int timeout_compare(const void *a, const void *b) {
1747 const struct reply_callback *x = a, *y = b;
1749 if (x->timeout != 0 && y->timeout == 0)
1752 if (x->timeout == 0 && y->timeout != 0)
1755 if (x->timeout < y->timeout)
1758 if (x->timeout > y->timeout)
1764 _public_ int sd_bus_call_async(
1768 sd_bus_message_handler_t callback,
1772 _cleanup_bus_message_unref_ sd_bus_message *m = sd_bus_message_ref(_m);
1773 _cleanup_bus_slot_unref_ sd_bus_slot *s = NULL;
1776 assert_return(bus, -EINVAL);
1777 assert_return(m, -EINVAL);
1778 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1779 assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1780 assert_return(callback, -EINVAL);
1781 assert_return(!bus_pid_changed(bus), -ECHILD);
1782 assert_return(!bus->is_kernel || !(bus->hello_flags & KDBUS_HELLO_MONITOR), -EROFS);
1784 if (!BUS_IS_OPEN(bus->state))
1787 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1791 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1795 r = bus_seal_message(bus, m, usec);
1799 r = bus_remarshal_message(bus, &m);
1803 s = bus_slot_allocate(bus, !slot, BUS_REPLY_CALLBACK, sizeof(struct reply_callback), userdata);
1807 s->reply_callback.callback = callback;
1809 s->reply_callback.cookie = BUS_MESSAGE_COOKIE(m);
1810 r = hashmap_put(bus->reply_callbacks, &s->reply_callback.cookie, &s->reply_callback);
1812 s->reply_callback.cookie = 0;
1816 s->reply_callback.timeout = calc_elapse(m->timeout);
1817 if (s->reply_callback.timeout != 0) {
1818 r = prioq_put(bus->reply_callbacks_prioq, &s->reply_callback, &s->reply_callback.prioq_idx);
1820 s->reply_callback.timeout = 0;
1825 r = sd_bus_send(bus, m, &s->reply_callback.cookie);
1836 int bus_ensure_running(sd_bus *bus) {
1841 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED || bus->state == BUS_CLOSING)
1843 if (bus->state == BUS_RUNNING)
1847 r = sd_bus_process(bus, NULL);
1850 if (bus->state == BUS_RUNNING)
1855 r = sd_bus_wait(bus, (uint64_t) -1);
1861 _public_ int sd_bus_call(
1865 sd_bus_error *error,
1866 sd_bus_message **reply) {
1868 _cleanup_bus_message_unref_ sd_bus_message *m = sd_bus_message_ref(_m);
1874 assert_return(bus, -EINVAL);
1875 assert_return(m, -EINVAL);
1876 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1877 assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1878 assert_return(!bus_error_is_dirty(error), -EINVAL);
1879 assert_return(!bus_pid_changed(bus), -ECHILD);
1880 assert_return(!bus->is_kernel || !(bus->hello_flags & KDBUS_HELLO_MONITOR), -EROFS);
1882 if (!BUS_IS_OPEN(bus->state))
1885 r = bus_ensure_running(bus);
1889 i = bus->rqueue_size;
1891 r = bus_seal_message(bus, m, usec);
1895 r = bus_remarshal_message(bus, &m);
1899 r = bus_send_internal(bus, m, &cookie, true);
1903 timeout = calc_elapse(m->timeout);
1908 while (i < bus->rqueue_size) {
1909 sd_bus_message *incoming = NULL;
1911 incoming = bus->rqueue[i];
1913 if (incoming->reply_cookie == cookie) {
1914 /* Found a match! */
1916 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1919 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1921 if (incoming->n_fds <= 0 || (bus->hello_flags & KDBUS_HELLO_ACCEPT_FD)) {
1925 sd_bus_message_unref(incoming);
1930 r = sd_bus_error_setf(error, SD_BUS_ERROR_INCONSISTENT_MESSAGE, "Reply message contained file descriptors which I couldn't accept. Sorry.");
1932 } else if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR)
1933 r = sd_bus_error_copy(error, &incoming->error);
1937 sd_bus_message_unref(incoming);
1940 } else if (BUS_MESSAGE_COOKIE(incoming) == cookie &&
1943 streq(bus->unique_name, incoming->sender)) {
1945 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1948 /* Our own message? Somebody is trying
1949 * to send its own client a message,
1950 * let's not dead-lock, let's fail
1953 sd_bus_message_unref(incoming);
1957 /* Try to read more, right-away */
1961 r = bus_read_message(bus, false, 0);
1963 if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
1964 bus_enter_closing(bus);
1976 n = now(CLOCK_MONOTONIC);
1982 left = (uint64_t) -1;
1984 r = bus_poll(bus, true, left);
1990 r = dispatch_wqueue(bus);
1992 if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
1993 bus_enter_closing(bus);
2002 _public_ int sd_bus_get_fd(sd_bus *bus) {
2004 assert_return(bus, -EINVAL);
2005 assert_return(bus->input_fd == bus->output_fd, -EPERM);
2006 assert_return(!bus_pid_changed(bus), -ECHILD);
2008 return bus->input_fd;
2011 _public_ int sd_bus_get_events(sd_bus *bus) {
2014 assert_return(bus, -EINVAL);
2015 assert_return(!bus_pid_changed(bus), -ECHILD);
2017 if (!BUS_IS_OPEN(bus->state) && bus->state != BUS_CLOSING)
2020 if (bus->state == BUS_OPENING)
2022 else if (bus->state == BUS_AUTHENTICATING) {
2024 if (bus_socket_auth_needs_write(bus))
2029 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
2030 if (bus->rqueue_size <= 0)
2032 if (bus->wqueue_size > 0)
2039 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
2040 struct reply_callback *c;
2042 assert_return(bus, -EINVAL);
2043 assert_return(timeout_usec, -EINVAL);
2044 assert_return(!bus_pid_changed(bus), -ECHILD);
2046 if (!BUS_IS_OPEN(bus->state) && bus->state != BUS_CLOSING)
2049 if (bus->track_queue) {
2054 if (bus->state == BUS_CLOSING) {
2059 if (bus->state == BUS_AUTHENTICATING) {
2060 *timeout_usec = bus->auth_timeout;
2064 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
2065 *timeout_usec = (uint64_t) -1;
2069 if (bus->rqueue_size > 0) {
2074 c = prioq_peek(bus->reply_callbacks_prioq);
2076 *timeout_usec = (uint64_t) -1;
2080 if (c->timeout == 0) {
2081 *timeout_usec = (uint64_t) -1;
2085 *timeout_usec = c->timeout;
2089 static int process_timeout(sd_bus *bus) {
2090 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2091 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
2092 struct reply_callback *c;
2098 c = prioq_peek(bus->reply_callbacks_prioq);
2102 n = now(CLOCK_MONOTONIC);
2106 r = bus_message_new_synthetic_error(
2109 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
2114 m->sender = "org.freedesktop.DBus";
2116 r = bus_seal_synthetic_message(bus, m);
2120 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
2123 hashmap_remove(bus->reply_callbacks, &c->cookie);
2126 bus->current_message = m;
2127 bus->current_slot = container_of(c, sd_bus_slot, reply_callback);
2129 bus->iteration_counter ++;
2131 r = c->callback(bus, m, bus->current_slot->userdata, &error_buffer);
2132 r = bus_maybe_reply_error(m, r, &error_buffer);
2134 bus->current_message = NULL;
2135 bus->current_slot = NULL;
2140 static int process_hello(sd_bus *bus, sd_bus_message *m) {
2144 if (bus->state != BUS_HELLO)
2147 /* Let's make sure the first message on the bus is the HELLO
2148 * reply. But note that we don't actually parse the message
2149 * here (we leave that to the usual handling), we just verify
2150 * we don't let any earlier msg through. */
2152 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
2153 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
2156 if (m->reply_cookie != 1)
2162 static int process_reply(sd_bus *bus, sd_bus_message *m) {
2163 _cleanup_bus_message_unref_ sd_bus_message *synthetic_reply = NULL;
2164 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2166 struct reply_callback *c;
2172 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
2173 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
2176 if (bus->is_kernel && (bus->hello_flags & KDBUS_HELLO_MONITOR))
2179 if (m->destination && bus->unique_name && !streq_ptr(m->destination, bus->unique_name))
2182 c = hashmap_remove(bus->reply_callbacks, &m->reply_cookie);
2187 slot = container_of(c, sd_bus_slot, reply_callback);
2189 if (c->timeout != 0) {
2190 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
2194 if (m->n_fds > 0 && !(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD)) {
2196 /* If the reply contained a file descriptor which we
2197 * didn't want we pass an error instead. */
2199 r = bus_message_new_synthetic_error(
2202 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_INCONSISTENT_MESSAGE, "Reply message contained file descriptor"),
2207 r = bus_seal_synthetic_message(bus, synthetic_reply);
2211 m = synthetic_reply;
2213 r = sd_bus_message_rewind(m, true);
2218 bus->current_slot = slot;
2219 r = c->callback(bus, m, bus->current_slot->userdata, &error_buffer);
2220 bus->current_slot = NULL;
2222 r = bus_maybe_reply_error(m, r, &error_buffer);
2225 if (slot->floating) {
2226 bus_slot_disconnect(slot);
2227 sd_bus_slot_unref(slot);
2233 static int process_filter(sd_bus *bus, sd_bus_message *m) {
2234 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2235 struct filter_callback *l;
2242 bus->filter_callbacks_modified = false;
2244 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
2246 if (bus->filter_callbacks_modified)
2249 /* Don't run this more than once per iteration */
2250 if (l->last_iteration == bus->iteration_counter)
2253 l->last_iteration = bus->iteration_counter;
2255 r = sd_bus_message_rewind(m, true);
2259 bus->current_slot = container_of(l, sd_bus_slot, filter_callback);
2260 r = l->callback(bus, m, bus->current_slot->userdata, &error_buffer);
2261 bus->current_slot = NULL;
2263 r = bus_maybe_reply_error(m, r, &error_buffer);
2269 } while (bus->filter_callbacks_modified);
2274 static int process_match(sd_bus *bus, sd_bus_message *m) {
2281 bus->match_callbacks_modified = false;
2283 r = bus_match_run(bus, &bus->match_callbacks, m);
2287 } while (bus->match_callbacks_modified);
2292 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
2293 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
2299 if (bus->hello_flags & KDBUS_HELLO_MONITOR)
2302 if (bus->manual_peer_interface)
2305 if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
2308 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
2311 if (m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED)
2314 if (streq_ptr(m->member, "Ping"))
2315 r = sd_bus_message_new_method_return(m, &reply);
2316 else if (streq_ptr(m->member, "GetMachineId")) {
2320 r = sd_id128_get_machine(&id);
2324 r = sd_bus_message_new_method_return(m, &reply);
2328 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
2330 r = sd_bus_message_new_method_errorf(
2332 SD_BUS_ERROR_UNKNOWN_METHOD,
2333 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
2339 r = sd_bus_send(bus, reply, NULL);
2346 static int process_fd_check(sd_bus *bus, sd_bus_message *m) {
2350 /* If we got a message with a file descriptor which we didn't
2351 * want to accept, then let's drop it. How can this even
2352 * happen? For example, when the kernel queues a message into
2353 * an activatable names's queue which allows fds, and then is
2354 * delivered to us later even though we ourselves did not
2357 if (bus->hello_flags & KDBUS_HELLO_MONITOR)
2363 if (bus->hello_flags & KDBUS_HELLO_ACCEPT_FD)
2366 if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
2367 return 1; /* just eat it up */
2369 return sd_bus_reply_method_errorf(m, SD_BUS_ERROR_INCONSISTENT_MESSAGE, "Message contains file descriptors, which I cannot accept. Sorry.");
2372 static int process_message(sd_bus *bus, sd_bus_message *m) {
2378 bus->current_message = m;
2379 bus->iteration_counter++;
2381 log_debug("Got message type=%s sender=%s destination=%s object=%s interface=%s member=%s cookie=%" PRIu64 " reply_cookie=%" PRIu64 " error=%s",
2382 bus_message_type_to_string(m->header->type),
2383 strna(sd_bus_message_get_sender(m)),
2384 strna(sd_bus_message_get_destination(m)),
2385 strna(sd_bus_message_get_path(m)),
2386 strna(sd_bus_message_get_interface(m)),
2387 strna(sd_bus_message_get_member(m)),
2388 BUS_MESSAGE_COOKIE(m),
2390 strna(m->error.message));
2392 r = process_hello(bus, m);
2396 r = process_reply(bus, m);
2400 r = process_fd_check(bus, m);
2404 r = process_filter(bus, m);
2408 r = process_match(bus, m);
2412 r = process_builtin(bus, m);
2416 r = bus_process_object(bus, m);
2419 bus->current_message = NULL;
2423 static int dispatch_track(sd_bus *bus) {
2426 if (!bus->track_queue)
2429 bus_track_dispatch(bus->track_queue);
2433 static int process_running(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **ret) {
2434 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2438 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2440 r = process_timeout(bus);
2444 r = dispatch_wqueue(bus);
2448 r = dispatch_track(bus);
2452 r = dispatch_rqueue(bus, hint_priority, priority, &m);
2458 r = process_message(bus, m);
2463 r = sd_bus_message_rewind(m, true);
2472 if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2474 log_debug("Unprocessed message call sender=%s object=%s interface=%s member=%s",
2475 strna(sd_bus_message_get_sender(m)),
2476 strna(sd_bus_message_get_path(m)),
2477 strna(sd_bus_message_get_interface(m)),
2478 strna(sd_bus_message_get_member(m)));
2480 r = sd_bus_reply_method_errorf(
2482 SD_BUS_ERROR_UNKNOWN_OBJECT,
2483 "Unknown object '%s'.", m->path);
2497 static int process_closing(sd_bus *bus, sd_bus_message **ret) {
2498 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2499 struct reply_callback *c;
2503 assert(bus->state == BUS_CLOSING);
2505 c = hashmap_first(bus->reply_callbacks);
2507 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2509 /* First, fail all outstanding method calls */
2510 r = bus_message_new_synthetic_error(
2513 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Connection terminated"),
2518 r = bus_seal_synthetic_message(bus, m);
2522 if (c->timeout != 0) {
2523 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
2527 hashmap_remove(bus->reply_callbacks, &c->cookie);
2530 bus->current_message = m;
2531 bus->current_slot = container_of(c, sd_bus_slot, reply_callback);
2533 bus->iteration_counter++;
2535 r = c->callback(bus, m, bus->current_slot->userdata, &error_buffer);
2536 r = bus_maybe_reply_error(m, r, &error_buffer);
2538 bus->current_slot = NULL;
2543 /* Then, synthesize a Disconnected message */
2544 r = sd_bus_message_new_signal(
2547 "/org/freedesktop/DBus/Local",
2548 "org.freedesktop.DBus.Local",
2553 m->sender = "org.freedesktop.DBus.Local";
2555 r = bus_seal_synthetic_message(bus, m);
2561 bus->current_message = m;
2562 bus->iteration_counter++;
2564 r = process_filter(bus, m);
2568 r = process_match(bus, m);
2580 bus->current_message = NULL;
2585 static int bus_process_internal(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **ret) {
2586 BUS_DONT_DESTROY(bus);
2589 /* Returns 0 when we didn't do anything. This should cause the
2590 * caller to invoke sd_bus_wait() before returning the next
2591 * time. Returns > 0 when we did something, which possibly
2592 * means *ret is filled in with an unprocessed message. */
2594 assert_return(bus, -EINVAL);
2595 assert_return(!bus_pid_changed(bus), -ECHILD);
2597 /* We don't allow recursively invoking sd_bus_process(). */
2598 assert_return(!bus->current_message, -EBUSY);
2599 assert(!bus->current_slot);
2601 switch (bus->state) {
2610 r = bus_socket_process_opening(bus);
2611 if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2612 bus_enter_closing(bus);
2620 case BUS_AUTHENTICATING:
2621 r = bus_socket_process_authenticating(bus);
2622 if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2623 bus_enter_closing(bus);
2635 r = process_running(bus, hint_priority, priority, ret);
2636 if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2637 bus_enter_closing(bus);
2647 return process_closing(bus, ret);
2650 assert_not_reached("Unknown state");
2653 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2654 return bus_process_internal(bus, false, 0, ret);
2657 _public_ int sd_bus_process_priority(sd_bus *bus, int64_t priority, sd_bus_message **ret) {
2658 return bus_process_internal(bus, true, priority, ret);
2661 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2662 struct pollfd p[2] = {};
2665 usec_t m = (usec_t) -1;
2669 if (bus->state == BUS_CLOSING)
2672 if (!BUS_IS_OPEN(bus->state))
2675 e = sd_bus_get_events(bus);
2680 /* The caller really needs some more data, he doesn't
2681 * care about what's already read, or any timeouts
2686 /* The caller wants to process if there's something to
2687 * process, but doesn't care otherwise */
2689 r = sd_bus_get_timeout(bus, &until);
2694 nw = now(CLOCK_MONOTONIC);
2695 m = until > nw ? until - nw : 0;
2699 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2702 p[0].fd = bus->input_fd;
2703 if (bus->output_fd == bus->input_fd) {
2707 p[0].events = e & POLLIN;
2708 p[1].fd = bus->output_fd;
2709 p[1].events = e & POLLOUT;
2713 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2717 return r > 0 ? 1 : 0;
2720 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2722 assert_return(bus, -EINVAL);
2723 assert_return(!bus_pid_changed(bus), -ECHILD);
2725 if (bus->state == BUS_CLOSING)
2728 if (!BUS_IS_OPEN(bus->state))
2731 if (bus->rqueue_size > 0)
2734 return bus_poll(bus, false, timeout_usec);
2737 _public_ int sd_bus_flush(sd_bus *bus) {
2740 assert_return(bus, -EINVAL);
2741 assert_return(!bus_pid_changed(bus), -ECHILD);
2743 if (bus->state == BUS_CLOSING)
2746 if (!BUS_IS_OPEN(bus->state))
2749 r = bus_ensure_running(bus);
2753 if (bus->wqueue_size <= 0)
2757 r = dispatch_wqueue(bus);
2759 if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2760 bus_enter_closing(bus);
2767 if (bus->wqueue_size <= 0)
2770 r = bus_poll(bus, false, (uint64_t) -1);
2776 _public_ int sd_bus_add_filter(
2779 sd_bus_message_handler_t callback,
2784 assert_return(bus, -EINVAL);
2785 assert_return(callback, -EINVAL);
2786 assert_return(!bus_pid_changed(bus), -ECHILD);
2788 s = bus_slot_allocate(bus, !slot, BUS_FILTER_CALLBACK, sizeof(struct filter_callback), userdata);
2792 s->filter_callback.callback = callback;
2794 bus->filter_callbacks_modified = true;
2795 LIST_PREPEND(callbacks, bus->filter_callbacks, &s->filter_callback);
2803 _public_ int sd_bus_add_match(
2807 sd_bus_message_handler_t callback,
2810 struct bus_match_component *components = NULL;
2811 unsigned n_components = 0;
2815 assert_return(bus, -EINVAL);
2816 assert_return(match, -EINVAL);
2817 assert_return(!bus_pid_changed(bus), -ECHILD);
2819 r = bus_match_parse(match, &components, &n_components);
2823 s = bus_slot_allocate(bus, !slot, BUS_MATCH_CALLBACK, sizeof(struct match_callback), userdata);
2829 s->match_callback.callback = callback;
2830 s->match_callback.cookie = ++bus->match_cookie;
2832 if (bus->bus_client) {
2834 if (!bus->is_kernel) {
2835 /* When this is not a kernel transport, we
2836 * store the original match string, so that we
2837 * can use it to remove the match again */
2839 s->match_callback.match_string = strdup(match);
2840 if (!s->match_callback.match_string) {
2846 r = bus_add_match_internal(bus, s->match_callback.match_string, components, n_components, s->match_callback.cookie);
2851 bus->match_callbacks_modified = true;
2852 r = bus_match_add(&bus->match_callbacks, components, n_components, &s->match_callback);
2861 bus_match_parse_free(components, n_components);
2862 sd_bus_slot_unref(s);
2867 int bus_remove_match_by_string(
2870 sd_bus_message_handler_t callback,
2873 struct bus_match_component *components = NULL;
2874 unsigned n_components = 0;
2875 struct match_callback *c;
2878 assert_return(bus, -EINVAL);
2879 assert_return(match, -EINVAL);
2880 assert_return(!bus_pid_changed(bus), -ECHILD);
2882 r = bus_match_parse(match, &components, &n_components);
2886 r = bus_match_find(&bus->match_callbacks, components, n_components, NULL, NULL, &c);
2890 sd_bus_slot_unref(container_of(c, sd_bus_slot, match_callback));
2893 bus_match_parse_free(components, n_components);
2898 bool bus_pid_changed(sd_bus *bus) {
2901 /* We don't support people creating a bus connection and
2902 * keeping it around over a fork(). Let's complain. */
2904 return bus->original_pid != getpid();
2907 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2908 sd_bus *bus = userdata;
2913 r = sd_bus_process(bus, NULL);
2920 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2921 sd_bus *bus = userdata;
2926 r = sd_bus_process(bus, NULL);
2933 static int prepare_callback(sd_event_source *s, void *userdata) {
2934 sd_bus *bus = userdata;
2941 e = sd_bus_get_events(bus);
2945 if (bus->output_fd != bus->input_fd) {
2947 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2951 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2955 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2960 r = sd_bus_get_timeout(bus, &until);
2966 j = sd_event_source_set_time(bus->time_event_source, until);
2971 r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2978 static int quit_callback(sd_event_source *event, void *userdata) {
2979 sd_bus *bus = userdata;
2988 static int attach_io_events(sd_bus *bus) {
2993 if (bus->input_fd < 0)
2999 if (!bus->input_io_event_source) {
3000 r = sd_event_add_io(bus->event, &bus->input_io_event_source, bus->input_fd, 0, io_callback, bus);
3004 r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
3008 r = sd_event_source_set_priority(bus->input_io_event_source, bus->event_priority);
3010 r = sd_event_source_set_io_fd(bus->input_io_event_source, bus->input_fd);
3015 if (bus->output_fd != bus->input_fd) {
3016 assert(bus->output_fd >= 0);
3018 if (!bus->output_io_event_source) {
3019 r = sd_event_add_io(bus->event, &bus->output_io_event_source, bus->output_fd, 0, io_callback, bus);
3023 r = sd_event_source_set_priority(bus->output_io_event_source, bus->event_priority);
3025 r = sd_event_source_set_io_fd(bus->output_io_event_source, bus->output_fd);
3034 static void detach_io_events(sd_bus *bus) {
3037 if (bus->input_io_event_source) {
3038 sd_event_source_set_enabled(bus->input_io_event_source, SD_EVENT_OFF);
3039 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
3042 if (bus->output_io_event_source) {
3043 sd_event_source_set_enabled(bus->output_io_event_source, SD_EVENT_OFF);
3044 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
3048 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
3051 assert_return(bus, -EINVAL);
3052 assert_return(!bus->event, -EBUSY);
3054 assert(!bus->input_io_event_source);
3055 assert(!bus->output_io_event_source);
3056 assert(!bus->time_event_source);
3059 bus->event = sd_event_ref(event);
3061 r = sd_event_default(&bus->event);
3066 bus->event_priority = priority;
3068 r = sd_event_add_time(bus->event, &bus->time_event_source, CLOCK_MONOTONIC, 0, 0, time_callback, bus);
3072 r = sd_event_source_set_priority(bus->time_event_source, priority);
3076 r = sd_event_add_exit(bus->event, &bus->quit_event_source, quit_callback, bus);
3080 r = attach_io_events(bus);
3087 sd_bus_detach_event(bus);
3091 _public_ int sd_bus_detach_event(sd_bus *bus) {
3092 assert_return(bus, -EINVAL);
3097 detach_io_events(bus);
3099 if (bus->time_event_source) {
3100 sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);
3101 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
3104 if (bus->quit_event_source) {
3105 sd_event_source_set_enabled(bus->quit_event_source, SD_EVENT_OFF);
3106 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
3109 bus->event = sd_event_unref(bus->event);
3113 _public_ sd_event* sd_bus_get_event(sd_bus *bus) {
3114 assert_return(bus, NULL);
3119 _public_ sd_bus_message* sd_bus_get_current_message(sd_bus *bus) {
3120 assert_return(bus, NULL);
3122 return bus->current_message;
3125 _public_ sd_bus_slot* sd_bus_get_current_slot(sd_bus *bus) {
3126 assert_return(bus, NULL);
3128 return bus->current_slot;
3131 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
3136 assert(default_bus);
3139 return !!*default_bus;
3142 *ret = sd_bus_ref(*default_bus);
3150 b->default_bus_ptr = default_bus;
3158 _public_ int sd_bus_default_system(sd_bus **ret) {
3159 static thread_local sd_bus *default_system_bus = NULL;
3161 return bus_default(sd_bus_open_system, &default_system_bus, ret);
3164 _public_ int sd_bus_default_user(sd_bus **ret) {
3165 static thread_local sd_bus *default_user_bus = NULL;
3167 return bus_default(sd_bus_open_user, &default_user_bus, ret);
3170 _public_ int sd_bus_default(sd_bus **ret) {
3174 /* Let's try our best to reuse another cached connection. If
3175 * the starter bus type is set, connect via our normal
3176 * connection logic, ignoring $DBUS_STARTER_ADDRESS, so that
3177 * we can share the connection with the user/system default
3180 e = secure_getenv("DBUS_STARTER_BUS_TYPE");
3182 if (streq(e, "system"))
3183 return sd_bus_default_system(ret);
3184 else if (STR_IN_SET(e, "user", "session"))
3185 return sd_bus_default_user(ret);
3188 /* No type is specified, so we have not other option than to
3189 * use the starter address if it is set. */
3191 e = secure_getenv("DBUS_STARTER_ADDRESS");
3193 static thread_local sd_bus *default_starter_bus = NULL;
3195 return bus_default(sd_bus_open, &default_starter_bus, ret);
3198 /* Finally, if nothing is set use the cached connection for
3199 * the right scope */
3201 if (cg_pid_get_owner_uid(0, NULL) >= 0)
3202 return sd_bus_default_user(ret);
3204 return sd_bus_default_system(ret);
3207 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
3208 assert_return(b, -EINVAL);
3209 assert_return(tid, -EINVAL);
3210 assert_return(!bus_pid_changed(b), -ECHILD);
3218 return sd_event_get_tid(b->event, tid);
3223 _public_ int sd_bus_path_encode(const char *prefix, const char *external_id, char **ret_path) {
3224 _cleanup_free_ char *e = NULL;
3227 assert_return(object_path_is_valid(prefix), -EINVAL);
3228 assert_return(external_id, -EINVAL);
3229 assert_return(ret_path, -EINVAL);
3231 e = bus_label_escape(external_id);
3235 ret = strjoin(prefix, "/", e, NULL);
3243 _public_ int sd_bus_path_decode(const char *path, const char *prefix, char **external_id) {
3247 assert_return(object_path_is_valid(path), -EINVAL);
3248 assert_return(object_path_is_valid(prefix), -EINVAL);
3249 assert_return(external_id, -EINVAL);
3251 e = object_path_startswith(path, prefix);
3253 *external_id = NULL;
3257 ret = bus_label_unescape(e);
3265 _public_ int sd_bus_get_peer_creds(sd_bus *bus, uint64_t mask, sd_bus_creds **ret) {
3270 assert_return(bus, -EINVAL);
3271 assert_return(mask <= _SD_BUS_CREDS_ALL, -ENOTSUP);
3272 assert_return(ret, -EINVAL);
3273 assert_return(!bus_pid_changed(bus), -ECHILD);
3278 if (!BUS_IS_OPEN(bus->state))
3281 if (!bus->ucred_valid && !isempty(bus->label))
3284 c = bus_creds_new();
3288 if (bus->ucred_valid) {
3289 pid = c->pid = bus->ucred.pid;
3290 c->uid = bus->ucred.uid;
3291 c->gid = bus->ucred.gid;
3293 c->mask |= (SD_BUS_CREDS_UID | SD_BUS_CREDS_PID | SD_BUS_CREDS_GID) & mask;
3296 if (!isempty(bus->label) && (mask & SD_BUS_CREDS_SELINUX_CONTEXT)) {
3297 c->label = strdup(bus->label);
3299 sd_bus_creds_unref(c);
3303 c->mask |= SD_BUS_CREDS_SELINUX_CONTEXT;
3306 r = bus_creds_add_more(c, mask, pid, 0);
3314 _public_ int sd_bus_try_close(sd_bus *bus) {
3317 assert_return(bus, -EINVAL);
3318 assert_return(!bus_pid_changed(bus), -ECHILD);
3320 if (!bus->is_kernel)
3323 if (!BUS_IS_OPEN(bus->state))
3326 if (bus->rqueue_size > 0)
3329 if (bus->wqueue_size > 0)
3332 r = bus_kernel_try_close(bus);
3340 _public_ int sd_bus_get_name(sd_bus *bus, const char **name) {
3341 assert_return(bus, -EINVAL);
3342 assert_return(name, -EINVAL);
3343 assert_return(!bus_pid_changed(bus), -ECHILD);
3345 *name = bus->connection_name;