1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
39 #include "bus-internal.h"
40 #include "bus-message.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
49 #include "bus-container.h"
51 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
53 static void bus_close_fds(sd_bus *b) {
57 close_nointr_nofail(b->input_fd);
59 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
60 close_nointr_nofail(b->output_fd);
62 b->input_fd = b->output_fd = -1;
65 static void bus_node_destroy(sd_bus *b, struct node *n) {
66 struct node_callback *c;
67 struct node_vtable *v;
68 struct node_enumerator *e;
76 bus_node_destroy(b, n->child);
78 while ((c = n->callbacks)) {
79 LIST_REMOVE(callbacks, n->callbacks, c);
83 while ((v = n->vtables)) {
84 LIST_REMOVE(vtables, n->vtables, v);
89 while ((e = n->enumerators)) {
90 LIST_REMOVE(enumerators, n->enumerators, e);
95 LIST_REMOVE(siblings, n->parent->child, n);
97 assert_se(hashmap_remove(b->nodes, n->path) == n);
102 static void bus_reset_queues(sd_bus *b) {
107 for (i = 0; i < b->rqueue_size; i++)
108 sd_bus_message_unref(b->rqueue[i]);
111 for (i = 0; i < b->wqueue_size; i++)
112 sd_bus_message_unref(b->wqueue[i]);
115 b->rqueue = b->wqueue = NULL;
116 b->rqueue_size = b->wqueue_size = 0;
119 static void bus_free(sd_bus *b) {
120 struct filter_callback *f;
125 sd_bus_detach_event(b);
130 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
133 free(b->unique_name);
134 free(b->auth_buffer);
140 strv_free(b->exec_argv);
142 close_many(b->fds, b->n_fds);
147 hashmap_free_free(b->reply_callbacks);
148 prioq_free(b->reply_callbacks_prioq);
150 while ((f = b->filter_callbacks)) {
151 LIST_REMOVE(callbacks, b->filter_callbacks, f);
155 bus_match_free(&b->match_callbacks);
157 hashmap_free_free(b->vtable_methods);
158 hashmap_free_free(b->vtable_properties);
160 while ((n = hashmap_first(b->nodes)))
161 bus_node_destroy(b, n);
163 hashmap_free(b->nodes);
165 bus_kernel_flush_memfd(b);
167 assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
172 _public_ int sd_bus_new(sd_bus **ret) {
175 assert_return(ret, -EINVAL);
181 r->n_ref = REFCNT_INIT;
182 r->input_fd = r->output_fd = -1;
183 r->message_version = 1;
184 r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
185 r->attach_flags |= KDBUS_ATTACH_NAMES;
186 r->original_pid = getpid();
188 assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
190 /* We guarantee that wqueue always has space for at least one
192 r->wqueue = new(sd_bus_message*, 1);
202 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
205 assert_return(bus, -EINVAL);
206 assert_return(bus->state == BUS_UNSET, -EPERM);
207 assert_return(address, -EINVAL);
208 assert_return(!bus_pid_changed(bus), -ECHILD);
220 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
221 assert_return(bus, -EINVAL);
222 assert_return(bus->state == BUS_UNSET, -EPERM);
223 assert_return(input_fd >= 0, -EINVAL);
224 assert_return(output_fd >= 0, -EINVAL);
225 assert_return(!bus_pid_changed(bus), -ECHILD);
227 bus->input_fd = input_fd;
228 bus->output_fd = output_fd;
232 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
235 assert_return(bus, -EINVAL);
236 assert_return(bus->state == BUS_UNSET, -EPERM);
237 assert_return(path, -EINVAL);
238 assert_return(!strv_isempty(argv), -EINVAL);
239 assert_return(!bus_pid_changed(bus), -ECHILD);
251 free(bus->exec_path);
252 strv_free(bus->exec_argv);
260 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
261 assert_return(bus, -EINVAL);
262 assert_return(bus->state == BUS_UNSET, -EPERM);
263 assert_return(!bus_pid_changed(bus), -ECHILD);
265 bus->bus_client = !!b;
269 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
270 assert_return(bus, -EINVAL);
271 assert_return(bus->state == BUS_UNSET, -EPERM);
272 assert_return(!bus_pid_changed(bus), -ECHILD);
274 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
278 _public_ int sd_bus_negotiate_attach_timestamp(sd_bus *bus, int b) {
279 assert_return(bus, -EINVAL);
280 assert_return(bus->state == BUS_UNSET, -EPERM);
281 assert_return(!bus_pid_changed(bus), -ECHILD);
283 SET_FLAG(bus->attach_flags, KDBUS_ATTACH_TIMESTAMP, b);
287 _public_ int sd_bus_negotiate_attach_creds(sd_bus *bus, uint64_t mask) {
288 assert_return(bus, -EINVAL);
289 assert_return(mask <= _SD_BUS_CREDS_MAX, -EINVAL);
290 assert_return(bus->state == BUS_UNSET, -EPERM);
291 assert_return(!bus_pid_changed(bus), -ECHILD);
293 return kdbus_translate_attach_flags(mask, &bus->creds_mask);
296 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
297 assert_return(bus, -EINVAL);
298 assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
299 assert_return(bus->state == BUS_UNSET, -EPERM);
300 assert_return(!bus_pid_changed(bus), -ECHILD);
302 bus->is_server = !!b;
303 bus->server_id = server_id;
307 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
308 assert_return(bus, -EINVAL);
309 assert_return(bus->state == BUS_UNSET, -EPERM);
310 assert_return(!bus_pid_changed(bus), -ECHILD);
312 bus->anonymous_auth = !!b;
316 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata, sd_bus_error *error) {
321 assert(bus->state == BUS_HELLO || bus->state == BUS_CLOSING);
324 r = sd_bus_message_get_errno(reply);
330 r = sd_bus_message_read(reply, "s", &s);
334 if (!service_name_is_valid(s) || s[0] != ':')
337 bus->unique_name = strdup(s);
338 if (!bus->unique_name)
341 if (bus->state == BUS_HELLO)
342 bus->state = BUS_RUNNING;
347 static int bus_send_hello(sd_bus *bus) {
348 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
353 if (!bus->bus_client || bus->is_kernel)
356 r = sd_bus_message_new_method_call(
358 "org.freedesktop.DBus",
360 "org.freedesktop.DBus",
366 return sd_bus_call_async(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
369 int bus_start_running(sd_bus *bus) {
372 if (bus->bus_client && !bus->is_kernel) {
373 bus->state = BUS_HELLO;
377 bus->state = BUS_RUNNING;
381 static int parse_address_key(const char **p, const char *key, char **value) {
392 if (strncmp(*p, key, l) != 0)
405 while (*a != ';' && *a != ',' && *a != 0) {
423 c = (char) ((x << 4) | y);
430 t = realloc(r, n + 2);
458 static void skip_address_key(const char **p) {
462 *p += strcspn(*p, ",");
468 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
469 _cleanup_free_ char *path = NULL, *abstract = NULL;
478 while (**p != 0 && **p != ';') {
479 r = parse_address_key(p, "guid", guid);
485 r = parse_address_key(p, "path", &path);
491 r = parse_address_key(p, "abstract", &abstract);
500 if (!path && !abstract)
503 if (path && abstract)
508 if (l > sizeof(b->sockaddr.un.sun_path))
511 b->sockaddr.un.sun_family = AF_UNIX;
512 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
513 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
514 } else if (abstract) {
515 l = strlen(abstract);
516 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
519 b->sockaddr.un.sun_family = AF_UNIX;
520 b->sockaddr.un.sun_path[0] = 0;
521 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
522 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
528 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
529 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
531 struct addrinfo *result, hints = {
532 .ai_socktype = SOCK_STREAM,
533 .ai_flags = AI_ADDRCONFIG,
541 while (**p != 0 && **p != ';') {
542 r = parse_address_key(p, "guid", guid);
548 r = parse_address_key(p, "host", &host);
554 r = parse_address_key(p, "port", &port);
560 r = parse_address_key(p, "family", &family);
573 if (streq(family, "ipv4"))
574 hints.ai_family = AF_INET;
575 else if (streq(family, "ipv6"))
576 hints.ai_family = AF_INET6;
581 r = getaddrinfo(host, port, &hints, &result);
585 return -EADDRNOTAVAIL;
587 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
588 b->sockaddr_size = result->ai_addrlen;
590 freeaddrinfo(result);
595 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
597 unsigned n_argv = 0, j;
606 while (**p != 0 && **p != ';') {
607 r = parse_address_key(p, "guid", guid);
613 r = parse_address_key(p, "path", &path);
619 if (startswith(*p, "argv")) {
623 ul = strtoul(*p + 4, (char**) p, 10);
624 if (errno > 0 || **p != '=' || ul > 256) {
634 x = realloc(argv, sizeof(char*) * (ul + 2));
640 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
646 r = parse_address_key(p, NULL, argv + ul);
661 /* Make sure there are no holes in the array, with the
662 * exception of argv[0] */
663 for (j = 1; j < n_argv; j++)
669 if (argv && argv[0] == NULL) {
670 argv[0] = strdup(path);
682 for (j = 0; j < n_argv; j++)
690 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
691 _cleanup_free_ char *path = NULL;
699 while (**p != 0 && **p != ';') {
700 r = parse_address_key(p, "guid", guid);
706 r = parse_address_key(p, "path", &path);
725 static int parse_container_address(sd_bus *b, const char **p, char **guid) {
726 _cleanup_free_ char *machine = NULL;
734 while (**p != 0 && **p != ';') {
735 r = parse_address_key(p, "guid", guid);
741 r = parse_address_key(p, "machine", &machine);
754 b->machine = machine;
757 b->sockaddr.un.sun_family = AF_UNIX;
758 strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
759 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
764 static void bus_reset_parsed_address(sd_bus *b) {
768 b->sockaddr_size = 0;
769 strv_free(b->exec_argv);
773 b->server_id = SD_ID128_NULL;
780 static int bus_parse_next_address(sd_bus *b) {
781 _cleanup_free_ char *guid = NULL;
789 if (b->address[b->address_index] == 0)
792 bus_reset_parsed_address(b);
794 a = b->address + b->address_index;
803 if (startswith(a, "unix:")) {
806 r = parse_unix_address(b, &a, &guid);
811 } else if (startswith(a, "tcp:")) {
814 r = parse_tcp_address(b, &a, &guid);
820 } else if (startswith(a, "unixexec:")) {
823 r = parse_exec_address(b, &a, &guid);
829 } else if (startswith(a, "kernel:")) {
832 r = parse_kernel_address(b, &a, &guid);
837 } else if (startswith(a, "x-container:")) {
840 r = parse_container_address(b, &a, &guid);
853 r = sd_id128_from_string(guid, &b->server_id);
858 b->address_index = a - b->address;
862 static int bus_start_address(sd_bus *b) {
872 r = bus_socket_exec(b);
876 b->last_connect_error = -r;
877 } else if (b->kernel) {
879 r = bus_kernel_connect(b);
883 b->last_connect_error = -r;
885 } else if (b->machine) {
887 r = bus_container_connect(b);
891 b->last_connect_error = -r;
893 } else if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
895 r = bus_socket_connect(b);
899 b->last_connect_error = -r;
902 r = bus_parse_next_address(b);
906 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
910 int bus_next_address(sd_bus *b) {
913 bus_reset_parsed_address(b);
914 return bus_start_address(b);
917 static int bus_start_fd(sd_bus *b) {
922 assert(b->input_fd >= 0);
923 assert(b->output_fd >= 0);
925 r = fd_nonblock(b->input_fd, true);
929 r = fd_cloexec(b->input_fd, true);
933 if (b->input_fd != b->output_fd) {
934 r = fd_nonblock(b->output_fd, true);
938 r = fd_cloexec(b->output_fd, true);
943 if (fstat(b->input_fd, &st) < 0)
946 if (S_ISCHR(b->input_fd))
947 return bus_kernel_take_fd(b);
949 return bus_socket_take_fd(b);
952 _public_ int sd_bus_start(sd_bus *bus) {
955 assert_return(bus, -EINVAL);
956 assert_return(bus->state == BUS_UNSET, -EPERM);
957 assert_return(!bus_pid_changed(bus), -ECHILD);
959 bus->state = BUS_OPENING;
961 if (bus->is_server && bus->bus_client)
964 if (bus->input_fd >= 0)
965 r = bus_start_fd(bus);
966 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
967 r = bus_start_address(bus);
974 return bus_send_hello(bus);
977 _public_ int sd_bus_open_system(sd_bus **ret) {
982 assert_return(ret, -EINVAL);
988 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
990 r = sd_bus_set_address(b, e);
992 r = sd_bus_set_address(b, "kernel:path=/dev/kdbus/0-system/bus;unix:path=/run/dbus/system_bus_socket");
997 b->bus_client = true;
1011 _public_ int sd_bus_open_user(sd_bus **ret) {
1016 assert_return(ret, -EINVAL);
1022 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1024 r = sd_bus_set_address(b, e);
1028 e = secure_getenv("XDG_RUNTIME_DIR");
1030 _cleanup_free_ char *ee = NULL;
1032 ee = bus_address_escape(e);
1038 asprintf(&b->address, "kernel:path=/dev/kdbus/%lu-user/bus;unix:path=%s/bus", (unsigned long) getuid(), ee);
1040 asprintf(&b->address, "kernel:path=/dev/kdbus/%lu-user/bus", (unsigned long) getuid());
1048 b->bus_client = true;
1050 r = sd_bus_start(b);
1062 _public_ int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1063 _cleanup_free_ char *e = NULL;
1068 assert_return(host, -EINVAL);
1069 assert_return(ret, -EINVAL);
1071 e = bus_address_escape(host);
1075 p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1079 r = sd_bus_new(&bus);
1086 bus->bus_client = true;
1088 r = sd_bus_start(bus);
1098 _public_ int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1099 _cleanup_free_ char *e = NULL;
1104 assert_return(machine, -EINVAL);
1105 assert_return(ret, -EINVAL);
1107 e = bus_address_escape(machine);
1111 p = strjoin("x-container:machine=", e, NULL);
1115 r = sd_bus_new(&bus);
1122 bus->bus_client = true;
1124 r = sd_bus_start(bus);
1134 _public_ void sd_bus_close(sd_bus *bus) {
1138 if (bus->state == BUS_CLOSED)
1140 if (bus_pid_changed(bus))
1143 bus->state = BUS_CLOSED;
1145 sd_bus_detach_event(bus);
1147 /* Drop all queued messages so that they drop references to
1148 * the bus object and the bus may be freed */
1149 bus_reset_queues(bus);
1151 if (!bus->is_kernel)
1154 /* We'll leave the fd open in case this is a kernel bus, since
1155 * there might still be memblocks around that reference this
1156 * bus, and they might need to invoke the * KDBUS_CMD_FREE
1157 * ioctl on the fd when they are freed. */
1160 static void bus_enter_closing(sd_bus *bus) {
1163 if (bus->state != BUS_OPENING &&
1164 bus->state != BUS_AUTHENTICATING &&
1165 bus->state != BUS_HELLO &&
1166 bus->state != BUS_RUNNING)
1169 bus->state = BUS_CLOSING;
1172 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1173 assert_return(bus, NULL);
1175 assert_se(REFCNT_INC(bus->n_ref) >= 2);
1180 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1181 assert_return(bus, NULL);
1183 if (REFCNT_DEC(bus->n_ref) <= 0)
1189 _public_ int sd_bus_is_open(sd_bus *bus) {
1191 assert_return(bus, -EINVAL);
1192 assert_return(!bus_pid_changed(bus), -ECHILD);
1194 return BUS_IS_OPEN(bus->state);
1197 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1200 assert_return(bus, -EINVAL);
1201 assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1202 assert_return(!bus_pid_changed(bus), -ECHILD);
1204 if (type == SD_BUS_TYPE_UNIX_FD) {
1205 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1208 r = bus_ensure_running(bus);
1212 return bus->can_fds;
1215 return bus_type_is_valid(type);
1218 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1221 assert_return(bus, -EINVAL);
1222 assert_return(server_id, -EINVAL);
1223 assert_return(!bus_pid_changed(bus), -ECHILD);
1225 r = bus_ensure_running(bus);
1229 *server_id = bus->server_id;
1233 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1237 if (m->header->version > b->message_version)
1241 /* If we copy the same message to multiple
1242 * destinations, avoid using the same serial
1244 b->serial = MAX(b->serial, BUS_MESSAGE_SERIAL(m));
1248 return bus_message_seal(m, ++b->serial);
1251 int bus_seal_synthetic_message(sd_bus *b, sd_bus_message *m) {
1255 if (m->header->version > b->message_version)
1258 /* The bus specification says the serial number cannot be 0,
1259 * hence let's fill something in for synthetic messages. Since
1260 * synthetic messages might have a fake sender and we don't
1261 * want to interfere with the real sender's serial numbers we
1262 * pick a fixed, artifical one. We use (uint32_t) -1 rather
1263 * than (uint64_t) -1 since dbus1 only had 32bit identifiers,
1264 * even though kdbus can do 64bit. */
1266 return bus_message_seal(m, 0xFFFFFFFFULL);
1269 static int bus_write_message(sd_bus *bus, sd_bus_message *message, size_t *idx) {
1276 return bus_kernel_write_message(bus, message);
1278 return bus_socket_write_message(bus, message, idx);
1283 static int dispatch_wqueue(sd_bus *bus) {
1287 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1289 while (bus->wqueue_size > 0) {
1291 r = bus_write_message(bus, bus->wqueue[0], &bus->windex);
1295 /* Didn't do anything this time */
1297 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1298 /* Fully written. Let's drop the entry from
1301 * This isn't particularly optimized, but
1302 * well, this is supposed to be our worst-case
1303 * buffer only, and the socket buffer is
1304 * supposed to be our primary buffer, and if
1305 * it got full, then all bets are off
1308 sd_bus_message_unref(bus->wqueue[0]);
1309 bus->wqueue_size --;
1310 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1320 static int bus_read_message(sd_bus *bus) {
1324 return bus_kernel_read_message(bus);
1326 return bus_socket_read_message(bus);
1329 int bus_rqueue_make_room(sd_bus *bus) {
1333 x = bus->rqueue_size + 1;
1335 if (bus->rqueue_allocated >= x)
1338 if (x > BUS_RQUEUE_MAX)
1341 q = realloc(bus->rqueue, x * sizeof(sd_bus_message*));
1346 bus->rqueue_allocated = x;
1351 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1356 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1359 if (bus->rqueue_size > 0) {
1360 /* Dispatch a queued message */
1362 *m = bus->rqueue[0];
1363 bus->rqueue_size --;
1364 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1368 /* Try to read a new message */
1369 r = bus_read_message(bus);
1379 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1382 assert_return(bus, -EINVAL);
1383 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1384 assert_return(m, -EINVAL);
1385 assert_return(!bus_pid_changed(bus), -ECHILD);
1388 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1395 /* If the serial number isn't kept, then we know that no reply
1397 if (!serial && !m->sealed)
1398 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1400 r = bus_seal_message(bus, m);
1404 /* If this is a reply and no reply was requested, then let's
1405 * suppress this, if we can */
1406 if (m->dont_send && !serial)
1409 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1412 r = bus_write_message(bus, m, &idx);
1414 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1415 bus_enter_closing(bus);
1418 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1419 /* Wasn't fully written. So let's remember how
1420 * much was written. Note that the first entry
1421 * of the wqueue array is always allocated so
1422 * that we always can remember how much was
1424 bus->wqueue[0] = sd_bus_message_ref(m);
1425 bus->wqueue_size = 1;
1431 /* Just append it to the queue. */
1433 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1436 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1441 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1445 *serial = BUS_MESSAGE_SERIAL(m);
1450 _public_ int sd_bus_send_to(sd_bus *bus, sd_bus_message *m, const char *destination, uint64_t *serial) {
1453 assert_return(bus, -EINVAL);
1454 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1455 assert_return(m, -EINVAL);
1456 assert_return(!bus_pid_changed(bus), -ECHILD);
1458 if (!streq_ptr(m->destination, destination)) {
1463 r = sd_bus_message_set_destination(m, destination);
1468 return sd_bus_send(bus, m, serial);
1471 static usec_t calc_elapse(uint64_t usec) {
1472 if (usec == (uint64_t) -1)
1476 usec = BUS_DEFAULT_TIMEOUT;
1478 return now(CLOCK_MONOTONIC) + usec;
1481 static int timeout_compare(const void *a, const void *b) {
1482 const struct reply_callback *x = a, *y = b;
1484 if (x->timeout != 0 && y->timeout == 0)
1487 if (x->timeout == 0 && y->timeout != 0)
1490 if (x->timeout < y->timeout)
1493 if (x->timeout > y->timeout)
1499 _public_ int sd_bus_call_async(
1502 sd_bus_message_handler_t callback,
1507 struct reply_callback *c;
1510 assert_return(bus, -EINVAL);
1511 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1512 assert_return(m, -EINVAL);
1513 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1514 assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1515 assert_return(callback, -EINVAL);
1516 assert_return(!bus_pid_changed(bus), -ECHILD);
1518 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1522 if (usec != (uint64_t) -1) {
1523 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1528 r = bus_seal_message(bus, m);
1532 c = new0(struct reply_callback, 1);
1536 c->callback = callback;
1537 c->userdata = userdata;
1538 c->serial = BUS_MESSAGE_SERIAL(m);
1539 c->timeout = calc_elapse(usec);
1541 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1547 if (c->timeout != 0) {
1548 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1551 sd_bus_call_async_cancel(bus, c->serial);
1556 r = sd_bus_send(bus, m, serial);
1558 sd_bus_call_async_cancel(bus, c->serial);
1565 _public_ int sd_bus_call_async_cancel(sd_bus *bus, uint64_t serial) {
1566 struct reply_callback *c;
1568 assert_return(bus, -EINVAL);
1569 assert_return(serial != 0, -EINVAL);
1570 assert_return(!bus_pid_changed(bus), -ECHILD);
1572 c = hashmap_remove(bus->reply_callbacks, &serial);
1576 if (c->timeout != 0)
1577 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1583 int bus_ensure_running(sd_bus *bus) {
1588 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED || bus->state == BUS_CLOSING)
1590 if (bus->state == BUS_RUNNING)
1594 r = sd_bus_process(bus, NULL);
1597 if (bus->state == BUS_RUNNING)
1602 r = sd_bus_wait(bus, (uint64_t) -1);
1608 _public_ int sd_bus_call(
1612 sd_bus_error *error,
1613 sd_bus_message **reply) {
1620 assert_return(bus, -EINVAL);
1621 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1622 assert_return(m, -EINVAL);
1623 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1624 assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1625 assert_return(!bus_error_is_dirty(error), -EINVAL);
1626 assert_return(!bus_pid_changed(bus), -ECHILD);
1628 r = bus_ensure_running(bus);
1632 i = bus->rqueue_size;
1634 r = sd_bus_send(bus, m, &serial);
1638 timeout = calc_elapse(usec);
1643 while (i < bus->rqueue_size) {
1644 sd_bus_message *incoming = NULL;
1646 incoming = bus->rqueue[i];
1648 if (incoming->reply_serial == serial) {
1649 /* Found a match! */
1651 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1654 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1659 sd_bus_message_unref(incoming);
1662 } else if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR)
1663 r = sd_bus_error_copy(error, &incoming->error);
1667 sd_bus_message_unref(incoming);
1670 } else if (incoming->header->serial == serial &&
1673 streq(bus->unique_name, incoming->sender)) {
1675 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1678 /* Our own message? Somebody is trying
1679 * to send its own client a message,
1680 * let's not dead-lock, let's fail
1683 sd_bus_message_unref(incoming);
1687 /* Try to read more, right-away */
1691 r = bus_read_message(bus);
1693 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1694 bus_enter_closing(bus);
1704 n = now(CLOCK_MONOTONIC);
1710 left = (uint64_t) -1;
1712 r = bus_poll(bus, true, left);
1716 r = dispatch_wqueue(bus);
1718 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1719 bus_enter_closing(bus);
1726 _public_ int sd_bus_get_fd(sd_bus *bus) {
1728 assert_return(bus, -EINVAL);
1729 assert_return(bus->input_fd == bus->output_fd, -EPERM);
1730 assert_return(!bus_pid_changed(bus), -ECHILD);
1732 return bus->input_fd;
1735 _public_ int sd_bus_get_events(sd_bus *bus) {
1738 assert_return(bus, -EINVAL);
1739 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1740 assert_return(!bus_pid_changed(bus), -ECHILD);
1742 if (bus->state == BUS_OPENING)
1744 else if (bus->state == BUS_AUTHENTICATING) {
1746 if (bus_socket_auth_needs_write(bus))
1751 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1752 if (bus->rqueue_size <= 0)
1754 if (bus->wqueue_size > 0)
1761 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1762 struct reply_callback *c;
1764 assert_return(bus, -EINVAL);
1765 assert_return(timeout_usec, -EINVAL);
1766 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1767 assert_return(!bus_pid_changed(bus), -ECHILD);
1769 if (bus->state == BUS_CLOSING) {
1774 if (bus->state == BUS_AUTHENTICATING) {
1775 *timeout_usec = bus->auth_timeout;
1779 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1780 *timeout_usec = (uint64_t) -1;
1784 if (bus->rqueue_size > 0) {
1789 c = prioq_peek(bus->reply_callbacks_prioq);
1791 *timeout_usec = (uint64_t) -1;
1795 *timeout_usec = c->timeout;
1799 static int process_timeout(sd_bus *bus) {
1800 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1801 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1802 struct reply_callback *c;
1808 c = prioq_peek(bus->reply_callbacks_prioq);
1812 n = now(CLOCK_MONOTONIC);
1816 r = bus_message_new_synthetic_error(
1819 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1824 m->sender = "org.freedesktop.DBus";
1826 r = bus_seal_synthetic_message(bus, m);
1830 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1831 hashmap_remove(bus->reply_callbacks, &c->serial);
1834 bus->iteration_counter ++;
1836 r = c->callback(bus, m, c->userdata, &error_buffer);
1837 r = bus_maybe_reply_error(m, r, &error_buffer);
1840 bus->current = NULL;
1845 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1849 if (bus->state != BUS_HELLO)
1852 /* Let's make sure the first message on the bus is the HELLO
1853 * reply. But note that we don't actually parse the message
1854 * here (we leave that to the usual handling), we just verify
1855 * we don't let any earlier msg through. */
1857 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1858 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1861 if (m->reply_serial != bus->hello_serial)
1867 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1868 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1869 struct reply_callback *c;
1875 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1876 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1879 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1883 if (c->timeout != 0)
1884 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1886 r = sd_bus_message_rewind(m, true);
1890 r = c->callback(bus, m, c->userdata, &error_buffer);
1891 r = bus_maybe_reply_error(m, r, &error_buffer);
1897 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1898 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1899 struct filter_callback *l;
1906 bus->filter_callbacks_modified = false;
1908 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1910 if (bus->filter_callbacks_modified)
1913 /* Don't run this more than once per iteration */
1914 if (l->last_iteration == bus->iteration_counter)
1917 l->last_iteration = bus->iteration_counter;
1919 r = sd_bus_message_rewind(m, true);
1923 r = l->callback(bus, m, l->userdata, &error_buffer);
1924 r = bus_maybe_reply_error(m, r, &error_buffer);
1930 } while (bus->filter_callbacks_modified);
1935 static int process_match(sd_bus *bus, sd_bus_message *m) {
1942 bus->match_callbacks_modified = false;
1944 r = bus_match_run(bus, &bus->match_callbacks, m);
1948 } while (bus->match_callbacks_modified);
1953 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1954 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1960 if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
1963 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1966 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1969 if (streq_ptr(m->member, "Ping"))
1970 r = sd_bus_message_new_method_return(m, &reply);
1971 else if (streq_ptr(m->member, "GetMachineId")) {
1975 r = sd_id128_get_machine(&id);
1979 r = sd_bus_message_new_method_return(m, &reply);
1983 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1985 r = sd_bus_message_new_method_errorf(
1987 SD_BUS_ERROR_UNKNOWN_METHOD,
1988 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1994 r = sd_bus_send(bus, reply, NULL);
2001 static int process_message(sd_bus *bus, sd_bus_message *m) {
2008 bus->iteration_counter++;
2010 log_debug("Got message sender=%s object=%s interface=%s member=%s",
2011 strna(sd_bus_message_get_sender(m)),
2012 strna(sd_bus_message_get_path(m)),
2013 strna(sd_bus_message_get_interface(m)),
2014 strna(sd_bus_message_get_member(m)));
2016 r = process_hello(bus, m);
2020 r = process_reply(bus, m);
2024 r = process_filter(bus, m);
2028 r = process_match(bus, m);
2032 r = process_builtin(bus, m);
2036 r = bus_process_object(bus, m);
2039 bus->current = NULL;
2043 static int process_running(sd_bus *bus, sd_bus_message **ret) {
2044 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2048 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2050 r = process_timeout(bus);
2054 r = dispatch_wqueue(bus);
2058 r = dispatch_rqueue(bus, &m);
2064 r = process_message(bus, m);
2069 r = sd_bus_message_rewind(m, true);
2078 if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2080 r = sd_bus_reply_method_errorf(
2082 SD_BUS_ERROR_UNKNOWN_OBJECT,
2083 "Unknown object '%s'.", m->path);
2097 static int process_closing(sd_bus *bus, sd_bus_message **ret) {
2098 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2099 struct reply_callback *c;
2103 assert(bus->state == BUS_CLOSING);
2105 c = hashmap_first(bus->reply_callbacks);
2107 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2109 /* First, fail all outstanding method calls */
2110 r = bus_message_new_synthetic_error(
2113 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Connection terminated"),
2118 r = bus_seal_synthetic_message(bus, m);
2122 if (c->timeout != 0)
2123 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
2125 hashmap_remove(bus->reply_callbacks, &c->serial);
2128 bus->iteration_counter++;
2130 r = c->callback(bus, m, c->userdata, &error_buffer);
2131 r = bus_maybe_reply_error(m, r, &error_buffer);
2137 /* Then, synthesize a Disconnected message */
2138 r = sd_bus_message_new_signal(
2140 "/org/freedesktop/DBus/Local",
2141 "org.freedesktop.DBus.Local",
2147 m->sender = "org.freedesktop.DBus.Local";
2149 r = bus_seal_synthetic_message(bus, m);
2156 bus->iteration_counter++;
2158 r = process_filter(bus, m);
2162 r = process_match(bus, m);
2174 bus->current = NULL;
2178 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2179 BUS_DONT_DESTROY(bus);
2182 /* Returns 0 when we didn't do anything. This should cause the
2183 * caller to invoke sd_bus_wait() before returning the next
2184 * time. Returns > 0 when we did something, which possibly
2185 * means *ret is filled in with an unprocessed message. */
2187 assert_return(bus, -EINVAL);
2188 assert_return(!bus_pid_changed(bus), -ECHILD);
2190 /* We don't allow recursively invoking sd_bus_process(). */
2191 assert_return(!bus->current, -EBUSY);
2193 switch (bus->state) {
2200 r = bus_socket_process_opening(bus);
2201 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2202 bus_enter_closing(bus);
2210 case BUS_AUTHENTICATING:
2211 r = bus_socket_process_authenticating(bus);
2212 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2213 bus_enter_closing(bus);
2225 r = process_running(bus, ret);
2226 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2227 bus_enter_closing(bus);
2237 return process_closing(bus, ret);
2240 assert_not_reached("Unknown state");
2243 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2244 struct pollfd p[2] = {};
2247 usec_t m = (usec_t) -1;
2251 if (bus->state == BUS_CLOSING)
2254 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2256 e = sd_bus_get_events(bus);
2261 /* The caller really needs some more data, he doesn't
2262 * care about what's already read, or any timeouts
2267 /* The caller wants to process if there's something to
2268 * process, but doesn't care otherwise */
2270 r = sd_bus_get_timeout(bus, &until);
2275 nw = now(CLOCK_MONOTONIC);
2276 m = until > nw ? until - nw : 0;
2280 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2283 p[0].fd = bus->input_fd;
2284 if (bus->output_fd == bus->input_fd) {
2288 p[0].events = e & POLLIN;
2289 p[1].fd = bus->output_fd;
2290 p[1].events = e & POLLOUT;
2294 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2298 return r > 0 ? 1 : 0;
2301 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2303 assert_return(bus, -EINVAL);
2304 assert_return(!bus_pid_changed(bus), -ECHILD);
2306 if (bus->state == BUS_CLOSING)
2309 assert_return(BUS_IS_OPEN(bus->state) , -ENOTCONN);
2311 if (bus->rqueue_size > 0)
2314 return bus_poll(bus, false, timeout_usec);
2317 _public_ int sd_bus_flush(sd_bus *bus) {
2320 assert_return(bus, -EINVAL);
2321 assert_return(!bus_pid_changed(bus), -ECHILD);
2323 if (bus->state == BUS_CLOSING)
2326 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2328 r = bus_ensure_running(bus);
2332 if (bus->wqueue_size <= 0)
2336 r = dispatch_wqueue(bus);
2338 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
2339 bus_enter_closing(bus);
2344 if (bus->wqueue_size <= 0)
2347 r = bus_poll(bus, false, (uint64_t) -1);
2353 _public_ int sd_bus_add_filter(sd_bus *bus,
2354 sd_bus_message_handler_t callback,
2357 struct filter_callback *f;
2359 assert_return(bus, -EINVAL);
2360 assert_return(callback, -EINVAL);
2361 assert_return(!bus_pid_changed(bus), -ECHILD);
2363 f = new0(struct filter_callback, 1);
2366 f->callback = callback;
2367 f->userdata = userdata;
2369 bus->filter_callbacks_modified = true;
2370 LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2374 _public_ int sd_bus_remove_filter(sd_bus *bus,
2375 sd_bus_message_handler_t callback,
2378 struct filter_callback *f;
2380 assert_return(bus, -EINVAL);
2381 assert_return(callback, -EINVAL);
2382 assert_return(!bus_pid_changed(bus), -ECHILD);
2384 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2385 if (f->callback == callback && f->userdata == userdata) {
2386 bus->filter_callbacks_modified = true;
2387 LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2396 _public_ int sd_bus_add_match(sd_bus *bus,
2398 sd_bus_message_handler_t callback,
2401 struct bus_match_component *components = NULL;
2402 unsigned n_components = 0;
2403 uint64_t cookie = 0;
2406 assert_return(bus, -EINVAL);
2407 assert_return(match, -EINVAL);
2408 assert_return(!bus_pid_changed(bus), -ECHILD);
2410 r = bus_match_parse(match, &components, &n_components);
2414 if (bus->bus_client) {
2415 cookie = ++bus->match_cookie;
2417 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2422 bus->match_callbacks_modified = true;
2423 r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2425 if (bus->bus_client)
2426 bus_remove_match_internal(bus, match, cookie);
2430 bus_match_parse_free(components, n_components);
2434 _public_ int sd_bus_remove_match(sd_bus *bus,
2436 sd_bus_message_handler_t callback,
2439 struct bus_match_component *components = NULL;
2440 unsigned n_components = 0;
2442 uint64_t cookie = 0;
2444 assert_return(bus, -EINVAL);
2445 assert_return(match, -EINVAL);
2446 assert_return(!bus_pid_changed(bus), -ECHILD);
2448 r = bus_match_parse(match, &components, &n_components);
2452 bus->match_callbacks_modified = true;
2453 r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2455 if (bus->bus_client)
2456 q = bus_remove_match_internal(bus, match, cookie);
2458 bus_match_parse_free(components, n_components);
2460 return r < 0 ? r : q;
2463 bool bus_pid_changed(sd_bus *bus) {
2466 /* We don't support people creating a bus connection and
2467 * keeping it around over a fork(). Let's complain. */
2469 return bus->original_pid != getpid();
2472 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2473 sd_bus *bus = userdata;
2478 r = sd_bus_process(bus, NULL);
2485 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2486 sd_bus *bus = userdata;
2491 r = sd_bus_process(bus, NULL);
2498 static int prepare_callback(sd_event_source *s, void *userdata) {
2499 sd_bus *bus = userdata;
2506 e = sd_bus_get_events(bus);
2510 if (bus->output_fd != bus->input_fd) {
2512 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2516 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2520 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2525 r = sd_bus_get_timeout(bus, &until);
2531 j = sd_event_source_set_time(bus->time_event_source, until);
2536 r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2543 static int quit_callback(sd_event_source *event, void *userdata) {
2544 sd_bus *bus = userdata;
2553 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2556 assert_return(bus, -EINVAL);
2557 assert_return(!bus->event, -EBUSY);
2559 assert(!bus->input_io_event_source);
2560 assert(!bus->output_io_event_source);
2561 assert(!bus->time_event_source);
2564 bus->event = sd_event_ref(event);
2566 r = sd_event_default(&bus->event);
2571 r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2575 r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2579 if (bus->output_fd != bus->input_fd) {
2580 r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2584 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2589 r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2593 r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
2597 r = sd_event_source_set_priority(bus->time_event_source, priority);
2601 r = sd_event_add_quit(bus->event, quit_callback, bus, &bus->quit_event_source);
2608 sd_bus_detach_event(bus);
2612 _public_ int sd_bus_detach_event(sd_bus *bus) {
2613 assert_return(bus, -EINVAL);
2614 assert_return(bus->event, -ENXIO);
2616 if (bus->input_io_event_source) {
2617 sd_event_source_set_enabled(bus->input_io_event_source, SD_EVENT_OFF);
2618 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2621 if (bus->output_io_event_source) {
2622 sd_event_source_set_enabled(bus->output_io_event_source, SD_EVENT_OFF);
2623 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2626 if (bus->time_event_source) {
2627 sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);
2628 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2631 if (bus->quit_event_source) {
2632 sd_event_source_set_enabled(bus->quit_event_source, SD_EVENT_OFF);
2633 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2637 bus->event = sd_event_unref(bus->event);
2642 _public_ sd_event* sd_bus_get_event(sd_bus *bus) {
2643 assert_return(bus, NULL);
2648 _public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2649 assert_return(bus, NULL);
2651 return bus->current;
2654 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
2659 assert(default_bus);
2662 return !!*default_bus;
2665 *ret = sd_bus_ref(*default_bus);
2673 b->default_bus_ptr = default_bus;
2681 _public_ int sd_bus_default_system(sd_bus **ret) {
2682 static __thread sd_bus *default_system_bus = NULL;
2684 return bus_default(sd_bus_open_system, &default_system_bus, ret);
2687 _public_ int sd_bus_default_user(sd_bus **ret) {
2688 static __thread sd_bus *default_user_bus = NULL;
2690 return bus_default(sd_bus_open_user, &default_user_bus, ret);
2693 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
2694 assert_return(b, -EINVAL);
2695 assert_return(tid, -EINVAL);
2696 assert_return(!bus_pid_changed(b), -ECHILD);
2704 return sd_event_get_tid(b->event, tid);
2709 _public_ char *sd_bus_label_escape(const char *s) {
2713 assert_return(s, NULL);
2715 /* Escapes all chars that D-Bus' object path cannot deal
2716 * with. Can be reversed with bus_path_unescape(). We special
2717 * case the empty string. */
2722 r = new(char, strlen(s)*3 + 1);
2726 for (f = s, t = r; *f; f++) {
2728 /* Escape everything that is not a-zA-Z0-9. We also
2729 * escape 0-9 if it's the first character */
2731 if (!(*f >= 'A' && *f <= 'Z') &&
2732 !(*f >= 'a' && *f <= 'z') &&
2733 !(f > s && *f >= '0' && *f <= '9')) {
2735 *(t++) = hexchar(*f >> 4);
2736 *(t++) = hexchar(*f);
2746 _public_ char *sd_bus_label_unescape(const char *f) {
2749 assert_return(f, NULL);
2751 /* Special case for the empty string */
2755 r = new(char, strlen(f) + 1);
2759 for (t = r; *f; f++) {
2764 if ((a = unhexchar(f[1])) < 0 ||
2765 (b = unhexchar(f[2])) < 0) {
2766 /* Invalid escape code, let's take it literal then */
2769 *(t++) = (char) ((a << 4) | b);
2781 _public_ int sd_bus_get_peer_creds(sd_bus *bus, uint64_t mask, sd_bus_creds **ret) {
2786 assert_return(bus, -EINVAL);
2787 assert_return(mask <= _SD_BUS_CREDS_MAX, -ENOTSUP);
2788 assert_return(ret, -EINVAL);
2789 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2790 assert_return(!bus_pid_changed(bus), -ECHILD);
2791 assert_return(!bus->is_kernel, -ENOTSUP);
2793 if (!bus->ucred_valid && !isempty(bus->label))
2796 c = bus_creds_new();
2800 if (bus->ucred_valid) {
2801 pid = c->pid = bus->ucred.pid;
2802 c->uid = bus->ucred.uid;
2803 c->gid = bus->ucred.gid;
2805 c->mask |= ((SD_BUS_CREDS_UID | SD_BUS_CREDS_PID | SD_BUS_CREDS_GID) & mask) & bus->creds_mask;
2808 if (!isempty(bus->label) && (mask & SD_BUS_CREDS_SELINUX_CONTEXT)) {
2809 c->label = strdup(bus->label);
2811 sd_bus_creds_unref(c);
2815 c->mask |= SD_BUS_CREDS_SELINUX_CONTEXT | bus->creds_mask;
2818 r = bus_creds_add_more(c, mask, pid, 0);