1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
38 #include "cgroup-util.h"
39 #include "bus-label.h"
42 #include "bus-internal.h"
43 #include "bus-message.h"
45 #include "bus-socket.h"
46 #include "bus-kernel.h"
47 #include "bus-control.h"
48 #include "bus-introspect.h"
49 #include "bus-signature.h"
50 #include "bus-objects.h"
52 #include "bus-container.h"
53 #include "bus-protocol.h"
54 #include "bus-track.h"
57 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
58 static int attach_io_events(sd_bus *b);
59 static void detach_io_events(sd_bus *b);
61 static void bus_close_fds(sd_bus *b) {
67 safe_close(b->input_fd);
69 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
70 safe_close(b->output_fd);
72 b->input_fd = b->output_fd = -1;
75 static void bus_reset_queues(sd_bus *b) {
78 /* NOTE: We _must_ decrement b->Xqueue_size before calling
79 * sd_bus_message_unref() for _each_ message. Otherwise the
80 * self-reference checks in sd_bus_unref() will fire for each message.
81 * We would thus recurse into sd_bus_message_unref() and trigger the
82 * assert(m->n_ref > 0) */
84 while (b->rqueue_size > 0)
85 sd_bus_message_unref(b->rqueue[--b->rqueue_size]);
89 b->rqueue_allocated = 0;
91 while (b->wqueue_size > 0)
92 sd_bus_message_unref(b->wqueue[--b->wqueue_size]);
96 b->wqueue_allocated = 0;
99 static void bus_free(sd_bus *b) {
103 assert(!b->track_queue);
105 b->state = BUS_CLOSED;
107 sd_bus_detach_event(b);
109 while ((s = b->slots)) {
110 /* At this point only floating slots can still be
111 * around, because the non-floating ones keep a
112 * reference to the bus, and we thus couldn't be
113 * destructing right now... We forcibly disconnect the
114 * slots here, so that they still can be referenced by
115 * apps, but are dead. */
118 bus_slot_disconnect(s);
119 sd_bus_slot_unref(s);
122 if (b->default_bus_ptr)
123 *b->default_bus_ptr = NULL;
128 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
131 free(b->unique_name);
132 free(b->auth_buffer);
137 free(b->cgroup_root);
138 free(b->connection_name);
141 strv_free(b->exec_argv);
143 close_many(b->fds, b->n_fds);
148 hashmap_free_free(b->reply_callbacks);
149 prioq_free(b->reply_callbacks_prioq);
151 bus_match_free(&b->match_callbacks);
153 hashmap_free_free(b->vtable_methods);
154 hashmap_free_free(b->vtable_properties);
156 assert(hashmap_isempty(b->nodes));
157 hashmap_free(b->nodes);
159 bus_kernel_flush_memfd(b);
161 assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
166 _public_ int sd_bus_new(sd_bus **ret) {
169 assert_return(ret, -EINVAL);
175 r->n_ref = REFCNT_INIT;
176 r->input_fd = r->output_fd = -1;
177 r->message_version = 1;
178 r->creds_mask |= SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
179 r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
180 r->attach_flags |= KDBUS_ATTACH_NAMES;
181 r->original_pid = getpid();
183 assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
185 /* We guarantee that wqueue always has space for at least one
187 if (!GREEDY_REALLOC(r->wqueue, r->wqueue_allocated, 1)) {
196 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
199 assert_return(bus, -EINVAL);
200 assert_return(bus->state == BUS_UNSET, -EPERM);
201 assert_return(address, -EINVAL);
202 assert_return(!bus_pid_changed(bus), -ECHILD);
214 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
215 assert_return(bus, -EINVAL);
216 assert_return(bus->state == BUS_UNSET, -EPERM);
217 assert_return(input_fd >= 0, -EINVAL);
218 assert_return(output_fd >= 0, -EINVAL);
219 assert_return(!bus_pid_changed(bus), -ECHILD);
221 bus->input_fd = input_fd;
222 bus->output_fd = output_fd;
226 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
229 assert_return(bus, -EINVAL);
230 assert_return(bus->state == BUS_UNSET, -EPERM);
231 assert_return(path, -EINVAL);
232 assert_return(!strv_isempty(argv), -EINVAL);
233 assert_return(!bus_pid_changed(bus), -ECHILD);
245 free(bus->exec_path);
246 strv_free(bus->exec_argv);
254 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
255 assert_return(bus, -EINVAL);
256 assert_return(bus->state == BUS_UNSET, -EPERM);
257 assert_return(!bus_pid_changed(bus), -ECHILD);
259 bus->bus_client = !!b;
263 _public_ int sd_bus_set_monitor(sd_bus *bus, int b) {
264 assert_return(bus, -EINVAL);
265 assert_return(bus->state == BUS_UNSET, -EPERM);
266 assert_return(!bus_pid_changed(bus), -ECHILD);
268 SET_FLAG(bus->hello_flags, KDBUS_HELLO_MONITOR, b);
272 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
273 assert_return(bus, -EINVAL);
274 assert_return(bus->state == BUS_UNSET, -EPERM);
275 assert_return(!bus_pid_changed(bus), -ECHILD);
277 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
281 _public_ int sd_bus_negotiate_timestamp(sd_bus *bus, int b) {
282 assert_return(bus, -EINVAL);
283 assert_return(bus->state == BUS_UNSET, -EPERM);
284 assert_return(!bus_pid_changed(bus), -ECHILD);
286 SET_FLAG(bus->attach_flags, KDBUS_ATTACH_TIMESTAMP, b);
290 _public_ int sd_bus_negotiate_creds(sd_bus *bus, uint64_t mask) {
291 assert_return(bus, -EINVAL);
292 assert_return(mask <= _SD_BUS_CREDS_ALL, -EINVAL);
293 assert_return(bus->state == BUS_UNSET, -EPERM);
294 assert_return(!bus_pid_changed(bus), -ECHILD);
296 /* The well knowns we need unconditionally, so that matches can work */
297 bus->creds_mask = mask | SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
299 return kdbus_translate_attach_flags(bus->creds_mask, &bus->attach_flags);
302 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
303 assert_return(bus, -EINVAL);
304 assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
305 assert_return(bus->state == BUS_UNSET, -EPERM);
306 assert_return(!bus_pid_changed(bus), -ECHILD);
308 bus->is_server = !!b;
309 bus->server_id = server_id;
313 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
314 assert_return(bus, -EINVAL);
315 assert_return(bus->state == BUS_UNSET, -EPERM);
316 assert_return(!bus_pid_changed(bus), -ECHILD);
318 bus->anonymous_auth = !!b;
322 _public_ int sd_bus_set_trusted(sd_bus *bus, int b) {
323 assert_return(bus, -EINVAL);
324 assert_return(bus->state == BUS_UNSET, -EPERM);
325 assert_return(!bus_pid_changed(bus), -ECHILD);
331 _public_ int sd_bus_set_name(sd_bus *bus, const char *name) {
334 assert_return(bus, -EINVAL);
335 assert_return(name, -EINVAL);
336 assert_return(bus->state == BUS_UNSET, -EPERM);
337 assert_return(!bus_pid_changed(bus), -ECHILD);
343 free(bus->connection_name);
344 bus->connection_name = n;
349 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata, sd_bus_error *error) {
354 assert(bus->state == BUS_HELLO || bus->state == BUS_CLOSING);
357 r = sd_bus_message_get_errno(reply);
363 r = sd_bus_message_read(reply, "s", &s);
367 if (!service_name_is_valid(s) || s[0] != ':')
370 bus->unique_name = strdup(s);
371 if (!bus->unique_name)
374 if (bus->state == BUS_HELLO)
375 bus->state = BUS_RUNNING;
380 static int bus_send_hello(sd_bus *bus) {
381 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
386 if (!bus->bus_client || bus->is_kernel)
389 r = sd_bus_message_new_method_call(
392 "org.freedesktop.DBus",
393 "/org/freedesktop/DBus",
394 "org.freedesktop.DBus",
399 return sd_bus_call_async(bus, NULL, m, hello_callback, NULL, 0);
402 int bus_start_running(sd_bus *bus) {
405 if (bus->bus_client && !bus->is_kernel) {
406 bus->state = BUS_HELLO;
410 bus->state = BUS_RUNNING;
414 static int parse_address_key(const char **p, const char *key, char **value) {
415 size_t l, n = 0, allocated = 0;
425 if (strncmp(*p, key, l) != 0)
438 while (*a != ';' && *a != ',' && *a != 0) {
456 c = (char) ((x << 4) | y);
463 if (!GREEDY_REALLOC(r, allocated, n + 2))
487 static void skip_address_key(const char **p) {
491 *p += strcspn(*p, ",");
497 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
498 _cleanup_free_ char *path = NULL, *abstract = NULL;
507 while (**p != 0 && **p != ';') {
508 r = parse_address_key(p, "guid", guid);
514 r = parse_address_key(p, "path", &path);
520 r = parse_address_key(p, "abstract", &abstract);
529 if (!path && !abstract)
532 if (path && abstract)
537 if (l > sizeof(b->sockaddr.un.sun_path))
540 b->sockaddr.un.sun_family = AF_UNIX;
541 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
542 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
543 } else if (abstract) {
544 l = strlen(abstract);
545 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
548 b->sockaddr.un.sun_family = AF_UNIX;
549 b->sockaddr.un.sun_path[0] = 0;
550 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
551 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
557 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
558 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
560 struct addrinfo *result, hints = {
561 .ai_socktype = SOCK_STREAM,
562 .ai_flags = AI_ADDRCONFIG,
570 while (**p != 0 && **p != ';') {
571 r = parse_address_key(p, "guid", guid);
577 r = parse_address_key(p, "host", &host);
583 r = parse_address_key(p, "port", &port);
589 r = parse_address_key(p, "family", &family);
602 if (streq(family, "ipv4"))
603 hints.ai_family = AF_INET;
604 else if (streq(family, "ipv6"))
605 hints.ai_family = AF_INET6;
610 r = getaddrinfo(host, port, &hints, &result);
614 return -EADDRNOTAVAIL;
616 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
617 b->sockaddr_size = result->ai_addrlen;
619 freeaddrinfo(result);
624 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
626 unsigned n_argv = 0, j;
628 size_t allocated = 0;
636 while (**p != 0 && **p != ';') {
637 r = parse_address_key(p, "guid", guid);
643 r = parse_address_key(p, "path", &path);
649 if (startswith(*p, "argv")) {
653 ul = strtoul(*p + 4, (char**) p, 10);
654 if (errno > 0 || **p != '=' || ul > 256) {
662 if (!GREEDY_REALLOC0(argv, allocated, ul + 2)) {
670 r = parse_address_key(p, NULL, argv + ul);
685 /* Make sure there are no holes in the array, with the
686 * exception of argv[0] */
687 for (j = 1; j < n_argv; j++)
693 if (argv && argv[0] == NULL) {
694 argv[0] = strdup(path);
706 for (j = 0; j < n_argv; j++)
714 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
715 _cleanup_free_ char *path = NULL;
723 while (**p != 0 && **p != ';') {
724 r = parse_address_key(p, "guid", guid);
730 r = parse_address_key(p, "path", &path);
749 static int parse_container_unix_address(sd_bus *b, const char **p, char **guid) {
750 _cleanup_free_ char *machine = NULL;
758 while (**p != 0 && **p != ';') {
759 r = parse_address_key(p, "guid", guid);
765 r = parse_address_key(p, "machine", &machine);
777 if (!filename_is_safe(machine))
781 b->machine = machine;
784 b->sockaddr.un.sun_family = AF_UNIX;
785 strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
786 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + strlen("/var/run/dbus/system_bus_socket");
791 static int parse_container_kernel_address(sd_bus *b, const char **p, char **guid) {
792 _cleanup_free_ char *machine = NULL;
800 while (**p != 0 && **p != ';') {
801 r = parse_address_key(p, "guid", guid);
807 r = parse_address_key(p, "machine", &machine);
819 if (!filename_is_safe(machine))
823 b->machine = machine;
827 b->kernel = strdup("/dev/kdbus/0-system/bus");
834 static void bus_reset_parsed_address(sd_bus *b) {
838 b->sockaddr_size = 0;
839 strv_free(b->exec_argv);
843 b->server_id = SD_ID128_NULL;
850 static int bus_parse_next_address(sd_bus *b) {
851 _cleanup_free_ char *guid = NULL;
859 if (b->address[b->address_index] == 0)
862 bus_reset_parsed_address(b);
864 a = b->address + b->address_index;
873 if (startswith(a, "unix:")) {
876 r = parse_unix_address(b, &a, &guid);
881 } else if (startswith(a, "tcp:")) {
884 r = parse_tcp_address(b, &a, &guid);
890 } else if (startswith(a, "unixexec:")) {
893 r = parse_exec_address(b, &a, &guid);
899 } else if (startswith(a, "kernel:")) {
902 r = parse_kernel_address(b, &a, &guid);
907 } else if (startswith(a, "x-container-unix:")) {
910 r = parse_container_unix_address(b, &a, &guid);
915 } else if (startswith(a, "x-container-kernel:")) {
918 r = parse_container_kernel_address(b, &a, &guid);
931 r = sd_id128_from_string(guid, &b->server_id);
936 b->address_index = a - b->address;
940 static int bus_start_address(sd_bus *b) {
946 bool skipped = false;
951 r = bus_socket_exec(b);
952 else if (b->machine && b->kernel)
953 r = bus_container_connect_kernel(b);
954 else if (b->machine && b->sockaddr.sa.sa_family != AF_UNSPEC)
955 r = bus_container_connect_socket(b);
957 r = bus_kernel_connect(b);
958 else if (b->sockaddr.sa.sa_family != AF_UNSPEC)
959 r = bus_socket_connect(b);
965 r = attach_io_events(b);
970 b->last_connect_error = -r;
973 r = bus_parse_next_address(b);
977 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
981 int bus_next_address(sd_bus *b) {
984 bus_reset_parsed_address(b);
985 return bus_start_address(b);
988 static int bus_start_fd(sd_bus *b) {
993 assert(b->input_fd >= 0);
994 assert(b->output_fd >= 0);
996 r = fd_nonblock(b->input_fd, true);
1000 r = fd_cloexec(b->input_fd, true);
1004 if (b->input_fd != b->output_fd) {
1005 r = fd_nonblock(b->output_fd, true);
1009 r = fd_cloexec(b->output_fd, true);
1014 if (fstat(b->input_fd, &st) < 0)
1017 if (S_ISCHR(b->input_fd))
1018 return bus_kernel_take_fd(b);
1020 return bus_socket_take_fd(b);
1023 _public_ int sd_bus_start(sd_bus *bus) {
1026 assert_return(bus, -EINVAL);
1027 assert_return(bus->state == BUS_UNSET, -EPERM);
1028 assert_return(!bus_pid_changed(bus), -ECHILD);
1030 bus->state = BUS_OPENING;
1032 if (bus->is_server && bus->bus_client)
1035 if (bus->input_fd >= 0)
1036 r = bus_start_fd(bus);
1037 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
1038 r = bus_start_address(bus);
1045 return bus_send_hello(bus);
1048 _public_ int sd_bus_open(sd_bus **ret) {
1053 assert_return(ret, -EINVAL);
1055 /* Let's connect to the starter bus if it is set, and
1056 * otherwise to the bus that is appropropriate for the scope
1057 * we are running in */
1059 e = secure_getenv("DBUS_STARTER_BUS_TYPE");
1061 if (streq(e, "system"))
1062 return sd_bus_open_system(ret);
1063 else if (STR_IN_SET(e, "session", "user"))
1064 return sd_bus_open_user(ret);
1067 e = secure_getenv("DBUS_STARTER_ADDRESS");
1069 if (cg_pid_get_owner_uid(0, NULL) >= 0)
1070 return sd_bus_open_user(ret);
1072 return sd_bus_open_system(ret);
1079 r = sd_bus_set_address(b, e);
1083 b->bus_client = true;
1085 /* We don't know whether the bus is trusted or not, so better
1086 * be safe, and authenticate everything */
1088 b->attach_flags |= KDBUS_ATTACH_CAPS | KDBUS_ATTACH_CREDS;
1090 r = sd_bus_start(b);
1102 int bus_set_address_system(sd_bus *b) {
1106 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1108 return sd_bus_set_address(b, e);
1110 return sd_bus_set_address(b, DEFAULT_SYSTEM_BUS_PATH);
1113 _public_ int sd_bus_open_system(sd_bus **ret) {
1117 assert_return(ret, -EINVAL);
1123 r = bus_set_address_system(b);
1127 b->bus_client = true;
1128 b->is_system = true;
1130 /* Let's do per-method access control on the system bus. We
1131 * need the caller's UID and capability set for that. */
1133 b->attach_flags |= KDBUS_ATTACH_CAPS | KDBUS_ATTACH_CREDS;
1135 r = sd_bus_start(b);
1147 int bus_set_address_user(sd_bus *b) {
1152 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1154 return sd_bus_set_address(b, e);
1156 e = secure_getenv("XDG_RUNTIME_DIR");
1158 _cleanup_free_ char *ee = NULL;
1160 ee = bus_address_escape(e);
1165 asprintf(&b->address, KERNEL_USER_BUS_FMT ";" UNIX_USER_BUS_FMT, getuid(), ee);
1167 asprintf(&b->address, UNIX_USER_BUS_FMT, ee);
1171 asprintf(&b->address, KERNEL_USER_BUS_FMT, getuid());
1173 return -ECONNREFUSED;
1183 _public_ int sd_bus_open_user(sd_bus **ret) {
1187 assert_return(ret, -EINVAL);
1193 r = bus_set_address_user(b);
1197 b->bus_client = true;
1200 /* We don't do any per-method access control on the user
1204 r = sd_bus_start(b);
1216 int bus_set_address_system_remote(sd_bus *b, const char *host) {
1217 _cleanup_free_ char *e = NULL;
1222 e = bus_address_escape(host);
1226 b->address = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1233 _public_ int sd_bus_open_system_remote(sd_bus **ret, const char *host) {
1237 assert_return(host, -EINVAL);
1238 assert_return(ret, -EINVAL);
1240 r = sd_bus_new(&bus);
1244 r = bus_set_address_system_remote(bus, host);
1248 bus->bus_client = true;
1249 bus->trusted = false;
1251 r = sd_bus_start(bus);
1263 int bus_set_address_system_container(sd_bus *b, const char *machine) {
1264 _cleanup_free_ char *e = NULL;
1269 e = bus_address_escape(machine);
1274 b->address = strjoin("x-container-kernel:machine=", e, ";x-container-unix:machine=", e, NULL);
1276 b->address = strjoin("x-container-unix:machine=", e, NULL);
1284 _public_ int sd_bus_open_system_container(sd_bus **ret, const char *machine) {
1288 assert_return(machine, -EINVAL);
1289 assert_return(ret, -EINVAL);
1290 assert_return(filename_is_safe(machine), -EINVAL);
1292 r = sd_bus_new(&bus);
1296 r = bus_set_address_system_container(bus, machine);
1300 bus->bus_client = true;
1301 bus->trusted = false;
1303 r = sd_bus_start(bus);
1315 _public_ void sd_bus_close(sd_bus *bus) {
1319 if (bus->state == BUS_CLOSED)
1321 if (bus_pid_changed(bus))
1324 bus->state = BUS_CLOSED;
1326 sd_bus_detach_event(bus);
1328 /* Drop all queued messages so that they drop references to
1329 * the bus object and the bus may be freed */
1330 bus_reset_queues(bus);
1332 if (!bus->is_kernel)
1335 /* We'll leave the fd open in case this is a kernel bus, since
1336 * there might still be memblocks around that reference this
1337 * bus, and they might need to invoke the KDBUS_CMD_FREE
1338 * ioctl on the fd when they are freed. */
1341 static void bus_enter_closing(sd_bus *bus) {
1344 if (bus->state != BUS_OPENING &&
1345 bus->state != BUS_AUTHENTICATING &&
1346 bus->state != BUS_HELLO &&
1347 bus->state != BUS_RUNNING)
1350 bus->state = BUS_CLOSING;
1353 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1354 assert_return(bus, NULL);
1356 assert_se(REFCNT_INC(bus->n_ref) >= 2);
1361 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1367 /* TODO/FIXME: It's naive to think REFCNT_GET() is thread-safe in any
1368 * way but exclusive REFCNT_DEC(). The current logic _must_ lock around
1369 * REFCNT_GET() until REFCNT_DEC() or two threads might end up in
1370 * parallel in bus_reset_queues(). But locking would totally break the
1371 * recursion we introduce by bus_reset_queues()...
1372 * (Imagine one thread in sd_bus_message_unref() setting n_ref to 0 and
1373 * thus calling into sd_bus_unref(). If at the same time the real
1374 * thread calls sd_bus_unref(), both end up with "q == true" and will
1375 * call into bus_reset_queues().
1376 * If we require the main bus to be alive until all dispatch threads
1377 * are done, there is no need to do ref-counts at all. So in both ways,
1378 * the REFCNT thing is humbug.)
1380 * On a second note: messages are *not* required to have ->bus set nor
1381 * does it have to be _this_ bus that they're assigned to. This whole
1382 * ref-cnt checking breaks apart if a message is not assigned to us.
1383 * (which is _very_ easy to trigger with the current API). */
1385 if (REFCNT_GET(bus->n_ref) == bus->rqueue_size + bus->wqueue_size + 1) {
1388 for (i = 0; i < bus->rqueue_size; i++)
1389 if (bus->rqueue[i]->n_ref > 1) {
1395 for (i = 0; i < bus->wqueue_size; i++)
1396 if (bus->wqueue[i]->n_ref > 1) {
1402 /* We are the only holders on the messages, and the
1403 * messages are the only holders on us, so let's drop
1404 * the messages and thus implicitly also kill our own
1406 * bus_reset_queues() decrements the queue-size before
1407 * calling into sd_bus_message_unref(). Thus, it
1408 * protects us from recursion. */
1411 bus_reset_queues(bus);
1414 i = REFCNT_DEC(bus->n_ref);
1422 _public_ int sd_bus_is_open(sd_bus *bus) {
1424 assert_return(bus, -EINVAL);
1425 assert_return(!bus_pid_changed(bus), -ECHILD);
1427 return BUS_IS_OPEN(bus->state);
1430 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1433 assert_return(bus, -EINVAL);
1434 assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1435 assert_return(!bus_pid_changed(bus), -ECHILD);
1437 if (bus->hello_flags & KDBUS_HELLO_MONITOR)
1440 if (type == SD_BUS_TYPE_UNIX_FD) {
1441 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1444 r = bus_ensure_running(bus);
1448 return bus->can_fds;
1451 return bus_type_is_valid(type);
1454 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1457 assert_return(bus, -EINVAL);
1458 assert_return(server_id, -EINVAL);
1459 assert_return(!bus_pid_changed(bus), -ECHILD);
1461 r = bus_ensure_running(bus);
1465 *server_id = bus->server_id;
1469 static int bus_seal_message(sd_bus *b, sd_bus_message *m, usec_t timeout) {
1474 /* If we copy the same message to multiple
1475 * destinations, avoid using the same cookie
1477 b->cookie = MAX(b->cookie, BUS_MESSAGE_COOKIE(m));
1482 timeout = BUS_DEFAULT_TIMEOUT;
1484 return bus_message_seal(m, ++b->cookie, timeout);
1487 static int bus_remarshal_message(sd_bus *b, sd_bus_message **m) {
1490 /* Do packet version and endianness already match? */
1491 if ((b->message_version == 0 || b->message_version == (*m)->header->version) &&
1492 (b->message_endian == 0 || b->message_endian == (*m)->header->endian))
1495 /* No? Then remarshal! */
1496 return bus_message_remarshal(b, m);
1499 int bus_seal_synthetic_message(sd_bus *b, sd_bus_message *m) {
1503 /* The bus specification says the serial number cannot be 0,
1504 * hence let's fill something in for synthetic messages. Since
1505 * synthetic messages might have a fake sender and we don't
1506 * want to interfere with the real sender's serial numbers we
1507 * pick a fixed, artificial one. We use (uint32_t) -1 rather
1508 * than (uint64_t) -1 since dbus1 only had 32bit identifiers,
1509 * even though kdbus can do 64bit. */
1511 return bus_message_seal(m, 0xFFFFFFFFULL, 0);
1514 static int bus_write_message(sd_bus *bus, sd_bus_message *m, bool hint_sync_call, size_t *idx) {
1521 r = bus_kernel_write_message(bus, m, hint_sync_call);
1523 r = bus_socket_write_message(bus, m, idx);
1528 if (bus->is_kernel || *idx >= BUS_MESSAGE_SIZE(m))
1529 log_debug("Sent message type=%s sender=%s destination=%s object=%s interface=%s member=%s cookie=%" PRIu64 " reply_cookie=%" PRIu64 " error=%s",
1530 bus_message_type_to_string(m->header->type),
1531 strna(sd_bus_message_get_sender(m)),
1532 strna(sd_bus_message_get_destination(m)),
1533 strna(sd_bus_message_get_path(m)),
1534 strna(sd_bus_message_get_interface(m)),
1535 strna(sd_bus_message_get_member(m)),
1536 BUS_MESSAGE_COOKIE(m),
1538 strna(m->error.message));
1543 static int dispatch_wqueue(sd_bus *bus) {
1547 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1549 while (bus->wqueue_size > 0) {
1551 r = bus_write_message(bus, bus->wqueue[0], false, &bus->windex);
1555 /* Didn't do anything this time */
1557 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1558 /* Fully written. Let's drop the entry from
1561 * This isn't particularly optimized, but
1562 * well, this is supposed to be our worst-case
1563 * buffer only, and the socket buffer is
1564 * supposed to be our primary buffer, and if
1565 * it got full, then all bets are off
1568 bus->wqueue_size --;
1569 sd_bus_message_unref(bus->wqueue[0]);
1570 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1580 static int bus_read_message(sd_bus *bus, bool hint_priority, int64_t priority) {
1584 return bus_kernel_read_message(bus, hint_priority, priority);
1586 return bus_socket_read_message(bus);
1589 int bus_rqueue_make_room(sd_bus *bus) {
1592 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1595 if (!GREEDY_REALLOC(bus->rqueue, bus->rqueue_allocated, bus->rqueue_size + 1))
1601 static int dispatch_rqueue(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **m) {
1606 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1608 /* Note that the priority logic is only available on kdbus,
1609 * where the rqueue is unused. We check the rqueue here
1610 * anyway, because it's simple... */
1613 if (bus->rqueue_size > 0) {
1614 /* Dispatch a queued message */
1616 *m = bus->rqueue[0];
1617 bus->rqueue_size --;
1618 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1622 /* Try to read a new message */
1623 r = bus_read_message(bus, hint_priority, priority);
1633 static int bus_send_internal(sd_bus *bus, sd_bus_message *_m, uint64_t *cookie, bool hint_sync_call) {
1634 _cleanup_bus_message_unref_ sd_bus_message *m = sd_bus_message_ref(_m);
1637 assert_return(bus, -EINVAL);
1638 assert_return(m, -EINVAL);
1639 assert_return(!bus_pid_changed(bus), -ECHILD);
1640 assert_return(!bus->is_kernel || !(bus->hello_flags & KDBUS_HELLO_MONITOR), -EROFS);
1642 if (!BUS_IS_OPEN(bus->state))
1646 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1653 /* If the cookie number isn't kept, then we know that no reply
1655 if (!cookie && !m->sealed)
1656 m->header->flags |= BUS_MESSAGE_NO_REPLY_EXPECTED;
1658 r = bus_seal_message(bus, m, 0);
1662 /* Remarshall if we have to. This will possibly unref the
1663 * message and place a replacement in m */
1664 r = bus_remarshal_message(bus, &m);
1668 /* If this is a reply and no reply was requested, then let's
1669 * suppress this, if we can */
1670 if (m->dont_send && !cookie)
1673 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1676 r = bus_write_message(bus, m, hint_sync_call, &idx);
1678 if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
1679 bus_enter_closing(bus);
1684 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1685 /* Wasn't fully written. So let's remember how
1686 * much was written. Note that the first entry
1687 * of the wqueue array is always allocated so
1688 * that we always can remember how much was
1690 bus->wqueue[0] = sd_bus_message_ref(m);
1691 bus->wqueue_size = 1;
1695 /* Just append it to the queue. */
1697 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1700 if (!GREEDY_REALLOC(bus->wqueue, bus->wqueue_allocated, bus->wqueue_size + 1))
1703 bus->wqueue[bus->wqueue_size ++] = sd_bus_message_ref(m);
1707 *cookie = BUS_MESSAGE_COOKIE(m);
1712 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *cookie) {
1713 return bus_send_internal(bus, m, cookie, false);
1716 _public_ int sd_bus_send_to(sd_bus *bus, sd_bus_message *m, const char *destination, uint64_t *cookie) {
1719 assert_return(bus, -EINVAL);
1720 assert_return(m, -EINVAL);
1721 assert_return(!bus_pid_changed(bus), -ECHILD);
1723 if (!BUS_IS_OPEN(bus->state))
1726 if (!streq_ptr(m->destination, destination)) {
1731 r = sd_bus_message_set_destination(m, destination);
1736 return sd_bus_send(bus, m, cookie);
1739 static usec_t calc_elapse(uint64_t usec) {
1740 if (usec == (uint64_t) -1)
1743 return now(CLOCK_MONOTONIC) + usec;
1746 static int timeout_compare(const void *a, const void *b) {
1747 const struct reply_callback *x = a, *y = b;
1749 if (x->timeout != 0 && y->timeout == 0)
1752 if (x->timeout == 0 && y->timeout != 0)
1755 if (x->timeout < y->timeout)
1758 if (x->timeout > y->timeout)
1764 _public_ int sd_bus_call_async(
1768 sd_bus_message_handler_t callback,
1772 _cleanup_bus_message_unref_ sd_bus_message *m = sd_bus_message_ref(_m);
1773 _cleanup_bus_slot_unref_ sd_bus_slot *s = NULL;
1776 assert_return(bus, -EINVAL);
1777 assert_return(m, -EINVAL);
1778 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1779 assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1780 assert_return(callback, -EINVAL);
1781 assert_return(!bus_pid_changed(bus), -ECHILD);
1782 assert_return(!bus->is_kernel || !(bus->hello_flags & KDBUS_HELLO_MONITOR), -EROFS);
1784 if (!BUS_IS_OPEN(bus->state))
1787 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1791 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1795 r = bus_seal_message(bus, m, usec);
1799 r = bus_remarshal_message(bus, &m);
1803 s = bus_slot_allocate(bus, !slot, BUS_REPLY_CALLBACK, sizeof(struct reply_callback), userdata);
1807 s->reply_callback.callback = callback;
1809 s->reply_callback.cookie = BUS_MESSAGE_COOKIE(m);
1810 r = hashmap_put(bus->reply_callbacks, &s->reply_callback.cookie, &s->reply_callback);
1812 s->reply_callback.cookie = 0;
1816 s->reply_callback.timeout = calc_elapse(m->timeout);
1817 if (s->reply_callback.timeout != 0) {
1818 r = prioq_put(bus->reply_callbacks_prioq, &s->reply_callback, &s->reply_callback.prioq_idx);
1820 s->reply_callback.timeout = 0;
1825 r = sd_bus_send(bus, m, &s->reply_callback.cookie);
1836 int bus_ensure_running(sd_bus *bus) {
1841 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED || bus->state == BUS_CLOSING)
1843 if (bus->state == BUS_RUNNING)
1847 r = sd_bus_process(bus, NULL);
1850 if (bus->state == BUS_RUNNING)
1855 r = sd_bus_wait(bus, (uint64_t) -1);
1861 _public_ int sd_bus_call(
1865 sd_bus_error *error,
1866 sd_bus_message **reply) {
1868 _cleanup_bus_message_unref_ sd_bus_message *m = sd_bus_message_ref(_m);
1874 assert_return(bus, -EINVAL);
1875 assert_return(m, -EINVAL);
1876 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1877 assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1878 assert_return(!bus_error_is_dirty(error), -EINVAL);
1879 assert_return(!bus_pid_changed(bus), -ECHILD);
1880 assert_return(!bus->is_kernel || !(bus->hello_flags & KDBUS_HELLO_MONITOR), -EROFS);
1882 if (!BUS_IS_OPEN(bus->state))
1885 r = bus_ensure_running(bus);
1889 i = bus->rqueue_size;
1891 r = bus_seal_message(bus, m, usec);
1895 r = bus_remarshal_message(bus, &m);
1899 r = bus_send_internal(bus, m, &cookie, true);
1903 timeout = calc_elapse(m->timeout);
1908 while (i < bus->rqueue_size) {
1909 sd_bus_message *incoming = NULL;
1911 incoming = bus->rqueue[i];
1913 if (incoming->reply_cookie == cookie) {
1914 /* Found a match! */
1916 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1919 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1921 if (incoming->n_fds <= 0 || (bus->hello_flags & KDBUS_HELLO_ACCEPT_FD)) {
1925 sd_bus_message_unref(incoming);
1930 r = sd_bus_error_setf(error, SD_BUS_ERROR_INCONSISTENT_MESSAGE, "Reply message contained file descriptors which I couldn't accept. Sorry.");
1932 } else if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR)
1933 r = sd_bus_error_copy(error, &incoming->error);
1937 sd_bus_message_unref(incoming);
1940 } else if (BUS_MESSAGE_COOKIE(incoming) == cookie &&
1943 streq(bus->unique_name, incoming->sender)) {
1945 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1948 /* Our own message? Somebody is trying
1949 * to send its own client a message,
1950 * let's not dead-lock, let's fail
1953 sd_bus_message_unref(incoming);
1957 /* Try to read more, right-away */
1961 r = bus_read_message(bus, false, 0);
1963 if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
1964 bus_enter_closing(bus);
1976 n = now(CLOCK_MONOTONIC);
1982 left = (uint64_t) -1;
1984 r = bus_poll(bus, true, left);
1990 r = dispatch_wqueue(bus);
1992 if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
1993 bus_enter_closing(bus);
2002 _public_ int sd_bus_get_fd(sd_bus *bus) {
2004 assert_return(bus, -EINVAL);
2005 assert_return(bus->input_fd == bus->output_fd, -EPERM);
2006 assert_return(!bus_pid_changed(bus), -ECHILD);
2008 return bus->input_fd;
2011 _public_ int sd_bus_get_events(sd_bus *bus) {
2014 assert_return(bus, -EINVAL);
2015 assert_return(!bus_pid_changed(bus), -ECHILD);
2017 if (!BUS_IS_OPEN(bus->state) && bus->state != BUS_CLOSING)
2020 if (bus->state == BUS_OPENING)
2022 else if (bus->state == BUS_AUTHENTICATING) {
2024 if (bus_socket_auth_needs_write(bus))
2029 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
2030 if (bus->rqueue_size <= 0)
2032 if (bus->wqueue_size > 0)
2039 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
2040 struct reply_callback *c;
2042 assert_return(bus, -EINVAL);
2043 assert_return(timeout_usec, -EINVAL);
2044 assert_return(!bus_pid_changed(bus), -ECHILD);
2046 if (!BUS_IS_OPEN(bus->state) && bus->state != BUS_CLOSING)
2049 if (bus->track_queue) {
2054 if (bus->state == BUS_CLOSING) {
2059 if (bus->state == BUS_AUTHENTICATING) {
2060 *timeout_usec = bus->auth_timeout;
2064 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
2065 *timeout_usec = (uint64_t) -1;
2069 if (bus->rqueue_size > 0) {
2074 c = prioq_peek(bus->reply_callbacks_prioq);
2076 *timeout_usec = (uint64_t) -1;
2080 if (c->timeout == 0) {
2081 *timeout_usec = (uint64_t) -1;
2085 *timeout_usec = c->timeout;
2089 static int process_timeout(sd_bus *bus) {
2090 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2091 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
2092 struct reply_callback *c;
2099 c = prioq_peek(bus->reply_callbacks_prioq);
2103 n = now(CLOCK_MONOTONIC);
2107 r = bus_message_new_synthetic_error(
2110 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
2115 m->sender = "org.freedesktop.DBus";
2117 r = bus_seal_synthetic_message(bus, m);
2121 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
2124 hashmap_remove(bus->reply_callbacks, &c->cookie);
2127 slot = container_of(c, sd_bus_slot, reply_callback);
2129 bus->iteration_counter ++;
2131 bus->current_message = m;
2132 bus->current_slot = sd_bus_slot_ref(slot);
2133 r = c->callback(bus, m, slot->userdata, &error_buffer);
2134 bus->current_slot = sd_bus_slot_unref(slot);
2135 bus->current_message = NULL;
2137 if (slot->floating) {
2138 bus_slot_disconnect(slot);
2139 sd_bus_slot_unref(slot);
2142 return bus_maybe_reply_error(m, r, &error_buffer);
2145 static int process_hello(sd_bus *bus, sd_bus_message *m) {
2149 if (bus->state != BUS_HELLO)
2152 /* Let's make sure the first message on the bus is the HELLO
2153 * reply. But note that we don't actually parse the message
2154 * here (we leave that to the usual handling), we just verify
2155 * we don't let any earlier msg through. */
2157 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
2158 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
2161 if (m->reply_cookie != 1)
2167 static int process_reply(sd_bus *bus, sd_bus_message *m) {
2168 _cleanup_bus_message_unref_ sd_bus_message *synthetic_reply = NULL;
2169 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2170 struct reply_callback *c;
2177 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
2178 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
2181 if (bus->is_kernel && (bus->hello_flags & KDBUS_HELLO_MONITOR))
2184 if (m->destination && bus->unique_name && !streq_ptr(m->destination, bus->unique_name))
2187 c = hashmap_remove(bus->reply_callbacks, &m->reply_cookie);
2193 slot = container_of(c, sd_bus_slot, reply_callback);
2195 if (m->n_fds > 0 && !(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD)) {
2197 /* If the reply contained a file descriptor which we
2198 * didn't want we pass an error instead. */
2200 r = bus_message_new_synthetic_error(
2203 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_INCONSISTENT_MESSAGE, "Reply message contained file descriptor"),
2208 r = bus_seal_synthetic_message(bus, synthetic_reply);
2212 m = synthetic_reply;
2214 r = sd_bus_message_rewind(m, true);
2219 if (c->timeout != 0) {
2220 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
2224 bus->current_slot = sd_bus_slot_ref(slot);
2225 r = c->callback(bus, m, slot->userdata, &error_buffer);
2226 bus->current_slot = sd_bus_slot_unref(slot);
2228 if (slot->floating) {
2229 bus_slot_disconnect(slot);
2230 sd_bus_slot_unref(slot);
2233 return bus_maybe_reply_error(m, r, &error_buffer);
2236 static int process_filter(sd_bus *bus, sd_bus_message *m) {
2237 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2238 struct filter_callback *l;
2245 bus->filter_callbacks_modified = false;
2247 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
2250 if (bus->filter_callbacks_modified)
2253 /* Don't run this more than once per iteration */
2254 if (l->last_iteration == bus->iteration_counter)
2257 l->last_iteration = bus->iteration_counter;
2259 r = sd_bus_message_rewind(m, true);
2263 slot = container_of(l, sd_bus_slot, filter_callback);
2265 bus->current_slot = sd_bus_slot_ref(slot);
2266 r = l->callback(bus, m, slot->userdata, &error_buffer);
2267 bus->current_slot = sd_bus_slot_unref(slot);
2269 r = bus_maybe_reply_error(m, r, &error_buffer);
2275 } while (bus->filter_callbacks_modified);
2280 static int process_match(sd_bus *bus, sd_bus_message *m) {
2287 bus->match_callbacks_modified = false;
2289 r = bus_match_run(bus, &bus->match_callbacks, m);
2293 } while (bus->match_callbacks_modified);
2298 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
2299 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
2305 if (bus->hello_flags & KDBUS_HELLO_MONITOR)
2308 if (bus->manual_peer_interface)
2311 if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
2314 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
2317 if (m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED)
2320 if (streq_ptr(m->member, "Ping"))
2321 r = sd_bus_message_new_method_return(m, &reply);
2322 else if (streq_ptr(m->member, "GetMachineId")) {
2326 r = sd_id128_get_machine(&id);
2330 r = sd_bus_message_new_method_return(m, &reply);
2334 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
2336 r = sd_bus_message_new_method_errorf(
2338 SD_BUS_ERROR_UNKNOWN_METHOD,
2339 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
2345 r = sd_bus_send(bus, reply, NULL);
2352 static int process_fd_check(sd_bus *bus, sd_bus_message *m) {
2356 /* If we got a message with a file descriptor which we didn't
2357 * want to accept, then let's drop it. How can this even
2358 * happen? For example, when the kernel queues a message into
2359 * an activatable names's queue which allows fds, and then is
2360 * delivered to us later even though we ourselves did not
2363 if (bus->hello_flags & KDBUS_HELLO_MONITOR)
2369 if (bus->hello_flags & KDBUS_HELLO_ACCEPT_FD)
2372 if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
2373 return 1; /* just eat it up */
2375 return sd_bus_reply_method_errorf(m, SD_BUS_ERROR_INCONSISTENT_MESSAGE, "Message contains file descriptors, which I cannot accept. Sorry.");
2378 static int process_message(sd_bus *bus, sd_bus_message *m) {
2384 bus->current_message = m;
2385 bus->iteration_counter++;
2387 log_debug("Got message type=%s sender=%s destination=%s object=%s interface=%s member=%s cookie=%" PRIu64 " reply_cookie=%" PRIu64 " error=%s",
2388 bus_message_type_to_string(m->header->type),
2389 strna(sd_bus_message_get_sender(m)),
2390 strna(sd_bus_message_get_destination(m)),
2391 strna(sd_bus_message_get_path(m)),
2392 strna(sd_bus_message_get_interface(m)),
2393 strna(sd_bus_message_get_member(m)),
2394 BUS_MESSAGE_COOKIE(m),
2396 strna(m->error.message));
2398 r = process_hello(bus, m);
2402 r = process_reply(bus, m);
2406 r = process_fd_check(bus, m);
2410 r = process_filter(bus, m);
2414 r = process_match(bus, m);
2418 r = process_builtin(bus, m);
2422 r = bus_process_object(bus, m);
2425 bus->current_message = NULL;
2429 static int dispatch_track(sd_bus *bus) {
2432 if (!bus->track_queue)
2435 bus_track_dispatch(bus->track_queue);
2439 static int process_running(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **ret) {
2440 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2444 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2446 r = process_timeout(bus);
2450 r = dispatch_wqueue(bus);
2454 r = dispatch_track(bus);
2458 r = dispatch_rqueue(bus, hint_priority, priority, &m);
2464 r = process_message(bus, m);
2469 r = sd_bus_message_rewind(m, true);
2478 if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2480 log_debug("Unprocessed message call sender=%s object=%s interface=%s member=%s",
2481 strna(sd_bus_message_get_sender(m)),
2482 strna(sd_bus_message_get_path(m)),
2483 strna(sd_bus_message_get_interface(m)),
2484 strna(sd_bus_message_get_member(m)));
2486 r = sd_bus_reply_method_errorf(
2488 SD_BUS_ERROR_UNKNOWN_OBJECT,
2489 "Unknown object '%s'.", m->path);
2503 static int process_closing(sd_bus *bus, sd_bus_message **ret) {
2504 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2505 struct reply_callback *c;
2509 assert(bus->state == BUS_CLOSING);
2511 c = hashmap_first(bus->reply_callbacks);
2513 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2516 /* First, fail all outstanding method calls */
2517 r = bus_message_new_synthetic_error(
2520 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Connection terminated"),
2525 r = bus_seal_synthetic_message(bus, m);
2529 if (c->timeout != 0) {
2530 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
2534 hashmap_remove(bus->reply_callbacks, &c->cookie);
2537 slot = container_of(c, sd_bus_slot, reply_callback);
2539 bus->iteration_counter++;
2541 bus->current_message = m;
2542 bus->current_slot = sd_bus_slot_ref(slot);
2543 r = c->callback(bus, m, slot->userdata, &error_buffer);
2544 bus->current_slot = sd_bus_slot_unref(slot);
2545 bus->current_message = NULL;
2547 if (slot->floating) {
2548 bus_slot_disconnect(slot);
2549 sd_bus_slot_unref(slot);
2552 return bus_maybe_reply_error(m, r, &error_buffer);
2555 /* Then, synthesize a Disconnected message */
2556 r = sd_bus_message_new_signal(
2559 "/org/freedesktop/DBus/Local",
2560 "org.freedesktop.DBus.Local",
2565 m->sender = "org.freedesktop.DBus.Local";
2567 r = bus_seal_synthetic_message(bus, m);
2573 bus->current_message = m;
2574 bus->iteration_counter++;
2576 r = process_filter(bus, m);
2580 r = process_match(bus, m);
2592 bus->current_message = NULL;
2597 static int bus_process_internal(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **ret) {
2598 BUS_DONT_DESTROY(bus);
2601 /* Returns 0 when we didn't do anything. This should cause the
2602 * caller to invoke sd_bus_wait() before returning the next
2603 * time. Returns > 0 when we did something, which possibly
2604 * means *ret is filled in with an unprocessed message. */
2606 assert_return(bus, -EINVAL);
2607 assert_return(!bus_pid_changed(bus), -ECHILD);
2609 /* We don't allow recursively invoking sd_bus_process(). */
2610 assert_return(!bus->current_message, -EBUSY);
2611 assert(!bus->current_slot);
2613 switch (bus->state) {
2622 r = bus_socket_process_opening(bus);
2623 if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2624 bus_enter_closing(bus);
2632 case BUS_AUTHENTICATING:
2633 r = bus_socket_process_authenticating(bus);
2634 if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2635 bus_enter_closing(bus);
2647 r = process_running(bus, hint_priority, priority, ret);
2648 if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2649 bus_enter_closing(bus);
2659 return process_closing(bus, ret);
2662 assert_not_reached("Unknown state");
2665 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2666 return bus_process_internal(bus, false, 0, ret);
2669 _public_ int sd_bus_process_priority(sd_bus *bus, int64_t priority, sd_bus_message **ret) {
2670 return bus_process_internal(bus, true, priority, ret);
2673 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2674 struct pollfd p[2] = {};
2677 usec_t m = (usec_t) -1;
2681 if (bus->state == BUS_CLOSING)
2684 if (!BUS_IS_OPEN(bus->state))
2687 e = sd_bus_get_events(bus);
2692 /* The caller really needs some more data, he doesn't
2693 * care about what's already read, or any timeouts
2698 /* The caller wants to process if there's something to
2699 * process, but doesn't care otherwise */
2701 r = sd_bus_get_timeout(bus, &until);
2706 nw = now(CLOCK_MONOTONIC);
2707 m = until > nw ? until - nw : 0;
2711 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2714 p[0].fd = bus->input_fd;
2715 if (bus->output_fd == bus->input_fd) {
2719 p[0].events = e & POLLIN;
2720 p[1].fd = bus->output_fd;
2721 p[1].events = e & POLLOUT;
2725 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2729 return r > 0 ? 1 : 0;
2732 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2734 assert_return(bus, -EINVAL);
2735 assert_return(!bus_pid_changed(bus), -ECHILD);
2737 if (bus->state == BUS_CLOSING)
2740 if (!BUS_IS_OPEN(bus->state))
2743 if (bus->rqueue_size > 0)
2746 return bus_poll(bus, false, timeout_usec);
2749 _public_ int sd_bus_flush(sd_bus *bus) {
2752 assert_return(bus, -EINVAL);
2753 assert_return(!bus_pid_changed(bus), -ECHILD);
2755 if (bus->state == BUS_CLOSING)
2758 if (!BUS_IS_OPEN(bus->state))
2761 r = bus_ensure_running(bus);
2765 if (bus->wqueue_size <= 0)
2769 r = dispatch_wqueue(bus);
2771 if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2772 bus_enter_closing(bus);
2779 if (bus->wqueue_size <= 0)
2782 r = bus_poll(bus, false, (uint64_t) -1);
2788 _public_ int sd_bus_add_filter(
2791 sd_bus_message_handler_t callback,
2796 assert_return(bus, -EINVAL);
2797 assert_return(callback, -EINVAL);
2798 assert_return(!bus_pid_changed(bus), -ECHILD);
2800 s = bus_slot_allocate(bus, !slot, BUS_FILTER_CALLBACK, sizeof(struct filter_callback), userdata);
2804 s->filter_callback.callback = callback;
2806 bus->filter_callbacks_modified = true;
2807 LIST_PREPEND(callbacks, bus->filter_callbacks, &s->filter_callback);
2815 _public_ int sd_bus_add_match(
2819 sd_bus_message_handler_t callback,
2822 struct bus_match_component *components = NULL;
2823 unsigned n_components = 0;
2824 sd_bus_slot *s = NULL;
2827 assert_return(bus, -EINVAL);
2828 assert_return(match, -EINVAL);
2829 assert_return(!bus_pid_changed(bus), -ECHILD);
2831 r = bus_match_parse(match, &components, &n_components);
2835 s = bus_slot_allocate(bus, !slot, BUS_MATCH_CALLBACK, sizeof(struct match_callback), userdata);
2841 s->match_callback.callback = callback;
2842 s->match_callback.cookie = ++bus->match_cookie;
2844 if (bus->bus_client) {
2846 if (!bus->is_kernel) {
2847 /* When this is not a kernel transport, we
2848 * store the original match string, so that we
2849 * can use it to remove the match again */
2851 s->match_callback.match_string = strdup(match);
2852 if (!s->match_callback.match_string) {
2858 r = bus_add_match_internal(bus, s->match_callback.match_string, components, n_components, s->match_callback.cookie);
2863 bus->match_callbacks_modified = true;
2864 r = bus_match_add(&bus->match_callbacks, components, n_components, &s->match_callback);
2873 bus_match_parse_free(components, n_components);
2874 sd_bus_slot_unref(s);
2879 int bus_remove_match_by_string(
2882 sd_bus_message_handler_t callback,
2885 struct bus_match_component *components = NULL;
2886 unsigned n_components = 0;
2887 struct match_callback *c;
2890 assert_return(bus, -EINVAL);
2891 assert_return(match, -EINVAL);
2892 assert_return(!bus_pid_changed(bus), -ECHILD);
2894 r = bus_match_parse(match, &components, &n_components);
2898 r = bus_match_find(&bus->match_callbacks, components, n_components, NULL, NULL, &c);
2902 sd_bus_slot_unref(container_of(c, sd_bus_slot, match_callback));
2905 bus_match_parse_free(components, n_components);
2910 bool bus_pid_changed(sd_bus *bus) {
2913 /* We don't support people creating a bus connection and
2914 * keeping it around over a fork(). Let's complain. */
2916 return bus->original_pid != getpid();
2919 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2920 sd_bus *bus = userdata;
2925 r = sd_bus_process(bus, NULL);
2932 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2933 sd_bus *bus = userdata;
2938 r = sd_bus_process(bus, NULL);
2945 static int prepare_callback(sd_event_source *s, void *userdata) {
2946 sd_bus *bus = userdata;
2953 e = sd_bus_get_events(bus);
2957 if (bus->output_fd != bus->input_fd) {
2959 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2963 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2967 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2972 r = sd_bus_get_timeout(bus, &until);
2978 j = sd_event_source_set_time(bus->time_event_source, until);
2983 r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2990 static int quit_callback(sd_event_source *event, void *userdata) {
2991 sd_bus *bus = userdata;
3000 static int attach_io_events(sd_bus *bus) {
3005 if (bus->input_fd < 0)
3011 if (!bus->input_io_event_source) {
3012 r = sd_event_add_io(bus->event, &bus->input_io_event_source, bus->input_fd, 0, io_callback, bus);
3016 r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
3020 r = sd_event_source_set_priority(bus->input_io_event_source, bus->event_priority);
3022 r = sd_event_source_set_io_fd(bus->input_io_event_source, bus->input_fd);
3027 if (bus->output_fd != bus->input_fd) {
3028 assert(bus->output_fd >= 0);
3030 if (!bus->output_io_event_source) {
3031 r = sd_event_add_io(bus->event, &bus->output_io_event_source, bus->output_fd, 0, io_callback, bus);
3035 r = sd_event_source_set_priority(bus->output_io_event_source, bus->event_priority);
3037 r = sd_event_source_set_io_fd(bus->output_io_event_source, bus->output_fd);
3046 static void detach_io_events(sd_bus *bus) {
3049 if (bus->input_io_event_source) {
3050 sd_event_source_set_enabled(bus->input_io_event_source, SD_EVENT_OFF);
3051 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
3054 if (bus->output_io_event_source) {
3055 sd_event_source_set_enabled(bus->output_io_event_source, SD_EVENT_OFF);
3056 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
3060 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
3063 assert_return(bus, -EINVAL);
3064 assert_return(!bus->event, -EBUSY);
3066 assert(!bus->input_io_event_source);
3067 assert(!bus->output_io_event_source);
3068 assert(!bus->time_event_source);
3071 bus->event = sd_event_ref(event);
3073 r = sd_event_default(&bus->event);
3078 bus->event_priority = priority;
3080 r = sd_event_add_time(bus->event, &bus->time_event_source, CLOCK_MONOTONIC, 0, 0, time_callback, bus);
3084 r = sd_event_source_set_priority(bus->time_event_source, priority);
3088 r = sd_event_add_exit(bus->event, &bus->quit_event_source, quit_callback, bus);
3092 r = attach_io_events(bus);
3099 sd_bus_detach_event(bus);
3103 _public_ int sd_bus_detach_event(sd_bus *bus) {
3104 assert_return(bus, -EINVAL);
3109 detach_io_events(bus);
3111 if (bus->time_event_source) {
3112 sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);
3113 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
3116 if (bus->quit_event_source) {
3117 sd_event_source_set_enabled(bus->quit_event_source, SD_EVENT_OFF);
3118 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
3121 bus->event = sd_event_unref(bus->event);
3125 _public_ sd_event* sd_bus_get_event(sd_bus *bus) {
3126 assert_return(bus, NULL);
3131 _public_ sd_bus_message* sd_bus_get_current_message(sd_bus *bus) {
3132 assert_return(bus, NULL);
3134 return bus->current_message;
3137 _public_ sd_bus_slot* sd_bus_get_current_slot(sd_bus *bus) {
3138 assert_return(bus, NULL);
3140 return bus->current_slot;
3143 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
3148 assert(default_bus);
3151 return !!*default_bus;
3154 *ret = sd_bus_ref(*default_bus);
3162 b->default_bus_ptr = default_bus;
3170 _public_ int sd_bus_default_system(sd_bus **ret) {
3171 static thread_local sd_bus *default_system_bus = NULL;
3173 return bus_default(sd_bus_open_system, &default_system_bus, ret);
3176 _public_ int sd_bus_default_user(sd_bus **ret) {
3177 static thread_local sd_bus *default_user_bus = NULL;
3179 return bus_default(sd_bus_open_user, &default_user_bus, ret);
3182 _public_ int sd_bus_default(sd_bus **ret) {
3186 /* Let's try our best to reuse another cached connection. If
3187 * the starter bus type is set, connect via our normal
3188 * connection logic, ignoring $DBUS_STARTER_ADDRESS, so that
3189 * we can share the connection with the user/system default
3192 e = secure_getenv("DBUS_STARTER_BUS_TYPE");
3194 if (streq(e, "system"))
3195 return sd_bus_default_system(ret);
3196 else if (STR_IN_SET(e, "user", "session"))
3197 return sd_bus_default_user(ret);
3200 /* No type is specified, so we have not other option than to
3201 * use the starter address if it is set. */
3203 e = secure_getenv("DBUS_STARTER_ADDRESS");
3205 static thread_local sd_bus *default_starter_bus = NULL;
3207 return bus_default(sd_bus_open, &default_starter_bus, ret);
3210 /* Finally, if nothing is set use the cached connection for
3211 * the right scope */
3213 if (cg_pid_get_owner_uid(0, NULL) >= 0)
3214 return sd_bus_default_user(ret);
3216 return sd_bus_default_system(ret);
3219 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
3220 assert_return(b, -EINVAL);
3221 assert_return(tid, -EINVAL);
3222 assert_return(!bus_pid_changed(b), -ECHILD);
3230 return sd_event_get_tid(b->event, tid);
3235 _public_ int sd_bus_path_encode(const char *prefix, const char *external_id, char **ret_path) {
3236 _cleanup_free_ char *e = NULL;
3239 assert_return(object_path_is_valid(prefix), -EINVAL);
3240 assert_return(external_id, -EINVAL);
3241 assert_return(ret_path, -EINVAL);
3243 e = bus_label_escape(external_id);
3247 ret = strjoin(prefix, "/", e, NULL);
3255 _public_ int sd_bus_path_decode(const char *path, const char *prefix, char **external_id) {
3259 assert_return(object_path_is_valid(path), -EINVAL);
3260 assert_return(object_path_is_valid(prefix), -EINVAL);
3261 assert_return(external_id, -EINVAL);
3263 e = object_path_startswith(path, prefix);
3265 *external_id = NULL;
3269 ret = bus_label_unescape(e);
3277 _public_ int sd_bus_get_peer_creds(sd_bus *bus, uint64_t mask, sd_bus_creds **ret) {
3282 assert_return(bus, -EINVAL);
3283 assert_return(mask <= _SD_BUS_CREDS_ALL, -ENOTSUP);
3284 assert_return(ret, -EINVAL);
3285 assert_return(!bus_pid_changed(bus), -ECHILD);
3290 if (!BUS_IS_OPEN(bus->state))
3293 if (!bus->ucred_valid && !isempty(bus->label))
3296 c = bus_creds_new();
3300 if (bus->ucred_valid) {
3301 pid = c->pid = bus->ucred.pid;
3302 c->uid = bus->ucred.uid;
3303 c->gid = bus->ucred.gid;
3305 c->mask |= (SD_BUS_CREDS_UID | SD_BUS_CREDS_PID | SD_BUS_CREDS_GID) & mask;
3308 if (!isempty(bus->label) && (mask & SD_BUS_CREDS_SELINUX_CONTEXT)) {
3309 c->label = strdup(bus->label);
3311 sd_bus_creds_unref(c);
3315 c->mask |= SD_BUS_CREDS_SELINUX_CONTEXT;
3318 r = bus_creds_add_more(c, mask, pid, 0);
3326 _public_ int sd_bus_try_close(sd_bus *bus) {
3329 assert_return(bus, -EINVAL);
3330 assert_return(!bus_pid_changed(bus), -ECHILD);
3332 if (!bus->is_kernel)
3335 if (!BUS_IS_OPEN(bus->state))
3338 if (bus->rqueue_size > 0)
3341 if (bus->wqueue_size > 0)
3344 r = bus_kernel_try_close(bus);
3352 _public_ int sd_bus_get_name(sd_bus *bus, const char **name) {
3353 assert_return(bus, -EINVAL);
3354 assert_return(name, -EINVAL);
3355 assert_return(!bus_pid_changed(bus), -ECHILD);
3357 *name = bus->connection_name;