1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
39 #include "bus-internal.h"
40 #include "bus-message.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
49 #include "bus-container.h"
51 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
53 static void bus_close_fds(sd_bus *b) {
57 close_nointr_nofail(b->input_fd);
59 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
60 close_nointr_nofail(b->output_fd);
62 b->input_fd = b->output_fd = -1;
65 static void bus_node_destroy(sd_bus *b, struct node *n) {
66 struct node_callback *c;
67 struct node_vtable *v;
68 struct node_enumerator *e;
76 bus_node_destroy(b, n->child);
78 while ((c = n->callbacks)) {
79 LIST_REMOVE(callbacks, n->callbacks, c);
83 while ((v = n->vtables)) {
84 LIST_REMOVE(vtables, n->vtables, v);
89 while ((e = n->enumerators)) {
90 LIST_REMOVE(enumerators, n->enumerators, e);
95 LIST_REMOVE(siblings, n->parent->child, n);
97 assert_se(hashmap_remove(b->nodes, n->path) == n);
102 static void bus_reset_queues(sd_bus *b) {
107 for (i = 0; i < b->rqueue_size; i++)
108 sd_bus_message_unref(b->rqueue[i]);
111 for (i = 0; i < b->wqueue_size; i++)
112 sd_bus_message_unref(b->wqueue[i]);
115 b->rqueue = b->wqueue = NULL;
116 b->rqueue_size = b->wqueue_size = 0;
119 static void bus_free(sd_bus *b) {
120 struct filter_callback *f;
125 sd_bus_detach_event(b);
130 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
133 free(b->unique_name);
134 free(b->auth_buffer);
140 strv_free(b->exec_argv);
142 close_many(b->fds, b->n_fds);
147 hashmap_free_free(b->reply_callbacks);
148 prioq_free(b->reply_callbacks_prioq);
150 while ((f = b->filter_callbacks)) {
151 LIST_REMOVE(callbacks, b->filter_callbacks, f);
155 bus_match_free(&b->match_callbacks);
157 hashmap_free_free(b->vtable_methods);
158 hashmap_free_free(b->vtable_properties);
160 while ((n = hashmap_first(b->nodes)))
161 bus_node_destroy(b, n);
163 hashmap_free(b->nodes);
165 bus_kernel_flush_memfd(b);
167 assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
172 _public_ int sd_bus_new(sd_bus **ret) {
175 assert_return(ret, -EINVAL);
181 r->n_ref = REFCNT_INIT;
182 r->input_fd = r->output_fd = -1;
183 r->message_version = 1;
184 r->hello_flags |= KDBUS_HELLO_ACCEPT_FD|KDBUS_HELLO_ATTACH_NAMES;
185 r->original_pid = getpid();
187 assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
189 /* We guarantee that wqueue always has space for at least one
191 r->wqueue = new(sd_bus_message*, 1);
201 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
204 assert_return(bus, -EINVAL);
205 assert_return(bus->state == BUS_UNSET, -EPERM);
206 assert_return(address, -EINVAL);
207 assert_return(!bus_pid_changed(bus), -ECHILD);
219 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
220 assert_return(bus, -EINVAL);
221 assert_return(bus->state == BUS_UNSET, -EPERM);
222 assert_return(input_fd >= 0, -EINVAL);
223 assert_return(output_fd >= 0, -EINVAL);
224 assert_return(!bus_pid_changed(bus), -ECHILD);
226 bus->input_fd = input_fd;
227 bus->output_fd = output_fd;
231 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
234 assert_return(bus, -EINVAL);
235 assert_return(bus->state == BUS_UNSET, -EPERM);
236 assert_return(path, -EINVAL);
237 assert_return(!strv_isempty(argv), -EINVAL);
238 assert_return(!bus_pid_changed(bus), -ECHILD);
250 free(bus->exec_path);
251 strv_free(bus->exec_argv);
259 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
260 assert_return(bus, -EINVAL);
261 assert_return(bus->state == BUS_UNSET, -EPERM);
262 assert_return(!bus_pid_changed(bus), -ECHILD);
264 bus->bus_client = !!b;
268 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
269 assert_return(bus, -EINVAL);
270 assert_return(bus->state == BUS_UNSET, -EPERM);
271 assert_return(!bus_pid_changed(bus), -ECHILD);
273 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
277 _public_ int sd_bus_negotiate_attach_timestamp(sd_bus *bus, int b) {
278 assert_return(bus, -EINVAL);
279 assert_return(bus->state == BUS_UNSET, -EPERM);
280 assert_return(!bus_pid_changed(bus), -ECHILD);
282 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_TIMESTAMP, b);
286 _public_ int sd_bus_negotiate_attach_creds(sd_bus *bus, int b) {
287 assert_return(bus, -EINVAL);
288 assert_return(bus->state == BUS_UNSET, -EPERM);
289 assert_return(!bus_pid_changed(bus), -ECHILD);
291 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CREDS, b);
295 _public_ int sd_bus_negotiate_attach_comm(sd_bus *bus, int b) {
296 assert_return(bus, -EINVAL);
297 assert_return(bus->state == BUS_UNSET, -EPERM);
298 assert_return(!bus_pid_changed(bus), -ECHILD);
300 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_COMM, b);
304 _public_ int sd_bus_negotiate_attach_exe(sd_bus *bus, int b) {
305 assert_return(bus, -EINVAL);
306 assert_return(bus->state == BUS_UNSET, -EPERM);
307 assert_return(!bus_pid_changed(bus), -ECHILD);
309 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_EXE, b);
313 _public_ int sd_bus_negotiate_attach_cmdline(sd_bus *bus, int b) {
314 assert_return(bus, -EINVAL);
315 assert_return(bus->state == BUS_UNSET, -EPERM);
316 assert_return(!bus_pid_changed(bus), -ECHILD);
318 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CMDLINE, b);
322 _public_ int sd_bus_negotiate_attach_cgroup(sd_bus *bus, int b) {
323 assert_return(bus, -EINVAL);
324 assert_return(bus->state == BUS_UNSET, -EPERM);
325 assert_return(!bus_pid_changed(bus), -ECHILD);
327 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CGROUP, b);
331 _public_ int sd_bus_negotiate_attach_caps(sd_bus *bus, int b) {
332 assert_return(bus, -EINVAL);
333 assert_return(bus->state == BUS_UNSET, -EPERM);
334 assert_return(!bus_pid_changed(bus), -ECHILD);
336 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CAPS, b);
340 _public_ int sd_bus_negotiate_attach_selinux_context(sd_bus *bus, int b) {
341 assert_return(bus, -EINVAL);
342 assert_return(bus->state == BUS_UNSET, -EPERM);
343 assert_return(!bus_pid_changed(bus), -ECHILD);
345 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_SECLABEL, b);
349 _public_ int sd_bus_negotiate_attach_audit(sd_bus *bus, int b) {
350 assert_return(bus, -EINVAL);
351 assert_return(bus->state == BUS_UNSET, -EPERM);
352 assert_return(!bus_pid_changed(bus), -ECHILD);
354 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_AUDIT, b);
358 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
359 assert_return(bus, -EINVAL);
360 assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
361 assert_return(bus->state == BUS_UNSET, -EPERM);
362 assert_return(!bus_pid_changed(bus), -ECHILD);
364 bus->is_server = !!b;
365 bus->server_id = server_id;
369 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
370 assert_return(bus, -EINVAL);
371 assert_return(bus->state == BUS_UNSET, -EPERM);
372 assert_return(!bus_pid_changed(bus), -ECHILD);
374 bus->anonymous_auth = !!b;
378 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata, sd_bus_error *error) {
383 assert(bus->state == BUS_HELLO || bus->state == BUS_CLOSING);
386 r = sd_bus_message_get_errno(reply);
392 r = sd_bus_message_read(reply, "s", &s);
396 if (!service_name_is_valid(s) || s[0] != ':')
399 bus->unique_name = strdup(s);
400 if (!bus->unique_name)
403 if (bus->state == BUS_HELLO)
404 bus->state = BUS_RUNNING;
409 static int bus_send_hello(sd_bus *bus) {
410 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
415 if (!bus->bus_client || bus->is_kernel)
418 r = sd_bus_message_new_method_call(
420 "org.freedesktop.DBus",
422 "org.freedesktop.DBus",
428 return sd_bus_call_async(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
431 int bus_start_running(sd_bus *bus) {
434 if (bus->bus_client && !bus->is_kernel) {
435 bus->state = BUS_HELLO;
439 bus->state = BUS_RUNNING;
443 static int parse_address_key(const char **p, const char *key, char **value) {
454 if (strncmp(*p, key, l) != 0)
467 while (*a != ';' && *a != ',' && *a != 0) {
485 c = (char) ((x << 4) | y);
492 t = realloc(r, n + 2);
520 static void skip_address_key(const char **p) {
524 *p += strcspn(*p, ",");
530 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
531 _cleanup_free_ char *path = NULL, *abstract = NULL;
540 while (**p != 0 && **p != ';') {
541 r = parse_address_key(p, "guid", guid);
547 r = parse_address_key(p, "path", &path);
553 r = parse_address_key(p, "abstract", &abstract);
562 if (!path && !abstract)
565 if (path && abstract)
570 if (l > sizeof(b->sockaddr.un.sun_path))
573 b->sockaddr.un.sun_family = AF_UNIX;
574 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
575 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
576 } else if (abstract) {
577 l = strlen(abstract);
578 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
581 b->sockaddr.un.sun_family = AF_UNIX;
582 b->sockaddr.un.sun_path[0] = 0;
583 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
584 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
590 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
591 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
593 struct addrinfo *result, hints = {
594 .ai_socktype = SOCK_STREAM,
595 .ai_flags = AI_ADDRCONFIG,
603 while (**p != 0 && **p != ';') {
604 r = parse_address_key(p, "guid", guid);
610 r = parse_address_key(p, "host", &host);
616 r = parse_address_key(p, "port", &port);
622 r = parse_address_key(p, "family", &family);
635 if (streq(family, "ipv4"))
636 hints.ai_family = AF_INET;
637 else if (streq(family, "ipv6"))
638 hints.ai_family = AF_INET6;
643 r = getaddrinfo(host, port, &hints, &result);
647 return -EADDRNOTAVAIL;
649 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
650 b->sockaddr_size = result->ai_addrlen;
652 freeaddrinfo(result);
657 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
659 unsigned n_argv = 0, j;
668 while (**p != 0 && **p != ';') {
669 r = parse_address_key(p, "guid", guid);
675 r = parse_address_key(p, "path", &path);
681 if (startswith(*p, "argv")) {
685 ul = strtoul(*p + 4, (char**) p, 10);
686 if (errno > 0 || **p != '=' || ul > 256) {
696 x = realloc(argv, sizeof(char*) * (ul + 2));
702 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
708 r = parse_address_key(p, NULL, argv + ul);
723 /* Make sure there are no holes in the array, with the
724 * exception of argv[0] */
725 for (j = 1; j < n_argv; j++)
731 if (argv && argv[0] == NULL) {
732 argv[0] = strdup(path);
744 for (j = 0; j < n_argv; j++)
752 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
753 _cleanup_free_ char *path = NULL;
761 while (**p != 0 && **p != ';') {
762 r = parse_address_key(p, "guid", guid);
768 r = parse_address_key(p, "path", &path);
787 static int parse_container_address(sd_bus *b, const char **p, char **guid) {
788 _cleanup_free_ char *machine = NULL;
796 while (**p != 0 && **p != ';') {
797 r = parse_address_key(p, "guid", guid);
803 r = parse_address_key(p, "machine", &machine);
816 b->machine = machine;
819 b->sockaddr.un.sun_family = AF_UNIX;
820 strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
821 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
826 static void bus_reset_parsed_address(sd_bus *b) {
830 b->sockaddr_size = 0;
831 strv_free(b->exec_argv);
835 b->server_id = SD_ID128_NULL;
842 static int bus_parse_next_address(sd_bus *b) {
843 _cleanup_free_ char *guid = NULL;
851 if (b->address[b->address_index] == 0)
854 bus_reset_parsed_address(b);
856 a = b->address + b->address_index;
865 if (startswith(a, "unix:")) {
868 r = parse_unix_address(b, &a, &guid);
873 } else if (startswith(a, "tcp:")) {
876 r = parse_tcp_address(b, &a, &guid);
882 } else if (startswith(a, "unixexec:")) {
885 r = parse_exec_address(b, &a, &guid);
891 } else if (startswith(a, "kernel:")) {
894 r = parse_kernel_address(b, &a, &guid);
899 } else if (startswith(a, "x-container:")) {
902 r = parse_container_address(b, &a, &guid);
915 r = sd_id128_from_string(guid, &b->server_id);
920 b->address_index = a - b->address;
924 static int bus_start_address(sd_bus *b) {
934 r = bus_socket_exec(b);
938 b->last_connect_error = -r;
939 } else if (b->kernel) {
941 r = bus_kernel_connect(b);
945 b->last_connect_error = -r;
947 } else if (b->machine) {
949 r = bus_container_connect(b);
953 b->last_connect_error = -r;
955 } else if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
957 r = bus_socket_connect(b);
961 b->last_connect_error = -r;
964 r = bus_parse_next_address(b);
968 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
972 int bus_next_address(sd_bus *b) {
975 bus_reset_parsed_address(b);
976 return bus_start_address(b);
979 static int bus_start_fd(sd_bus *b) {
984 assert(b->input_fd >= 0);
985 assert(b->output_fd >= 0);
987 r = fd_nonblock(b->input_fd, true);
991 r = fd_cloexec(b->input_fd, true);
995 if (b->input_fd != b->output_fd) {
996 r = fd_nonblock(b->output_fd, true);
1000 r = fd_cloexec(b->output_fd, true);
1005 if (fstat(b->input_fd, &st) < 0)
1008 if (S_ISCHR(b->input_fd))
1009 return bus_kernel_take_fd(b);
1011 return bus_socket_take_fd(b);
1014 _public_ int sd_bus_start(sd_bus *bus) {
1017 assert_return(bus, -EINVAL);
1018 assert_return(bus->state == BUS_UNSET, -EPERM);
1019 assert_return(!bus_pid_changed(bus), -ECHILD);
1021 bus->state = BUS_OPENING;
1023 if (bus->is_server && bus->bus_client)
1026 if (bus->input_fd >= 0)
1027 r = bus_start_fd(bus);
1028 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
1029 r = bus_start_address(bus);
1036 return bus_send_hello(bus);
1039 _public_ int sd_bus_open_system(sd_bus **ret) {
1044 assert_return(ret, -EINVAL);
1050 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1052 r = sd_bus_set_address(b, e);
1056 b->sockaddr.un.sun_family = AF_UNIX;
1057 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
1058 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
1061 b->bus_client = true;
1063 r = sd_bus_start(b);
1075 _public_ int sd_bus_open_user(sd_bus **ret) {
1081 assert_return(ret, -EINVAL);
1087 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1089 r = sd_bus_set_address(b, e);
1093 e = secure_getenv("XDG_RUNTIME_DIR");
1100 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1105 b->sockaddr.un.sun_family = AF_UNIX;
1106 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1107 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1110 b->bus_client = true;
1112 r = sd_bus_start(b);
1124 _public_ int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1125 _cleanup_free_ char *e = NULL;
1130 assert_return(host, -EINVAL);
1131 assert_return(ret, -EINVAL);
1133 e = bus_address_escape(host);
1137 p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1141 r = sd_bus_new(&bus);
1148 bus->bus_client = true;
1150 r = sd_bus_start(bus);
1160 _public_ int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1161 _cleanup_free_ char *e = NULL;
1166 assert_return(machine, -EINVAL);
1167 assert_return(ret, -EINVAL);
1169 e = bus_address_escape(machine);
1173 p = strjoin("x-container:machine=", e, NULL);
1177 r = sd_bus_new(&bus);
1184 bus->bus_client = true;
1186 r = sd_bus_start(bus);
1196 _public_ void sd_bus_close(sd_bus *bus) {
1200 if (bus->state == BUS_CLOSED)
1202 if (bus_pid_changed(bus))
1205 bus->state = BUS_CLOSED;
1207 sd_bus_detach_event(bus);
1209 /* Drop all queued messages so that they drop references to
1210 * the bus object and the bus may be freed */
1211 bus_reset_queues(bus);
1213 if (!bus->is_kernel)
1216 /* We'll leave the fd open in case this is a kernel bus, since
1217 * there might still be memblocks around that reference this
1218 * bus, and they might need to invoke the
1219 * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1223 static void bus_enter_closing(sd_bus *bus) {
1226 if (bus->state != BUS_OPENING &&
1227 bus->state != BUS_AUTHENTICATING &&
1228 bus->state != BUS_HELLO &&
1229 bus->state != BUS_RUNNING)
1232 bus->state = BUS_CLOSING;
1235 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1236 assert_return(bus, NULL);
1238 assert_se(REFCNT_INC(bus->n_ref) >= 2);
1243 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1244 assert_return(bus, NULL);
1246 if (REFCNT_DEC(bus->n_ref) <= 0)
1252 _public_ int sd_bus_is_open(sd_bus *bus) {
1254 assert_return(bus, -EINVAL);
1255 assert_return(!bus_pid_changed(bus), -ECHILD);
1257 return BUS_IS_OPEN(bus->state);
1260 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1263 assert_return(bus, -EINVAL);
1264 assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1265 assert_return(!bus_pid_changed(bus), -ECHILD);
1267 if (type == SD_BUS_TYPE_UNIX_FD) {
1268 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1271 r = bus_ensure_running(bus);
1275 return bus->can_fds;
1278 return bus_type_is_valid(type);
1281 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1284 assert_return(bus, -EINVAL);
1285 assert_return(server_id, -EINVAL);
1286 assert_return(!bus_pid_changed(bus), -ECHILD);
1288 r = bus_ensure_running(bus);
1292 *server_id = bus->server_id;
1296 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1299 if (m->header->version > b->message_version)
1303 /* If we copy the same message to multiple
1304 * destinations, avoid using the same serial
1306 b->serial = MAX(b->serial, BUS_MESSAGE_SERIAL(m));
1310 return bus_message_seal(m, ++b->serial);
1313 static int bus_write_message(sd_bus *bus, sd_bus_message *message, size_t *idx) {
1320 r = bus_kernel_write_message(bus, message);
1322 r = bus_socket_write_message(bus, message, idx);
1327 static int dispatch_wqueue(sd_bus *bus) {
1331 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1333 while (bus->wqueue_size > 0) {
1335 r = bus_write_message(bus, bus->wqueue[0], &bus->windex);
1339 /* Didn't do anything this time */
1341 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1342 /* Fully written. Let's drop the entry from
1345 * This isn't particularly optimized, but
1346 * well, this is supposed to be our worst-case
1347 * buffer only, and the socket buffer is
1348 * supposed to be our primary buffer, and if
1349 * it got full, then all bets are off
1352 sd_bus_message_unref(bus->wqueue[0]);
1353 bus->wqueue_size --;
1354 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1364 static int bus_read_message(sd_bus *bus, sd_bus_message **m) {
1371 r = bus_kernel_read_message(bus, m);
1373 r = bus_socket_read_message(bus, m);
1378 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1379 sd_bus_message *z = NULL;
1384 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1386 if (bus->rqueue_size > 0) {
1387 /* Dispatch a queued message */
1389 *m = bus->rqueue[0];
1390 bus->rqueue_size --;
1391 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1395 /* Try to read a new message */
1397 r = bus_read_message(bus, &z);
1410 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1413 assert_return(bus, -EINVAL);
1414 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1415 assert_return(m, -EINVAL);
1416 assert_return(!bus_pid_changed(bus), -ECHILD);
1419 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1426 /* If the serial number isn't kept, then we know that no reply
1428 if (!serial && !m->sealed)
1429 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1431 r = bus_seal_message(bus, m);
1435 /* If this is a reply and no reply was requested, then let's
1436 * suppress this, if we can */
1437 if (m->dont_send && !serial)
1440 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1443 r = bus_write_message(bus, m, &idx);
1446 else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1447 /* Wasn't fully written. So let's remember how
1448 * much was written. Note that the first entry
1449 * of the wqueue array is always allocated so
1450 * that we always can remember how much was
1452 bus->wqueue[0] = sd_bus_message_ref(m);
1453 bus->wqueue_size = 1;
1459 /* Just append it to the queue. */
1461 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1464 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1469 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1473 *serial = BUS_MESSAGE_SERIAL(m);
1478 _public_ int sd_bus_send_to(sd_bus *bus, sd_bus_message *m, const char *destination, uint64_t *serial) {
1481 assert_return(bus, -EINVAL);
1482 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1483 assert_return(m, -EINVAL);
1484 assert_return(!bus_pid_changed(bus), -ECHILD);
1486 if (!streq_ptr(m->destination, destination)) {
1491 r = sd_bus_message_set_destination(m, destination);
1496 return sd_bus_send(bus, m, serial);
1499 static usec_t calc_elapse(uint64_t usec) {
1500 if (usec == (uint64_t) -1)
1504 usec = BUS_DEFAULT_TIMEOUT;
1506 return now(CLOCK_MONOTONIC) + usec;
1509 static int timeout_compare(const void *a, const void *b) {
1510 const struct reply_callback *x = a, *y = b;
1512 if (x->timeout != 0 && y->timeout == 0)
1515 if (x->timeout == 0 && y->timeout != 0)
1518 if (x->timeout < y->timeout)
1521 if (x->timeout > y->timeout)
1527 _public_ int sd_bus_call_async(
1530 sd_bus_message_handler_t callback,
1535 struct reply_callback *c;
1538 assert_return(bus, -EINVAL);
1539 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1540 assert_return(m, -EINVAL);
1541 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1542 assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1543 assert_return(callback, -EINVAL);
1544 assert_return(!bus_pid_changed(bus), -ECHILD);
1546 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1550 if (usec != (uint64_t) -1) {
1551 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1556 r = bus_seal_message(bus, m);
1560 c = new0(struct reply_callback, 1);
1564 c->callback = callback;
1565 c->userdata = userdata;
1566 c->serial = BUS_MESSAGE_SERIAL(m);
1567 c->timeout = calc_elapse(usec);
1569 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1575 if (c->timeout != 0) {
1576 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1579 sd_bus_call_async_cancel(bus, c->serial);
1584 r = sd_bus_send(bus, m, serial);
1586 sd_bus_call_async_cancel(bus, c->serial);
1593 _public_ int sd_bus_call_async_cancel(sd_bus *bus, uint64_t serial) {
1594 struct reply_callback *c;
1596 assert_return(bus, -EINVAL);
1597 assert_return(serial != 0, -EINVAL);
1598 assert_return(!bus_pid_changed(bus), -ECHILD);
1600 c = hashmap_remove(bus->reply_callbacks, &serial);
1604 if (c->timeout != 0)
1605 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1611 int bus_ensure_running(sd_bus *bus) {
1616 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED || bus->state == BUS_CLOSING)
1618 if (bus->state == BUS_RUNNING)
1622 r = sd_bus_process(bus, NULL);
1625 if (bus->state == BUS_RUNNING)
1630 r = sd_bus_wait(bus, (uint64_t) -1);
1636 _public_ int sd_bus_call(
1640 sd_bus_error *error,
1641 sd_bus_message **reply) {
1648 assert_return(bus, -EINVAL);
1649 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1650 assert_return(m, -EINVAL);
1651 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1652 assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1653 assert_return(!bus_error_is_dirty(error), -EINVAL);
1654 assert_return(!bus_pid_changed(bus), -ECHILD);
1656 r = bus_ensure_running(bus);
1660 r = sd_bus_send(bus, m, &serial);
1664 timeout = calc_elapse(usec);
1668 sd_bus_message *incoming = NULL;
1673 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1676 /* Make sure there's room for queuing this
1677 * locally, before we read the message */
1679 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1687 r = bus_read_message(bus, &incoming);
1693 if (incoming->reply_serial == serial) {
1694 /* Found a match! */
1696 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1701 sd_bus_message_unref(incoming);
1706 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) {
1709 r = sd_bus_error_copy(error, &incoming->error);
1711 sd_bus_message_unref(incoming);
1715 k = sd_bus_error_get_errno(&incoming->error);
1716 sd_bus_message_unref(incoming);
1720 sd_bus_message_unref(incoming);
1723 } else if (incoming->header->serial == serial &&
1726 streq(bus->unique_name, incoming->sender)) {
1728 /* Our own message? Somebody is trying
1729 * to send its own client a message,
1730 * let's not dead-lock, let's fail
1733 sd_bus_message_unref(incoming);
1737 /* There's already guaranteed to be room for
1738 * this, so need to resize things here */
1739 bus->rqueue[bus->rqueue_size ++] = incoming;
1742 /* Try to read more, right-away */
1751 n = now(CLOCK_MONOTONIC);
1757 left = (uint64_t) -1;
1759 r = bus_poll(bus, true, left);
1763 r = dispatch_wqueue(bus);
1769 _public_ int sd_bus_get_fd(sd_bus *bus) {
1771 assert_return(bus, -EINVAL);
1772 assert_return(bus->input_fd == bus->output_fd, -EPERM);
1773 assert_return(!bus_pid_changed(bus), -ECHILD);
1775 return bus->input_fd;
1778 _public_ int sd_bus_get_events(sd_bus *bus) {
1781 assert_return(bus, -EINVAL);
1782 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1783 assert_return(!bus_pid_changed(bus), -ECHILD);
1785 if (bus->state == BUS_OPENING)
1787 else if (bus->state == BUS_AUTHENTICATING) {
1789 if (bus_socket_auth_needs_write(bus))
1794 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1795 if (bus->rqueue_size <= 0)
1797 if (bus->wqueue_size > 0)
1804 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1805 struct reply_callback *c;
1807 assert_return(bus, -EINVAL);
1808 assert_return(timeout_usec, -EINVAL);
1809 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1810 assert_return(!bus_pid_changed(bus), -ECHILD);
1812 if (bus->state == BUS_CLOSING) {
1817 if (bus->state == BUS_AUTHENTICATING) {
1818 *timeout_usec = bus->auth_timeout;
1822 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1823 *timeout_usec = (uint64_t) -1;
1827 if (bus->rqueue_size > 0) {
1832 c = prioq_peek(bus->reply_callbacks_prioq);
1834 *timeout_usec = (uint64_t) -1;
1838 *timeout_usec = c->timeout;
1842 static int process_timeout(sd_bus *bus) {
1843 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1844 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1845 struct reply_callback *c;
1851 c = prioq_peek(bus->reply_callbacks_prioq);
1855 n = now(CLOCK_MONOTONIC);
1859 r = bus_message_new_synthetic_error(
1862 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1867 r = bus_seal_message(bus, m);
1871 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1872 hashmap_remove(bus->reply_callbacks, &c->serial);
1875 bus->iteration_counter ++;
1877 r = c->callback(bus, m, c->userdata, &error_buffer);
1878 r = bus_maybe_reply_error(m, r, &error_buffer);
1881 bus->current = NULL;
1886 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1890 if (bus->state != BUS_HELLO)
1893 /* Let's make sure the first message on the bus is the HELLO
1894 * reply. But note that we don't actually parse the message
1895 * here (we leave that to the usual handling), we just verify
1896 * we don't let any earlier msg through. */
1898 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1899 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1902 if (m->reply_serial != bus->hello_serial)
1908 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1909 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1910 struct reply_callback *c;
1916 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1917 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1920 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1924 if (c->timeout != 0)
1925 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1927 r = sd_bus_message_rewind(m, true);
1931 r = c->callback(bus, m, c->userdata, &error_buffer);
1932 r = bus_maybe_reply_error(m, r, &error_buffer);
1938 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1939 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1940 struct filter_callback *l;
1947 bus->filter_callbacks_modified = false;
1949 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1951 if (bus->filter_callbacks_modified)
1954 /* Don't run this more than once per iteration */
1955 if (l->last_iteration == bus->iteration_counter)
1958 l->last_iteration = bus->iteration_counter;
1960 r = sd_bus_message_rewind(m, true);
1964 r = l->callback(bus, m, l->userdata, &error_buffer);
1965 r = bus_maybe_reply_error(m, r, &error_buffer);
1971 } while (bus->filter_callbacks_modified);
1976 static int process_match(sd_bus *bus, sd_bus_message *m) {
1983 bus->match_callbacks_modified = false;
1985 r = bus_match_run(bus, &bus->match_callbacks, m);
1989 } while (bus->match_callbacks_modified);
1994 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1995 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
2001 if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
2004 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
2007 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2010 if (streq_ptr(m->member, "Ping"))
2011 r = sd_bus_message_new_method_return(m, &reply);
2012 else if (streq_ptr(m->member, "GetMachineId")) {
2016 r = sd_id128_get_machine(&id);
2020 r = sd_bus_message_new_method_return(m, &reply);
2024 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
2026 r = sd_bus_message_new_method_errorf(
2028 SD_BUS_ERROR_UNKNOWN_METHOD,
2029 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
2035 r = sd_bus_send(bus, reply, NULL);
2042 static int process_message(sd_bus *bus, sd_bus_message *m) {
2049 bus->iteration_counter++;
2051 log_debug("Got message sender=%s object=%s interface=%s member=%s",
2052 strna(sd_bus_message_get_sender(m)),
2053 strna(sd_bus_message_get_path(m)),
2054 strna(sd_bus_message_get_interface(m)),
2055 strna(sd_bus_message_get_member(m)));
2057 r = process_hello(bus, m);
2061 r = process_reply(bus, m);
2065 r = process_filter(bus, m);
2069 r = process_match(bus, m);
2073 r = process_builtin(bus, m);
2077 r = bus_process_object(bus, m);
2080 bus->current = NULL;
2084 static int process_running(sd_bus *bus, sd_bus_message **ret) {
2085 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2089 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2091 r = process_timeout(bus);
2095 r = dispatch_wqueue(bus);
2099 r = dispatch_rqueue(bus, &m);
2105 r = process_message(bus, m);
2110 r = sd_bus_message_rewind(m, true);
2119 if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2121 r = sd_bus_reply_method_errorf(
2123 SD_BUS_ERROR_UNKNOWN_OBJECT,
2124 "Unknown object '%s'.", m->path);
2138 static int process_closing(sd_bus *bus, sd_bus_message **ret) {
2139 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2140 struct reply_callback *c;
2144 assert(bus->state == BUS_CLOSING);
2146 c = hashmap_first(bus->reply_callbacks);
2148 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2150 /* First, fail all outstanding method calls */
2151 r = bus_message_new_synthetic_error(
2154 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Connection terminated"),
2159 r = bus_seal_message(bus, m);
2163 if (c->timeout != 0)
2164 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
2166 hashmap_remove(bus->reply_callbacks, &c->serial);
2169 bus->iteration_counter++;
2171 r = c->callback(bus, m, c->userdata, &error_buffer);
2172 r = bus_maybe_reply_error(m, r, &error_buffer);
2178 /* Then, synthesize a Disconnected message */
2179 r = sd_bus_message_new_signal(
2181 "/org/freedesktop/DBus/Local",
2182 "org.freedesktop.DBus.Local",
2188 r = bus_seal_message(bus, m);
2195 bus->iteration_counter++;
2197 r = process_filter(bus, m);
2201 r = process_match(bus, m);
2213 bus->current = NULL;
2217 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2218 BUS_DONT_DESTROY(bus);
2221 /* Returns 0 when we didn't do anything. This should cause the
2222 * caller to invoke sd_bus_wait() before returning the next
2223 * time. Returns > 0 when we did something, which possibly
2224 * means *ret is filled in with an unprocessed message. */
2226 assert_return(bus, -EINVAL);
2227 assert_return(!bus_pid_changed(bus), -ECHILD);
2229 /* We don't allow recursively invoking sd_bus_process(). */
2230 assert_return(!bus->current, -EBUSY);
2232 switch (bus->state) {
2239 r = bus_socket_process_opening(bus);
2240 if (r == -ECONNRESET || r == -EPIPE) {
2241 bus_enter_closing(bus);
2249 case BUS_AUTHENTICATING:
2250 r = bus_socket_process_authenticating(bus);
2251 if (r == -ECONNRESET || r == -EPIPE) {
2252 bus_enter_closing(bus);
2264 r = process_running(bus, ret);
2265 if (r == -ECONNRESET || r == -EPIPE) {
2266 bus_enter_closing(bus);
2276 return process_closing(bus, ret);
2279 assert_not_reached("Unknown state");
2282 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2283 struct pollfd p[2] = {};
2286 usec_t m = (usec_t) -1;
2290 if (bus->state == BUS_CLOSING)
2293 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2295 e = sd_bus_get_events(bus);
2300 /* The caller really needs some more data, he doesn't
2301 * care about what's already read, or any timeouts
2306 /* The caller wants to process if there's something to
2307 * process, but doesn't care otherwise */
2309 r = sd_bus_get_timeout(bus, &until);
2314 nw = now(CLOCK_MONOTONIC);
2315 m = until > nw ? until - nw : 0;
2319 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2322 p[0].fd = bus->input_fd;
2323 if (bus->output_fd == bus->input_fd) {
2327 p[0].events = e & POLLIN;
2328 p[1].fd = bus->output_fd;
2329 p[1].events = e & POLLOUT;
2333 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2337 return r > 0 ? 1 : 0;
2340 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2342 assert_return(bus, -EINVAL);
2343 assert_return(!bus_pid_changed(bus), -ECHILD);
2345 if (bus->state == BUS_CLOSING)
2348 assert_return(BUS_IS_OPEN(bus->state) , -ENOTCONN);
2350 if (bus->rqueue_size > 0)
2353 return bus_poll(bus, false, timeout_usec);
2356 _public_ int sd_bus_flush(sd_bus *bus) {
2359 assert_return(bus, -EINVAL);
2360 assert_return(!bus_pid_changed(bus), -ECHILD);
2362 if (bus->state == BUS_CLOSING)
2365 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2367 r = bus_ensure_running(bus);
2371 if (bus->wqueue_size <= 0)
2375 r = dispatch_wqueue(bus);
2379 if (bus->wqueue_size <= 0)
2382 r = bus_poll(bus, false, (uint64_t) -1);
2388 _public_ int sd_bus_add_filter(sd_bus *bus,
2389 sd_bus_message_handler_t callback,
2392 struct filter_callback *f;
2394 assert_return(bus, -EINVAL);
2395 assert_return(callback, -EINVAL);
2396 assert_return(!bus_pid_changed(bus), -ECHILD);
2398 f = new0(struct filter_callback, 1);
2401 f->callback = callback;
2402 f->userdata = userdata;
2404 bus->filter_callbacks_modified = true;
2405 LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2409 _public_ int sd_bus_remove_filter(sd_bus *bus,
2410 sd_bus_message_handler_t callback,
2413 struct filter_callback *f;
2415 assert_return(bus, -EINVAL);
2416 assert_return(callback, -EINVAL);
2417 assert_return(!bus_pid_changed(bus), -ECHILD);
2419 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2420 if (f->callback == callback && f->userdata == userdata) {
2421 bus->filter_callbacks_modified = true;
2422 LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2431 _public_ int sd_bus_add_match(sd_bus *bus,
2433 sd_bus_message_handler_t callback,
2436 struct bus_match_component *components = NULL;
2437 unsigned n_components = 0;
2438 uint64_t cookie = 0;
2441 assert_return(bus, -EINVAL);
2442 assert_return(match, -EINVAL);
2443 assert_return(!bus_pid_changed(bus), -ECHILD);
2445 r = bus_match_parse(match, &components, &n_components);
2449 if (bus->bus_client) {
2450 cookie = ++bus->match_cookie;
2452 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2457 bus->match_callbacks_modified = true;
2458 r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2460 if (bus->bus_client)
2461 bus_remove_match_internal(bus, match, cookie);
2465 bus_match_parse_free(components, n_components);
2469 _public_ int sd_bus_remove_match(sd_bus *bus,
2471 sd_bus_message_handler_t callback,
2474 struct bus_match_component *components = NULL;
2475 unsigned n_components = 0;
2477 uint64_t cookie = 0;
2479 assert_return(bus, -EINVAL);
2480 assert_return(match, -EINVAL);
2481 assert_return(!bus_pid_changed(bus), -ECHILD);
2483 r = bus_match_parse(match, &components, &n_components);
2487 bus->match_callbacks_modified = true;
2488 r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2490 if (bus->bus_client)
2491 q = bus_remove_match_internal(bus, match, cookie);
2493 bus_match_parse_free(components, n_components);
2495 return r < 0 ? r : q;
2498 bool bus_pid_changed(sd_bus *bus) {
2501 /* We don't support people creating a bus connection and
2502 * keeping it around over a fork(). Let's complain. */
2504 return bus->original_pid != getpid();
2507 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2508 sd_bus *bus = userdata;
2513 r = sd_bus_process(bus, NULL);
2520 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2521 sd_bus *bus = userdata;
2526 r = sd_bus_process(bus, NULL);
2533 static int prepare_callback(sd_event_source *s, void *userdata) {
2534 sd_bus *bus = userdata;
2541 e = sd_bus_get_events(bus);
2545 if (bus->output_fd != bus->input_fd) {
2547 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2551 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2555 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2560 r = sd_bus_get_timeout(bus, &until);
2566 j = sd_event_source_set_time(bus->time_event_source, until);
2571 r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2578 static int quit_callback(sd_event_source *event, void *userdata) {
2579 sd_bus *bus = userdata;
2588 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2591 assert_return(bus, -EINVAL);
2592 assert_return(!bus->event, -EBUSY);
2594 assert(!bus->input_io_event_source);
2595 assert(!bus->output_io_event_source);
2596 assert(!bus->time_event_source);
2599 bus->event = sd_event_ref(event);
2601 r = sd_event_default(&bus->event);
2606 r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2610 r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2614 if (bus->output_fd != bus->input_fd) {
2615 r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2619 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2624 r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2628 r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
2632 r = sd_event_source_set_priority(bus->time_event_source, priority);
2636 r = sd_event_add_quit(bus->event, quit_callback, bus, &bus->quit_event_source);
2643 sd_bus_detach_event(bus);
2647 _public_ int sd_bus_detach_event(sd_bus *bus) {
2648 assert_return(bus, -EINVAL);
2649 assert_return(bus->event, -ENXIO);
2651 if (bus->input_io_event_source) {
2652 sd_event_source_set_enabled(bus->input_io_event_source, SD_EVENT_OFF);
2653 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2656 if (bus->output_io_event_source) {
2657 sd_event_source_set_enabled(bus->output_io_event_source, SD_EVENT_OFF);
2658 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2661 if (bus->time_event_source) {
2662 sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);
2663 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2666 if (bus->quit_event_source) {
2667 sd_event_source_set_enabled(bus->quit_event_source, SD_EVENT_OFF);
2668 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2672 bus->event = sd_event_unref(bus->event);
2677 _public_ sd_event* sd_bus_get_event(sd_bus *bus) {
2678 assert_return(bus, NULL);
2683 _public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2684 assert_return(bus, NULL);
2686 return bus->current;
2689 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
2694 assert(default_bus);
2697 return !!*default_bus;
2700 *ret = sd_bus_ref(*default_bus);
2708 b->default_bus_ptr = default_bus;
2716 _public_ int sd_bus_default_system(sd_bus **ret) {
2717 static __thread sd_bus *default_system_bus = NULL;
2719 return bus_default(sd_bus_open_system, &default_system_bus, ret);
2722 _public_ int sd_bus_default_user(sd_bus **ret) {
2723 static __thread sd_bus *default_user_bus = NULL;
2725 return bus_default(sd_bus_open_user, &default_user_bus, ret);
2728 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
2729 assert_return(b, -EINVAL);
2730 assert_return(tid, -EINVAL);
2731 assert_return(!bus_pid_changed(b), -ECHILD);
2739 return sd_event_get_tid(b->event, tid);
2744 _public_ char *sd_bus_label_escape(const char *s) {
2748 assert_return(s, NULL);
2750 /* Escapes all chars that D-Bus' object path cannot deal
2751 * with. Can be reversed with bus_path_unescape(). We special
2752 * case the empty string. */
2757 r = new(char, strlen(s)*3 + 1);
2761 for (f = s, t = r; *f; f++) {
2763 /* Escape everything that is not a-zA-Z0-9. We also
2764 * escape 0-9 if it's the first character */
2766 if (!(*f >= 'A' && *f <= 'Z') &&
2767 !(*f >= 'a' && *f <= 'z') &&
2768 !(f > s && *f >= '0' && *f <= '9')) {
2770 *(t++) = hexchar(*f >> 4);
2771 *(t++) = hexchar(*f);
2781 _public_ char *sd_bus_label_unescape(const char *f) {
2784 assert_return(f, NULL);
2786 /* Special case for the empty string */
2790 r = new(char, strlen(f) + 1);
2794 for (t = r; *f; f++) {
2799 if ((a = unhexchar(f[1])) < 0 ||
2800 (b = unhexchar(f[2])) < 0) {
2801 /* Invalid escape code, let's take it literal then */
2804 *(t++) = (char) ((a << 4) | b);