1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
39 #include "bus-internal.h"
40 #include "bus-message.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
49 #include "bus-container.h"
50 #include "bus-protocol.h"
52 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
54 static void bus_close_fds(sd_bus *b) {
58 close_nointr_nofail(b->input_fd);
60 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
61 close_nointr_nofail(b->output_fd);
63 b->input_fd = b->output_fd = -1;
66 static void bus_node_destroy(sd_bus *b, struct node *n) {
67 struct node_callback *c;
68 struct node_vtable *v;
69 struct node_enumerator *e;
77 bus_node_destroy(b, n->child);
79 while ((c = n->callbacks)) {
80 LIST_REMOVE(callbacks, n->callbacks, c);
84 while ((v = n->vtables)) {
85 LIST_REMOVE(vtables, n->vtables, v);
90 while ((e = n->enumerators)) {
91 LIST_REMOVE(enumerators, n->enumerators, e);
96 LIST_REMOVE(siblings, n->parent->child, n);
98 assert_se(hashmap_remove(b->nodes, n->path) == n);
103 static void bus_reset_queues(sd_bus *b) {
108 for (i = 0; i < b->rqueue_size; i++)
109 sd_bus_message_unref(b->rqueue[i]);
112 for (i = 0; i < b->wqueue_size; i++)
113 sd_bus_message_unref(b->wqueue[i]);
116 b->rqueue = b->wqueue = NULL;
117 b->rqueue_size = b->wqueue_size = 0;
120 static void bus_free(sd_bus *b) {
121 struct filter_callback *f;
126 sd_bus_detach_event(b);
131 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
134 free(b->unique_name);
135 free(b->auth_buffer);
141 strv_free(b->exec_argv);
143 close_many(b->fds, b->n_fds);
148 hashmap_free_free(b->reply_callbacks);
149 prioq_free(b->reply_callbacks_prioq);
151 while ((f = b->filter_callbacks)) {
152 LIST_REMOVE(callbacks, b->filter_callbacks, f);
156 bus_match_free(&b->match_callbacks);
158 hashmap_free_free(b->vtable_methods);
159 hashmap_free_free(b->vtable_properties);
161 while ((n = hashmap_first(b->nodes)))
162 bus_node_destroy(b, n);
164 hashmap_free(b->nodes);
166 bus_kernel_flush_memfd(b);
168 assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
173 _public_ int sd_bus_new(sd_bus **ret) {
176 assert_return(ret, -EINVAL);
182 r->n_ref = REFCNT_INIT;
183 r->input_fd = r->output_fd = -1;
184 r->message_version = 1;
185 r->creds_mask |= SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
186 r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
187 r->attach_flags |= KDBUS_ATTACH_NAMES;
188 r->original_pid = getpid();
190 assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
192 /* We guarantee that wqueue always has space for at least one
194 r->wqueue = new(sd_bus_message*, 1);
204 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
207 assert_return(bus, -EINVAL);
208 assert_return(bus->state == BUS_UNSET, -EPERM);
209 assert_return(address, -EINVAL);
210 assert_return(!bus_pid_changed(bus), -ECHILD);
222 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
223 assert_return(bus, -EINVAL);
224 assert_return(bus->state == BUS_UNSET, -EPERM);
225 assert_return(input_fd >= 0, -EINVAL);
226 assert_return(output_fd >= 0, -EINVAL);
227 assert_return(!bus_pid_changed(bus), -ECHILD);
229 bus->input_fd = input_fd;
230 bus->output_fd = output_fd;
234 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
237 assert_return(bus, -EINVAL);
238 assert_return(bus->state == BUS_UNSET, -EPERM);
239 assert_return(path, -EINVAL);
240 assert_return(!strv_isempty(argv), -EINVAL);
241 assert_return(!bus_pid_changed(bus), -ECHILD);
253 free(bus->exec_path);
254 strv_free(bus->exec_argv);
262 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
263 assert_return(bus, -EINVAL);
264 assert_return(bus->state == BUS_UNSET, -EPERM);
265 assert_return(!bus_pid_changed(bus), -ECHILD);
267 bus->bus_client = !!b;
271 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
272 assert_return(bus, -EINVAL);
273 assert_return(bus->state == BUS_UNSET, -EPERM);
274 assert_return(!bus_pid_changed(bus), -ECHILD);
276 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
280 _public_ int sd_bus_negotiate_attach_timestamp(sd_bus *bus, int b) {
281 assert_return(bus, -EINVAL);
282 assert_return(bus->state == BUS_UNSET, -EPERM);
283 assert_return(!bus_pid_changed(bus), -ECHILD);
285 SET_FLAG(bus->attach_flags, KDBUS_ATTACH_TIMESTAMP, b);
289 _public_ int sd_bus_negotiate_attach_creds(sd_bus *bus, uint64_t mask) {
290 assert_return(bus, -EINVAL);
291 assert_return(mask <= _SD_BUS_CREDS_ALL, -EINVAL);
292 assert_return(bus->state == BUS_UNSET, -EPERM);
293 assert_return(!bus_pid_changed(bus), -ECHILD);
295 /* The well knowns we need unconditionally, so that matches can work */
296 bus->creds_mask = mask | SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
298 return kdbus_translate_attach_flags(bus->creds_mask, &bus->creds_mask);
301 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
302 assert_return(bus, -EINVAL);
303 assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
304 assert_return(bus->state == BUS_UNSET, -EPERM);
305 assert_return(!bus_pid_changed(bus), -ECHILD);
307 bus->is_server = !!b;
308 bus->server_id = server_id;
312 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
313 assert_return(bus, -EINVAL);
314 assert_return(bus->state == BUS_UNSET, -EPERM);
315 assert_return(!bus_pid_changed(bus), -ECHILD);
317 bus->anonymous_auth = !!b;
321 _public_ int sd_bus_set_trusted(sd_bus *bus, int b) {
322 assert_return(bus, -EINVAL);
323 assert_return(bus->state == BUS_UNSET, -EPERM);
324 assert_return(!bus_pid_changed(bus), -ECHILD);
330 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata, sd_bus_error *error) {
335 assert(bus->state == BUS_HELLO || bus->state == BUS_CLOSING);
338 r = sd_bus_message_get_errno(reply);
344 r = sd_bus_message_read(reply, "s", &s);
348 if (!service_name_is_valid(s) || s[0] != ':')
351 bus->unique_name = strdup(s);
352 if (!bus->unique_name)
355 if (bus->state == BUS_HELLO)
356 bus->state = BUS_RUNNING;
361 static int bus_send_hello(sd_bus *bus) {
362 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
367 if (!bus->bus_client || bus->is_kernel)
370 r = sd_bus_message_new_method_call(
372 "org.freedesktop.DBus",
374 "org.freedesktop.DBus",
380 return sd_bus_call_async(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
383 int bus_start_running(sd_bus *bus) {
386 if (bus->bus_client && !bus->is_kernel) {
387 bus->state = BUS_HELLO;
391 bus->state = BUS_RUNNING;
395 static int parse_address_key(const char **p, const char *key, char **value) {
406 if (strncmp(*p, key, l) != 0)
419 while (*a != ';' && *a != ',' && *a != 0) {
437 c = (char) ((x << 4) | y);
444 t = realloc(r, n + 2);
472 static void skip_address_key(const char **p) {
476 *p += strcspn(*p, ",");
482 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
483 _cleanup_free_ char *path = NULL, *abstract = NULL;
492 while (**p != 0 && **p != ';') {
493 r = parse_address_key(p, "guid", guid);
499 r = parse_address_key(p, "path", &path);
505 r = parse_address_key(p, "abstract", &abstract);
514 if (!path && !abstract)
517 if (path && abstract)
522 if (l > sizeof(b->sockaddr.un.sun_path))
525 b->sockaddr.un.sun_family = AF_UNIX;
526 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
527 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
528 } else if (abstract) {
529 l = strlen(abstract);
530 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
533 b->sockaddr.un.sun_family = AF_UNIX;
534 b->sockaddr.un.sun_path[0] = 0;
535 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
536 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
542 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
543 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
545 struct addrinfo *result, hints = {
546 .ai_socktype = SOCK_STREAM,
547 .ai_flags = AI_ADDRCONFIG,
555 while (**p != 0 && **p != ';') {
556 r = parse_address_key(p, "guid", guid);
562 r = parse_address_key(p, "host", &host);
568 r = parse_address_key(p, "port", &port);
574 r = parse_address_key(p, "family", &family);
587 if (streq(family, "ipv4"))
588 hints.ai_family = AF_INET;
589 else if (streq(family, "ipv6"))
590 hints.ai_family = AF_INET6;
595 r = getaddrinfo(host, port, &hints, &result);
599 return -EADDRNOTAVAIL;
601 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
602 b->sockaddr_size = result->ai_addrlen;
604 freeaddrinfo(result);
609 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
611 unsigned n_argv = 0, j;
620 while (**p != 0 && **p != ';') {
621 r = parse_address_key(p, "guid", guid);
627 r = parse_address_key(p, "path", &path);
633 if (startswith(*p, "argv")) {
637 ul = strtoul(*p + 4, (char**) p, 10);
638 if (errno > 0 || **p != '=' || ul > 256) {
648 x = realloc(argv, sizeof(char*) * (ul + 2));
654 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
660 r = parse_address_key(p, NULL, argv + ul);
675 /* Make sure there are no holes in the array, with the
676 * exception of argv[0] */
677 for (j = 1; j < n_argv; j++)
683 if (argv && argv[0] == NULL) {
684 argv[0] = strdup(path);
696 for (j = 0; j < n_argv; j++)
704 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
705 _cleanup_free_ char *path = NULL;
713 while (**p != 0 && **p != ';') {
714 r = parse_address_key(p, "guid", guid);
720 r = parse_address_key(p, "path", &path);
739 static int parse_container_address(sd_bus *b, const char **p, char **guid) {
740 _cleanup_free_ char *machine = NULL;
748 while (**p != 0 && **p != ';') {
749 r = parse_address_key(p, "guid", guid);
755 r = parse_address_key(p, "machine", &machine);
767 if (!filename_is_safe(machine))
771 b->machine = machine;
774 b->sockaddr.un.sun_family = AF_UNIX;
775 strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
776 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
781 static void bus_reset_parsed_address(sd_bus *b) {
785 b->sockaddr_size = 0;
786 strv_free(b->exec_argv);
790 b->server_id = SD_ID128_NULL;
797 static int bus_parse_next_address(sd_bus *b) {
798 _cleanup_free_ char *guid = NULL;
806 if (b->address[b->address_index] == 0)
809 bus_reset_parsed_address(b);
811 a = b->address + b->address_index;
820 if (startswith(a, "unix:")) {
823 r = parse_unix_address(b, &a, &guid);
828 } else if (startswith(a, "tcp:")) {
831 r = parse_tcp_address(b, &a, &guid);
837 } else if (startswith(a, "unixexec:")) {
840 r = parse_exec_address(b, &a, &guid);
846 } else if (startswith(a, "kernel:")) {
849 r = parse_kernel_address(b, &a, &guid);
854 } else if (startswith(a, "x-container:")) {
857 r = parse_container_address(b, &a, &guid);
870 r = sd_id128_from_string(guid, &b->server_id);
875 b->address_index = a - b->address;
879 static int bus_start_address(sd_bus *b) {
889 r = bus_socket_exec(b);
893 b->last_connect_error = -r;
894 } else if (b->kernel) {
896 r = bus_kernel_connect(b);
900 b->last_connect_error = -r;
902 } else if (b->machine) {
904 r = bus_container_connect(b);
908 b->last_connect_error = -r;
910 } else if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
912 r = bus_socket_connect(b);
916 b->last_connect_error = -r;
919 r = bus_parse_next_address(b);
923 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
927 int bus_next_address(sd_bus *b) {
930 bus_reset_parsed_address(b);
931 return bus_start_address(b);
934 static int bus_start_fd(sd_bus *b) {
939 assert(b->input_fd >= 0);
940 assert(b->output_fd >= 0);
942 r = fd_nonblock(b->input_fd, true);
946 r = fd_cloexec(b->input_fd, true);
950 if (b->input_fd != b->output_fd) {
951 r = fd_nonblock(b->output_fd, true);
955 r = fd_cloexec(b->output_fd, true);
960 if (fstat(b->input_fd, &st) < 0)
963 if (S_ISCHR(b->input_fd))
964 return bus_kernel_take_fd(b);
966 return bus_socket_take_fd(b);
969 _public_ int sd_bus_start(sd_bus *bus) {
972 assert_return(bus, -EINVAL);
973 assert_return(bus->state == BUS_UNSET, -EPERM);
974 assert_return(!bus_pid_changed(bus), -ECHILD);
976 bus->state = BUS_OPENING;
978 if (bus->is_server && bus->bus_client)
981 if (bus->input_fd >= 0)
982 r = bus_start_fd(bus);
983 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
984 r = bus_start_address(bus);
991 return bus_send_hello(bus);
994 _public_ int sd_bus_open_system(sd_bus **ret) {
999 assert_return(ret, -EINVAL);
1005 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1007 r = sd_bus_set_address(b, e);
1010 r = sd_bus_set_address(b, "kernel:path=/dev/kdbus/0-system/bus;unix:path=/run/dbus/system_bus_socket");
1012 r = sd_bus_set_address(b, "unix:path=/run/dbus/system_bus_socket");
1018 b->bus_client = true;
1020 /* Let's do per-method access control on the system bus. We
1021 * need the caller's UID and capability set for that. */
1023 b->attach_flags |= KDBUS_ATTACH_CAPS | KDBUS_ATTACH_CREDS;
1025 r = sd_bus_start(b);
1037 _public_ int sd_bus_open_user(sd_bus **ret) {
1042 assert_return(ret, -EINVAL);
1048 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1050 r = sd_bus_set_address(b, e);
1054 e = secure_getenv("XDG_RUNTIME_DIR");
1056 _cleanup_free_ char *ee = NULL;
1058 ee = bus_address_escape(e);
1065 asprintf(&b->address, "kernel:path=/dev/kdbus/%lu-user/bus;unix:path=%s/bus", (unsigned long) getuid(), ee);
1067 b->address = strjoin("unix:path=", ee, "/bus", NULL);
1071 asprintf(&b->address, "kernel:path=/dev/kdbus/%lu-user/bus", (unsigned long) getuid());
1073 return -ECONNREFUSED;
1083 b->bus_client = true;
1085 /* We don't do any per-method access control on the user
1089 r = sd_bus_start(b);
1101 _public_ int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1102 _cleanup_free_ char *e = NULL;
1107 assert_return(host, -EINVAL);
1108 assert_return(ret, -EINVAL);
1110 e = bus_address_escape(host);
1114 p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1118 r = sd_bus_new(&bus);
1125 bus->bus_client = true;
1127 r = sd_bus_start(bus);
1137 _public_ int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1138 _cleanup_free_ char *e = NULL;
1143 assert_return(machine, -EINVAL);
1144 assert_return(ret, -EINVAL);
1145 assert_return(filename_is_safe(machine), -EINVAL);
1147 e = bus_address_escape(machine);
1152 p = strjoin("kernel:path=/dev/kdbus/ns/machine-", e, "/0-system/bus;x-container:machine=", e, NULL);
1154 p = strjoin("x-container:machine=", e, NULL);
1159 r = sd_bus_new(&bus);
1166 bus->bus_client = true;
1168 r = sd_bus_start(bus);
1178 _public_ void sd_bus_close(sd_bus *bus) {
1182 if (bus->state == BUS_CLOSED)
1184 if (bus_pid_changed(bus))
1187 bus->state = BUS_CLOSED;
1189 sd_bus_detach_event(bus);
1191 /* Drop all queued messages so that they drop references to
1192 * the bus object and the bus may be freed */
1193 bus_reset_queues(bus);
1195 if (!bus->is_kernel)
1198 /* We'll leave the fd open in case this is a kernel bus, since
1199 * there might still be memblocks around that reference this
1200 * bus, and they might need to invoke the * KDBUS_CMD_FREE
1201 * ioctl on the fd when they are freed. */
1204 static void bus_enter_closing(sd_bus *bus) {
1207 if (bus->state != BUS_OPENING &&
1208 bus->state != BUS_AUTHENTICATING &&
1209 bus->state != BUS_HELLO &&
1210 bus->state != BUS_RUNNING)
1213 bus->state = BUS_CLOSING;
1216 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1217 assert_return(bus, NULL);
1219 assert_se(REFCNT_INC(bus->n_ref) >= 2);
1224 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1229 if (REFCNT_DEC(bus->n_ref) <= 0)
1235 _public_ int sd_bus_is_open(sd_bus *bus) {
1237 assert_return(bus, -EINVAL);
1238 assert_return(!bus_pid_changed(bus), -ECHILD);
1240 return BUS_IS_OPEN(bus->state);
1243 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1246 assert_return(bus, -EINVAL);
1247 assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1248 assert_return(!bus_pid_changed(bus), -ECHILD);
1250 if (type == SD_BUS_TYPE_UNIX_FD) {
1251 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1254 r = bus_ensure_running(bus);
1258 return bus->can_fds;
1261 return bus_type_is_valid(type);
1264 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1267 assert_return(bus, -EINVAL);
1268 assert_return(server_id, -EINVAL);
1269 assert_return(!bus_pid_changed(bus), -ECHILD);
1271 r = bus_ensure_running(bus);
1275 *server_id = bus->server_id;
1279 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1283 if (m->header->version > b->message_version)
1287 /* If we copy the same message to multiple
1288 * destinations, avoid using the same serial
1290 b->serial = MAX(b->serial, BUS_MESSAGE_SERIAL(m));
1294 return bus_message_seal(m, ++b->serial);
1297 int bus_seal_synthetic_message(sd_bus *b, sd_bus_message *m) {
1301 if (m->header->version > b->message_version)
1304 /* The bus specification says the serial number cannot be 0,
1305 * hence let's fill something in for synthetic messages. Since
1306 * synthetic messages might have a fake sender and we don't
1307 * want to interfere with the real sender's serial numbers we
1308 * pick a fixed, artifical one. We use (uint32_t) -1 rather
1309 * than (uint64_t) -1 since dbus1 only had 32bit identifiers,
1310 * even though kdbus can do 64bit. */
1312 return bus_message_seal(m, 0xFFFFFFFFULL);
1315 static int bus_write_message(sd_bus *bus, sd_bus_message *message, size_t *idx) {
1320 return bus_kernel_write_message(bus, message);
1322 return bus_socket_write_message(bus, message, idx);
1325 static int dispatch_wqueue(sd_bus *bus) {
1329 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1331 while (bus->wqueue_size > 0) {
1333 r = bus_write_message(bus, bus->wqueue[0], &bus->windex);
1337 /* Didn't do anything this time */
1339 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1340 /* Fully written. Let's drop the entry from
1343 * This isn't particularly optimized, but
1344 * well, this is supposed to be our worst-case
1345 * buffer only, and the socket buffer is
1346 * supposed to be our primary buffer, and if
1347 * it got full, then all bets are off
1350 sd_bus_message_unref(bus->wqueue[0]);
1351 bus->wqueue_size --;
1352 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1362 static int bus_read_message(sd_bus *bus) {
1366 return bus_kernel_read_message(bus);
1368 return bus_socket_read_message(bus);
1371 int bus_rqueue_make_room(sd_bus *bus) {
1375 x = bus->rqueue_size + 1;
1377 if (bus->rqueue_allocated >= x)
1380 if (x > BUS_RQUEUE_MAX)
1383 q = realloc(bus->rqueue, x * sizeof(sd_bus_message*));
1388 bus->rqueue_allocated = x;
1393 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1398 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1401 if (bus->rqueue_size > 0) {
1402 /* Dispatch a queued message */
1404 *m = bus->rqueue[0];
1405 bus->rqueue_size --;
1406 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1410 /* Try to read a new message */
1411 r = bus_read_message(bus);
1421 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1424 assert_return(bus, -EINVAL);
1425 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1426 assert_return(m, -EINVAL);
1427 assert_return(!bus_pid_changed(bus), -ECHILD);
1430 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1437 /* If the serial number isn't kept, then we know that no reply
1439 if (!serial && !m->sealed)
1440 m->header->flags |= BUS_MESSAGE_NO_REPLY_EXPECTED;
1442 r = bus_seal_message(bus, m);
1446 /* If this is a reply and no reply was requested, then let's
1447 * suppress this, if we can */
1448 if (m->dont_send && !serial)
1451 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1454 r = bus_write_message(bus, m, &idx);
1456 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1457 bus_enter_closing(bus);
1460 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1461 /* Wasn't fully written. So let's remember how
1462 * much was written. Note that the first entry
1463 * of the wqueue array is always allocated so
1464 * that we always can remember how much was
1466 bus->wqueue[0] = sd_bus_message_ref(m);
1467 bus->wqueue_size = 1;
1473 /* Just append it to the queue. */
1475 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1478 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1483 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1487 *serial = BUS_MESSAGE_SERIAL(m);
1492 _public_ int sd_bus_send_to(sd_bus *bus, sd_bus_message *m, const char *destination, uint64_t *serial) {
1495 assert_return(bus, -EINVAL);
1496 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1497 assert_return(m, -EINVAL);
1498 assert_return(!bus_pid_changed(bus), -ECHILD);
1500 if (!streq_ptr(m->destination, destination)) {
1505 r = sd_bus_message_set_destination(m, destination);
1510 return sd_bus_send(bus, m, serial);
1513 static usec_t calc_elapse(uint64_t usec) {
1514 if (usec == (uint64_t) -1)
1518 usec = BUS_DEFAULT_TIMEOUT;
1520 return now(CLOCK_MONOTONIC) + usec;
1523 static int timeout_compare(const void *a, const void *b) {
1524 const struct reply_callback *x = a, *y = b;
1526 if (x->timeout != 0 && y->timeout == 0)
1529 if (x->timeout == 0 && y->timeout != 0)
1532 if (x->timeout < y->timeout)
1535 if (x->timeout > y->timeout)
1541 _public_ int sd_bus_call_async(
1544 sd_bus_message_handler_t callback,
1549 struct reply_callback *c;
1552 assert_return(bus, -EINVAL);
1553 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1554 assert_return(m, -EINVAL);
1555 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1556 assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1557 assert_return(callback, -EINVAL);
1558 assert_return(!bus_pid_changed(bus), -ECHILD);
1560 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1564 if (usec != (uint64_t) -1) {
1565 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1570 r = bus_seal_message(bus, m);
1574 c = new0(struct reply_callback, 1);
1578 c->callback = callback;
1579 c->userdata = userdata;
1580 c->serial = BUS_MESSAGE_SERIAL(m);
1581 c->timeout = calc_elapse(usec);
1583 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1589 if (c->timeout != 0) {
1590 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1593 sd_bus_call_async_cancel(bus, c->serial);
1598 r = sd_bus_send(bus, m, serial);
1600 sd_bus_call_async_cancel(bus, c->serial);
1607 _public_ int sd_bus_call_async_cancel(sd_bus *bus, uint64_t serial) {
1608 struct reply_callback *c;
1610 assert_return(bus, -EINVAL);
1611 assert_return(serial != 0, -EINVAL);
1612 assert_return(!bus_pid_changed(bus), -ECHILD);
1614 c = hashmap_remove(bus->reply_callbacks, &serial);
1618 if (c->timeout != 0)
1619 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1625 int bus_ensure_running(sd_bus *bus) {
1630 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED || bus->state == BUS_CLOSING)
1632 if (bus->state == BUS_RUNNING)
1636 r = sd_bus_process(bus, NULL);
1639 if (bus->state == BUS_RUNNING)
1644 r = sd_bus_wait(bus, (uint64_t) -1);
1650 _public_ int sd_bus_call(
1654 sd_bus_error *error,
1655 sd_bus_message **reply) {
1662 assert_return(bus, -EINVAL);
1663 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1664 assert_return(m, -EINVAL);
1665 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1666 assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1667 assert_return(!bus_error_is_dirty(error), -EINVAL);
1668 assert_return(!bus_pid_changed(bus), -ECHILD);
1670 r = bus_ensure_running(bus);
1674 i = bus->rqueue_size;
1676 r = sd_bus_send(bus, m, &serial);
1680 timeout = calc_elapse(usec);
1685 while (i < bus->rqueue_size) {
1686 sd_bus_message *incoming = NULL;
1688 incoming = bus->rqueue[i];
1690 if (incoming->reply_serial == serial) {
1691 /* Found a match! */
1693 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1696 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1701 sd_bus_message_unref(incoming);
1704 } else if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR)
1705 r = sd_bus_error_copy(error, &incoming->error);
1709 sd_bus_message_unref(incoming);
1712 } else if (incoming->header->serial == serial &&
1715 streq(bus->unique_name, incoming->sender)) {
1717 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1720 /* Our own message? Somebody is trying
1721 * to send its own client a message,
1722 * let's not dead-lock, let's fail
1725 sd_bus_message_unref(incoming);
1729 /* Try to read more, right-away */
1733 r = bus_read_message(bus);
1735 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1736 bus_enter_closing(bus);
1746 n = now(CLOCK_MONOTONIC);
1752 left = (uint64_t) -1;
1754 r = bus_poll(bus, true, left);
1760 r = dispatch_wqueue(bus);
1762 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1763 bus_enter_closing(bus);
1770 _public_ int sd_bus_get_fd(sd_bus *bus) {
1772 assert_return(bus, -EINVAL);
1773 assert_return(bus->input_fd == bus->output_fd, -EPERM);
1774 assert_return(!bus_pid_changed(bus), -ECHILD);
1776 return bus->input_fd;
1779 _public_ int sd_bus_get_events(sd_bus *bus) {
1782 assert_return(bus, -EINVAL);
1783 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1784 assert_return(!bus_pid_changed(bus), -ECHILD);
1786 if (bus->state == BUS_OPENING)
1788 else if (bus->state == BUS_AUTHENTICATING) {
1790 if (bus_socket_auth_needs_write(bus))
1795 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1796 if (bus->rqueue_size <= 0)
1798 if (bus->wqueue_size > 0)
1805 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1806 struct reply_callback *c;
1808 assert_return(bus, -EINVAL);
1809 assert_return(timeout_usec, -EINVAL);
1810 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1811 assert_return(!bus_pid_changed(bus), -ECHILD);
1813 if (bus->state == BUS_CLOSING) {
1818 if (bus->state == BUS_AUTHENTICATING) {
1819 *timeout_usec = bus->auth_timeout;
1823 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1824 *timeout_usec = (uint64_t) -1;
1828 if (bus->rqueue_size > 0) {
1833 c = prioq_peek(bus->reply_callbacks_prioq);
1835 *timeout_usec = (uint64_t) -1;
1839 *timeout_usec = c->timeout;
1843 static int process_timeout(sd_bus *bus) {
1844 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1845 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1846 struct reply_callback *c;
1852 c = prioq_peek(bus->reply_callbacks_prioq);
1856 n = now(CLOCK_MONOTONIC);
1860 r = bus_message_new_synthetic_error(
1863 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1868 m->sender = "org.freedesktop.DBus";
1870 r = bus_seal_synthetic_message(bus, m);
1874 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1875 hashmap_remove(bus->reply_callbacks, &c->serial);
1878 bus->iteration_counter ++;
1880 r = c->callback(bus, m, c->userdata, &error_buffer);
1881 r = bus_maybe_reply_error(m, r, &error_buffer);
1884 bus->current = NULL;
1889 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1893 if (bus->state != BUS_HELLO)
1896 /* Let's make sure the first message on the bus is the HELLO
1897 * reply. But note that we don't actually parse the message
1898 * here (we leave that to the usual handling), we just verify
1899 * we don't let any earlier msg through. */
1901 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1902 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1905 if (m->reply_serial != bus->hello_serial)
1911 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1912 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1913 struct reply_callback *c;
1919 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1920 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1923 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1927 if (c->timeout != 0)
1928 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1930 r = sd_bus_message_rewind(m, true);
1934 r = c->callback(bus, m, c->userdata, &error_buffer);
1935 r = bus_maybe_reply_error(m, r, &error_buffer);
1941 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1942 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1943 struct filter_callback *l;
1950 bus->filter_callbacks_modified = false;
1952 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1954 if (bus->filter_callbacks_modified)
1957 /* Don't run this more than once per iteration */
1958 if (l->last_iteration == bus->iteration_counter)
1961 l->last_iteration = bus->iteration_counter;
1963 r = sd_bus_message_rewind(m, true);
1967 r = l->callback(bus, m, l->userdata, &error_buffer);
1968 r = bus_maybe_reply_error(m, r, &error_buffer);
1974 } while (bus->filter_callbacks_modified);
1979 static int process_match(sd_bus *bus, sd_bus_message *m) {
1986 bus->match_callbacks_modified = false;
1988 r = bus_match_run(bus, &bus->match_callbacks, m);
1992 } while (bus->match_callbacks_modified);
1997 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1998 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
2004 if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
2007 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
2010 if (m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED)
2013 if (streq_ptr(m->member, "Ping"))
2014 r = sd_bus_message_new_method_return(m, &reply);
2015 else if (streq_ptr(m->member, "GetMachineId")) {
2019 r = sd_id128_get_machine(&id);
2023 r = sd_bus_message_new_method_return(m, &reply);
2027 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
2029 r = sd_bus_message_new_method_errorf(
2031 SD_BUS_ERROR_UNKNOWN_METHOD,
2032 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
2038 r = sd_bus_send(bus, reply, NULL);
2045 static int process_message(sd_bus *bus, sd_bus_message *m) {
2052 bus->iteration_counter++;
2054 log_debug("Got message sender=%s object=%s interface=%s member=%s",
2055 strna(sd_bus_message_get_sender(m)),
2056 strna(sd_bus_message_get_path(m)),
2057 strna(sd_bus_message_get_interface(m)),
2058 strna(sd_bus_message_get_member(m)));
2060 r = process_hello(bus, m);
2064 r = process_reply(bus, m);
2068 r = process_filter(bus, m);
2072 r = process_match(bus, m);
2076 r = process_builtin(bus, m);
2080 r = bus_process_object(bus, m);
2083 bus->current = NULL;
2087 static int process_running(sd_bus *bus, sd_bus_message **ret) {
2088 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2092 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2094 r = process_timeout(bus);
2098 r = dispatch_wqueue(bus);
2102 r = dispatch_rqueue(bus, &m);
2108 r = process_message(bus, m);
2113 r = sd_bus_message_rewind(m, true);
2122 if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2124 r = sd_bus_reply_method_errorf(
2126 SD_BUS_ERROR_UNKNOWN_OBJECT,
2127 "Unknown object '%s'.", m->path);
2141 static int process_closing(sd_bus *bus, sd_bus_message **ret) {
2142 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2143 struct reply_callback *c;
2147 assert(bus->state == BUS_CLOSING);
2149 c = hashmap_first(bus->reply_callbacks);
2151 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2153 /* First, fail all outstanding method calls */
2154 r = bus_message_new_synthetic_error(
2157 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Connection terminated"),
2162 r = bus_seal_synthetic_message(bus, m);
2166 if (c->timeout != 0)
2167 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
2169 hashmap_remove(bus->reply_callbacks, &c->serial);
2172 bus->iteration_counter++;
2174 r = c->callback(bus, m, c->userdata, &error_buffer);
2175 r = bus_maybe_reply_error(m, r, &error_buffer);
2181 /* Then, synthesize a Disconnected message */
2182 r = sd_bus_message_new_signal(
2184 "/org/freedesktop/DBus/Local",
2185 "org.freedesktop.DBus.Local",
2191 m->sender = "org.freedesktop.DBus.Local";
2193 r = bus_seal_synthetic_message(bus, m);
2200 bus->iteration_counter++;
2202 r = process_filter(bus, m);
2206 r = process_match(bus, m);
2218 bus->current = NULL;
2222 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2223 BUS_DONT_DESTROY(bus);
2226 /* Returns 0 when we didn't do anything. This should cause the
2227 * caller to invoke sd_bus_wait() before returning the next
2228 * time. Returns > 0 when we did something, which possibly
2229 * means *ret is filled in with an unprocessed message. */
2231 assert_return(bus, -EINVAL);
2232 assert_return(!bus_pid_changed(bus), -ECHILD);
2234 /* We don't allow recursively invoking sd_bus_process(). */
2235 assert_return(!bus->current, -EBUSY);
2237 switch (bus->state) {
2244 r = bus_socket_process_opening(bus);
2245 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2246 bus_enter_closing(bus);
2254 case BUS_AUTHENTICATING:
2255 r = bus_socket_process_authenticating(bus);
2256 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2257 bus_enter_closing(bus);
2269 r = process_running(bus, ret);
2270 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2271 bus_enter_closing(bus);
2281 return process_closing(bus, ret);
2284 assert_not_reached("Unknown state");
2287 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2288 struct pollfd p[2] = {};
2291 usec_t m = (usec_t) -1;
2295 if (bus->state == BUS_CLOSING)
2298 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2300 e = sd_bus_get_events(bus);
2305 /* The caller really needs some more data, he doesn't
2306 * care about what's already read, or any timeouts
2311 /* The caller wants to process if there's something to
2312 * process, but doesn't care otherwise */
2314 r = sd_bus_get_timeout(bus, &until);
2319 nw = now(CLOCK_MONOTONIC);
2320 m = until > nw ? until - nw : 0;
2324 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2327 p[0].fd = bus->input_fd;
2328 if (bus->output_fd == bus->input_fd) {
2332 p[0].events = e & POLLIN;
2333 p[1].fd = bus->output_fd;
2334 p[1].events = e & POLLOUT;
2338 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2342 return r > 0 ? 1 : 0;
2345 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2347 assert_return(bus, -EINVAL);
2348 assert_return(!bus_pid_changed(bus), -ECHILD);
2350 if (bus->state == BUS_CLOSING)
2353 assert_return(BUS_IS_OPEN(bus->state) , -ENOTCONN);
2355 if (bus->rqueue_size > 0)
2358 return bus_poll(bus, false, timeout_usec);
2361 _public_ int sd_bus_flush(sd_bus *bus) {
2364 assert_return(bus, -EINVAL);
2365 assert_return(!bus_pid_changed(bus), -ECHILD);
2367 if (bus->state == BUS_CLOSING)
2370 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2372 r = bus_ensure_running(bus);
2376 if (bus->wqueue_size <= 0)
2380 r = dispatch_wqueue(bus);
2382 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
2383 bus_enter_closing(bus);
2388 if (bus->wqueue_size <= 0)
2391 r = bus_poll(bus, false, (uint64_t) -1);
2397 _public_ int sd_bus_add_filter(sd_bus *bus,
2398 sd_bus_message_handler_t callback,
2401 struct filter_callback *f;
2403 assert_return(bus, -EINVAL);
2404 assert_return(callback, -EINVAL);
2405 assert_return(!bus_pid_changed(bus), -ECHILD);
2407 f = new0(struct filter_callback, 1);
2410 f->callback = callback;
2411 f->userdata = userdata;
2413 bus->filter_callbacks_modified = true;
2414 LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2418 _public_ int sd_bus_remove_filter(sd_bus *bus,
2419 sd_bus_message_handler_t callback,
2422 struct filter_callback *f;
2424 assert_return(bus, -EINVAL);
2425 assert_return(callback, -EINVAL);
2426 assert_return(!bus_pid_changed(bus), -ECHILD);
2428 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2429 if (f->callback == callback && f->userdata == userdata) {
2430 bus->filter_callbacks_modified = true;
2431 LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2440 _public_ int sd_bus_add_match(sd_bus *bus,
2442 sd_bus_message_handler_t callback,
2445 struct bus_match_component *components = NULL;
2446 unsigned n_components = 0;
2447 uint64_t cookie = 0;
2450 assert_return(bus, -EINVAL);
2451 assert_return(match, -EINVAL);
2452 assert_return(!bus_pid_changed(bus), -ECHILD);
2454 r = bus_match_parse(match, &components, &n_components);
2458 if (bus->bus_client) {
2459 cookie = ++bus->match_cookie;
2461 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2466 bus->match_callbacks_modified = true;
2467 r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2469 if (bus->bus_client)
2470 bus_remove_match_internal(bus, match, cookie);
2474 bus_match_parse_free(components, n_components);
2478 _public_ int sd_bus_remove_match(sd_bus *bus,
2480 sd_bus_message_handler_t callback,
2483 struct bus_match_component *components = NULL;
2484 unsigned n_components = 0;
2486 uint64_t cookie = 0;
2488 assert_return(bus, -EINVAL);
2489 assert_return(match, -EINVAL);
2490 assert_return(!bus_pid_changed(bus), -ECHILD);
2492 r = bus_match_parse(match, &components, &n_components);
2496 bus->match_callbacks_modified = true;
2497 r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2499 if (bus->bus_client)
2500 q = bus_remove_match_internal(bus, match, cookie);
2502 bus_match_parse_free(components, n_components);
2504 return r < 0 ? r : q;
2507 bool bus_pid_changed(sd_bus *bus) {
2510 /* We don't support people creating a bus connection and
2511 * keeping it around over a fork(). Let's complain. */
2513 return bus->original_pid != getpid();
2516 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2517 sd_bus *bus = userdata;
2522 r = sd_bus_process(bus, NULL);
2529 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2530 sd_bus *bus = userdata;
2535 r = sd_bus_process(bus, NULL);
2542 static int prepare_callback(sd_event_source *s, void *userdata) {
2543 sd_bus *bus = userdata;
2550 e = sd_bus_get_events(bus);
2554 if (bus->output_fd != bus->input_fd) {
2556 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2560 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2564 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2569 r = sd_bus_get_timeout(bus, &until);
2575 j = sd_event_source_set_time(bus->time_event_source, until);
2580 r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2587 static int quit_callback(sd_event_source *event, void *userdata) {
2588 sd_bus *bus = userdata;
2597 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2600 assert_return(bus, -EINVAL);
2601 assert_return(!bus->event, -EBUSY);
2603 assert(!bus->input_io_event_source);
2604 assert(!bus->output_io_event_source);
2605 assert(!bus->time_event_source);
2608 bus->event = sd_event_ref(event);
2610 r = sd_event_default(&bus->event);
2615 r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2619 r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2623 if (bus->output_fd != bus->input_fd) {
2624 r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2628 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2633 r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2637 r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
2641 r = sd_event_source_set_priority(bus->time_event_source, priority);
2645 r = sd_event_add_quit(bus->event, quit_callback, bus, &bus->quit_event_source);
2652 sd_bus_detach_event(bus);
2656 _public_ int sd_bus_detach_event(sd_bus *bus) {
2657 assert_return(bus, -EINVAL);
2662 if (bus->input_io_event_source) {
2663 sd_event_source_set_enabled(bus->input_io_event_source, SD_EVENT_OFF);
2664 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2667 if (bus->output_io_event_source) {
2668 sd_event_source_set_enabled(bus->output_io_event_source, SD_EVENT_OFF);
2669 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2672 if (bus->time_event_source) {
2673 sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);
2674 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2677 if (bus->quit_event_source) {
2678 sd_event_source_set_enabled(bus->quit_event_source, SD_EVENT_OFF);
2679 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2683 bus->event = sd_event_unref(bus->event);
2688 _public_ sd_event* sd_bus_get_event(sd_bus *bus) {
2689 assert_return(bus, NULL);
2694 _public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2695 assert_return(bus, NULL);
2697 return bus->current;
2700 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
2705 assert(default_bus);
2708 return !!*default_bus;
2711 *ret = sd_bus_ref(*default_bus);
2719 b->default_bus_ptr = default_bus;
2727 _public_ int sd_bus_default_system(sd_bus **ret) {
2728 static __thread sd_bus *default_system_bus = NULL;
2730 return bus_default(sd_bus_open_system, &default_system_bus, ret);
2733 _public_ int sd_bus_default_user(sd_bus **ret) {
2734 static __thread sd_bus *default_user_bus = NULL;
2736 return bus_default(sd_bus_open_user, &default_user_bus, ret);
2739 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
2740 assert_return(b, -EINVAL);
2741 assert_return(tid, -EINVAL);
2742 assert_return(!bus_pid_changed(b), -ECHILD);
2750 return sd_event_get_tid(b->event, tid);
2755 _public_ char *sd_bus_label_escape(const char *s) {
2759 assert_return(s, NULL);
2761 /* Escapes all chars that D-Bus' object path cannot deal
2762 * with. Can be reversed with bus_path_unescape(). We special
2763 * case the empty string. */
2768 r = new(char, strlen(s)*3 + 1);
2772 for (f = s, t = r; *f; f++) {
2774 /* Escape everything that is not a-zA-Z0-9. We also
2775 * escape 0-9 if it's the first character */
2777 if (!(*f >= 'A' && *f <= 'Z') &&
2778 !(*f >= 'a' && *f <= 'z') &&
2779 !(f > s && *f >= '0' && *f <= '9')) {
2781 *(t++) = hexchar(*f >> 4);
2782 *(t++) = hexchar(*f);
2792 _public_ char *sd_bus_label_unescape(const char *f) {
2795 assert_return(f, NULL);
2797 /* Special case for the empty string */
2801 r = new(char, strlen(f) + 1);
2805 for (t = r; *f; f++) {
2810 if ((a = unhexchar(f[1])) < 0 ||
2811 (b = unhexchar(f[2])) < 0) {
2812 /* Invalid escape code, let's take it literal then */
2815 *(t++) = (char) ((a << 4) | b);
2827 _public_ int sd_bus_get_peer_creds(sd_bus *bus, uint64_t mask, sd_bus_creds **ret) {
2832 assert_return(bus, -EINVAL);
2833 assert_return(mask <= _SD_BUS_CREDS_ALL, -ENOTSUP);
2834 assert_return(ret, -EINVAL);
2835 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2836 assert_return(!bus_pid_changed(bus), -ECHILD);
2837 assert_return(!bus->is_kernel, -ENOTSUP);
2839 if (!bus->ucred_valid && !isempty(bus->label))
2842 c = bus_creds_new();
2846 if (bus->ucred_valid) {
2847 pid = c->pid = bus->ucred.pid;
2848 c->uid = bus->ucred.uid;
2849 c->gid = bus->ucred.gid;
2851 c->mask |= (SD_BUS_CREDS_UID | SD_BUS_CREDS_PID | SD_BUS_CREDS_GID) & mask;
2854 if (!isempty(bus->label) && (mask & SD_BUS_CREDS_SELINUX_CONTEXT)) {
2855 c->label = strdup(bus->label);
2857 sd_bus_creds_unref(c);
2861 c->mask |= SD_BUS_CREDS_SELINUX_CONTEXT;
2864 r = bus_creds_add_more(c, mask, pid, 0);