1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
39 #include "bus-internal.h"
40 #include "bus-message.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
49 #include "bus-container.h"
51 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
53 static void bus_close_fds(sd_bus *b) {
57 close_nointr_nofail(b->input_fd);
59 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
60 close_nointr_nofail(b->output_fd);
62 b->input_fd = b->output_fd = -1;
65 static void bus_node_destroy(sd_bus *b, struct node *n) {
66 struct node_callback *c;
67 struct node_vtable *v;
68 struct node_enumerator *e;
76 bus_node_destroy(b, n->child);
78 while ((c = n->callbacks)) {
79 LIST_REMOVE(callbacks, n->callbacks, c);
83 while ((v = n->vtables)) {
84 LIST_REMOVE(vtables, n->vtables, v);
89 while ((e = n->enumerators)) {
90 LIST_REMOVE(enumerators, n->enumerators, e);
95 LIST_REMOVE(siblings, n->parent->child, n);
97 assert_se(hashmap_remove(b->nodes, n->path) == n);
102 static void bus_reset_queues(sd_bus *b) {
107 for (i = 0; i < b->rqueue_size; i++)
108 sd_bus_message_unref(b->rqueue[i]);
111 for (i = 0; i < b->wqueue_size; i++)
112 sd_bus_message_unref(b->wqueue[i]);
115 b->rqueue = b->wqueue = NULL;
116 b->rqueue_size = b->wqueue_size = 0;
119 static void bus_free(sd_bus *b) {
120 struct filter_callback *f;
125 sd_bus_detach_event(b);
130 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
133 free(b->unique_name);
134 free(b->auth_buffer);
140 strv_free(b->exec_argv);
142 close_many(b->fds, b->n_fds);
147 hashmap_free_free(b->reply_callbacks);
148 prioq_free(b->reply_callbacks_prioq);
150 while ((f = b->filter_callbacks)) {
151 LIST_REMOVE(callbacks, b->filter_callbacks, f);
155 bus_match_free(&b->match_callbacks);
157 hashmap_free_free(b->vtable_methods);
158 hashmap_free_free(b->vtable_properties);
160 while ((n = hashmap_first(b->nodes)))
161 bus_node_destroy(b, n);
163 hashmap_free(b->nodes);
165 bus_kernel_flush_memfd(b);
167 assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
172 _public_ int sd_bus_new(sd_bus **ret) {
175 assert_return(ret, -EINVAL);
181 r->n_ref = REFCNT_INIT;
182 r->input_fd = r->output_fd = -1;
183 r->message_version = 1;
184 r->hello_flags |= KDBUS_HELLO_ACCEPT_FD|KDBUS_HELLO_ATTACH_NAMES;
185 r->original_pid = getpid();
187 assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
189 /* We guarantee that wqueue always has space for at least one
191 r->wqueue = new(sd_bus_message*, 1);
201 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
204 assert_return(bus, -EINVAL);
205 assert_return(bus->state == BUS_UNSET, -EPERM);
206 assert_return(address, -EINVAL);
207 assert_return(!bus_pid_changed(bus), -ECHILD);
219 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
220 assert_return(bus, -EINVAL);
221 assert_return(bus->state == BUS_UNSET, -EPERM);
222 assert_return(input_fd >= 0, -EINVAL);
223 assert_return(output_fd >= 0, -EINVAL);
224 assert_return(!bus_pid_changed(bus), -ECHILD);
226 bus->input_fd = input_fd;
227 bus->output_fd = output_fd;
231 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
234 assert_return(bus, -EINVAL);
235 assert_return(bus->state == BUS_UNSET, -EPERM);
236 assert_return(path, -EINVAL);
237 assert_return(!strv_isempty(argv), -EINVAL);
238 assert_return(!bus_pid_changed(bus), -ECHILD);
250 free(bus->exec_path);
251 strv_free(bus->exec_argv);
259 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
260 assert_return(bus, -EINVAL);
261 assert_return(bus->state == BUS_UNSET, -EPERM);
262 assert_return(!bus_pid_changed(bus), -ECHILD);
264 bus->bus_client = !!b;
268 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
269 assert_return(bus, -EINVAL);
270 assert_return(bus->state == BUS_UNSET, -EPERM);
271 assert_return(!bus_pid_changed(bus), -ECHILD);
273 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
277 _public_ int sd_bus_negotiate_attach_timestamp(sd_bus *bus, int b) {
278 assert_return(bus, -EINVAL);
279 assert_return(bus->state == BUS_UNSET, -EPERM);
280 assert_return(!bus_pid_changed(bus), -ECHILD);
282 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_TIMESTAMP, b);
286 _public_ int sd_bus_negotiate_attach_creds(sd_bus *bus, uint64_t mask) {
287 assert_return(bus, -EINVAL);
288 assert_return(mask <= _SD_BUS_CREDS_MAX, -EINVAL);
289 assert_return(bus->state == BUS_UNSET, -EPERM);
290 assert_return(!bus_pid_changed(bus), -ECHILD);
292 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CREDS,
293 !!(mask & (SD_BUS_CREDS_UID|SD_BUS_CREDS_GID|SD_BUS_CREDS_PID|SD_BUS_CREDS_PID_STARTTIME|SD_BUS_CREDS_TID)));
295 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_COMM,
296 !!(mask & (SD_BUS_CREDS_COMM|SD_BUS_CREDS_TID_COMM)));
298 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_EXE,
299 !!(mask & SD_BUS_CREDS_EXE));
301 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CMDLINE,
302 !!(mask & SD_BUS_CREDS_CMDLINE));
304 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CGROUP,
305 !!(mask & (SD_BUS_CREDS_CGROUP|SD_BUS_CREDS_UNIT|SD_BUS_CREDS_USER_UNIT|SD_BUS_CREDS_SLICE|SD_BUS_CREDS_SESSION|SD_BUS_CREDS_OWNER_UID)));
307 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CAPS,
308 !!(mask & (SD_BUS_CREDS_EFFECTIVE_CAPS|SD_BUS_CREDS_PERMITTED_CAPS|SD_BUS_CREDS_INHERITABLE_CAPS|SD_BUS_CREDS_BOUNDING_CAPS)));
310 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_SECLABEL,
311 !!(mask & SD_BUS_CREDS_SELINUX_CONTEXT));
313 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_AUDIT,
314 !!(mask & (SD_BUS_CREDS_AUDIT_SESSION_ID|SD_BUS_CREDS_AUDIT_LOGIN_UID)));
316 bus->creds_mask = mask;
321 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
322 assert_return(bus, -EINVAL);
323 assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
324 assert_return(bus->state == BUS_UNSET, -EPERM);
325 assert_return(!bus_pid_changed(bus), -ECHILD);
327 bus->is_server = !!b;
328 bus->server_id = server_id;
332 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
333 assert_return(bus, -EINVAL);
334 assert_return(bus->state == BUS_UNSET, -EPERM);
335 assert_return(!bus_pid_changed(bus), -ECHILD);
337 bus->anonymous_auth = !!b;
341 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata, sd_bus_error *error) {
346 assert(bus->state == BUS_HELLO || bus->state == BUS_CLOSING);
349 r = sd_bus_message_get_errno(reply);
355 r = sd_bus_message_read(reply, "s", &s);
359 if (!service_name_is_valid(s) || s[0] != ':')
362 bus->unique_name = strdup(s);
363 if (!bus->unique_name)
366 if (bus->state == BUS_HELLO)
367 bus->state = BUS_RUNNING;
372 static int bus_send_hello(sd_bus *bus) {
373 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
378 if (!bus->bus_client || bus->is_kernel)
381 r = sd_bus_message_new_method_call(
383 "org.freedesktop.DBus",
385 "org.freedesktop.DBus",
391 return sd_bus_call_async(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
394 int bus_start_running(sd_bus *bus) {
397 if (bus->bus_client && !bus->is_kernel) {
398 bus->state = BUS_HELLO;
402 bus->state = BUS_RUNNING;
406 static int parse_address_key(const char **p, const char *key, char **value) {
417 if (strncmp(*p, key, l) != 0)
430 while (*a != ';' && *a != ',' && *a != 0) {
448 c = (char) ((x << 4) | y);
455 t = realloc(r, n + 2);
483 static void skip_address_key(const char **p) {
487 *p += strcspn(*p, ",");
493 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
494 _cleanup_free_ char *path = NULL, *abstract = NULL;
503 while (**p != 0 && **p != ';') {
504 r = parse_address_key(p, "guid", guid);
510 r = parse_address_key(p, "path", &path);
516 r = parse_address_key(p, "abstract", &abstract);
525 if (!path && !abstract)
528 if (path && abstract)
533 if (l > sizeof(b->sockaddr.un.sun_path))
536 b->sockaddr.un.sun_family = AF_UNIX;
537 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
538 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
539 } else if (abstract) {
540 l = strlen(abstract);
541 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
544 b->sockaddr.un.sun_family = AF_UNIX;
545 b->sockaddr.un.sun_path[0] = 0;
546 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
547 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
553 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
554 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
556 struct addrinfo *result, hints = {
557 .ai_socktype = SOCK_STREAM,
558 .ai_flags = AI_ADDRCONFIG,
566 while (**p != 0 && **p != ';') {
567 r = parse_address_key(p, "guid", guid);
573 r = parse_address_key(p, "host", &host);
579 r = parse_address_key(p, "port", &port);
585 r = parse_address_key(p, "family", &family);
598 if (streq(family, "ipv4"))
599 hints.ai_family = AF_INET;
600 else if (streq(family, "ipv6"))
601 hints.ai_family = AF_INET6;
606 r = getaddrinfo(host, port, &hints, &result);
610 return -EADDRNOTAVAIL;
612 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
613 b->sockaddr_size = result->ai_addrlen;
615 freeaddrinfo(result);
620 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
622 unsigned n_argv = 0, j;
631 while (**p != 0 && **p != ';') {
632 r = parse_address_key(p, "guid", guid);
638 r = parse_address_key(p, "path", &path);
644 if (startswith(*p, "argv")) {
648 ul = strtoul(*p + 4, (char**) p, 10);
649 if (errno > 0 || **p != '=' || ul > 256) {
659 x = realloc(argv, sizeof(char*) * (ul + 2));
665 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
671 r = parse_address_key(p, NULL, argv + ul);
686 /* Make sure there are no holes in the array, with the
687 * exception of argv[0] */
688 for (j = 1; j < n_argv; j++)
694 if (argv && argv[0] == NULL) {
695 argv[0] = strdup(path);
707 for (j = 0; j < n_argv; j++)
715 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
716 _cleanup_free_ char *path = NULL;
724 while (**p != 0 && **p != ';') {
725 r = parse_address_key(p, "guid", guid);
731 r = parse_address_key(p, "path", &path);
750 static int parse_container_address(sd_bus *b, const char **p, char **guid) {
751 _cleanup_free_ char *machine = NULL;
759 while (**p != 0 && **p != ';') {
760 r = parse_address_key(p, "guid", guid);
766 r = parse_address_key(p, "machine", &machine);
779 b->machine = machine;
782 b->sockaddr.un.sun_family = AF_UNIX;
783 strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
784 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
789 static void bus_reset_parsed_address(sd_bus *b) {
793 b->sockaddr_size = 0;
794 strv_free(b->exec_argv);
798 b->server_id = SD_ID128_NULL;
805 static int bus_parse_next_address(sd_bus *b) {
806 _cleanup_free_ char *guid = NULL;
814 if (b->address[b->address_index] == 0)
817 bus_reset_parsed_address(b);
819 a = b->address + b->address_index;
828 if (startswith(a, "unix:")) {
831 r = parse_unix_address(b, &a, &guid);
836 } else if (startswith(a, "tcp:")) {
839 r = parse_tcp_address(b, &a, &guid);
845 } else if (startswith(a, "unixexec:")) {
848 r = parse_exec_address(b, &a, &guid);
854 } else if (startswith(a, "kernel:")) {
857 r = parse_kernel_address(b, &a, &guid);
862 } else if (startswith(a, "x-container:")) {
865 r = parse_container_address(b, &a, &guid);
878 r = sd_id128_from_string(guid, &b->server_id);
883 b->address_index = a - b->address;
887 static int bus_start_address(sd_bus *b) {
897 r = bus_socket_exec(b);
901 b->last_connect_error = -r;
902 } else if (b->kernel) {
904 r = bus_kernel_connect(b);
908 b->last_connect_error = -r;
910 } else if (b->machine) {
912 r = bus_container_connect(b);
916 b->last_connect_error = -r;
918 } else if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
920 r = bus_socket_connect(b);
924 b->last_connect_error = -r;
927 r = bus_parse_next_address(b);
931 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
935 int bus_next_address(sd_bus *b) {
938 bus_reset_parsed_address(b);
939 return bus_start_address(b);
942 static int bus_start_fd(sd_bus *b) {
947 assert(b->input_fd >= 0);
948 assert(b->output_fd >= 0);
950 r = fd_nonblock(b->input_fd, true);
954 r = fd_cloexec(b->input_fd, true);
958 if (b->input_fd != b->output_fd) {
959 r = fd_nonblock(b->output_fd, true);
963 r = fd_cloexec(b->output_fd, true);
968 if (fstat(b->input_fd, &st) < 0)
971 if (S_ISCHR(b->input_fd))
972 return bus_kernel_take_fd(b);
974 return bus_socket_take_fd(b);
977 _public_ int sd_bus_start(sd_bus *bus) {
980 assert_return(bus, -EINVAL);
981 assert_return(bus->state == BUS_UNSET, -EPERM);
982 assert_return(!bus_pid_changed(bus), -ECHILD);
984 bus->state = BUS_OPENING;
986 if (bus->is_server && bus->bus_client)
989 if (bus->input_fd >= 0)
990 r = bus_start_fd(bus);
991 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
992 r = bus_start_address(bus);
999 return bus_send_hello(bus);
1002 _public_ int sd_bus_open_system(sd_bus **ret) {
1007 assert_return(ret, -EINVAL);
1013 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1015 r = sd_bus_set_address(b, e);
1019 b->sockaddr.un.sun_family = AF_UNIX;
1020 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
1021 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
1024 b->bus_client = true;
1026 r = sd_bus_start(b);
1038 _public_ int sd_bus_open_user(sd_bus **ret) {
1044 assert_return(ret, -EINVAL);
1050 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1052 r = sd_bus_set_address(b, e);
1056 e = secure_getenv("XDG_RUNTIME_DIR");
1063 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1068 b->sockaddr.un.sun_family = AF_UNIX;
1069 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1070 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1073 b->bus_client = true;
1075 r = sd_bus_start(b);
1087 _public_ int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1088 _cleanup_free_ char *e = NULL;
1093 assert_return(host, -EINVAL);
1094 assert_return(ret, -EINVAL);
1096 e = bus_address_escape(host);
1100 p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1104 r = sd_bus_new(&bus);
1111 bus->bus_client = true;
1113 r = sd_bus_start(bus);
1123 _public_ int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1124 _cleanup_free_ char *e = NULL;
1129 assert_return(machine, -EINVAL);
1130 assert_return(ret, -EINVAL);
1132 e = bus_address_escape(machine);
1136 p = strjoin("x-container:machine=", e, NULL);
1140 r = sd_bus_new(&bus);
1147 bus->bus_client = true;
1149 r = sd_bus_start(bus);
1159 _public_ void sd_bus_close(sd_bus *bus) {
1163 if (bus->state == BUS_CLOSED)
1165 if (bus_pid_changed(bus))
1168 bus->state = BUS_CLOSED;
1170 sd_bus_detach_event(bus);
1172 /* Drop all queued messages so that they drop references to
1173 * the bus object and the bus may be freed */
1174 bus_reset_queues(bus);
1176 if (!bus->is_kernel)
1179 /* We'll leave the fd open in case this is a kernel bus, since
1180 * there might still be memblocks around that reference this
1181 * bus, and they might need to invoke the
1182 * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1186 static void bus_enter_closing(sd_bus *bus) {
1189 if (bus->state != BUS_OPENING &&
1190 bus->state != BUS_AUTHENTICATING &&
1191 bus->state != BUS_HELLO &&
1192 bus->state != BUS_RUNNING)
1195 bus->state = BUS_CLOSING;
1198 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1199 assert_return(bus, NULL);
1201 assert_se(REFCNT_INC(bus->n_ref) >= 2);
1206 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1207 assert_return(bus, NULL);
1209 if (REFCNT_DEC(bus->n_ref) <= 0)
1215 _public_ int sd_bus_is_open(sd_bus *bus) {
1217 assert_return(bus, -EINVAL);
1218 assert_return(!bus_pid_changed(bus), -ECHILD);
1220 return BUS_IS_OPEN(bus->state);
1223 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1226 assert_return(bus, -EINVAL);
1227 assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1228 assert_return(!bus_pid_changed(bus), -ECHILD);
1230 if (type == SD_BUS_TYPE_UNIX_FD) {
1231 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1234 r = bus_ensure_running(bus);
1238 return bus->can_fds;
1241 return bus_type_is_valid(type);
1244 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1247 assert_return(bus, -EINVAL);
1248 assert_return(server_id, -EINVAL);
1249 assert_return(!bus_pid_changed(bus), -ECHILD);
1251 r = bus_ensure_running(bus);
1255 *server_id = bus->server_id;
1259 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1262 if (m->header->version > b->message_version)
1266 /* If we copy the same message to multiple
1267 * destinations, avoid using the same serial
1269 b->serial = MAX(b->serial, BUS_MESSAGE_SERIAL(m));
1273 return bus_message_seal(m, ++b->serial);
1276 static int bus_write_message(sd_bus *bus, sd_bus_message *message, size_t *idx) {
1283 r = bus_kernel_write_message(bus, message);
1285 r = bus_socket_write_message(bus, message, idx);
1290 static int dispatch_wqueue(sd_bus *bus) {
1294 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1296 while (bus->wqueue_size > 0) {
1298 r = bus_write_message(bus, bus->wqueue[0], &bus->windex);
1302 /* Didn't do anything this time */
1304 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1305 /* Fully written. Let's drop the entry from
1308 * This isn't particularly optimized, but
1309 * well, this is supposed to be our worst-case
1310 * buffer only, and the socket buffer is
1311 * supposed to be our primary buffer, and if
1312 * it got full, then all bets are off
1315 sd_bus_message_unref(bus->wqueue[0]);
1316 bus->wqueue_size --;
1317 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1327 static int bus_read_message(sd_bus *bus, sd_bus_message **m) {
1334 r = bus_kernel_read_message(bus, m);
1336 r = bus_socket_read_message(bus, m);
1341 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1342 sd_bus_message *z = NULL;
1347 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1349 if (bus->rqueue_size > 0) {
1350 /* Dispatch a queued message */
1352 *m = bus->rqueue[0];
1353 bus->rqueue_size --;
1354 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1358 /* Try to read a new message */
1360 r = bus_read_message(bus, &z);
1373 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1376 assert_return(bus, -EINVAL);
1377 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1378 assert_return(m, -EINVAL);
1379 assert_return(!bus_pid_changed(bus), -ECHILD);
1382 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1389 /* If the serial number isn't kept, then we know that no reply
1391 if (!serial && !m->sealed)
1392 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1394 r = bus_seal_message(bus, m);
1398 /* If this is a reply and no reply was requested, then let's
1399 * suppress this, if we can */
1400 if (m->dont_send && !serial)
1403 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1406 r = bus_write_message(bus, m, &idx);
1409 else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1410 /* Wasn't fully written. So let's remember how
1411 * much was written. Note that the first entry
1412 * of the wqueue array is always allocated so
1413 * that we always can remember how much was
1415 bus->wqueue[0] = sd_bus_message_ref(m);
1416 bus->wqueue_size = 1;
1422 /* Just append it to the queue. */
1424 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1427 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1432 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1436 *serial = BUS_MESSAGE_SERIAL(m);
1441 _public_ int sd_bus_send_to(sd_bus *bus, sd_bus_message *m, const char *destination, uint64_t *serial) {
1444 assert_return(bus, -EINVAL);
1445 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1446 assert_return(m, -EINVAL);
1447 assert_return(!bus_pid_changed(bus), -ECHILD);
1449 if (!streq_ptr(m->destination, destination)) {
1454 r = sd_bus_message_set_destination(m, destination);
1459 return sd_bus_send(bus, m, serial);
1462 static usec_t calc_elapse(uint64_t usec) {
1463 if (usec == (uint64_t) -1)
1467 usec = BUS_DEFAULT_TIMEOUT;
1469 return now(CLOCK_MONOTONIC) + usec;
1472 static int timeout_compare(const void *a, const void *b) {
1473 const struct reply_callback *x = a, *y = b;
1475 if (x->timeout != 0 && y->timeout == 0)
1478 if (x->timeout == 0 && y->timeout != 0)
1481 if (x->timeout < y->timeout)
1484 if (x->timeout > y->timeout)
1490 _public_ int sd_bus_call_async(
1493 sd_bus_message_handler_t callback,
1498 struct reply_callback *c;
1501 assert_return(bus, -EINVAL);
1502 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1503 assert_return(m, -EINVAL);
1504 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1505 assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1506 assert_return(callback, -EINVAL);
1507 assert_return(!bus_pid_changed(bus), -ECHILD);
1509 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1513 if (usec != (uint64_t) -1) {
1514 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1519 r = bus_seal_message(bus, m);
1523 c = new0(struct reply_callback, 1);
1527 c->callback = callback;
1528 c->userdata = userdata;
1529 c->serial = BUS_MESSAGE_SERIAL(m);
1530 c->timeout = calc_elapse(usec);
1532 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1538 if (c->timeout != 0) {
1539 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1542 sd_bus_call_async_cancel(bus, c->serial);
1547 r = sd_bus_send(bus, m, serial);
1549 sd_bus_call_async_cancel(bus, c->serial);
1556 _public_ int sd_bus_call_async_cancel(sd_bus *bus, uint64_t serial) {
1557 struct reply_callback *c;
1559 assert_return(bus, -EINVAL);
1560 assert_return(serial != 0, -EINVAL);
1561 assert_return(!bus_pid_changed(bus), -ECHILD);
1563 c = hashmap_remove(bus->reply_callbacks, &serial);
1567 if (c->timeout != 0)
1568 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1574 int bus_ensure_running(sd_bus *bus) {
1579 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED || bus->state == BUS_CLOSING)
1581 if (bus->state == BUS_RUNNING)
1585 r = sd_bus_process(bus, NULL);
1588 if (bus->state == BUS_RUNNING)
1593 r = sd_bus_wait(bus, (uint64_t) -1);
1599 _public_ int sd_bus_call(
1603 sd_bus_error *error,
1604 sd_bus_message **reply) {
1611 assert_return(bus, -EINVAL);
1612 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1613 assert_return(m, -EINVAL);
1614 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1615 assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1616 assert_return(!bus_error_is_dirty(error), -EINVAL);
1617 assert_return(!bus_pid_changed(bus), -ECHILD);
1619 r = bus_ensure_running(bus);
1623 r = sd_bus_send(bus, m, &serial);
1627 timeout = calc_elapse(usec);
1631 sd_bus_message *incoming = NULL;
1636 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1639 /* Make sure there's room for queuing this
1640 * locally, before we read the message */
1642 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1650 r = bus_read_message(bus, &incoming);
1656 if (incoming->reply_serial == serial) {
1657 /* Found a match! */
1659 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1664 sd_bus_message_unref(incoming);
1669 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) {
1672 r = sd_bus_error_copy(error, &incoming->error);
1674 sd_bus_message_unref(incoming);
1678 k = sd_bus_error_get_errno(&incoming->error);
1679 sd_bus_message_unref(incoming);
1683 sd_bus_message_unref(incoming);
1686 } else if (incoming->header->serial == serial &&
1689 streq(bus->unique_name, incoming->sender)) {
1691 /* Our own message? Somebody is trying
1692 * to send its own client a message,
1693 * let's not dead-lock, let's fail
1696 sd_bus_message_unref(incoming);
1700 /* There's already guaranteed to be room for
1701 * this, so need to resize things here */
1702 bus->rqueue[bus->rqueue_size ++] = incoming;
1705 /* Try to read more, right-away */
1714 n = now(CLOCK_MONOTONIC);
1720 left = (uint64_t) -1;
1722 r = bus_poll(bus, true, left);
1726 r = dispatch_wqueue(bus);
1732 _public_ int sd_bus_get_fd(sd_bus *bus) {
1734 assert_return(bus, -EINVAL);
1735 assert_return(bus->input_fd == bus->output_fd, -EPERM);
1736 assert_return(!bus_pid_changed(bus), -ECHILD);
1738 return bus->input_fd;
1741 _public_ int sd_bus_get_events(sd_bus *bus) {
1744 assert_return(bus, -EINVAL);
1745 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1746 assert_return(!bus_pid_changed(bus), -ECHILD);
1748 if (bus->state == BUS_OPENING)
1750 else if (bus->state == BUS_AUTHENTICATING) {
1752 if (bus_socket_auth_needs_write(bus))
1757 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1758 if (bus->rqueue_size <= 0)
1760 if (bus->wqueue_size > 0)
1767 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1768 struct reply_callback *c;
1770 assert_return(bus, -EINVAL);
1771 assert_return(timeout_usec, -EINVAL);
1772 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1773 assert_return(!bus_pid_changed(bus), -ECHILD);
1775 if (bus->state == BUS_CLOSING) {
1780 if (bus->state == BUS_AUTHENTICATING) {
1781 *timeout_usec = bus->auth_timeout;
1785 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1786 *timeout_usec = (uint64_t) -1;
1790 if (bus->rqueue_size > 0) {
1795 c = prioq_peek(bus->reply_callbacks_prioq);
1797 *timeout_usec = (uint64_t) -1;
1801 *timeout_usec = c->timeout;
1805 static int process_timeout(sd_bus *bus) {
1806 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1807 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1808 struct reply_callback *c;
1814 c = prioq_peek(bus->reply_callbacks_prioq);
1818 n = now(CLOCK_MONOTONIC);
1822 r = bus_message_new_synthetic_error(
1825 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1830 r = bus_seal_message(bus, m);
1834 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1835 hashmap_remove(bus->reply_callbacks, &c->serial);
1838 bus->iteration_counter ++;
1840 r = c->callback(bus, m, c->userdata, &error_buffer);
1841 r = bus_maybe_reply_error(m, r, &error_buffer);
1844 bus->current = NULL;
1849 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1853 if (bus->state != BUS_HELLO)
1856 /* Let's make sure the first message on the bus is the HELLO
1857 * reply. But note that we don't actually parse the message
1858 * here (we leave that to the usual handling), we just verify
1859 * we don't let any earlier msg through. */
1861 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1862 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1865 if (m->reply_serial != bus->hello_serial)
1871 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1872 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1873 struct reply_callback *c;
1879 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1880 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1883 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1887 if (c->timeout != 0)
1888 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1890 r = sd_bus_message_rewind(m, true);
1894 r = c->callback(bus, m, c->userdata, &error_buffer);
1895 r = bus_maybe_reply_error(m, r, &error_buffer);
1901 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1902 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1903 struct filter_callback *l;
1910 bus->filter_callbacks_modified = false;
1912 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1914 if (bus->filter_callbacks_modified)
1917 /* Don't run this more than once per iteration */
1918 if (l->last_iteration == bus->iteration_counter)
1921 l->last_iteration = bus->iteration_counter;
1923 r = sd_bus_message_rewind(m, true);
1927 r = l->callback(bus, m, l->userdata, &error_buffer);
1928 r = bus_maybe_reply_error(m, r, &error_buffer);
1934 } while (bus->filter_callbacks_modified);
1939 static int process_match(sd_bus *bus, sd_bus_message *m) {
1946 bus->match_callbacks_modified = false;
1948 r = bus_match_run(bus, &bus->match_callbacks, m);
1952 } while (bus->match_callbacks_modified);
1957 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1958 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1964 if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
1967 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1970 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1973 if (streq_ptr(m->member, "Ping"))
1974 r = sd_bus_message_new_method_return(m, &reply);
1975 else if (streq_ptr(m->member, "GetMachineId")) {
1979 r = sd_id128_get_machine(&id);
1983 r = sd_bus_message_new_method_return(m, &reply);
1987 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1989 r = sd_bus_message_new_method_errorf(
1991 SD_BUS_ERROR_UNKNOWN_METHOD,
1992 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1998 r = sd_bus_send(bus, reply, NULL);
2005 static int process_message(sd_bus *bus, sd_bus_message *m) {
2012 bus->iteration_counter++;
2014 log_debug("Got message sender=%s object=%s interface=%s member=%s",
2015 strna(sd_bus_message_get_sender(m)),
2016 strna(sd_bus_message_get_path(m)),
2017 strna(sd_bus_message_get_interface(m)),
2018 strna(sd_bus_message_get_member(m)));
2020 r = process_hello(bus, m);
2024 r = process_reply(bus, m);
2028 r = process_filter(bus, m);
2032 r = process_match(bus, m);
2036 r = process_builtin(bus, m);
2040 r = bus_process_object(bus, m);
2043 bus->current = NULL;
2047 static int process_running(sd_bus *bus, sd_bus_message **ret) {
2048 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2052 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2054 r = process_timeout(bus);
2058 r = dispatch_wqueue(bus);
2062 r = dispatch_rqueue(bus, &m);
2068 r = process_message(bus, m);
2073 r = sd_bus_message_rewind(m, true);
2082 if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2084 r = sd_bus_reply_method_errorf(
2086 SD_BUS_ERROR_UNKNOWN_OBJECT,
2087 "Unknown object '%s'.", m->path);
2101 static int process_closing(sd_bus *bus, sd_bus_message **ret) {
2102 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2103 struct reply_callback *c;
2107 assert(bus->state == BUS_CLOSING);
2109 c = hashmap_first(bus->reply_callbacks);
2111 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2113 /* First, fail all outstanding method calls */
2114 r = bus_message_new_synthetic_error(
2117 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Connection terminated"),
2122 r = bus_seal_message(bus, m);
2126 if (c->timeout != 0)
2127 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
2129 hashmap_remove(bus->reply_callbacks, &c->serial);
2132 bus->iteration_counter++;
2134 r = c->callback(bus, m, c->userdata, &error_buffer);
2135 r = bus_maybe_reply_error(m, r, &error_buffer);
2141 /* Then, synthesize a Disconnected message */
2142 r = sd_bus_message_new_signal(
2144 "/org/freedesktop/DBus/Local",
2145 "org.freedesktop.DBus.Local",
2151 r = bus_seal_message(bus, m);
2158 bus->iteration_counter++;
2160 r = process_filter(bus, m);
2164 r = process_match(bus, m);
2176 bus->current = NULL;
2180 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2181 BUS_DONT_DESTROY(bus);
2184 /* Returns 0 when we didn't do anything. This should cause the
2185 * caller to invoke sd_bus_wait() before returning the next
2186 * time. Returns > 0 when we did something, which possibly
2187 * means *ret is filled in with an unprocessed message. */
2189 assert_return(bus, -EINVAL);
2190 assert_return(!bus_pid_changed(bus), -ECHILD);
2192 /* We don't allow recursively invoking sd_bus_process(). */
2193 assert_return(!bus->current, -EBUSY);
2195 switch (bus->state) {
2202 r = bus_socket_process_opening(bus);
2203 if (r == -ECONNRESET || r == -EPIPE) {
2204 bus_enter_closing(bus);
2212 case BUS_AUTHENTICATING:
2213 r = bus_socket_process_authenticating(bus);
2214 if (r == -ECONNRESET || r == -EPIPE) {
2215 bus_enter_closing(bus);
2227 r = process_running(bus, ret);
2228 if (r == -ECONNRESET || r == -EPIPE) {
2229 bus_enter_closing(bus);
2239 return process_closing(bus, ret);
2242 assert_not_reached("Unknown state");
2245 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2246 struct pollfd p[2] = {};
2249 usec_t m = (usec_t) -1;
2253 if (bus->state == BUS_CLOSING)
2256 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2258 e = sd_bus_get_events(bus);
2263 /* The caller really needs some more data, he doesn't
2264 * care about what's already read, or any timeouts
2269 /* The caller wants to process if there's something to
2270 * process, but doesn't care otherwise */
2272 r = sd_bus_get_timeout(bus, &until);
2277 nw = now(CLOCK_MONOTONIC);
2278 m = until > nw ? until - nw : 0;
2282 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2285 p[0].fd = bus->input_fd;
2286 if (bus->output_fd == bus->input_fd) {
2290 p[0].events = e & POLLIN;
2291 p[1].fd = bus->output_fd;
2292 p[1].events = e & POLLOUT;
2296 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2300 return r > 0 ? 1 : 0;
2303 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2305 assert_return(bus, -EINVAL);
2306 assert_return(!bus_pid_changed(bus), -ECHILD);
2308 if (bus->state == BUS_CLOSING)
2311 assert_return(BUS_IS_OPEN(bus->state) , -ENOTCONN);
2313 if (bus->rqueue_size > 0)
2316 return bus_poll(bus, false, timeout_usec);
2319 _public_ int sd_bus_flush(sd_bus *bus) {
2322 assert_return(bus, -EINVAL);
2323 assert_return(!bus_pid_changed(bus), -ECHILD);
2325 if (bus->state == BUS_CLOSING)
2328 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2330 r = bus_ensure_running(bus);
2334 if (bus->wqueue_size <= 0)
2338 r = dispatch_wqueue(bus);
2342 if (bus->wqueue_size <= 0)
2345 r = bus_poll(bus, false, (uint64_t) -1);
2351 _public_ int sd_bus_add_filter(sd_bus *bus,
2352 sd_bus_message_handler_t callback,
2355 struct filter_callback *f;
2357 assert_return(bus, -EINVAL);
2358 assert_return(callback, -EINVAL);
2359 assert_return(!bus_pid_changed(bus), -ECHILD);
2361 f = new0(struct filter_callback, 1);
2364 f->callback = callback;
2365 f->userdata = userdata;
2367 bus->filter_callbacks_modified = true;
2368 LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2372 _public_ int sd_bus_remove_filter(sd_bus *bus,
2373 sd_bus_message_handler_t callback,
2376 struct filter_callback *f;
2378 assert_return(bus, -EINVAL);
2379 assert_return(callback, -EINVAL);
2380 assert_return(!bus_pid_changed(bus), -ECHILD);
2382 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2383 if (f->callback == callback && f->userdata == userdata) {
2384 bus->filter_callbacks_modified = true;
2385 LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2394 _public_ int sd_bus_add_match(sd_bus *bus,
2396 sd_bus_message_handler_t callback,
2399 struct bus_match_component *components = NULL;
2400 unsigned n_components = 0;
2401 uint64_t cookie = 0;
2404 assert_return(bus, -EINVAL);
2405 assert_return(match, -EINVAL);
2406 assert_return(!bus_pid_changed(bus), -ECHILD);
2408 r = bus_match_parse(match, &components, &n_components);
2412 if (bus->bus_client) {
2413 cookie = ++bus->match_cookie;
2415 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2420 bus->match_callbacks_modified = true;
2421 r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2423 if (bus->bus_client)
2424 bus_remove_match_internal(bus, match, cookie);
2428 bus_match_parse_free(components, n_components);
2432 _public_ int sd_bus_remove_match(sd_bus *bus,
2434 sd_bus_message_handler_t callback,
2437 struct bus_match_component *components = NULL;
2438 unsigned n_components = 0;
2440 uint64_t cookie = 0;
2442 assert_return(bus, -EINVAL);
2443 assert_return(match, -EINVAL);
2444 assert_return(!bus_pid_changed(bus), -ECHILD);
2446 r = bus_match_parse(match, &components, &n_components);
2450 bus->match_callbacks_modified = true;
2451 r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2453 if (bus->bus_client)
2454 q = bus_remove_match_internal(bus, match, cookie);
2456 bus_match_parse_free(components, n_components);
2458 return r < 0 ? r : q;
2461 bool bus_pid_changed(sd_bus *bus) {
2464 /* We don't support people creating a bus connection and
2465 * keeping it around over a fork(). Let's complain. */
2467 return bus->original_pid != getpid();
2470 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2471 sd_bus *bus = userdata;
2476 r = sd_bus_process(bus, NULL);
2483 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2484 sd_bus *bus = userdata;
2489 r = sd_bus_process(bus, NULL);
2496 static int prepare_callback(sd_event_source *s, void *userdata) {
2497 sd_bus *bus = userdata;
2504 e = sd_bus_get_events(bus);
2508 if (bus->output_fd != bus->input_fd) {
2510 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2514 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2518 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2523 r = sd_bus_get_timeout(bus, &until);
2529 j = sd_event_source_set_time(bus->time_event_source, until);
2534 r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2541 static int quit_callback(sd_event_source *event, void *userdata) {
2542 sd_bus *bus = userdata;
2551 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2554 assert_return(bus, -EINVAL);
2555 assert_return(!bus->event, -EBUSY);
2557 assert(!bus->input_io_event_source);
2558 assert(!bus->output_io_event_source);
2559 assert(!bus->time_event_source);
2562 bus->event = sd_event_ref(event);
2564 r = sd_event_default(&bus->event);
2569 r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2573 r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2577 if (bus->output_fd != bus->input_fd) {
2578 r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2582 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2587 r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2591 r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
2595 r = sd_event_source_set_priority(bus->time_event_source, priority);
2599 r = sd_event_add_quit(bus->event, quit_callback, bus, &bus->quit_event_source);
2606 sd_bus_detach_event(bus);
2610 _public_ int sd_bus_detach_event(sd_bus *bus) {
2611 assert_return(bus, -EINVAL);
2612 assert_return(bus->event, -ENXIO);
2614 if (bus->input_io_event_source) {
2615 sd_event_source_set_enabled(bus->input_io_event_source, SD_EVENT_OFF);
2616 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2619 if (bus->output_io_event_source) {
2620 sd_event_source_set_enabled(bus->output_io_event_source, SD_EVENT_OFF);
2621 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2624 if (bus->time_event_source) {
2625 sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);
2626 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2629 if (bus->quit_event_source) {
2630 sd_event_source_set_enabled(bus->quit_event_source, SD_EVENT_OFF);
2631 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2635 bus->event = sd_event_unref(bus->event);
2640 _public_ sd_event* sd_bus_get_event(sd_bus *bus) {
2641 assert_return(bus, NULL);
2646 _public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2647 assert_return(bus, NULL);
2649 return bus->current;
2652 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
2657 assert(default_bus);
2660 return !!*default_bus;
2663 *ret = sd_bus_ref(*default_bus);
2671 b->default_bus_ptr = default_bus;
2679 _public_ int sd_bus_default_system(sd_bus **ret) {
2680 static __thread sd_bus *default_system_bus = NULL;
2682 return bus_default(sd_bus_open_system, &default_system_bus, ret);
2685 _public_ int sd_bus_default_user(sd_bus **ret) {
2686 static __thread sd_bus *default_user_bus = NULL;
2688 return bus_default(sd_bus_open_user, &default_user_bus, ret);
2691 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
2692 assert_return(b, -EINVAL);
2693 assert_return(tid, -EINVAL);
2694 assert_return(!bus_pid_changed(b), -ECHILD);
2702 return sd_event_get_tid(b->event, tid);
2707 _public_ char *sd_bus_label_escape(const char *s) {
2711 assert_return(s, NULL);
2713 /* Escapes all chars that D-Bus' object path cannot deal
2714 * with. Can be reversed with bus_path_unescape(). We special
2715 * case the empty string. */
2720 r = new(char, strlen(s)*3 + 1);
2724 for (f = s, t = r; *f; f++) {
2726 /* Escape everything that is not a-zA-Z0-9. We also
2727 * escape 0-9 if it's the first character */
2729 if (!(*f >= 'A' && *f <= 'Z') &&
2730 !(*f >= 'a' && *f <= 'z') &&
2731 !(f > s && *f >= '0' && *f <= '9')) {
2733 *(t++) = hexchar(*f >> 4);
2734 *(t++) = hexchar(*f);
2744 _public_ char *sd_bus_label_unescape(const char *f) {
2747 assert_return(f, NULL);
2749 /* Special case for the empty string */
2753 r = new(char, strlen(f) + 1);
2757 for (t = r; *f; f++) {
2762 if ((a = unhexchar(f[1])) < 0 ||
2763 (b = unhexchar(f[2])) < 0) {
2764 /* Invalid escape code, let's take it literal then */
2767 *(t++) = (char) ((a << 4) | b);
2779 _public_ int sd_bus_get_peer_creds(sd_bus *bus, uint64_t mask, sd_bus_creds **ret) {
2784 assert_return(bus, -EINVAL);
2785 assert_return(mask <= _SD_BUS_CREDS_MAX, -ENOTSUP);
2786 assert_return(ret, -EINVAL);
2787 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2788 assert_return(!bus_pid_changed(bus), -ECHILD);
2789 assert_return(!bus->is_kernel, -ENOTSUP);
2791 if (!bus->ucred_valid && !isempty(bus->label))
2794 c = bus_creds_new();
2798 if (bus->ucred_valid) {
2799 pid = c->pid = bus->ucred.pid;
2800 c->uid = bus->ucred.uid;
2801 c->gid = bus->ucred.gid;
2803 c->mask |= ((SD_BUS_CREDS_UID | SD_BUS_CREDS_PID | SD_BUS_CREDS_GID) & mask) & bus->creds_mask;
2806 if (!isempty(bus->label) && (mask & SD_BUS_CREDS_SELINUX_CONTEXT)) {
2807 c->label = strdup(bus->label);
2809 sd_bus_creds_unref(c);
2813 c->mask |= SD_BUS_CREDS_SELINUX_CONTEXT | bus->creds_mask;
2816 r = bus_creds_add_more(c, mask, pid, 0);