1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
39 #include "bus-internal.h"
40 #include "bus-message.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
49 #include "bus-container.h"
50 #include "bus-protocol.h"
52 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
53 static int attach_io_events(sd_bus *b);
54 static void detach_io_events(sd_bus *b);
56 static void bus_close_fds(sd_bus *b) {
62 close_nointr_nofail(b->input_fd);
64 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
65 close_nointr_nofail(b->output_fd);
67 b->input_fd = b->output_fd = -1;
70 static void bus_node_destroy(sd_bus *b, struct node *n) {
71 struct node_callback *c;
72 struct node_vtable *v;
73 struct node_enumerator *e;
81 bus_node_destroy(b, n->child);
83 while ((c = n->callbacks)) {
84 LIST_REMOVE(callbacks, n->callbacks, c);
88 while ((v = n->vtables)) {
89 LIST_REMOVE(vtables, n->vtables, v);
94 while ((e = n->enumerators)) {
95 LIST_REMOVE(enumerators, n->enumerators, e);
100 LIST_REMOVE(siblings, n->parent->child, n);
102 assert_se(hashmap_remove(b->nodes, n->path) == n);
107 static void bus_reset_queues(sd_bus *b) {
112 for (i = 0; i < b->rqueue_size; i++)
113 sd_bus_message_unref(b->rqueue[i]);
116 for (i = 0; i < b->wqueue_size; i++)
117 sd_bus_message_unref(b->wqueue[i]);
120 b->rqueue = b->wqueue = NULL;
121 b->rqueue_allocated = b->wqueue_allocated = 0;
122 b->rqueue_size = b->wqueue_size = 0;
125 static void bus_free(sd_bus *b) {
126 struct filter_callback *f;
131 sd_bus_detach_event(b);
136 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
139 free(b->unique_name);
140 free(b->auth_buffer);
145 free(b->cgroup_root);
148 strv_free(b->exec_argv);
150 close_many(b->fds, b->n_fds);
155 hashmap_free_free(b->reply_callbacks);
156 prioq_free(b->reply_callbacks_prioq);
158 while ((f = b->filter_callbacks)) {
159 LIST_REMOVE(callbacks, b->filter_callbacks, f);
163 bus_match_free(&b->match_callbacks);
165 hashmap_free_free(b->vtable_methods);
166 hashmap_free_free(b->vtable_properties);
168 while ((n = hashmap_first(b->nodes)))
169 bus_node_destroy(b, n);
171 hashmap_free(b->nodes);
173 bus_kernel_flush_memfd(b);
175 assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
180 _public_ int sd_bus_new(sd_bus **ret) {
183 assert_return(ret, -EINVAL);
189 r->n_ref = REFCNT_INIT;
190 r->input_fd = r->output_fd = -1;
191 r->message_version = 1;
192 r->creds_mask |= SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
193 r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
194 r->attach_flags |= KDBUS_ATTACH_NAMES;
195 r->original_pid = getpid();
197 assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
199 /* We guarantee that wqueue always has space for at least one
201 if (!GREEDY_REALLOC(r->wqueue, r->wqueue_allocated, 1)) {
210 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
213 assert_return(bus, -EINVAL);
214 assert_return(bus->state == BUS_UNSET, -EPERM);
215 assert_return(address, -EINVAL);
216 assert_return(!bus_pid_changed(bus), -ECHILD);
228 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
229 assert_return(bus, -EINVAL);
230 assert_return(bus->state == BUS_UNSET, -EPERM);
231 assert_return(input_fd >= 0, -EINVAL);
232 assert_return(output_fd >= 0, -EINVAL);
233 assert_return(!bus_pid_changed(bus), -ECHILD);
235 bus->input_fd = input_fd;
236 bus->output_fd = output_fd;
240 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
243 assert_return(bus, -EINVAL);
244 assert_return(bus->state == BUS_UNSET, -EPERM);
245 assert_return(path, -EINVAL);
246 assert_return(!strv_isempty(argv), -EINVAL);
247 assert_return(!bus_pid_changed(bus), -ECHILD);
259 free(bus->exec_path);
260 strv_free(bus->exec_argv);
268 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
269 assert_return(bus, -EINVAL);
270 assert_return(bus->state == BUS_UNSET, -EPERM);
271 assert_return(!bus_pid_changed(bus), -ECHILD);
273 bus->bus_client = !!b;
277 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
278 assert_return(bus, -EINVAL);
279 assert_return(bus->state == BUS_UNSET, -EPERM);
280 assert_return(!bus_pid_changed(bus), -ECHILD);
282 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
286 _public_ int sd_bus_negotiate_attach_timestamp(sd_bus *bus, int b) {
287 assert_return(bus, -EINVAL);
288 assert_return(bus->state == BUS_UNSET, -EPERM);
289 assert_return(!bus_pid_changed(bus), -ECHILD);
291 SET_FLAG(bus->attach_flags, KDBUS_ATTACH_TIMESTAMP, b);
295 _public_ int sd_bus_negotiate_attach_creds(sd_bus *bus, uint64_t mask) {
296 assert_return(bus, -EINVAL);
297 assert_return(mask <= _SD_BUS_CREDS_ALL, -EINVAL);
298 assert_return(bus->state == BUS_UNSET, -EPERM);
299 assert_return(!bus_pid_changed(bus), -ECHILD);
301 /* The well knowns we need unconditionally, so that matches can work */
302 bus->creds_mask = mask | SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
304 return kdbus_translate_attach_flags(bus->creds_mask, &bus->creds_mask);
307 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
308 assert_return(bus, -EINVAL);
309 assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
310 assert_return(bus->state == BUS_UNSET, -EPERM);
311 assert_return(!bus_pid_changed(bus), -ECHILD);
313 bus->is_server = !!b;
314 bus->server_id = server_id;
318 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
319 assert_return(bus, -EINVAL);
320 assert_return(bus->state == BUS_UNSET, -EPERM);
321 assert_return(!bus_pid_changed(bus), -ECHILD);
323 bus->anonymous_auth = !!b;
327 _public_ int sd_bus_set_trusted(sd_bus *bus, int b) {
328 assert_return(bus, -EINVAL);
329 assert_return(bus->state == BUS_UNSET, -EPERM);
330 assert_return(!bus_pid_changed(bus), -ECHILD);
336 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata, sd_bus_error *error) {
341 assert(bus->state == BUS_HELLO || bus->state == BUS_CLOSING);
344 r = sd_bus_message_get_errno(reply);
350 r = sd_bus_message_read(reply, "s", &s);
354 if (!service_name_is_valid(s) || s[0] != ':')
357 bus->unique_name = strdup(s);
358 if (!bus->unique_name)
361 if (bus->state == BUS_HELLO)
362 bus->state = BUS_RUNNING;
367 static int bus_send_hello(sd_bus *bus) {
368 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
373 if (!bus->bus_client || bus->is_kernel)
376 r = sd_bus_message_new_method_call(
378 "org.freedesktop.DBus",
379 "/org/freedesktop/DBus",
380 "org.freedesktop.DBus",
386 return sd_bus_call_async(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
389 int bus_start_running(sd_bus *bus) {
392 if (bus->bus_client && !bus->is_kernel) {
393 bus->state = BUS_HELLO;
397 bus->state = BUS_RUNNING;
401 static int parse_address_key(const char **p, const char *key, char **value) {
402 size_t l, n = 0, allocated = 0;
412 if (strncmp(*p, key, l) != 0)
425 while (*a != ';' && *a != ',' && *a != 0) {
443 c = (char) ((x << 4) | y);
450 if (!GREEDY_REALLOC(r, allocated, n + 2))
474 static void skip_address_key(const char **p) {
478 *p += strcspn(*p, ",");
484 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
485 _cleanup_free_ char *path = NULL, *abstract = NULL;
494 while (**p != 0 && **p != ';') {
495 r = parse_address_key(p, "guid", guid);
501 r = parse_address_key(p, "path", &path);
507 r = parse_address_key(p, "abstract", &abstract);
516 if (!path && !abstract)
519 if (path && abstract)
524 if (l > sizeof(b->sockaddr.un.sun_path))
527 b->sockaddr.un.sun_family = AF_UNIX;
528 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
529 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
530 } else if (abstract) {
531 l = strlen(abstract);
532 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
535 b->sockaddr.un.sun_family = AF_UNIX;
536 b->sockaddr.un.sun_path[0] = 0;
537 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
538 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
544 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
545 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
547 struct addrinfo *result, hints = {
548 .ai_socktype = SOCK_STREAM,
549 .ai_flags = AI_ADDRCONFIG,
557 while (**p != 0 && **p != ';') {
558 r = parse_address_key(p, "guid", guid);
564 r = parse_address_key(p, "host", &host);
570 r = parse_address_key(p, "port", &port);
576 r = parse_address_key(p, "family", &family);
589 if (streq(family, "ipv4"))
590 hints.ai_family = AF_INET;
591 else if (streq(family, "ipv6"))
592 hints.ai_family = AF_INET6;
597 r = getaddrinfo(host, port, &hints, &result);
601 return -EADDRNOTAVAIL;
603 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
604 b->sockaddr_size = result->ai_addrlen;
606 freeaddrinfo(result);
611 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
613 unsigned n_argv = 0, j;
615 size_t allocated = 0;
623 while (**p != 0 && **p != ';') {
624 r = parse_address_key(p, "guid", guid);
630 r = parse_address_key(p, "path", &path);
636 if (startswith(*p, "argv")) {
640 ul = strtoul(*p + 4, (char**) p, 10);
641 if (errno > 0 || **p != '=' || ul > 256) {
649 if (!GREEDY_REALLOC0(argv, allocated, ul + 2)) {
657 r = parse_address_key(p, NULL, argv + ul);
672 /* Make sure there are no holes in the array, with the
673 * exception of argv[0] */
674 for (j = 1; j < n_argv; j++)
680 if (argv && argv[0] == NULL) {
681 argv[0] = strdup(path);
693 for (j = 0; j < n_argv; j++)
701 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
702 _cleanup_free_ char *path = NULL;
710 while (**p != 0 && **p != ';') {
711 r = parse_address_key(p, "guid", guid);
717 r = parse_address_key(p, "path", &path);
736 static int parse_container_unix_address(sd_bus *b, const char **p, char **guid) {
737 _cleanup_free_ char *machine = NULL;
745 while (**p != 0 && **p != ';') {
746 r = parse_address_key(p, "guid", guid);
752 r = parse_address_key(p, "machine", &machine);
764 if (!filename_is_safe(machine))
768 b->machine = machine;
771 b->sockaddr.un.sun_family = AF_UNIX;
772 strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
773 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
778 static int parse_container_kernel_address(sd_bus *b, const char **p, char **guid) {
779 _cleanup_free_ char *machine = NULL;
787 while (**p != 0 && **p != ';') {
788 r = parse_address_key(p, "guid", guid);
794 r = parse_address_key(p, "machine", &machine);
806 if (!filename_is_safe(machine))
810 b->machine = machine;
814 b->kernel = strdup("/dev/kdbus/0-system/bus");
821 static void bus_reset_parsed_address(sd_bus *b) {
825 b->sockaddr_size = 0;
826 strv_free(b->exec_argv);
830 b->server_id = SD_ID128_NULL;
837 static int bus_parse_next_address(sd_bus *b) {
838 _cleanup_free_ char *guid = NULL;
846 if (b->address[b->address_index] == 0)
849 bus_reset_parsed_address(b);
851 a = b->address + b->address_index;
860 if (startswith(a, "unix:")) {
863 r = parse_unix_address(b, &a, &guid);
868 } else if (startswith(a, "tcp:")) {
871 r = parse_tcp_address(b, &a, &guid);
877 } else if (startswith(a, "unixexec:")) {
880 r = parse_exec_address(b, &a, &guid);
886 } else if (startswith(a, "kernel:")) {
889 r = parse_kernel_address(b, &a, &guid);
894 } else if (startswith(a, "x-container-unix:")) {
897 r = parse_container_unix_address(b, &a, &guid);
902 } else if (startswith(a, "x-container-kernel:")) {
905 r = parse_container_kernel_address(b, &a, &guid);
918 r = sd_id128_from_string(guid, &b->server_id);
923 b->address_index = a - b->address;
927 static int bus_start_address(sd_bus *b) {
933 bool skipped = false;
938 r = bus_socket_exec(b);
939 else if (b->machine && b->kernel)
940 r = bus_container_connect_kernel(b);
941 else if (b->machine && b->sockaddr.sa.sa_family != AF_UNSPEC)
942 r = bus_container_connect_socket(b);
944 r = bus_kernel_connect(b);
945 else if (b->sockaddr.sa.sa_family != AF_UNSPEC)
946 r = bus_socket_connect(b);
952 r = attach_io_events(b);
957 b->last_connect_error = -r;
960 r = bus_parse_next_address(b);
964 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
968 int bus_next_address(sd_bus *b) {
971 bus_reset_parsed_address(b);
972 return bus_start_address(b);
975 static int bus_start_fd(sd_bus *b) {
980 assert(b->input_fd >= 0);
981 assert(b->output_fd >= 0);
983 r = fd_nonblock(b->input_fd, true);
987 r = fd_cloexec(b->input_fd, true);
991 if (b->input_fd != b->output_fd) {
992 r = fd_nonblock(b->output_fd, true);
996 r = fd_cloexec(b->output_fd, true);
1001 if (fstat(b->input_fd, &st) < 0)
1004 if (S_ISCHR(b->input_fd))
1005 return bus_kernel_take_fd(b);
1007 return bus_socket_take_fd(b);
1010 _public_ int sd_bus_start(sd_bus *bus) {
1013 assert_return(bus, -EINVAL);
1014 assert_return(bus->state == BUS_UNSET, -EPERM);
1015 assert_return(!bus_pid_changed(bus), -ECHILD);
1017 bus->state = BUS_OPENING;
1019 if (bus->is_server && bus->bus_client)
1022 if (bus->input_fd >= 0)
1023 r = bus_start_fd(bus);
1024 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
1025 r = bus_start_address(bus);
1032 return bus_send_hello(bus);
1035 _public_ int sd_bus_open_system(sd_bus **ret) {
1040 assert_return(ret, -EINVAL);
1046 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1048 r = sd_bus_set_address(b, e);
1051 r = sd_bus_set_address(b, "kernel:path=/dev/kdbus/0-system/bus;unix:path=/run/dbus/system_bus_socket");
1053 r = sd_bus_set_address(b, "unix:path=/run/dbus/system_bus_socket");
1059 b->bus_client = true;
1061 /* Let's do per-method access control on the system bus. We
1062 * need the caller's UID and capability set for that. */
1064 b->attach_flags |= KDBUS_ATTACH_CAPS | KDBUS_ATTACH_CREDS;
1066 r = sd_bus_start(b);
1078 _public_ int sd_bus_open_user(sd_bus **ret) {
1083 assert_return(ret, -EINVAL);
1089 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1091 r = sd_bus_set_address(b, e);
1095 e = secure_getenv("XDG_RUNTIME_DIR");
1097 _cleanup_free_ char *ee = NULL;
1099 ee = bus_address_escape(e);
1106 asprintf(&b->address, "kernel:path=/dev/kdbus/%lu-user/bus;unix:path=%s/bus", (unsigned long) getuid(), ee);
1108 b->address = strjoin("unix:path=", ee, "/bus", NULL);
1112 asprintf(&b->address, "kernel:path=/dev/kdbus/%lu-user/bus", (unsigned long) getuid());
1114 return -ECONNREFUSED;
1124 b->bus_client = true;
1126 /* We don't do any per-method access control on the user
1130 r = sd_bus_start(b);
1142 _public_ int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1143 _cleanup_free_ char *e = NULL;
1148 assert_return(host, -EINVAL);
1149 assert_return(ret, -EINVAL);
1151 e = bus_address_escape(host);
1155 p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1159 r = sd_bus_new(&bus);
1166 bus->bus_client = true;
1168 r = sd_bus_start(bus);
1178 _public_ int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1179 _cleanup_free_ char *e = NULL;
1184 assert_return(machine, -EINVAL);
1185 assert_return(ret, -EINVAL);
1186 assert_return(filename_is_safe(machine), -EINVAL);
1188 e = bus_address_escape(machine);
1193 p = strjoin("x-container-kernel:machine=", e, ";x-container-unix:machine=", e, NULL);
1195 p = strjoin("x-container-unix:machine=", e, NULL);
1200 r = sd_bus_new(&bus);
1207 bus->bus_client = true;
1209 r = sd_bus_start(bus);
1219 _public_ void sd_bus_close(sd_bus *bus) {
1223 if (bus->state == BUS_CLOSED)
1225 if (bus_pid_changed(bus))
1228 bus->state = BUS_CLOSED;
1230 sd_bus_detach_event(bus);
1232 /* Drop all queued messages so that they drop references to
1233 * the bus object and the bus may be freed */
1234 bus_reset_queues(bus);
1236 if (!bus->is_kernel)
1239 /* We'll leave the fd open in case this is a kernel bus, since
1240 * there might still be memblocks around that reference this
1241 * bus, and they might need to invoke the * KDBUS_CMD_FREE
1242 * ioctl on the fd when they are freed. */
1245 static void bus_enter_closing(sd_bus *bus) {
1248 if (bus->state != BUS_OPENING &&
1249 bus->state != BUS_AUTHENTICATING &&
1250 bus->state != BUS_HELLO &&
1251 bus->state != BUS_RUNNING)
1254 bus->state = BUS_CLOSING;
1257 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1258 assert_return(bus, NULL);
1260 assert_se(REFCNT_INC(bus->n_ref) >= 2);
1265 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1270 if (REFCNT_DEC(bus->n_ref) <= 0)
1276 _public_ int sd_bus_is_open(sd_bus *bus) {
1278 assert_return(bus, -EINVAL);
1279 assert_return(!bus_pid_changed(bus), -ECHILD);
1281 return BUS_IS_OPEN(bus->state);
1284 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1287 assert_return(bus, -EINVAL);
1288 assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1289 assert_return(!bus_pid_changed(bus), -ECHILD);
1291 if (type == SD_BUS_TYPE_UNIX_FD) {
1292 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1295 r = bus_ensure_running(bus);
1299 return bus->can_fds;
1302 return bus_type_is_valid(type);
1305 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1308 assert_return(bus, -EINVAL);
1309 assert_return(server_id, -EINVAL);
1310 assert_return(!bus_pid_changed(bus), -ECHILD);
1312 r = bus_ensure_running(bus);
1316 *server_id = bus->server_id;
1320 static int bus_seal_message(sd_bus *b, sd_bus_message *m, usec_t timeout) {
1325 /* If we copy the same message to multiple
1326 * destinations, avoid using the same serial
1328 b->serial = MAX(b->serial, BUS_MESSAGE_SERIAL(m));
1333 timeout = BUS_DEFAULT_TIMEOUT;
1335 return bus_message_seal(m, ++b->serial, timeout);
1338 static int bus_remarshal_message(sd_bus *b, sd_bus_message **m) {
1341 /* Do packet version and endianess already match? */
1342 if ((b->message_version == 0 || b->message_version == (*m)->header->version) &&
1343 (b->message_endian == 0 || b->message_endian == (*m)->header->endian))
1346 /* No? Then remarshal! */
1347 return bus_message_remarshal(b, m);
1350 int bus_seal_synthetic_message(sd_bus *b, sd_bus_message *m) {
1354 /* The bus specification says the serial number cannot be 0,
1355 * hence let's fill something in for synthetic messages. Since
1356 * synthetic messages might have a fake sender and we don't
1357 * want to interfere with the real sender's serial numbers we
1358 * pick a fixed, artifical one. We use (uint32_t) -1 rather
1359 * than (uint64_t) -1 since dbus1 only had 32bit identifiers,
1360 * even though kdbus can do 64bit. */
1362 return bus_message_seal(m, 0xFFFFFFFFULL, 0);
1365 static int bus_write_message(sd_bus *bus, sd_bus_message *message, size_t *idx) {
1370 return bus_kernel_write_message(bus, message);
1372 return bus_socket_write_message(bus, message, idx);
1375 static int dispatch_wqueue(sd_bus *bus) {
1379 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1381 while (bus->wqueue_size > 0) {
1383 r = bus_write_message(bus, bus->wqueue[0], &bus->windex);
1387 /* Didn't do anything this time */
1389 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1390 /* Fully written. Let's drop the entry from
1393 * This isn't particularly optimized, but
1394 * well, this is supposed to be our worst-case
1395 * buffer only, and the socket buffer is
1396 * supposed to be our primary buffer, and if
1397 * it got full, then all bets are off
1400 sd_bus_message_unref(bus->wqueue[0]);
1401 bus->wqueue_size --;
1402 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1412 static int bus_read_message(sd_bus *bus) {
1416 return bus_kernel_read_message(bus);
1418 return bus_socket_read_message(bus);
1421 int bus_rqueue_make_room(sd_bus *bus) {
1424 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1427 if (!GREEDY_REALLOC(bus->rqueue, bus->rqueue_allocated, bus->rqueue_size + 1))
1433 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1438 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1441 if (bus->rqueue_size > 0) {
1442 /* Dispatch a queued message */
1444 *m = bus->rqueue[0];
1445 bus->rqueue_size --;
1446 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1450 /* Try to read a new message */
1451 r = bus_read_message(bus);
1461 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *_m, uint64_t *serial) {
1462 _cleanup_bus_message_unref_ sd_bus_message *m = sd_bus_message_ref(_m);
1465 assert_return(bus, -EINVAL);
1466 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1467 assert_return(m, -EINVAL);
1468 assert_return(!bus_pid_changed(bus), -ECHILD);
1471 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1478 /* If the serial number isn't kept, then we know that no reply
1480 if (!serial && !m->sealed)
1481 m->header->flags |= BUS_MESSAGE_NO_REPLY_EXPECTED;
1483 r = bus_seal_message(bus, m, 0);
1487 /* Remarshall if we have to. This will possible unref the
1488 * message and place a replacement in m */
1489 r = bus_remarshal_message(bus, &m);
1493 /* If this is a reply and no reply was requested, then let's
1494 * suppress this, if we can */
1495 if (m->dont_send && !serial)
1498 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1501 r = bus_write_message(bus, m, &idx);
1503 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1504 bus_enter_closing(bus);
1507 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1508 /* Wasn't fully written. So let's remember how
1509 * much was written. Note that the first entry
1510 * of the wqueue array is always allocated so
1511 * that we always can remember how much was
1513 bus->wqueue[0] = sd_bus_message_ref(m);
1514 bus->wqueue_size = 1;
1518 /* Just append it to the queue. */
1520 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1523 if (!GREEDY_REALLOC(bus->wqueue, bus->wqueue_allocated, bus->wqueue_size + 1))
1526 bus->wqueue[bus->wqueue_size ++] = sd_bus_message_ref(m);
1530 *serial = BUS_MESSAGE_SERIAL(m);
1535 _public_ int sd_bus_send_to(sd_bus *bus, sd_bus_message *m, const char *destination, uint64_t *serial) {
1538 assert_return(bus, -EINVAL);
1539 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1540 assert_return(m, -EINVAL);
1541 assert_return(!bus_pid_changed(bus), -ECHILD);
1543 if (!streq_ptr(m->destination, destination)) {
1548 r = sd_bus_message_set_destination(m, destination);
1553 return sd_bus_send(bus, m, serial);
1556 static usec_t calc_elapse(uint64_t usec) {
1557 if (usec == (uint64_t) -1)
1560 return now(CLOCK_MONOTONIC) + usec;
1563 static int timeout_compare(const void *a, const void *b) {
1564 const struct reply_callback *x = a, *y = b;
1566 if (x->timeout != 0 && y->timeout == 0)
1569 if (x->timeout == 0 && y->timeout != 0)
1572 if (x->timeout < y->timeout)
1575 if (x->timeout > y->timeout)
1581 _public_ int sd_bus_call_async(
1584 sd_bus_message_handler_t callback,
1589 _cleanup_bus_message_unref_ sd_bus_message *m = sd_bus_message_ref(_m);
1590 struct reply_callback *c;
1593 assert_return(bus, -EINVAL);
1594 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1595 assert_return(m, -EINVAL);
1596 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1597 assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1598 assert_return(callback, -EINVAL);
1599 assert_return(!bus_pid_changed(bus), -ECHILD);
1601 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1605 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1609 r = bus_seal_message(bus, m, usec);
1613 r = bus_remarshal_message(bus, &m);
1617 c = new0(struct reply_callback, 1);
1621 c->callback = callback;
1622 c->userdata = userdata;
1623 c->serial = BUS_MESSAGE_SERIAL(m);
1624 c->timeout = calc_elapse(m->timeout);
1626 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1632 if (c->timeout != 0) {
1633 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1636 sd_bus_call_async_cancel(bus, c->serial);
1641 r = sd_bus_send(bus, m, serial);
1643 sd_bus_call_async_cancel(bus, c->serial);
1650 _public_ int sd_bus_call_async_cancel(sd_bus *bus, uint64_t serial) {
1651 struct reply_callback *c;
1653 assert_return(bus, -EINVAL);
1654 assert_return(serial != 0, -EINVAL);
1655 assert_return(!bus_pid_changed(bus), -ECHILD);
1657 c = hashmap_remove(bus->reply_callbacks, &serial);
1661 if (c->timeout != 0)
1662 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1668 int bus_ensure_running(sd_bus *bus) {
1673 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED || bus->state == BUS_CLOSING)
1675 if (bus->state == BUS_RUNNING)
1679 r = sd_bus_process(bus, NULL);
1682 if (bus->state == BUS_RUNNING)
1687 r = sd_bus_wait(bus, (uint64_t) -1);
1693 _public_ int sd_bus_call(
1697 sd_bus_error *error,
1698 sd_bus_message **reply) {
1700 _cleanup_bus_message_unref_ sd_bus_message *m = sd_bus_message_ref(_m);
1706 assert_return(bus, -EINVAL);
1707 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1708 assert_return(m, -EINVAL);
1709 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1710 assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1711 assert_return(!bus_error_is_dirty(error), -EINVAL);
1712 assert_return(!bus_pid_changed(bus), -ECHILD);
1714 r = bus_ensure_running(bus);
1718 i = bus->rqueue_size;
1720 r = bus_seal_message(bus, m, usec);
1724 r = bus_remarshal_message(bus, &m);
1728 r = sd_bus_send(bus, m, &serial);
1732 timeout = calc_elapse(m->timeout);
1737 while (i < bus->rqueue_size) {
1738 sd_bus_message *incoming = NULL;
1740 incoming = bus->rqueue[i];
1742 if (incoming->reply_serial == serial) {
1743 /* Found a match! */
1745 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1748 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1753 sd_bus_message_unref(incoming);
1756 } else if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR)
1757 r = sd_bus_error_copy(error, &incoming->error);
1761 sd_bus_message_unref(incoming);
1764 } else if (incoming->header->serial == serial &&
1767 streq(bus->unique_name, incoming->sender)) {
1769 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1772 /* Our own message? Somebody is trying
1773 * to send its own client a message,
1774 * let's not dead-lock, let's fail
1777 sd_bus_message_unref(incoming);
1781 /* Try to read more, right-away */
1785 r = bus_read_message(bus);
1787 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1788 bus_enter_closing(bus);
1798 n = now(CLOCK_MONOTONIC);
1804 left = (uint64_t) -1;
1806 r = bus_poll(bus, true, left);
1812 r = dispatch_wqueue(bus);
1814 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1815 bus_enter_closing(bus);
1822 _public_ int sd_bus_get_fd(sd_bus *bus) {
1824 assert_return(bus, -EINVAL);
1825 assert_return(bus->input_fd == bus->output_fd, -EPERM);
1826 assert_return(!bus_pid_changed(bus), -ECHILD);
1828 return bus->input_fd;
1831 _public_ int sd_bus_get_events(sd_bus *bus) {
1834 assert_return(bus, -EINVAL);
1835 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1836 assert_return(!bus_pid_changed(bus), -ECHILD);
1838 if (bus->state == BUS_OPENING)
1840 else if (bus->state == BUS_AUTHENTICATING) {
1842 if (bus_socket_auth_needs_write(bus))
1847 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1848 if (bus->rqueue_size <= 0)
1850 if (bus->wqueue_size > 0)
1857 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1858 struct reply_callback *c;
1860 assert_return(bus, -EINVAL);
1861 assert_return(timeout_usec, -EINVAL);
1862 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1863 assert_return(!bus_pid_changed(bus), -ECHILD);
1865 if (bus->state == BUS_CLOSING) {
1870 if (bus->state == BUS_AUTHENTICATING) {
1871 *timeout_usec = bus->auth_timeout;
1875 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1876 *timeout_usec = (uint64_t) -1;
1880 if (bus->rqueue_size > 0) {
1885 c = prioq_peek(bus->reply_callbacks_prioq);
1887 *timeout_usec = (uint64_t) -1;
1891 *timeout_usec = c->timeout;
1895 static int process_timeout(sd_bus *bus) {
1896 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1897 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1898 struct reply_callback *c;
1904 c = prioq_peek(bus->reply_callbacks_prioq);
1908 n = now(CLOCK_MONOTONIC);
1912 r = bus_message_new_synthetic_error(
1915 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1920 m->sender = "org.freedesktop.DBus";
1922 r = bus_seal_synthetic_message(bus, m);
1926 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1927 hashmap_remove(bus->reply_callbacks, &c->serial);
1930 bus->iteration_counter ++;
1932 r = c->callback(bus, m, c->userdata, &error_buffer);
1933 r = bus_maybe_reply_error(m, r, &error_buffer);
1936 bus->current = NULL;
1941 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1945 if (bus->state != BUS_HELLO)
1948 /* Let's make sure the first message on the bus is the HELLO
1949 * reply. But note that we don't actually parse the message
1950 * here (we leave that to the usual handling), we just verify
1951 * we don't let any earlier msg through. */
1953 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1954 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1957 if (m->reply_serial != bus->hello_serial)
1963 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1964 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1965 struct reply_callback *c;
1971 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1972 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1975 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1979 if (c->timeout != 0)
1980 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1982 r = sd_bus_message_rewind(m, true);
1986 r = c->callback(bus, m, c->userdata, &error_buffer);
1987 r = bus_maybe_reply_error(m, r, &error_buffer);
1993 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1994 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1995 struct filter_callback *l;
2002 bus->filter_callbacks_modified = false;
2004 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
2006 if (bus->filter_callbacks_modified)
2009 /* Don't run this more than once per iteration */
2010 if (l->last_iteration == bus->iteration_counter)
2013 l->last_iteration = bus->iteration_counter;
2015 r = sd_bus_message_rewind(m, true);
2019 r = l->callback(bus, m, l->userdata, &error_buffer);
2020 r = bus_maybe_reply_error(m, r, &error_buffer);
2026 } while (bus->filter_callbacks_modified);
2031 static int process_match(sd_bus *bus, sd_bus_message *m) {
2038 bus->match_callbacks_modified = false;
2040 r = bus_match_run(bus, &bus->match_callbacks, m);
2044 } while (bus->match_callbacks_modified);
2049 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
2050 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
2056 if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
2059 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
2062 if (m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED)
2065 if (streq_ptr(m->member, "Ping"))
2066 r = sd_bus_message_new_method_return(m, &reply);
2067 else if (streq_ptr(m->member, "GetMachineId")) {
2071 r = sd_id128_get_machine(&id);
2075 r = sd_bus_message_new_method_return(m, &reply);
2079 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
2081 r = sd_bus_message_new_method_errorf(
2083 SD_BUS_ERROR_UNKNOWN_METHOD,
2084 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
2090 r = sd_bus_send(bus, reply, NULL);
2097 static int process_message(sd_bus *bus, sd_bus_message *m) {
2104 bus->iteration_counter++;
2106 log_debug("Got message type=%s sender=%s destination=%s object=%s interface=%s member=%s serial=%lu reply_serial=%lu error=%s",
2107 bus_message_type_to_string(m->header->type),
2108 strna(sd_bus_message_get_sender(m)),
2109 strna(sd_bus_message_get_destination(m)),
2110 strna(sd_bus_message_get_path(m)),
2111 strna(sd_bus_message_get_interface(m)),
2112 strna(sd_bus_message_get_member(m)),
2113 (unsigned long) m->header->serial,
2114 (unsigned long) m->reply_serial,
2115 strna(m->error.message));
2117 r = process_hello(bus, m);
2121 r = process_reply(bus, m);
2125 r = process_filter(bus, m);
2129 r = process_match(bus, m);
2133 r = process_builtin(bus, m);
2137 r = bus_process_object(bus, m);
2140 bus->current = NULL;
2144 static int process_running(sd_bus *bus, sd_bus_message **ret) {
2145 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2149 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2151 r = process_timeout(bus);
2155 r = dispatch_wqueue(bus);
2159 r = dispatch_rqueue(bus, &m);
2165 r = process_message(bus, m);
2170 r = sd_bus_message_rewind(m, true);
2179 if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2181 log_debug("Unprocessed message call sender=%s object=%s interface=%s member=%s",
2182 strna(sd_bus_message_get_sender(m)),
2183 strna(sd_bus_message_get_path(m)),
2184 strna(sd_bus_message_get_interface(m)),
2185 strna(sd_bus_message_get_member(m)));
2187 r = sd_bus_reply_method_errorf(
2189 SD_BUS_ERROR_UNKNOWN_OBJECT,
2190 "Unknown object '%s'.", m->path);
2204 static int process_closing(sd_bus *bus, sd_bus_message **ret) {
2205 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2206 struct reply_callback *c;
2210 assert(bus->state == BUS_CLOSING);
2212 c = hashmap_first(bus->reply_callbacks);
2214 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2216 /* First, fail all outstanding method calls */
2217 r = bus_message_new_synthetic_error(
2220 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Connection terminated"),
2225 r = bus_seal_synthetic_message(bus, m);
2229 if (c->timeout != 0)
2230 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
2232 hashmap_remove(bus->reply_callbacks, &c->serial);
2235 bus->iteration_counter++;
2237 r = c->callback(bus, m, c->userdata, &error_buffer);
2238 r = bus_maybe_reply_error(m, r, &error_buffer);
2244 /* Then, synthesize a Disconnected message */
2245 r = sd_bus_message_new_signal(
2247 "/org/freedesktop/DBus/Local",
2248 "org.freedesktop.DBus.Local",
2254 m->sender = "org.freedesktop.DBus.Local";
2256 r = bus_seal_synthetic_message(bus, m);
2263 bus->iteration_counter++;
2265 r = process_filter(bus, m);
2269 r = process_match(bus, m);
2281 bus->current = NULL;
2285 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2286 BUS_DONT_DESTROY(bus);
2289 /* Returns 0 when we didn't do anything. This should cause the
2290 * caller to invoke sd_bus_wait() before returning the next
2291 * time. Returns > 0 when we did something, which possibly
2292 * means *ret is filled in with an unprocessed message. */
2294 assert_return(bus, -EINVAL);
2295 assert_return(!bus_pid_changed(bus), -ECHILD);
2297 /* We don't allow recursively invoking sd_bus_process(). */
2298 assert_return(!bus->current, -EBUSY);
2300 switch (bus->state) {
2309 r = bus_socket_process_opening(bus);
2310 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2311 bus_enter_closing(bus);
2319 case BUS_AUTHENTICATING:
2320 r = bus_socket_process_authenticating(bus);
2321 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2322 bus_enter_closing(bus);
2334 r = process_running(bus, ret);
2335 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2336 bus_enter_closing(bus);
2346 return process_closing(bus, ret);
2349 assert_not_reached("Unknown state");
2352 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2353 struct pollfd p[2] = {};
2356 usec_t m = (usec_t) -1;
2360 if (bus->state == BUS_CLOSING)
2363 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2365 e = sd_bus_get_events(bus);
2370 /* The caller really needs some more data, he doesn't
2371 * care about what's already read, or any timeouts
2376 /* The caller wants to process if there's something to
2377 * process, but doesn't care otherwise */
2379 r = sd_bus_get_timeout(bus, &until);
2384 nw = now(CLOCK_MONOTONIC);
2385 m = until > nw ? until - nw : 0;
2389 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2392 p[0].fd = bus->input_fd;
2393 if (bus->output_fd == bus->input_fd) {
2397 p[0].events = e & POLLIN;
2398 p[1].fd = bus->output_fd;
2399 p[1].events = e & POLLOUT;
2403 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2407 return r > 0 ? 1 : 0;
2410 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2412 assert_return(bus, -EINVAL);
2413 assert_return(!bus_pid_changed(bus), -ECHILD);
2415 if (bus->state == BUS_CLOSING)
2418 assert_return(BUS_IS_OPEN(bus->state) , -ENOTCONN);
2420 if (bus->rqueue_size > 0)
2423 return bus_poll(bus, false, timeout_usec);
2426 _public_ int sd_bus_flush(sd_bus *bus) {
2429 assert_return(bus, -EINVAL);
2430 assert_return(!bus_pid_changed(bus), -ECHILD);
2432 if (bus->state == BUS_CLOSING)
2435 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2437 r = bus_ensure_running(bus);
2441 if (bus->wqueue_size <= 0)
2445 r = dispatch_wqueue(bus);
2447 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
2448 bus_enter_closing(bus);
2453 if (bus->wqueue_size <= 0)
2456 r = bus_poll(bus, false, (uint64_t) -1);
2462 _public_ int sd_bus_add_filter(sd_bus *bus,
2463 sd_bus_message_handler_t callback,
2466 struct filter_callback *f;
2468 assert_return(bus, -EINVAL);
2469 assert_return(callback, -EINVAL);
2470 assert_return(!bus_pid_changed(bus), -ECHILD);
2472 f = new0(struct filter_callback, 1);
2475 f->callback = callback;
2476 f->userdata = userdata;
2478 bus->filter_callbacks_modified = true;
2479 LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2483 _public_ int sd_bus_remove_filter(sd_bus *bus,
2484 sd_bus_message_handler_t callback,
2487 struct filter_callback *f;
2489 assert_return(bus, -EINVAL);
2490 assert_return(callback, -EINVAL);
2491 assert_return(!bus_pid_changed(bus), -ECHILD);
2493 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2494 if (f->callback == callback && f->userdata == userdata) {
2495 bus->filter_callbacks_modified = true;
2496 LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2505 _public_ int sd_bus_add_match(sd_bus *bus,
2507 sd_bus_message_handler_t callback,
2510 struct bus_match_component *components = NULL;
2511 unsigned n_components = 0;
2512 uint64_t cookie = 0;
2515 assert_return(bus, -EINVAL);
2516 assert_return(match, -EINVAL);
2517 assert_return(!bus_pid_changed(bus), -ECHILD);
2519 r = bus_match_parse(match, &components, &n_components);
2523 if (bus->bus_client) {
2524 cookie = ++bus->match_cookie;
2526 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2531 bus->match_callbacks_modified = true;
2532 r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2534 if (bus->bus_client)
2535 bus_remove_match_internal(bus, match, cookie);
2539 bus_match_parse_free(components, n_components);
2543 _public_ int sd_bus_remove_match(sd_bus *bus,
2545 sd_bus_message_handler_t callback,
2548 struct bus_match_component *components = NULL;
2549 unsigned n_components = 0;
2551 uint64_t cookie = 0;
2553 assert_return(bus, -EINVAL);
2554 assert_return(match, -EINVAL);
2555 assert_return(!bus_pid_changed(bus), -ECHILD);
2557 r = bus_match_parse(match, &components, &n_components);
2561 bus->match_callbacks_modified = true;
2562 r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2564 if (bus->bus_client)
2565 q = bus_remove_match_internal(bus, match, cookie);
2567 bus_match_parse_free(components, n_components);
2569 return r < 0 ? r : q;
2572 bool bus_pid_changed(sd_bus *bus) {
2575 /* We don't support people creating a bus connection and
2576 * keeping it around over a fork(). Let's complain. */
2578 return bus->original_pid != getpid();
2581 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2582 sd_bus *bus = userdata;
2587 r = sd_bus_process(bus, NULL);
2594 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2595 sd_bus *bus = userdata;
2600 r = sd_bus_process(bus, NULL);
2607 static int prepare_callback(sd_event_source *s, void *userdata) {
2608 sd_bus *bus = userdata;
2615 e = sd_bus_get_events(bus);
2619 if (bus->output_fd != bus->input_fd) {
2621 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2625 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2629 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2634 r = sd_bus_get_timeout(bus, &until);
2640 j = sd_event_source_set_time(bus->time_event_source, until);
2645 r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2652 static int quit_callback(sd_event_source *event, void *userdata) {
2653 sd_bus *bus = userdata;
2662 static int attach_io_events(sd_bus *bus) {
2667 if (bus->input_fd < 0)
2673 if (!bus->input_io_event_source) {
2674 r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2678 r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2682 r = sd_event_source_set_priority(bus->input_io_event_source, bus->event_priority);
2684 r = sd_event_source_set_io_fd(bus->input_io_event_source, bus->input_fd);
2689 if (bus->output_fd != bus->input_fd) {
2690 assert(bus->output_fd >= 0);
2692 if (!bus->output_io_event_source) {
2693 r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2697 r = sd_event_source_set_priority(bus->output_io_event_source, bus->event_priority);
2699 r = sd_event_source_set_io_fd(bus->output_io_event_source, bus->output_fd);
2708 static void detach_io_events(sd_bus *bus) {
2711 if (bus->input_io_event_source) {
2712 sd_event_source_set_enabled(bus->input_io_event_source, SD_EVENT_OFF);
2713 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2716 if (bus->output_io_event_source) {
2717 sd_event_source_set_enabled(bus->output_io_event_source, SD_EVENT_OFF);
2718 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2722 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2725 assert_return(bus, -EINVAL);
2726 assert_return(!bus->event, -EBUSY);
2728 assert(!bus->input_io_event_source);
2729 assert(!bus->output_io_event_source);
2730 assert(!bus->time_event_source);
2733 bus->event = sd_event_ref(event);
2735 r = sd_event_default(&bus->event);
2740 bus->event_priority = priority;
2742 r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
2746 r = sd_event_source_set_priority(bus->time_event_source, priority);
2750 r = sd_event_add_exit(bus->event, quit_callback, bus, &bus->quit_event_source);
2754 r = attach_io_events(bus);
2761 sd_bus_detach_event(bus);
2765 _public_ int sd_bus_detach_event(sd_bus *bus) {
2766 assert_return(bus, -EINVAL);
2771 detach_io_events(bus);
2773 if (bus->time_event_source) {
2774 sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);
2775 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2778 if (bus->quit_event_source) {
2779 sd_event_source_set_enabled(bus->quit_event_source, SD_EVENT_OFF);
2780 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2784 bus->event = sd_event_unref(bus->event);
2789 _public_ sd_event* sd_bus_get_event(sd_bus *bus) {
2790 assert_return(bus, NULL);
2795 _public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2796 assert_return(bus, NULL);
2798 return bus->current;
2801 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
2806 assert(default_bus);
2809 return !!*default_bus;
2812 *ret = sd_bus_ref(*default_bus);
2820 b->default_bus_ptr = default_bus;
2828 _public_ int sd_bus_default_system(sd_bus **ret) {
2829 static thread_local sd_bus *default_system_bus = NULL;
2831 return bus_default(sd_bus_open_system, &default_system_bus, ret);
2834 _public_ int sd_bus_default_user(sd_bus **ret) {
2835 static thread_local sd_bus *default_user_bus = NULL;
2837 return bus_default(sd_bus_open_user, &default_user_bus, ret);
2840 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
2841 assert_return(b, -EINVAL);
2842 assert_return(tid, -EINVAL);
2843 assert_return(!bus_pid_changed(b), -ECHILD);
2851 return sd_event_get_tid(b->event, tid);
2856 _public_ char *sd_bus_label_escape(const char *s) {
2860 assert_return(s, NULL);
2862 /* Escapes all chars that D-Bus' object path cannot deal
2863 * with. Can be reversed with bus_path_unescape(). We special
2864 * case the empty string. */
2869 r = new(char, strlen(s)*3 + 1);
2873 for (f = s, t = r; *f; f++) {
2875 /* Escape everything that is not a-zA-Z0-9. We also
2876 * escape 0-9 if it's the first character */
2878 if (!(*f >= 'A' && *f <= 'Z') &&
2879 !(*f >= 'a' && *f <= 'z') &&
2880 !(f > s && *f >= '0' && *f <= '9')) {
2882 *(t++) = hexchar(*f >> 4);
2883 *(t++) = hexchar(*f);
2893 _public_ char *sd_bus_label_unescape(const char *f) {
2896 assert_return(f, NULL);
2898 /* Special case for the empty string */
2902 r = new(char, strlen(f) + 1);
2906 for (t = r; *f; f++) {
2911 if ((a = unhexchar(f[1])) < 0 ||
2912 (b = unhexchar(f[2])) < 0) {
2913 /* Invalid escape code, let's take it literal then */
2916 *(t++) = (char) ((a << 4) | b);
2928 _public_ int sd_bus_get_peer_creds(sd_bus *bus, uint64_t mask, sd_bus_creds **ret) {
2933 assert_return(bus, -EINVAL);
2934 assert_return(mask <= _SD_BUS_CREDS_ALL, -ENOTSUP);
2935 assert_return(ret, -EINVAL);
2936 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2937 assert_return(!bus_pid_changed(bus), -ECHILD);
2938 assert_return(!bus->is_kernel, -ENOTSUP);
2940 if (!bus->ucred_valid && !isempty(bus->label))
2943 c = bus_creds_new();
2947 if (bus->ucred_valid) {
2948 pid = c->pid = bus->ucred.pid;
2949 c->uid = bus->ucred.uid;
2950 c->gid = bus->ucred.gid;
2952 c->mask |= (SD_BUS_CREDS_UID | SD_BUS_CREDS_PID | SD_BUS_CREDS_GID) & mask;
2955 if (!isempty(bus->label) && (mask & SD_BUS_CREDS_SELINUX_CONTEXT)) {
2956 c->label = strdup(bus->label);
2958 sd_bus_creds_unref(c);
2962 c->mask |= SD_BUS_CREDS_SELINUX_CONTEXT;
2965 r = bus_creds_add_more(c, mask, pid, 0);
2973 _public_ int sd_bus_try_close(sd_bus *bus) {
2976 assert_return(bus, -EINVAL);
2977 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2978 assert_return(!bus_pid_changed(bus), -ECHILD);
2979 assert_return(bus->is_kernel, -ENOTSUP);
2981 if (bus->rqueue_size > 0)
2984 if (bus->wqueue_size > 0)
2987 r = bus_kernel_try_close(bus);