1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
39 #include "bus-internal.h"
40 #include "bus-message.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
49 #include "bus-container.h"
51 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
53 static void bus_close_fds(sd_bus *b) {
57 close_nointr_nofail(b->input_fd);
59 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
60 close_nointr_nofail(b->output_fd);
62 b->input_fd = b->output_fd = -1;
65 static void bus_node_destroy(sd_bus *b, struct node *n) {
66 struct node_callback *c;
67 struct node_vtable *v;
68 struct node_enumerator *e;
76 bus_node_destroy(b, n->child);
78 while ((c = n->callbacks)) {
79 LIST_REMOVE(callbacks, n->callbacks, c);
83 while ((v = n->vtables)) {
84 LIST_REMOVE(vtables, n->vtables, v);
89 while ((e = n->enumerators)) {
90 LIST_REMOVE(enumerators, n->enumerators, e);
95 LIST_REMOVE(siblings, n->parent->child, n);
97 assert_se(hashmap_remove(b->nodes, n->path) == n);
102 static void bus_free(sd_bus *b) {
103 struct filter_callback *f;
109 sd_bus_detach_event(b);
114 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
117 free(b->unique_name);
118 free(b->auth_buffer);
124 strv_free(b->exec_argv);
126 close_many(b->fds, b->n_fds);
129 for (i = 0; i < b->rqueue_size; i++)
130 sd_bus_message_unref(b->rqueue[i]);
133 for (i = 0; i < b->wqueue_size; i++)
134 sd_bus_message_unref(b->wqueue[i]);
137 hashmap_free_free(b->reply_callbacks);
138 prioq_free(b->reply_callbacks_prioq);
140 while ((f = b->filter_callbacks)) {
141 LIST_REMOVE(callbacks, b->filter_callbacks, f);
145 bus_match_free(&b->match_callbacks);
147 hashmap_free_free(b->vtable_methods);
148 hashmap_free_free(b->vtable_properties);
150 while ((n = hashmap_first(b->nodes)))
151 bus_node_destroy(b, n);
153 hashmap_free(b->nodes);
155 bus_kernel_flush_memfd(b);
157 assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
162 _public_ int sd_bus_new(sd_bus **ret) {
165 assert_return(ret, -EINVAL);
171 r->n_ref = REFCNT_INIT;
172 r->input_fd = r->output_fd = -1;
173 r->message_version = 1;
174 r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
175 r->original_pid = getpid();
177 assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
179 /* We guarantee that wqueue always has space for at least one
181 r->wqueue = new(sd_bus_message*, 1);
191 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
194 assert_return(bus, -EINVAL);
195 assert_return(bus->state == BUS_UNSET, -EPERM);
196 assert_return(address, -EINVAL);
197 assert_return(!bus_pid_changed(bus), -ECHILD);
209 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
210 assert_return(bus, -EINVAL);
211 assert_return(bus->state == BUS_UNSET, -EPERM);
212 assert_return(input_fd >= 0, -EINVAL);
213 assert_return(output_fd >= 0, -EINVAL);
214 assert_return(!bus_pid_changed(bus), -ECHILD);
216 bus->input_fd = input_fd;
217 bus->output_fd = output_fd;
221 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
224 assert_return(bus, -EINVAL);
225 assert_return(bus->state == BUS_UNSET, -EPERM);
226 assert_return(path, -EINVAL);
227 assert_return(!strv_isempty(argv), -EINVAL);
228 assert_return(!bus_pid_changed(bus), -ECHILD);
240 free(bus->exec_path);
241 strv_free(bus->exec_argv);
249 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
250 assert_return(bus, -EINVAL);
251 assert_return(bus->state == BUS_UNSET, -EPERM);
252 assert_return(!bus_pid_changed(bus), -ECHILD);
254 bus->bus_client = !!b;
258 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
259 assert_return(bus, -EINVAL);
260 assert_return(bus->state == BUS_UNSET, -EPERM);
261 assert_return(!bus_pid_changed(bus), -ECHILD);
263 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
267 _public_ int sd_bus_negotiate_attach_timestamp(sd_bus *bus, int b) {
268 assert_return(bus, -EINVAL);
269 assert_return(bus->state == BUS_UNSET, -EPERM);
270 assert_return(!bus_pid_changed(bus), -ECHILD);
272 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_TIMESTAMP, b);
276 _public_ int sd_bus_negotiate_attach_creds(sd_bus *bus, int b) {
277 assert_return(bus, -EINVAL);
278 assert_return(bus->state == BUS_UNSET, -EPERM);
279 assert_return(!bus_pid_changed(bus), -ECHILD);
281 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CREDS, b);
285 _public_ int sd_bus_negotiate_attach_comm(sd_bus *bus, int b) {
286 assert_return(bus, -EINVAL);
287 assert_return(bus->state == BUS_UNSET, -EPERM);
288 assert_return(!bus_pid_changed(bus), -ECHILD);
290 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_COMM, b);
294 _public_ int sd_bus_negotiate_attach_exe(sd_bus *bus, int b) {
295 assert_return(bus, -EINVAL);
296 assert_return(bus->state == BUS_UNSET, -EPERM);
297 assert_return(!bus_pid_changed(bus), -ECHILD);
299 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_EXE, b);
303 _public_ int sd_bus_negotiate_attach_cmdline(sd_bus *bus, int b) {
304 assert_return(bus, -EINVAL);
305 assert_return(bus->state == BUS_UNSET, -EPERM);
306 assert_return(!bus_pid_changed(bus), -ECHILD);
308 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CMDLINE, b);
312 _public_ int sd_bus_negotiate_attach_cgroup(sd_bus *bus, int b) {
313 assert_return(bus, -EINVAL);
314 assert_return(bus->state == BUS_UNSET, -EPERM);
315 assert_return(!bus_pid_changed(bus), -ECHILD);
317 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CGROUP, b);
321 _public_ int sd_bus_negotiate_attach_caps(sd_bus *bus, int b) {
322 assert_return(bus, -EINVAL);
323 assert_return(bus->state == BUS_UNSET, -EPERM);
324 assert_return(!bus_pid_changed(bus), -ECHILD);
326 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CAPS, b);
330 _public_ int sd_bus_negotiate_attach_selinux_context(sd_bus *bus, int b) {
331 assert_return(bus, -EINVAL);
332 assert_return(bus->state == BUS_UNSET, -EPERM);
333 assert_return(!bus_pid_changed(bus), -ECHILD);
335 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_SECLABEL, b);
339 _public_ int sd_bus_negotiate_attach_audit(sd_bus *bus, int b) {
340 assert_return(bus, -EINVAL);
341 assert_return(bus->state == BUS_UNSET, -EPERM);
342 assert_return(!bus_pid_changed(bus), -ECHILD);
344 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_AUDIT, b);
348 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
349 assert_return(bus, -EINVAL);
350 assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
351 assert_return(bus->state == BUS_UNSET, -EPERM);
352 assert_return(!bus_pid_changed(bus), -ECHILD);
354 bus->is_server = !!b;
355 bus->server_id = server_id;
359 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
360 assert_return(bus, -EINVAL);
361 assert_return(bus->state == BUS_UNSET, -EPERM);
362 assert_return(!bus_pid_changed(bus), -ECHILD);
364 bus->anonymous_auth = !!b;
368 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata, sd_bus_error *error) {
373 assert(bus->state == BUS_HELLO);
376 r = sd_bus_message_get_errno(reply);
382 r = sd_bus_message_read(reply, "s", &s);
386 if (!service_name_is_valid(s) || s[0] != ':')
389 bus->unique_name = strdup(s);
390 if (!bus->unique_name)
393 bus->state = BUS_RUNNING;
398 static int bus_send_hello(sd_bus *bus) {
399 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
404 if (!bus->bus_client || bus->is_kernel)
407 r = sd_bus_message_new_method_call(
409 "org.freedesktop.DBus",
411 "org.freedesktop.DBus",
417 return sd_bus_call_async(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
420 int bus_start_running(sd_bus *bus) {
423 if (bus->bus_client && !bus->is_kernel) {
424 bus->state = BUS_HELLO;
428 bus->state = BUS_RUNNING;
432 static int parse_address_key(const char **p, const char *key, char **value) {
443 if (strncmp(*p, key, l) != 0)
456 while (*a != ';' && *a != ',' && *a != 0) {
474 c = (char) ((x << 4) | y);
481 t = realloc(r, n + 2);
509 static void skip_address_key(const char **p) {
513 *p += strcspn(*p, ",");
519 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
520 _cleanup_free_ char *path = NULL, *abstract = NULL;
529 while (**p != 0 && **p != ';') {
530 r = parse_address_key(p, "guid", guid);
536 r = parse_address_key(p, "path", &path);
542 r = parse_address_key(p, "abstract", &abstract);
551 if (!path && !abstract)
554 if (path && abstract)
559 if (l > sizeof(b->sockaddr.un.sun_path))
562 b->sockaddr.un.sun_family = AF_UNIX;
563 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
564 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
565 } else if (abstract) {
566 l = strlen(abstract);
567 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
570 b->sockaddr.un.sun_family = AF_UNIX;
571 b->sockaddr.un.sun_path[0] = 0;
572 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
573 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
579 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
580 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
582 struct addrinfo *result, hints = {
583 .ai_socktype = SOCK_STREAM,
584 .ai_flags = AI_ADDRCONFIG,
592 while (**p != 0 && **p != ';') {
593 r = parse_address_key(p, "guid", guid);
599 r = parse_address_key(p, "host", &host);
605 r = parse_address_key(p, "port", &port);
611 r = parse_address_key(p, "family", &family);
624 if (streq(family, "ipv4"))
625 hints.ai_family = AF_INET;
626 else if (streq(family, "ipv6"))
627 hints.ai_family = AF_INET6;
632 r = getaddrinfo(host, port, &hints, &result);
636 return -EADDRNOTAVAIL;
638 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
639 b->sockaddr_size = result->ai_addrlen;
641 freeaddrinfo(result);
646 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
648 unsigned n_argv = 0, j;
657 while (**p != 0 && **p != ';') {
658 r = parse_address_key(p, "guid", guid);
664 r = parse_address_key(p, "path", &path);
670 if (startswith(*p, "argv")) {
674 ul = strtoul(*p + 4, (char**) p, 10);
675 if (errno > 0 || **p != '=' || ul > 256) {
685 x = realloc(argv, sizeof(char*) * (ul + 2));
691 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
697 r = parse_address_key(p, NULL, argv + ul);
712 /* Make sure there are no holes in the array, with the
713 * exception of argv[0] */
714 for (j = 1; j < n_argv; j++)
720 if (argv && argv[0] == NULL) {
721 argv[0] = strdup(path);
733 for (j = 0; j < n_argv; j++)
741 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
742 _cleanup_free_ char *path = NULL;
750 while (**p != 0 && **p != ';') {
751 r = parse_address_key(p, "guid", guid);
757 r = parse_address_key(p, "path", &path);
776 static int parse_container_address(sd_bus *b, const char **p, char **guid) {
777 _cleanup_free_ char *machine = NULL;
785 while (**p != 0 && **p != ';') {
786 r = parse_address_key(p, "guid", guid);
792 r = parse_address_key(p, "machine", &machine);
805 b->machine = machine;
808 b->sockaddr.un.sun_family = AF_UNIX;
809 strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
810 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
815 static void bus_reset_parsed_address(sd_bus *b) {
819 b->sockaddr_size = 0;
820 strv_free(b->exec_argv);
824 b->server_id = SD_ID128_NULL;
831 static int bus_parse_next_address(sd_bus *b) {
832 _cleanup_free_ char *guid = NULL;
840 if (b->address[b->address_index] == 0)
843 bus_reset_parsed_address(b);
845 a = b->address + b->address_index;
854 if (startswith(a, "unix:")) {
857 r = parse_unix_address(b, &a, &guid);
862 } else if (startswith(a, "tcp:")) {
865 r = parse_tcp_address(b, &a, &guid);
871 } else if (startswith(a, "unixexec:")) {
874 r = parse_exec_address(b, &a, &guid);
880 } else if (startswith(a, "kernel:")) {
883 r = parse_kernel_address(b, &a, &guid);
888 } else if (startswith(a, "x-container:")) {
891 r = parse_container_address(b, &a, &guid);
904 r = sd_id128_from_string(guid, &b->server_id);
909 b->address_index = a - b->address;
913 static int bus_start_address(sd_bus *b) {
923 r = bus_socket_exec(b);
927 b->last_connect_error = -r;
928 } else if (b->kernel) {
930 r = bus_kernel_connect(b);
934 b->last_connect_error = -r;
936 } else if (b->machine) {
938 r = bus_container_connect(b);
942 b->last_connect_error = -r;
944 } else if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
946 r = bus_socket_connect(b);
950 b->last_connect_error = -r;
953 r = bus_parse_next_address(b);
957 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
961 int bus_next_address(sd_bus *b) {
964 bus_reset_parsed_address(b);
965 return bus_start_address(b);
968 static int bus_start_fd(sd_bus *b) {
973 assert(b->input_fd >= 0);
974 assert(b->output_fd >= 0);
976 r = fd_nonblock(b->input_fd, true);
980 r = fd_cloexec(b->input_fd, true);
984 if (b->input_fd != b->output_fd) {
985 r = fd_nonblock(b->output_fd, true);
989 r = fd_cloexec(b->output_fd, true);
994 if (fstat(b->input_fd, &st) < 0)
997 if (S_ISCHR(b->input_fd))
998 return bus_kernel_take_fd(b);
1000 return bus_socket_take_fd(b);
1003 _public_ int sd_bus_start(sd_bus *bus) {
1006 assert_return(bus, -EINVAL);
1007 assert_return(bus->state == BUS_UNSET, -EPERM);
1008 assert_return(!bus_pid_changed(bus), -ECHILD);
1010 bus->state = BUS_OPENING;
1012 if (bus->is_server && bus->bus_client)
1015 if (bus->input_fd >= 0)
1016 r = bus_start_fd(bus);
1017 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
1018 r = bus_start_address(bus);
1025 return bus_send_hello(bus);
1028 _public_ int sd_bus_open_system(sd_bus **ret) {
1033 assert_return(ret, -EINVAL);
1039 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1041 r = sd_bus_set_address(b, e);
1045 b->sockaddr.un.sun_family = AF_UNIX;
1046 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
1047 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
1050 b->bus_client = true;
1052 r = sd_bus_start(b);
1064 _public_ int sd_bus_open_user(sd_bus **ret) {
1070 assert_return(ret, -EINVAL);
1076 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1078 r = sd_bus_set_address(b, e);
1082 e = secure_getenv("XDG_RUNTIME_DIR");
1089 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1094 b->sockaddr.un.sun_family = AF_UNIX;
1095 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1096 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1099 b->bus_client = true;
1101 r = sd_bus_start(b);
1113 _public_ int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1114 _cleanup_free_ char *e = NULL;
1119 assert_return(host, -EINVAL);
1120 assert_return(ret, -EINVAL);
1122 e = bus_address_escape(host);
1126 p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1130 r = sd_bus_new(&bus);
1137 bus->bus_client = true;
1139 r = sd_bus_start(bus);
1149 _public_ int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1150 _cleanup_free_ char *e = NULL;
1155 assert_return(machine, -EINVAL);
1156 assert_return(ret, -EINVAL);
1158 e = bus_address_escape(machine);
1162 p = strjoin("x-container:machine=", e, NULL);
1166 r = sd_bus_new(&bus);
1173 bus->bus_client = true;
1175 r = sd_bus_start(bus);
1185 _public_ void sd_bus_close(sd_bus *bus) {
1188 if (bus->state == BUS_CLOSED)
1190 if (bus_pid_changed(bus))
1193 bus->state = BUS_CLOSED;
1195 sd_bus_detach_event(bus);
1197 if (!bus->is_kernel)
1200 /* We'll leave the fd open in case this is a kernel bus, since
1201 * there might still be memblocks around that reference this
1202 * bus, and they might need to invoke the
1203 * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1207 static void bus_enter_closing(sd_bus *bus) {
1210 if (bus->state != BUS_OPENING &&
1211 bus->state != BUS_AUTHENTICATING &&
1212 bus->state != BUS_HELLO &&
1213 bus->state != BUS_RUNNING)
1216 bus->state = BUS_CLOSING;
1219 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1220 assert_return(bus, NULL);
1222 assert_se(REFCNT_INC(bus->n_ref) >= 2);
1227 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1228 assert_return(bus, NULL);
1230 if (REFCNT_DEC(bus->n_ref) <= 0)
1236 _public_ int sd_bus_is_open(sd_bus *bus) {
1238 assert_return(bus, -EINVAL);
1239 assert_return(!bus_pid_changed(bus), -ECHILD);
1241 return BUS_IS_OPEN(bus->state);
1244 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1247 assert_return(bus, -EINVAL);
1248 assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1249 assert_return(!bus_pid_changed(bus), -ECHILD);
1251 if (type == SD_BUS_TYPE_UNIX_FD) {
1252 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1255 r = bus_ensure_running(bus);
1259 return bus->can_fds;
1262 return bus_type_is_valid(type);
1265 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1268 assert_return(bus, -EINVAL);
1269 assert_return(server_id, -EINVAL);
1270 assert_return(!bus_pid_changed(bus), -ECHILD);
1272 r = bus_ensure_running(bus);
1276 *server_id = bus->server_id;
1280 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1283 if (m->header->version > b->message_version)
1287 /* If we copy the same message to multiple
1288 * destinations, avoid using the same serial
1290 b->serial = MAX(b->serial, BUS_MESSAGE_SERIAL(m));
1294 return bus_message_seal(m, ++b->serial);
1297 static int bus_write_message(sd_bus *bus, sd_bus_message *message, size_t *idx) {
1304 r = bus_kernel_write_message(bus, message);
1306 r = bus_socket_write_message(bus, message, idx);
1311 static int dispatch_wqueue(sd_bus *bus) {
1315 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1317 while (bus->wqueue_size > 0) {
1319 r = bus_write_message(bus, bus->wqueue[0], &bus->windex);
1323 /* Didn't do anything this time */
1325 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1326 /* Fully written. Let's drop the entry from
1329 * This isn't particularly optimized, but
1330 * well, this is supposed to be our worst-case
1331 * buffer only, and the socket buffer is
1332 * supposed to be our primary buffer, and if
1333 * it got full, then all bets are off
1336 sd_bus_message_unref(bus->wqueue[0]);
1337 bus->wqueue_size --;
1338 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1348 static int bus_read_message(sd_bus *bus, sd_bus_message **m) {
1355 r = bus_kernel_read_message(bus, m);
1357 r = bus_socket_read_message(bus, m);
1362 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1363 sd_bus_message *z = NULL;
1368 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1370 if (bus->rqueue_size > 0) {
1371 /* Dispatch a queued message */
1373 *m = bus->rqueue[0];
1374 bus->rqueue_size --;
1375 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1379 /* Try to read a new message */
1381 r = bus_read_message(bus, &z);
1394 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1397 assert_return(bus, -EINVAL);
1398 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1399 assert_return(m, -EINVAL);
1400 assert_return(!bus_pid_changed(bus), -ECHILD);
1403 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1410 /* If the serial number isn't kept, then we know that no reply
1412 if (!serial && !m->sealed)
1413 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1415 r = bus_seal_message(bus, m);
1419 /* If this is a reply and no reply was requested, then let's
1420 * suppress this, if we can */
1421 if (m->dont_send && !serial)
1424 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1427 r = bus_write_message(bus, m, &idx);
1430 else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1431 /* Wasn't fully written. So let's remember how
1432 * much was written. Note that the first entry
1433 * of the wqueue array is always allocated so
1434 * that we always can remember how much was
1436 bus->wqueue[0] = sd_bus_message_ref(m);
1437 bus->wqueue_size = 1;
1443 /* Just append it to the queue. */
1445 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1448 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1453 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1457 *serial = BUS_MESSAGE_SERIAL(m);
1462 _public_ int sd_bus_send_to(sd_bus *bus, sd_bus_message *m, const char *destination, uint64_t *serial) {
1465 assert_return(bus, -EINVAL);
1466 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1467 assert_return(m, -EINVAL);
1468 assert_return(!bus_pid_changed(bus), -ECHILD);
1470 if (!streq_ptr(m->destination, destination)) {
1475 r = sd_bus_message_set_destination(m, destination);
1480 return sd_bus_send(bus, m, serial);
1483 static usec_t calc_elapse(uint64_t usec) {
1484 if (usec == (uint64_t) -1)
1488 usec = BUS_DEFAULT_TIMEOUT;
1490 return now(CLOCK_MONOTONIC) + usec;
1493 static int timeout_compare(const void *a, const void *b) {
1494 const struct reply_callback *x = a, *y = b;
1496 if (x->timeout != 0 && y->timeout == 0)
1499 if (x->timeout == 0 && y->timeout != 0)
1502 if (x->timeout < y->timeout)
1505 if (x->timeout > y->timeout)
1511 _public_ int sd_bus_call_async(
1514 sd_bus_message_handler_t callback,
1519 struct reply_callback *c;
1522 assert_return(bus, -EINVAL);
1523 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1524 assert_return(m, -EINVAL);
1525 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1526 assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1527 assert_return(callback, -EINVAL);
1528 assert_return(!bus_pid_changed(bus), -ECHILD);
1530 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1534 if (usec != (uint64_t) -1) {
1535 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1540 r = bus_seal_message(bus, m);
1544 c = new0(struct reply_callback, 1);
1548 c->callback = callback;
1549 c->userdata = userdata;
1550 c->serial = BUS_MESSAGE_SERIAL(m);
1551 c->timeout = calc_elapse(usec);
1553 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1559 if (c->timeout != 0) {
1560 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1563 sd_bus_call_async_cancel(bus, c->serial);
1568 r = sd_bus_send(bus, m, serial);
1570 sd_bus_call_async_cancel(bus, c->serial);
1577 _public_ int sd_bus_call_async_cancel(sd_bus *bus, uint64_t serial) {
1578 struct reply_callback *c;
1580 assert_return(bus, -EINVAL);
1581 assert_return(serial != 0, -EINVAL);
1582 assert_return(!bus_pid_changed(bus), -ECHILD);
1584 c = hashmap_remove(bus->reply_callbacks, &serial);
1588 if (c->timeout != 0)
1589 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1595 int bus_ensure_running(sd_bus *bus) {
1600 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED || bus->state == BUS_CLOSING)
1602 if (bus->state == BUS_RUNNING)
1606 r = sd_bus_process(bus, NULL);
1609 if (bus->state == BUS_RUNNING)
1614 r = sd_bus_wait(bus, (uint64_t) -1);
1620 _public_ int sd_bus_call(
1624 sd_bus_error *error,
1625 sd_bus_message **reply) {
1632 assert_return(bus, -EINVAL);
1633 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1634 assert_return(m, -EINVAL);
1635 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1636 assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1637 assert_return(!bus_error_is_dirty(error), -EINVAL);
1638 assert_return(!bus_pid_changed(bus), -ECHILD);
1640 r = bus_ensure_running(bus);
1644 r = sd_bus_send(bus, m, &serial);
1648 timeout = calc_elapse(usec);
1652 sd_bus_message *incoming = NULL;
1657 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1660 /* Make sure there's room for queuing this
1661 * locally, before we read the message */
1663 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1671 r = bus_read_message(bus, &incoming);
1677 if (incoming->reply_serial == serial) {
1678 /* Found a match! */
1680 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1685 sd_bus_message_unref(incoming);
1690 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) {
1693 r = sd_bus_error_copy(error, &incoming->error);
1695 sd_bus_message_unref(incoming);
1699 k = sd_bus_error_get_errno(&incoming->error);
1700 sd_bus_message_unref(incoming);
1704 sd_bus_message_unref(incoming);
1707 } else if (incoming->header->serial == serial &&
1710 streq(bus->unique_name, incoming->sender)) {
1712 /* Our own message? Somebody is trying
1713 * to send its own client a message,
1714 * let's not dead-lock, let's fail
1717 sd_bus_message_unref(incoming);
1721 /* There's already guaranteed to be room for
1722 * this, so need to resize things here */
1723 bus->rqueue[bus->rqueue_size ++] = incoming;
1726 /* Try to read more, right-away */
1735 n = now(CLOCK_MONOTONIC);
1741 left = (uint64_t) -1;
1743 r = bus_poll(bus, true, left);
1747 r = dispatch_wqueue(bus);
1753 _public_ int sd_bus_get_fd(sd_bus *bus) {
1755 assert_return(bus, -EINVAL);
1756 assert_return(bus->input_fd == bus->output_fd, -EPERM);
1757 assert_return(!bus_pid_changed(bus), -ECHILD);
1759 return bus->input_fd;
1762 _public_ int sd_bus_get_events(sd_bus *bus) {
1765 assert_return(bus, -EINVAL);
1766 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1767 assert_return(!bus_pid_changed(bus), -ECHILD);
1769 if (bus->state == BUS_OPENING)
1771 else if (bus->state == BUS_AUTHENTICATING) {
1773 if (bus_socket_auth_needs_write(bus))
1778 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1779 if (bus->rqueue_size <= 0)
1781 if (bus->wqueue_size > 0)
1788 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1789 struct reply_callback *c;
1791 assert_return(bus, -EINVAL);
1792 assert_return(timeout_usec, -EINVAL);
1793 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1794 assert_return(!bus_pid_changed(bus), -ECHILD);
1796 if (bus->state == BUS_CLOSING) {
1801 if (bus->state == BUS_AUTHENTICATING) {
1802 *timeout_usec = bus->auth_timeout;
1806 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1807 *timeout_usec = (uint64_t) -1;
1811 if (bus->rqueue_size > 0) {
1816 c = prioq_peek(bus->reply_callbacks_prioq);
1818 *timeout_usec = (uint64_t) -1;
1822 *timeout_usec = c->timeout;
1826 static int process_timeout(sd_bus *bus) {
1827 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1828 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1829 struct reply_callback *c;
1835 c = prioq_peek(bus->reply_callbacks_prioq);
1839 n = now(CLOCK_MONOTONIC);
1843 r = bus_message_new_synthetic_error(
1846 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1851 r = bus_seal_message(bus, m);
1855 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1856 hashmap_remove(bus->reply_callbacks, &c->serial);
1859 bus->iteration_counter ++;
1861 r = c->callback(bus, m, c->userdata, &error_buffer);
1862 r = bus_maybe_reply_error(m, r, &error_buffer);
1865 bus->current = NULL;
1870 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1874 if (bus->state != BUS_HELLO)
1877 /* Let's make sure the first message on the bus is the HELLO
1878 * reply. But note that we don't actually parse the message
1879 * here (we leave that to the usual handling), we just verify
1880 * we don't let any earlier msg through. */
1882 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1883 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1886 if (m->reply_serial != bus->hello_serial)
1892 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1893 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1894 struct reply_callback *c;
1900 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1901 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1904 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1908 if (c->timeout != 0)
1909 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1911 r = sd_bus_message_rewind(m, true);
1915 r = c->callback(bus, m, c->userdata, &error_buffer);
1916 r = bus_maybe_reply_error(m, r, &error_buffer);
1922 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1923 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1924 struct filter_callback *l;
1931 bus->filter_callbacks_modified = false;
1933 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1935 if (bus->filter_callbacks_modified)
1938 /* Don't run this more than once per iteration */
1939 if (l->last_iteration == bus->iteration_counter)
1942 l->last_iteration = bus->iteration_counter;
1944 r = sd_bus_message_rewind(m, true);
1948 r = l->callback(bus, m, &error_buffer, l->userdata);
1949 r = bus_maybe_reply_error(m, r, &error_buffer);
1955 } while (bus->filter_callbacks_modified);
1960 static int process_match(sd_bus *bus, sd_bus_message *m) {
1967 bus->match_callbacks_modified = false;
1969 r = bus_match_run(bus, &bus->match_callbacks, m);
1973 } while (bus->match_callbacks_modified);
1978 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1979 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1985 if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
1988 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1991 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1994 if (streq_ptr(m->member, "Ping"))
1995 r = sd_bus_message_new_method_return(m, &reply);
1996 else if (streq_ptr(m->member, "GetMachineId")) {
2000 r = sd_id128_get_machine(&id);
2004 r = sd_bus_message_new_method_return(m, &reply);
2008 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
2010 r = sd_bus_message_new_method_errorf(
2012 SD_BUS_ERROR_UNKNOWN_METHOD,
2013 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
2019 r = sd_bus_send(bus, reply, NULL);
2026 static int process_message(sd_bus *bus, sd_bus_message *m) {
2033 bus->iteration_counter++;
2035 log_debug("Got message sender=%s object=%s interface=%s member=%s",
2036 strna(sd_bus_message_get_sender(m)),
2037 strna(sd_bus_message_get_path(m)),
2038 strna(sd_bus_message_get_interface(m)),
2039 strna(sd_bus_message_get_member(m)));
2041 r = process_hello(bus, m);
2045 r = process_reply(bus, m);
2049 r = process_filter(bus, m);
2053 r = process_match(bus, m);
2057 r = process_builtin(bus, m);
2061 r = bus_process_object(bus, m);
2064 bus->current = NULL;
2068 static int process_running(sd_bus *bus, sd_bus_message **ret) {
2069 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2073 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2075 r = process_timeout(bus);
2079 r = dispatch_wqueue(bus);
2083 r = dispatch_rqueue(bus, &m);
2089 r = process_message(bus, m);
2094 r = sd_bus_message_rewind(m, true);
2103 if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2105 r = sd_bus_reply_method_errorf(
2107 SD_BUS_ERROR_UNKNOWN_OBJECT,
2108 "Unknown object '%s'.", m->path);
2122 static int process_closing(sd_bus *bus, sd_bus_message **ret) {
2123 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2124 struct reply_callback *c;
2128 assert(bus->state == BUS_CLOSING);
2130 c = hashmap_first(bus->reply_callbacks);
2132 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2134 /* First, fail all outstanding method calls */
2135 r = bus_message_new_synthetic_error(
2138 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Connection terminated"),
2143 r = bus_seal_message(bus, m);
2147 if (c->timeout != 0)
2148 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
2150 hashmap_remove(bus->reply_callbacks, &c->serial);
2153 bus->iteration_counter++;
2155 r = c->callback(bus, m, c->userdata, &error_buffer);
2156 r = bus_maybe_reply_error(m, r, &error_buffer);
2162 /* Then, synthesize a Disconnected message */
2163 r = sd_bus_message_new_signal(
2165 "/org/freedesktop/DBus/Local",
2166 "org.freedesktop.DBus.Local",
2172 r = bus_seal_message(bus, m);
2179 bus->iteration_counter++;
2181 r = process_filter(bus, m);
2185 r = process_match(bus, m);
2197 bus->current = NULL;
2201 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2202 BUS_DONT_DESTROY(bus);
2205 /* Returns 0 when we didn't do anything. This should cause the
2206 * caller to invoke sd_bus_wait() before returning the next
2207 * time. Returns > 0 when we did something, which possibly
2208 * means *ret is filled in with an unprocessed message. */
2210 assert_return(bus, -EINVAL);
2211 assert_return(!bus_pid_changed(bus), -ECHILD);
2213 /* We don't allow recursively invoking sd_bus_process(). */
2214 assert_return(!bus->current, -EBUSY);
2216 switch (bus->state) {
2223 r = bus_socket_process_opening(bus);
2224 if (r == -ECONNRESET || r == -EPIPE) {
2225 bus_enter_closing(bus);
2233 case BUS_AUTHENTICATING:
2234 r = bus_socket_process_authenticating(bus);
2235 if (r == -ECONNRESET || r == -EPIPE) {
2236 bus_enter_closing(bus);
2248 r = process_running(bus, ret);
2249 if (r == -ECONNRESET || r == -EPIPE) {
2250 bus_enter_closing(bus);
2260 return process_closing(bus, ret);
2263 assert_not_reached("Unknown state");
2266 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2267 struct pollfd p[2] = {};
2270 usec_t m = (usec_t) -1;
2274 if (bus->state == BUS_CLOSING)
2277 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2279 e = sd_bus_get_events(bus);
2284 /* The caller really needs some more data, he doesn't
2285 * care about what's already read, or any timeouts
2290 /* The caller wants to process if there's something to
2291 * process, but doesn't care otherwise */
2293 r = sd_bus_get_timeout(bus, &until);
2298 nw = now(CLOCK_MONOTONIC);
2299 m = until > nw ? until - nw : 0;
2303 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2306 p[0].fd = bus->input_fd;
2307 if (bus->output_fd == bus->input_fd) {
2311 p[0].events = e & POLLIN;
2312 p[1].fd = bus->output_fd;
2313 p[1].events = e & POLLOUT;
2317 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2321 return r > 0 ? 1 : 0;
2324 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2326 assert_return(bus, -EINVAL);
2327 assert_return(!bus_pid_changed(bus), -ECHILD);
2329 if (bus->state == BUS_CLOSING)
2332 assert_return(BUS_IS_OPEN(bus->state) , -ENOTCONN);
2334 if (bus->rqueue_size > 0)
2337 return bus_poll(bus, false, timeout_usec);
2340 _public_ int sd_bus_flush(sd_bus *bus) {
2343 assert_return(bus, -EINVAL);
2344 assert_return(!bus_pid_changed(bus), -ECHILD);
2346 if (bus->state == BUS_CLOSING)
2349 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2351 r = bus_ensure_running(bus);
2355 if (bus->wqueue_size <= 0)
2359 r = dispatch_wqueue(bus);
2363 if (bus->wqueue_size <= 0)
2366 r = bus_poll(bus, false, (uint64_t) -1);
2372 _public_ int sd_bus_add_filter(sd_bus *bus,
2373 sd_bus_message_handler_t callback,
2376 struct filter_callback *f;
2378 assert_return(bus, -EINVAL);
2379 assert_return(callback, -EINVAL);
2380 assert_return(!bus_pid_changed(bus), -ECHILD);
2382 f = new0(struct filter_callback, 1);
2385 f->callback = callback;
2386 f->userdata = userdata;
2388 bus->filter_callbacks_modified = true;
2389 LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2393 _public_ int sd_bus_remove_filter(sd_bus *bus,
2394 sd_bus_message_handler_t callback,
2397 struct filter_callback *f;
2399 assert_return(bus, -EINVAL);
2400 assert_return(callback, -EINVAL);
2401 assert_return(!bus_pid_changed(bus), -ECHILD);
2403 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2404 if (f->callback == callback && f->userdata == userdata) {
2405 bus->filter_callbacks_modified = true;
2406 LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2415 _public_ int sd_bus_add_match(sd_bus *bus,
2417 sd_bus_message_handler_t callback,
2420 struct bus_match_component *components = NULL;
2421 unsigned n_components = 0;
2422 uint64_t cookie = 0;
2425 assert_return(bus, -EINVAL);
2426 assert_return(match, -EINVAL);
2427 assert_return(!bus_pid_changed(bus), -ECHILD);
2429 r = bus_match_parse(match, &components, &n_components);
2433 if (bus->bus_client) {
2434 cookie = ++bus->match_cookie;
2436 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2441 bus->match_callbacks_modified = true;
2442 r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2444 if (bus->bus_client)
2445 bus_remove_match_internal(bus, match, cookie);
2449 bus_match_parse_free(components, n_components);
2453 _public_ int sd_bus_remove_match(sd_bus *bus,
2455 sd_bus_message_handler_t callback,
2458 struct bus_match_component *components = NULL;
2459 unsigned n_components = 0;
2461 uint64_t cookie = 0;
2463 assert_return(bus, -EINVAL);
2464 assert_return(match, -EINVAL);
2465 assert_return(!bus_pid_changed(bus), -ECHILD);
2467 r = bus_match_parse(match, &components, &n_components);
2471 bus->match_callbacks_modified = true;
2472 r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2474 if (bus->bus_client)
2475 q = bus_remove_match_internal(bus, match, cookie);
2477 bus_match_parse_free(components, n_components);
2479 return r < 0 ? r : q;
2482 bool bus_pid_changed(sd_bus *bus) {
2485 /* We don't support people creating a bus connection and
2486 * keeping it around over a fork(). Let's complain. */
2488 return bus->original_pid != getpid();
2491 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2492 sd_bus *bus = userdata;
2497 r = sd_bus_process(bus, NULL);
2504 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2505 sd_bus *bus = userdata;
2510 r = sd_bus_process(bus, NULL);
2517 static int prepare_callback(sd_event_source *s, void *userdata) {
2518 sd_bus *bus = userdata;
2525 e = sd_bus_get_events(bus);
2529 if (bus->output_fd != bus->input_fd) {
2531 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2535 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2539 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2544 r = sd_bus_get_timeout(bus, &until);
2550 j = sd_event_source_set_time(bus->time_event_source, until);
2555 r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2562 static int quit_callback(sd_event_source *event, void *userdata) {
2563 sd_bus *bus = userdata;
2572 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2575 assert_return(bus, -EINVAL);
2576 assert_return(!bus->event, -EBUSY);
2578 assert(!bus->input_io_event_source);
2579 assert(!bus->output_io_event_source);
2580 assert(!bus->time_event_source);
2583 bus->event = sd_event_ref(event);
2585 r = sd_event_default(&bus->event);
2590 r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2594 r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2598 if (bus->output_fd != bus->input_fd) {
2599 r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2603 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2608 r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2612 r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
2616 r = sd_event_source_set_priority(bus->time_event_source, priority);
2620 r = sd_event_add_quit(bus->event, quit_callback, bus, &bus->quit_event_source);
2627 sd_bus_detach_event(bus);
2631 _public_ int sd_bus_detach_event(sd_bus *bus) {
2632 assert_return(bus, -EINVAL);
2633 assert_return(bus->event, -ENXIO);
2635 if (bus->input_io_event_source) {
2636 sd_event_source_set_enabled(bus->input_io_event_source, SD_EVENT_OFF);
2637 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2640 if (bus->output_io_event_source) {
2641 sd_event_source_set_enabled(bus->output_io_event_source, SD_EVENT_OFF);
2642 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2645 if (bus->time_event_source) {
2646 sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);
2647 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2650 if (bus->quit_event_source) {
2651 sd_event_source_set_enabled(bus->quit_event_source, SD_EVENT_OFF);
2652 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2656 bus->event = sd_event_unref(bus->event);
2661 _public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2662 assert_return(bus, NULL);
2664 return bus->current;
2667 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
2672 assert(default_bus);
2675 return !!*default_bus;
2678 *ret = sd_bus_ref(*default_bus);
2686 b->default_bus_ptr = default_bus;
2694 _public_ int sd_bus_default_system(sd_bus **ret) {
2695 static __thread sd_bus *default_system_bus = NULL;
2697 return bus_default(sd_bus_open_system, &default_system_bus, ret);
2700 _public_ int sd_bus_default_user(sd_bus **ret) {
2701 static __thread sd_bus *default_user_bus = NULL;
2703 return bus_default(sd_bus_open_user, &default_user_bus, ret);
2706 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
2707 assert_return(b, -EINVAL);
2708 assert_return(tid, -EINVAL);
2709 assert_return(!bus_pid_changed(b), -ECHILD);
2717 return sd_event_get_tid(b->event, tid);
2722 _public_ char *sd_bus_label_escape(const char *s) {
2726 assert_return(s, NULL);
2728 /* Escapes all chars that D-Bus' object path cannot deal
2729 * with. Can be reversed with bus_path_unescape(). We special
2730 * case the empty string. */
2735 r = new(char, strlen(s)*3 + 1);
2739 for (f = s, t = r; *f; f++) {
2741 /* Escape everything that is not a-zA-Z0-9. We also
2742 * escape 0-9 if it's the first character */
2744 if (!(*f >= 'A' && *f <= 'Z') &&
2745 !(*f >= 'a' && *f <= 'z') &&
2746 !(f > s && *f >= '0' && *f <= '9')) {
2748 *(t++) = hexchar(*f >> 4);
2749 *(t++) = hexchar(*f);
2759 _public_ char *sd_bus_label_unescape(const char *f) {
2762 assert_return(f, NULL);
2764 /* Special case for the empty string */
2768 r = new(char, strlen(f) + 1);
2772 for (t = r; *f; f++) {
2777 if ((a = unhexchar(f[1])) < 0 ||
2778 (b = unhexchar(f[2])) < 0) {
2779 /* Invalid escape code, let's take it literal then */
2782 *(t++) = (char) ((a << 4) | b);