1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
39 #include "bus-internal.h"
40 #include "bus-message.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
49 #include "bus-container.h"
51 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
53 static void bus_close_fds(sd_bus *b) {
57 close_nointr_nofail(b->input_fd);
59 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
60 close_nointr_nofail(b->output_fd);
62 b->input_fd = b->output_fd = -1;
65 static void bus_node_destroy(sd_bus *b, struct node *n) {
66 struct node_callback *c;
67 struct node_vtable *v;
68 struct node_enumerator *e;
76 bus_node_destroy(b, n->child);
78 while ((c = n->callbacks)) {
79 LIST_REMOVE(callbacks, n->callbacks, c);
83 while ((v = n->vtables)) {
84 LIST_REMOVE(vtables, n->vtables, v);
89 while ((e = n->enumerators)) {
90 LIST_REMOVE(enumerators, n->enumerators, e);
95 LIST_REMOVE(siblings, n->parent->child, n);
97 assert_se(hashmap_remove(b->nodes, n->path) == n);
102 static void bus_reset_queues(sd_bus *b) {
107 for (i = 0; i < b->rqueue_size; i++)
108 sd_bus_message_unref(b->rqueue[i]);
111 for (i = 0; i < b->wqueue_size; i++)
112 sd_bus_message_unref(b->wqueue[i]);
115 b->rqueue = b->wqueue = NULL;
116 b->rqueue_size = b->wqueue_size = 0;
119 static void bus_free(sd_bus *b) {
120 struct filter_callback *f;
125 sd_bus_detach_event(b);
130 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
133 free(b->unique_name);
134 free(b->auth_buffer);
140 strv_free(b->exec_argv);
142 close_many(b->fds, b->n_fds);
147 hashmap_free_free(b->reply_callbacks);
148 prioq_free(b->reply_callbacks_prioq);
150 while ((f = b->filter_callbacks)) {
151 LIST_REMOVE(callbacks, b->filter_callbacks, f);
155 bus_match_free(&b->match_callbacks);
157 hashmap_free_free(b->vtable_methods);
158 hashmap_free_free(b->vtable_properties);
160 while ((n = hashmap_first(b->nodes)))
161 bus_node_destroy(b, n);
163 hashmap_free(b->nodes);
165 bus_kernel_flush_memfd(b);
167 assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
172 _public_ int sd_bus_new(sd_bus **ret) {
175 assert_return(ret, -EINVAL);
181 r->n_ref = REFCNT_INIT;
182 r->input_fd = r->output_fd = -1;
183 r->message_version = 1;
184 r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
185 r->original_pid = getpid();
187 assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
189 /* We guarantee that wqueue always has space for at least one
191 r->wqueue = new(sd_bus_message*, 1);
201 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
204 assert_return(bus, -EINVAL);
205 assert_return(bus->state == BUS_UNSET, -EPERM);
206 assert_return(address, -EINVAL);
207 assert_return(!bus_pid_changed(bus), -ECHILD);
219 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
220 assert_return(bus, -EINVAL);
221 assert_return(bus->state == BUS_UNSET, -EPERM);
222 assert_return(input_fd >= 0, -EINVAL);
223 assert_return(output_fd >= 0, -EINVAL);
224 assert_return(!bus_pid_changed(bus), -ECHILD);
226 bus->input_fd = input_fd;
227 bus->output_fd = output_fd;
231 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
234 assert_return(bus, -EINVAL);
235 assert_return(bus->state == BUS_UNSET, -EPERM);
236 assert_return(path, -EINVAL);
237 assert_return(!strv_isempty(argv), -EINVAL);
238 assert_return(!bus_pid_changed(bus), -ECHILD);
250 free(bus->exec_path);
251 strv_free(bus->exec_argv);
259 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
260 assert_return(bus, -EINVAL);
261 assert_return(bus->state == BUS_UNSET, -EPERM);
262 assert_return(!bus_pid_changed(bus), -ECHILD);
264 bus->bus_client = !!b;
268 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
269 assert_return(bus, -EINVAL);
270 assert_return(bus->state == BUS_UNSET, -EPERM);
271 assert_return(!bus_pid_changed(bus), -ECHILD);
273 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
277 _public_ int sd_bus_negotiate_attach_timestamp(sd_bus *bus, int b) {
278 assert_return(bus, -EINVAL);
279 assert_return(bus->state == BUS_UNSET, -EPERM);
280 assert_return(!bus_pid_changed(bus), -ECHILD);
282 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_TIMESTAMP, b);
286 _public_ int sd_bus_negotiate_attach_creds(sd_bus *bus, int b) {
287 assert_return(bus, -EINVAL);
288 assert_return(bus->state == BUS_UNSET, -EPERM);
289 assert_return(!bus_pid_changed(bus), -ECHILD);
291 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CREDS, b);
295 _public_ int sd_bus_negotiate_attach_comm(sd_bus *bus, int b) {
296 assert_return(bus, -EINVAL);
297 assert_return(bus->state == BUS_UNSET, -EPERM);
298 assert_return(!bus_pid_changed(bus), -ECHILD);
300 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_COMM, b);
304 _public_ int sd_bus_negotiate_attach_exe(sd_bus *bus, int b) {
305 assert_return(bus, -EINVAL);
306 assert_return(bus->state == BUS_UNSET, -EPERM);
307 assert_return(!bus_pid_changed(bus), -ECHILD);
309 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_EXE, b);
313 _public_ int sd_bus_negotiate_attach_cmdline(sd_bus *bus, int b) {
314 assert_return(bus, -EINVAL);
315 assert_return(bus->state == BUS_UNSET, -EPERM);
316 assert_return(!bus_pid_changed(bus), -ECHILD);
318 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CMDLINE, b);
322 _public_ int sd_bus_negotiate_attach_cgroup(sd_bus *bus, int b) {
323 assert_return(bus, -EINVAL);
324 assert_return(bus->state == BUS_UNSET, -EPERM);
325 assert_return(!bus_pid_changed(bus), -ECHILD);
327 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CGROUP, b);
331 _public_ int sd_bus_negotiate_attach_caps(sd_bus *bus, int b) {
332 assert_return(bus, -EINVAL);
333 assert_return(bus->state == BUS_UNSET, -EPERM);
334 assert_return(!bus_pid_changed(bus), -ECHILD);
336 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CAPS, b);
340 _public_ int sd_bus_negotiate_attach_selinux_context(sd_bus *bus, int b) {
341 assert_return(bus, -EINVAL);
342 assert_return(bus->state == BUS_UNSET, -EPERM);
343 assert_return(!bus_pid_changed(bus), -ECHILD);
345 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_SECLABEL, b);
349 _public_ int sd_bus_negotiate_attach_audit(sd_bus *bus, int b) {
350 assert_return(bus, -EINVAL);
351 assert_return(bus->state == BUS_UNSET, -EPERM);
352 assert_return(!bus_pid_changed(bus), -ECHILD);
354 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_AUDIT, b);
358 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
359 assert_return(bus, -EINVAL);
360 assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
361 assert_return(bus->state == BUS_UNSET, -EPERM);
362 assert_return(!bus_pid_changed(bus), -ECHILD);
364 bus->is_server = !!b;
365 bus->server_id = server_id;
369 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
370 assert_return(bus, -EINVAL);
371 assert_return(bus->state == BUS_UNSET, -EPERM);
372 assert_return(!bus_pid_changed(bus), -ECHILD);
374 bus->anonymous_auth = !!b;
378 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata, sd_bus_error *error) {
383 assert(bus->state == BUS_HELLO);
386 r = sd_bus_message_get_errno(reply);
392 r = sd_bus_message_read(reply, "s", &s);
396 if (!service_name_is_valid(s) || s[0] != ':')
399 bus->unique_name = strdup(s);
400 if (!bus->unique_name)
403 bus->state = BUS_RUNNING;
408 static int bus_send_hello(sd_bus *bus) {
409 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
414 if (!bus->bus_client || bus->is_kernel)
417 r = sd_bus_message_new_method_call(
419 "org.freedesktop.DBus",
421 "org.freedesktop.DBus",
427 return sd_bus_call_async(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
430 int bus_start_running(sd_bus *bus) {
433 if (bus->bus_client && !bus->is_kernel) {
434 bus->state = BUS_HELLO;
438 bus->state = BUS_RUNNING;
442 static int parse_address_key(const char **p, const char *key, char **value) {
453 if (strncmp(*p, key, l) != 0)
466 while (*a != ';' && *a != ',' && *a != 0) {
484 c = (char) ((x << 4) | y);
491 t = realloc(r, n + 2);
519 static void skip_address_key(const char **p) {
523 *p += strcspn(*p, ",");
529 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
530 _cleanup_free_ char *path = NULL, *abstract = NULL;
539 while (**p != 0 && **p != ';') {
540 r = parse_address_key(p, "guid", guid);
546 r = parse_address_key(p, "path", &path);
552 r = parse_address_key(p, "abstract", &abstract);
561 if (!path && !abstract)
564 if (path && abstract)
569 if (l > sizeof(b->sockaddr.un.sun_path))
572 b->sockaddr.un.sun_family = AF_UNIX;
573 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
574 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
575 } else if (abstract) {
576 l = strlen(abstract);
577 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
580 b->sockaddr.un.sun_family = AF_UNIX;
581 b->sockaddr.un.sun_path[0] = 0;
582 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
583 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
589 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
590 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
592 struct addrinfo *result, hints = {
593 .ai_socktype = SOCK_STREAM,
594 .ai_flags = AI_ADDRCONFIG,
602 while (**p != 0 && **p != ';') {
603 r = parse_address_key(p, "guid", guid);
609 r = parse_address_key(p, "host", &host);
615 r = parse_address_key(p, "port", &port);
621 r = parse_address_key(p, "family", &family);
634 if (streq(family, "ipv4"))
635 hints.ai_family = AF_INET;
636 else if (streq(family, "ipv6"))
637 hints.ai_family = AF_INET6;
642 r = getaddrinfo(host, port, &hints, &result);
646 return -EADDRNOTAVAIL;
648 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
649 b->sockaddr_size = result->ai_addrlen;
651 freeaddrinfo(result);
656 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
658 unsigned n_argv = 0, j;
667 while (**p != 0 && **p != ';') {
668 r = parse_address_key(p, "guid", guid);
674 r = parse_address_key(p, "path", &path);
680 if (startswith(*p, "argv")) {
684 ul = strtoul(*p + 4, (char**) p, 10);
685 if (errno > 0 || **p != '=' || ul > 256) {
695 x = realloc(argv, sizeof(char*) * (ul + 2));
701 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
707 r = parse_address_key(p, NULL, argv + ul);
722 /* Make sure there are no holes in the array, with the
723 * exception of argv[0] */
724 for (j = 1; j < n_argv; j++)
730 if (argv && argv[0] == NULL) {
731 argv[0] = strdup(path);
743 for (j = 0; j < n_argv; j++)
751 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
752 _cleanup_free_ char *path = NULL;
760 while (**p != 0 && **p != ';') {
761 r = parse_address_key(p, "guid", guid);
767 r = parse_address_key(p, "path", &path);
786 static int parse_container_address(sd_bus *b, const char **p, char **guid) {
787 _cleanup_free_ char *machine = NULL;
795 while (**p != 0 && **p != ';') {
796 r = parse_address_key(p, "guid", guid);
802 r = parse_address_key(p, "machine", &machine);
815 b->machine = machine;
818 b->sockaddr.un.sun_family = AF_UNIX;
819 strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
820 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
825 static void bus_reset_parsed_address(sd_bus *b) {
829 b->sockaddr_size = 0;
830 strv_free(b->exec_argv);
834 b->server_id = SD_ID128_NULL;
841 static int bus_parse_next_address(sd_bus *b) {
842 _cleanup_free_ char *guid = NULL;
850 if (b->address[b->address_index] == 0)
853 bus_reset_parsed_address(b);
855 a = b->address + b->address_index;
864 if (startswith(a, "unix:")) {
867 r = parse_unix_address(b, &a, &guid);
872 } else if (startswith(a, "tcp:")) {
875 r = parse_tcp_address(b, &a, &guid);
881 } else if (startswith(a, "unixexec:")) {
884 r = parse_exec_address(b, &a, &guid);
890 } else if (startswith(a, "kernel:")) {
893 r = parse_kernel_address(b, &a, &guid);
898 } else if (startswith(a, "x-container:")) {
901 r = parse_container_address(b, &a, &guid);
914 r = sd_id128_from_string(guid, &b->server_id);
919 b->address_index = a - b->address;
923 static int bus_start_address(sd_bus *b) {
933 r = bus_socket_exec(b);
937 b->last_connect_error = -r;
938 } else if (b->kernel) {
940 r = bus_kernel_connect(b);
944 b->last_connect_error = -r;
946 } else if (b->machine) {
948 r = bus_container_connect(b);
952 b->last_connect_error = -r;
954 } else if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
956 r = bus_socket_connect(b);
960 b->last_connect_error = -r;
963 r = bus_parse_next_address(b);
967 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
971 int bus_next_address(sd_bus *b) {
974 bus_reset_parsed_address(b);
975 return bus_start_address(b);
978 static int bus_start_fd(sd_bus *b) {
983 assert(b->input_fd >= 0);
984 assert(b->output_fd >= 0);
986 r = fd_nonblock(b->input_fd, true);
990 r = fd_cloexec(b->input_fd, true);
994 if (b->input_fd != b->output_fd) {
995 r = fd_nonblock(b->output_fd, true);
999 r = fd_cloexec(b->output_fd, true);
1004 if (fstat(b->input_fd, &st) < 0)
1007 if (S_ISCHR(b->input_fd))
1008 return bus_kernel_take_fd(b);
1010 return bus_socket_take_fd(b);
1013 _public_ int sd_bus_start(sd_bus *bus) {
1016 assert_return(bus, -EINVAL);
1017 assert_return(bus->state == BUS_UNSET, -EPERM);
1018 assert_return(!bus_pid_changed(bus), -ECHILD);
1020 bus->state = BUS_OPENING;
1022 if (bus->is_server && bus->bus_client)
1025 if (bus->input_fd >= 0)
1026 r = bus_start_fd(bus);
1027 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
1028 r = bus_start_address(bus);
1035 return bus_send_hello(bus);
1038 _public_ int sd_bus_open_system(sd_bus **ret) {
1043 assert_return(ret, -EINVAL);
1049 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1051 r = sd_bus_set_address(b, e);
1055 b->sockaddr.un.sun_family = AF_UNIX;
1056 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
1057 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
1060 b->bus_client = true;
1062 r = sd_bus_start(b);
1074 _public_ int sd_bus_open_user(sd_bus **ret) {
1080 assert_return(ret, -EINVAL);
1086 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1088 r = sd_bus_set_address(b, e);
1092 e = secure_getenv("XDG_RUNTIME_DIR");
1099 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1104 b->sockaddr.un.sun_family = AF_UNIX;
1105 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1106 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1109 b->bus_client = true;
1111 r = sd_bus_start(b);
1123 _public_ int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1124 _cleanup_free_ char *e = NULL;
1129 assert_return(host, -EINVAL);
1130 assert_return(ret, -EINVAL);
1132 e = bus_address_escape(host);
1136 p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1140 r = sd_bus_new(&bus);
1147 bus->bus_client = true;
1149 r = sd_bus_start(bus);
1159 _public_ int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1160 _cleanup_free_ char *e = NULL;
1165 assert_return(machine, -EINVAL);
1166 assert_return(ret, -EINVAL);
1168 e = bus_address_escape(machine);
1172 p = strjoin("x-container:machine=", e, NULL);
1176 r = sd_bus_new(&bus);
1183 bus->bus_client = true;
1185 r = sd_bus_start(bus);
1195 _public_ void sd_bus_close(sd_bus *bus) {
1199 if (bus->state == BUS_CLOSED)
1201 if (bus_pid_changed(bus))
1204 bus->state = BUS_CLOSED;
1206 sd_bus_detach_event(bus);
1208 /* Drop all queued messages so that they drop references to
1209 * the bus object and the bus may be freed */
1210 bus_reset_queues(bus);
1212 if (!bus->is_kernel)
1215 /* We'll leave the fd open in case this is a kernel bus, since
1216 * there might still be memblocks around that reference this
1217 * bus, and they might need to invoke the
1218 * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1222 static void bus_enter_closing(sd_bus *bus) {
1225 if (bus->state != BUS_OPENING &&
1226 bus->state != BUS_AUTHENTICATING &&
1227 bus->state != BUS_HELLO &&
1228 bus->state != BUS_RUNNING)
1231 bus->state = BUS_CLOSING;
1234 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1235 assert_return(bus, NULL);
1237 assert_se(REFCNT_INC(bus->n_ref) >= 2);
1242 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1243 assert_return(bus, NULL);
1245 if (REFCNT_DEC(bus->n_ref) <= 0)
1251 _public_ int sd_bus_is_open(sd_bus *bus) {
1253 assert_return(bus, -EINVAL);
1254 assert_return(!bus_pid_changed(bus), -ECHILD);
1256 return BUS_IS_OPEN(bus->state);
1259 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1262 assert_return(bus, -EINVAL);
1263 assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1264 assert_return(!bus_pid_changed(bus), -ECHILD);
1266 if (type == SD_BUS_TYPE_UNIX_FD) {
1267 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1270 r = bus_ensure_running(bus);
1274 return bus->can_fds;
1277 return bus_type_is_valid(type);
1280 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1283 assert_return(bus, -EINVAL);
1284 assert_return(server_id, -EINVAL);
1285 assert_return(!bus_pid_changed(bus), -ECHILD);
1287 r = bus_ensure_running(bus);
1291 *server_id = bus->server_id;
1295 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1298 if (m->header->version > b->message_version)
1302 /* If we copy the same message to multiple
1303 * destinations, avoid using the same serial
1305 b->serial = MAX(b->serial, BUS_MESSAGE_SERIAL(m));
1309 return bus_message_seal(m, ++b->serial);
1312 static int bus_write_message(sd_bus *bus, sd_bus_message *message, size_t *idx) {
1319 r = bus_kernel_write_message(bus, message);
1321 r = bus_socket_write_message(bus, message, idx);
1326 static int dispatch_wqueue(sd_bus *bus) {
1330 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1332 while (bus->wqueue_size > 0) {
1334 r = bus_write_message(bus, bus->wqueue[0], &bus->windex);
1338 /* Didn't do anything this time */
1340 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1341 /* Fully written. Let's drop the entry from
1344 * This isn't particularly optimized, but
1345 * well, this is supposed to be our worst-case
1346 * buffer only, and the socket buffer is
1347 * supposed to be our primary buffer, and if
1348 * it got full, then all bets are off
1351 sd_bus_message_unref(bus->wqueue[0]);
1352 bus->wqueue_size --;
1353 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1363 static int bus_read_message(sd_bus *bus, sd_bus_message **m) {
1370 r = bus_kernel_read_message(bus, m);
1372 r = bus_socket_read_message(bus, m);
1377 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1378 sd_bus_message *z = NULL;
1383 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1385 if (bus->rqueue_size > 0) {
1386 /* Dispatch a queued message */
1388 *m = bus->rqueue[0];
1389 bus->rqueue_size --;
1390 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1394 /* Try to read a new message */
1396 r = bus_read_message(bus, &z);
1409 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1412 assert_return(bus, -EINVAL);
1413 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1414 assert_return(m, -EINVAL);
1415 assert_return(!bus_pid_changed(bus), -ECHILD);
1418 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1425 /* If the serial number isn't kept, then we know that no reply
1427 if (!serial && !m->sealed)
1428 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1430 r = bus_seal_message(bus, m);
1434 /* If this is a reply and no reply was requested, then let's
1435 * suppress this, if we can */
1436 if (m->dont_send && !serial)
1439 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1442 r = bus_write_message(bus, m, &idx);
1445 else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1446 /* Wasn't fully written. So let's remember how
1447 * much was written. Note that the first entry
1448 * of the wqueue array is always allocated so
1449 * that we always can remember how much was
1451 bus->wqueue[0] = sd_bus_message_ref(m);
1452 bus->wqueue_size = 1;
1458 /* Just append it to the queue. */
1460 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1463 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1468 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1472 *serial = BUS_MESSAGE_SERIAL(m);
1477 _public_ int sd_bus_send_to(sd_bus *bus, sd_bus_message *m, const char *destination, uint64_t *serial) {
1480 assert_return(bus, -EINVAL);
1481 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1482 assert_return(m, -EINVAL);
1483 assert_return(!bus_pid_changed(bus), -ECHILD);
1485 if (!streq_ptr(m->destination, destination)) {
1490 r = sd_bus_message_set_destination(m, destination);
1495 return sd_bus_send(bus, m, serial);
1498 static usec_t calc_elapse(uint64_t usec) {
1499 if (usec == (uint64_t) -1)
1503 usec = BUS_DEFAULT_TIMEOUT;
1505 return now(CLOCK_MONOTONIC) + usec;
1508 static int timeout_compare(const void *a, const void *b) {
1509 const struct reply_callback *x = a, *y = b;
1511 if (x->timeout != 0 && y->timeout == 0)
1514 if (x->timeout == 0 && y->timeout != 0)
1517 if (x->timeout < y->timeout)
1520 if (x->timeout > y->timeout)
1526 _public_ int sd_bus_call_async(
1529 sd_bus_message_handler_t callback,
1534 struct reply_callback *c;
1537 assert_return(bus, -EINVAL);
1538 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1539 assert_return(m, -EINVAL);
1540 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1541 assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1542 assert_return(callback, -EINVAL);
1543 assert_return(!bus_pid_changed(bus), -ECHILD);
1545 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1549 if (usec != (uint64_t) -1) {
1550 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1555 r = bus_seal_message(bus, m);
1559 c = new0(struct reply_callback, 1);
1563 c->callback = callback;
1564 c->userdata = userdata;
1565 c->serial = BUS_MESSAGE_SERIAL(m);
1566 c->timeout = calc_elapse(usec);
1568 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1574 if (c->timeout != 0) {
1575 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1578 sd_bus_call_async_cancel(bus, c->serial);
1583 r = sd_bus_send(bus, m, serial);
1585 sd_bus_call_async_cancel(bus, c->serial);
1592 _public_ int sd_bus_call_async_cancel(sd_bus *bus, uint64_t serial) {
1593 struct reply_callback *c;
1595 assert_return(bus, -EINVAL);
1596 assert_return(serial != 0, -EINVAL);
1597 assert_return(!bus_pid_changed(bus), -ECHILD);
1599 c = hashmap_remove(bus->reply_callbacks, &serial);
1603 if (c->timeout != 0)
1604 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1610 int bus_ensure_running(sd_bus *bus) {
1615 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED || bus->state == BUS_CLOSING)
1617 if (bus->state == BUS_RUNNING)
1621 r = sd_bus_process(bus, NULL);
1624 if (bus->state == BUS_RUNNING)
1629 r = sd_bus_wait(bus, (uint64_t) -1);
1635 _public_ int sd_bus_call(
1639 sd_bus_error *error,
1640 sd_bus_message **reply) {
1647 assert_return(bus, -EINVAL);
1648 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1649 assert_return(m, -EINVAL);
1650 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1651 assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1652 assert_return(!bus_error_is_dirty(error), -EINVAL);
1653 assert_return(!bus_pid_changed(bus), -ECHILD);
1655 r = bus_ensure_running(bus);
1659 r = sd_bus_send(bus, m, &serial);
1663 timeout = calc_elapse(usec);
1667 sd_bus_message *incoming = NULL;
1672 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1675 /* Make sure there's room for queuing this
1676 * locally, before we read the message */
1678 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1686 r = bus_read_message(bus, &incoming);
1692 if (incoming->reply_serial == serial) {
1693 /* Found a match! */
1695 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1700 sd_bus_message_unref(incoming);
1705 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) {
1708 r = sd_bus_error_copy(error, &incoming->error);
1710 sd_bus_message_unref(incoming);
1714 k = sd_bus_error_get_errno(&incoming->error);
1715 sd_bus_message_unref(incoming);
1719 sd_bus_message_unref(incoming);
1722 } else if (incoming->header->serial == serial &&
1725 streq(bus->unique_name, incoming->sender)) {
1727 /* Our own message? Somebody is trying
1728 * to send its own client a message,
1729 * let's not dead-lock, let's fail
1732 sd_bus_message_unref(incoming);
1736 /* There's already guaranteed to be room for
1737 * this, so need to resize things here */
1738 bus->rqueue[bus->rqueue_size ++] = incoming;
1741 /* Try to read more, right-away */
1750 n = now(CLOCK_MONOTONIC);
1756 left = (uint64_t) -1;
1758 r = bus_poll(bus, true, left);
1762 r = dispatch_wqueue(bus);
1768 _public_ int sd_bus_get_fd(sd_bus *bus) {
1770 assert_return(bus, -EINVAL);
1771 assert_return(bus->input_fd == bus->output_fd, -EPERM);
1772 assert_return(!bus_pid_changed(bus), -ECHILD);
1774 return bus->input_fd;
1777 _public_ int sd_bus_get_events(sd_bus *bus) {
1780 assert_return(bus, -EINVAL);
1781 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1782 assert_return(!bus_pid_changed(bus), -ECHILD);
1784 if (bus->state == BUS_OPENING)
1786 else if (bus->state == BUS_AUTHENTICATING) {
1788 if (bus_socket_auth_needs_write(bus))
1793 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1794 if (bus->rqueue_size <= 0)
1796 if (bus->wqueue_size > 0)
1803 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1804 struct reply_callback *c;
1806 assert_return(bus, -EINVAL);
1807 assert_return(timeout_usec, -EINVAL);
1808 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1809 assert_return(!bus_pid_changed(bus), -ECHILD);
1811 if (bus->state == BUS_CLOSING) {
1816 if (bus->state == BUS_AUTHENTICATING) {
1817 *timeout_usec = bus->auth_timeout;
1821 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1822 *timeout_usec = (uint64_t) -1;
1826 if (bus->rqueue_size > 0) {
1831 c = prioq_peek(bus->reply_callbacks_prioq);
1833 *timeout_usec = (uint64_t) -1;
1837 *timeout_usec = c->timeout;
1841 static int process_timeout(sd_bus *bus) {
1842 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1843 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1844 struct reply_callback *c;
1850 c = prioq_peek(bus->reply_callbacks_prioq);
1854 n = now(CLOCK_MONOTONIC);
1858 r = bus_message_new_synthetic_error(
1861 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1866 r = bus_seal_message(bus, m);
1870 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1871 hashmap_remove(bus->reply_callbacks, &c->serial);
1874 bus->iteration_counter ++;
1876 r = c->callback(bus, m, c->userdata, &error_buffer);
1877 r = bus_maybe_reply_error(m, r, &error_buffer);
1880 bus->current = NULL;
1885 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1889 if (bus->state != BUS_HELLO)
1892 /* Let's make sure the first message on the bus is the HELLO
1893 * reply. But note that we don't actually parse the message
1894 * here (we leave that to the usual handling), we just verify
1895 * we don't let any earlier msg through. */
1897 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1898 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1901 if (m->reply_serial != bus->hello_serial)
1907 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1908 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1909 struct reply_callback *c;
1915 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1916 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1919 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1923 if (c->timeout != 0)
1924 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1926 r = sd_bus_message_rewind(m, true);
1930 r = c->callback(bus, m, c->userdata, &error_buffer);
1931 r = bus_maybe_reply_error(m, r, &error_buffer);
1937 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1938 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1939 struct filter_callback *l;
1946 bus->filter_callbacks_modified = false;
1948 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1950 if (bus->filter_callbacks_modified)
1953 /* Don't run this more than once per iteration */
1954 if (l->last_iteration == bus->iteration_counter)
1957 l->last_iteration = bus->iteration_counter;
1959 r = sd_bus_message_rewind(m, true);
1963 r = l->callback(bus, m, l->userdata, &error_buffer);
1964 r = bus_maybe_reply_error(m, r, &error_buffer);
1970 } while (bus->filter_callbacks_modified);
1975 static int process_match(sd_bus *bus, sd_bus_message *m) {
1982 bus->match_callbacks_modified = false;
1984 r = bus_match_run(bus, &bus->match_callbacks, m);
1988 } while (bus->match_callbacks_modified);
1993 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1994 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
2000 if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
2003 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
2006 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2009 if (streq_ptr(m->member, "Ping"))
2010 r = sd_bus_message_new_method_return(m, &reply);
2011 else if (streq_ptr(m->member, "GetMachineId")) {
2015 r = sd_id128_get_machine(&id);
2019 r = sd_bus_message_new_method_return(m, &reply);
2023 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
2025 r = sd_bus_message_new_method_errorf(
2027 SD_BUS_ERROR_UNKNOWN_METHOD,
2028 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
2034 r = sd_bus_send(bus, reply, NULL);
2041 static int process_message(sd_bus *bus, sd_bus_message *m) {
2048 bus->iteration_counter++;
2050 log_debug("Got message sender=%s object=%s interface=%s member=%s",
2051 strna(sd_bus_message_get_sender(m)),
2052 strna(sd_bus_message_get_path(m)),
2053 strna(sd_bus_message_get_interface(m)),
2054 strna(sd_bus_message_get_member(m)));
2056 r = process_hello(bus, m);
2060 r = process_reply(bus, m);
2064 r = process_filter(bus, m);
2068 r = process_match(bus, m);
2072 r = process_builtin(bus, m);
2076 r = bus_process_object(bus, m);
2079 bus->current = NULL;
2083 static int process_running(sd_bus *bus, sd_bus_message **ret) {
2084 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2088 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2090 r = process_timeout(bus);
2094 r = dispatch_wqueue(bus);
2098 r = dispatch_rqueue(bus, &m);
2104 r = process_message(bus, m);
2109 r = sd_bus_message_rewind(m, true);
2118 if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2120 r = sd_bus_reply_method_errorf(
2122 SD_BUS_ERROR_UNKNOWN_OBJECT,
2123 "Unknown object '%s'.", m->path);
2137 static int process_closing(sd_bus *bus, sd_bus_message **ret) {
2138 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2139 struct reply_callback *c;
2143 assert(bus->state == BUS_CLOSING);
2145 c = hashmap_first(bus->reply_callbacks);
2147 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2149 /* First, fail all outstanding method calls */
2150 r = bus_message_new_synthetic_error(
2153 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Connection terminated"),
2158 r = bus_seal_message(bus, m);
2162 if (c->timeout != 0)
2163 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
2165 hashmap_remove(bus->reply_callbacks, &c->serial);
2168 bus->iteration_counter++;
2170 r = c->callback(bus, m, c->userdata, &error_buffer);
2171 r = bus_maybe_reply_error(m, r, &error_buffer);
2177 /* Then, synthesize a Disconnected message */
2178 r = sd_bus_message_new_signal(
2180 "/org/freedesktop/DBus/Local",
2181 "org.freedesktop.DBus.Local",
2187 r = bus_seal_message(bus, m);
2194 bus->iteration_counter++;
2196 r = process_filter(bus, m);
2200 r = process_match(bus, m);
2212 bus->current = NULL;
2216 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2217 BUS_DONT_DESTROY(bus);
2220 /* Returns 0 when we didn't do anything. This should cause the
2221 * caller to invoke sd_bus_wait() before returning the next
2222 * time. Returns > 0 when we did something, which possibly
2223 * means *ret is filled in with an unprocessed message. */
2225 assert_return(bus, -EINVAL);
2226 assert_return(!bus_pid_changed(bus), -ECHILD);
2228 /* We don't allow recursively invoking sd_bus_process(). */
2229 assert_return(!bus->current, -EBUSY);
2231 switch (bus->state) {
2238 r = bus_socket_process_opening(bus);
2239 if (r == -ECONNRESET || r == -EPIPE) {
2240 bus_enter_closing(bus);
2248 case BUS_AUTHENTICATING:
2249 r = bus_socket_process_authenticating(bus);
2250 if (r == -ECONNRESET || r == -EPIPE) {
2251 bus_enter_closing(bus);
2263 r = process_running(bus, ret);
2264 if (r == -ECONNRESET || r == -EPIPE) {
2265 bus_enter_closing(bus);
2275 return process_closing(bus, ret);
2278 assert_not_reached("Unknown state");
2281 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2282 struct pollfd p[2] = {};
2285 usec_t m = (usec_t) -1;
2289 if (bus->state == BUS_CLOSING)
2292 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2294 e = sd_bus_get_events(bus);
2299 /* The caller really needs some more data, he doesn't
2300 * care about what's already read, or any timeouts
2305 /* The caller wants to process if there's something to
2306 * process, but doesn't care otherwise */
2308 r = sd_bus_get_timeout(bus, &until);
2313 nw = now(CLOCK_MONOTONIC);
2314 m = until > nw ? until - nw : 0;
2318 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2321 p[0].fd = bus->input_fd;
2322 if (bus->output_fd == bus->input_fd) {
2326 p[0].events = e & POLLIN;
2327 p[1].fd = bus->output_fd;
2328 p[1].events = e & POLLOUT;
2332 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2336 return r > 0 ? 1 : 0;
2339 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2341 assert_return(bus, -EINVAL);
2342 assert_return(!bus_pid_changed(bus), -ECHILD);
2344 if (bus->state == BUS_CLOSING)
2347 assert_return(BUS_IS_OPEN(bus->state) , -ENOTCONN);
2349 if (bus->rqueue_size > 0)
2352 return bus_poll(bus, false, timeout_usec);
2355 _public_ int sd_bus_flush(sd_bus *bus) {
2358 assert_return(bus, -EINVAL);
2359 assert_return(!bus_pid_changed(bus), -ECHILD);
2361 if (bus->state == BUS_CLOSING)
2364 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2366 r = bus_ensure_running(bus);
2370 if (bus->wqueue_size <= 0)
2374 r = dispatch_wqueue(bus);
2378 if (bus->wqueue_size <= 0)
2381 r = bus_poll(bus, false, (uint64_t) -1);
2387 _public_ int sd_bus_add_filter(sd_bus *bus,
2388 sd_bus_message_handler_t callback,
2391 struct filter_callback *f;
2393 assert_return(bus, -EINVAL);
2394 assert_return(callback, -EINVAL);
2395 assert_return(!bus_pid_changed(bus), -ECHILD);
2397 f = new0(struct filter_callback, 1);
2400 f->callback = callback;
2401 f->userdata = userdata;
2403 bus->filter_callbacks_modified = true;
2404 LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2408 _public_ int sd_bus_remove_filter(sd_bus *bus,
2409 sd_bus_message_handler_t callback,
2412 struct filter_callback *f;
2414 assert_return(bus, -EINVAL);
2415 assert_return(callback, -EINVAL);
2416 assert_return(!bus_pid_changed(bus), -ECHILD);
2418 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2419 if (f->callback == callback && f->userdata == userdata) {
2420 bus->filter_callbacks_modified = true;
2421 LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2430 _public_ int sd_bus_add_match(sd_bus *bus,
2432 sd_bus_message_handler_t callback,
2435 struct bus_match_component *components = NULL;
2436 unsigned n_components = 0;
2437 uint64_t cookie = 0;
2440 assert_return(bus, -EINVAL);
2441 assert_return(match, -EINVAL);
2442 assert_return(!bus_pid_changed(bus), -ECHILD);
2444 r = bus_match_parse(match, &components, &n_components);
2448 if (bus->bus_client) {
2449 cookie = ++bus->match_cookie;
2451 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2456 bus->match_callbacks_modified = true;
2457 r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2459 if (bus->bus_client)
2460 bus_remove_match_internal(bus, match, cookie);
2464 bus_match_parse_free(components, n_components);
2468 _public_ int sd_bus_remove_match(sd_bus *bus,
2470 sd_bus_message_handler_t callback,
2473 struct bus_match_component *components = NULL;
2474 unsigned n_components = 0;
2476 uint64_t cookie = 0;
2478 assert_return(bus, -EINVAL);
2479 assert_return(match, -EINVAL);
2480 assert_return(!bus_pid_changed(bus), -ECHILD);
2482 r = bus_match_parse(match, &components, &n_components);
2486 bus->match_callbacks_modified = true;
2487 r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2489 if (bus->bus_client)
2490 q = bus_remove_match_internal(bus, match, cookie);
2492 bus_match_parse_free(components, n_components);
2494 return r < 0 ? r : q;
2497 bool bus_pid_changed(sd_bus *bus) {
2500 /* We don't support people creating a bus connection and
2501 * keeping it around over a fork(). Let's complain. */
2503 return bus->original_pid != getpid();
2506 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2507 sd_bus *bus = userdata;
2512 r = sd_bus_process(bus, NULL);
2519 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2520 sd_bus *bus = userdata;
2525 r = sd_bus_process(bus, NULL);
2532 static int prepare_callback(sd_event_source *s, void *userdata) {
2533 sd_bus *bus = userdata;
2540 e = sd_bus_get_events(bus);
2544 if (bus->output_fd != bus->input_fd) {
2546 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2550 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2554 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2559 r = sd_bus_get_timeout(bus, &until);
2565 j = sd_event_source_set_time(bus->time_event_source, until);
2570 r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2577 static int quit_callback(sd_event_source *event, void *userdata) {
2578 sd_bus *bus = userdata;
2587 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2590 assert_return(bus, -EINVAL);
2591 assert_return(!bus->event, -EBUSY);
2593 assert(!bus->input_io_event_source);
2594 assert(!bus->output_io_event_source);
2595 assert(!bus->time_event_source);
2598 bus->event = sd_event_ref(event);
2600 r = sd_event_default(&bus->event);
2605 r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2609 r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2613 if (bus->output_fd != bus->input_fd) {
2614 r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2618 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2623 r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2627 r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
2631 r = sd_event_source_set_priority(bus->time_event_source, priority);
2635 r = sd_event_add_quit(bus->event, quit_callback, bus, &bus->quit_event_source);
2642 sd_bus_detach_event(bus);
2646 _public_ int sd_bus_detach_event(sd_bus *bus) {
2647 assert_return(bus, -EINVAL);
2648 assert_return(bus->event, -ENXIO);
2650 if (bus->input_io_event_source) {
2651 sd_event_source_set_enabled(bus->input_io_event_source, SD_EVENT_OFF);
2652 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2655 if (bus->output_io_event_source) {
2656 sd_event_source_set_enabled(bus->output_io_event_source, SD_EVENT_OFF);
2657 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2660 if (bus->time_event_source) {
2661 sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);
2662 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2665 if (bus->quit_event_source) {
2666 sd_event_source_set_enabled(bus->quit_event_source, SD_EVENT_OFF);
2667 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2671 bus->event = sd_event_unref(bus->event);
2676 _public_ sd_event* sd_bus_get_event(sd_bus *bus) {
2677 assert_return(bus, NULL);
2682 _public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2683 assert_return(bus, NULL);
2685 return bus->current;
2688 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
2693 assert(default_bus);
2696 return !!*default_bus;
2699 *ret = sd_bus_ref(*default_bus);
2707 b->default_bus_ptr = default_bus;
2715 _public_ int sd_bus_default_system(sd_bus **ret) {
2716 static __thread sd_bus *default_system_bus = NULL;
2718 return bus_default(sd_bus_open_system, &default_system_bus, ret);
2721 _public_ int sd_bus_default_user(sd_bus **ret) {
2722 static __thread sd_bus *default_user_bus = NULL;
2724 return bus_default(sd_bus_open_user, &default_user_bus, ret);
2727 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
2728 assert_return(b, -EINVAL);
2729 assert_return(tid, -EINVAL);
2730 assert_return(!bus_pid_changed(b), -ECHILD);
2738 return sd_event_get_tid(b->event, tid);
2743 _public_ char *sd_bus_label_escape(const char *s) {
2747 assert_return(s, NULL);
2749 /* Escapes all chars that D-Bus' object path cannot deal
2750 * with. Can be reversed with bus_path_unescape(). We special
2751 * case the empty string. */
2756 r = new(char, strlen(s)*3 + 1);
2760 for (f = s, t = r; *f; f++) {
2762 /* Escape everything that is not a-zA-Z0-9. We also
2763 * escape 0-9 if it's the first character */
2765 if (!(*f >= 'A' && *f <= 'Z') &&
2766 !(*f >= 'a' && *f <= 'z') &&
2767 !(f > s && *f >= '0' && *f <= '9')) {
2769 *(t++) = hexchar(*f >> 4);
2770 *(t++) = hexchar(*f);
2780 _public_ char *sd_bus_label_unescape(const char *f) {
2783 assert_return(f, NULL);
2785 /* Special case for the empty string */
2789 r = new(char, strlen(f) + 1);
2793 for (t = r; *f; f++) {
2798 if ((a = unhexchar(f[1])) < 0 ||
2799 (b = unhexchar(f[2])) < 0) {
2800 /* Invalid escape code, let's take it literal then */
2803 *(t++) = (char) ((a << 4) | b);