1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
39 #include "bus-internal.h"
40 #include "bus-message.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
49 #include "bus-container.h"
50 #include "bus-protocol.h"
52 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
54 static void bus_close_fds(sd_bus *b) {
58 close_nointr_nofail(b->input_fd);
60 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
61 close_nointr_nofail(b->output_fd);
63 b->input_fd = b->output_fd = -1;
66 static void bus_node_destroy(sd_bus *b, struct node *n) {
67 struct node_callback *c;
68 struct node_vtable *v;
69 struct node_enumerator *e;
77 bus_node_destroy(b, n->child);
79 while ((c = n->callbacks)) {
80 LIST_REMOVE(callbacks, n->callbacks, c);
84 while ((v = n->vtables)) {
85 LIST_REMOVE(vtables, n->vtables, v);
90 while ((e = n->enumerators)) {
91 LIST_REMOVE(enumerators, n->enumerators, e);
96 LIST_REMOVE(siblings, n->parent->child, n);
98 assert_se(hashmap_remove(b->nodes, n->path) == n);
103 static void bus_reset_queues(sd_bus *b) {
108 for (i = 0; i < b->rqueue_size; i++)
109 sd_bus_message_unref(b->rqueue[i]);
112 for (i = 0; i < b->wqueue_size; i++)
113 sd_bus_message_unref(b->wqueue[i]);
116 b->rqueue = b->wqueue = NULL;
117 b->rqueue_size = b->wqueue_size = 0;
120 static void bus_free(sd_bus *b) {
121 struct filter_callback *f;
126 sd_bus_detach_event(b);
131 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
134 free(b->unique_name);
135 free(b->auth_buffer);
141 strv_free(b->exec_argv);
143 close_many(b->fds, b->n_fds);
148 hashmap_free_free(b->reply_callbacks);
149 prioq_free(b->reply_callbacks_prioq);
151 while ((f = b->filter_callbacks)) {
152 LIST_REMOVE(callbacks, b->filter_callbacks, f);
156 bus_match_free(&b->match_callbacks);
158 hashmap_free_free(b->vtable_methods);
159 hashmap_free_free(b->vtable_properties);
161 while ((n = hashmap_first(b->nodes)))
162 bus_node_destroy(b, n);
164 hashmap_free(b->nodes);
166 bus_kernel_flush_memfd(b);
168 assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
173 _public_ int sd_bus_new(sd_bus **ret) {
176 assert_return(ret, -EINVAL);
182 r->n_ref = REFCNT_INIT;
183 r->input_fd = r->output_fd = -1;
184 r->message_version = 1;
185 r->creds_mask |= SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
186 r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
187 r->attach_flags |= KDBUS_ATTACH_NAMES;
188 r->original_pid = getpid();
190 assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
192 /* We guarantee that wqueue always has space for at least one
194 r->wqueue = new(sd_bus_message*, 1);
204 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
207 assert_return(bus, -EINVAL);
208 assert_return(bus->state == BUS_UNSET, -EPERM);
209 assert_return(address, -EINVAL);
210 assert_return(!bus_pid_changed(bus), -ECHILD);
222 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
223 assert_return(bus, -EINVAL);
224 assert_return(bus->state == BUS_UNSET, -EPERM);
225 assert_return(input_fd >= 0, -EINVAL);
226 assert_return(output_fd >= 0, -EINVAL);
227 assert_return(!bus_pid_changed(bus), -ECHILD);
229 bus->input_fd = input_fd;
230 bus->output_fd = output_fd;
234 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
237 assert_return(bus, -EINVAL);
238 assert_return(bus->state == BUS_UNSET, -EPERM);
239 assert_return(path, -EINVAL);
240 assert_return(!strv_isempty(argv), -EINVAL);
241 assert_return(!bus_pid_changed(bus), -ECHILD);
253 free(bus->exec_path);
254 strv_free(bus->exec_argv);
262 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
263 assert_return(bus, -EINVAL);
264 assert_return(bus->state == BUS_UNSET, -EPERM);
265 assert_return(!bus_pid_changed(bus), -ECHILD);
267 bus->bus_client = !!b;
271 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
272 assert_return(bus, -EINVAL);
273 assert_return(bus->state == BUS_UNSET, -EPERM);
274 assert_return(!bus_pid_changed(bus), -ECHILD);
276 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
280 _public_ int sd_bus_negotiate_attach_timestamp(sd_bus *bus, int b) {
281 assert_return(bus, -EINVAL);
282 assert_return(bus->state == BUS_UNSET, -EPERM);
283 assert_return(!bus_pid_changed(bus), -ECHILD);
285 SET_FLAG(bus->attach_flags, KDBUS_ATTACH_TIMESTAMP, b);
289 _public_ int sd_bus_negotiate_attach_creds(sd_bus *bus, uint64_t mask) {
290 assert_return(bus, -EINVAL);
291 assert_return(mask <= _SD_BUS_CREDS_ALL, -EINVAL);
292 assert_return(bus->state == BUS_UNSET, -EPERM);
293 assert_return(!bus_pid_changed(bus), -ECHILD);
295 /* The well knowns we need unconditionally, so that matches can work */
296 bus->creds_mask = mask | SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
298 return kdbus_translate_attach_flags(bus->creds_mask, &bus->creds_mask);
301 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
302 assert_return(bus, -EINVAL);
303 assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
304 assert_return(bus->state == BUS_UNSET, -EPERM);
305 assert_return(!bus_pid_changed(bus), -ECHILD);
307 bus->is_server = !!b;
308 bus->server_id = server_id;
312 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
313 assert_return(bus, -EINVAL);
314 assert_return(bus->state == BUS_UNSET, -EPERM);
315 assert_return(!bus_pid_changed(bus), -ECHILD);
317 bus->anonymous_auth = !!b;
321 _public_ int sd_bus_set_trusted(sd_bus *bus, int b) {
322 assert_return(bus, -EINVAL);
323 assert_return(bus->state == BUS_UNSET, -EPERM);
324 assert_return(!bus_pid_changed(bus), -ECHILD);
330 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata, sd_bus_error *error) {
335 assert(bus->state == BUS_HELLO || bus->state == BUS_CLOSING);
338 r = sd_bus_message_get_errno(reply);
344 r = sd_bus_message_read(reply, "s", &s);
348 if (!service_name_is_valid(s) || s[0] != ':')
351 bus->unique_name = strdup(s);
352 if (!bus->unique_name)
355 if (bus->state == BUS_HELLO)
356 bus->state = BUS_RUNNING;
361 static int bus_send_hello(sd_bus *bus) {
362 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
367 if (!bus->bus_client || bus->is_kernel)
370 r = sd_bus_message_new_method_call(
372 "org.freedesktop.DBus",
374 "org.freedesktop.DBus",
380 return sd_bus_call_async(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
383 int bus_start_running(sd_bus *bus) {
386 if (bus->bus_client && !bus->is_kernel) {
387 bus->state = BUS_HELLO;
391 bus->state = BUS_RUNNING;
395 static int parse_address_key(const char **p, const char *key, char **value) {
406 if (strncmp(*p, key, l) != 0)
419 while (*a != ';' && *a != ',' && *a != 0) {
437 c = (char) ((x << 4) | y);
444 t = realloc(r, n + 2);
472 static void skip_address_key(const char **p) {
476 *p += strcspn(*p, ",");
482 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
483 _cleanup_free_ char *path = NULL, *abstract = NULL;
492 while (**p != 0 && **p != ';') {
493 r = parse_address_key(p, "guid", guid);
499 r = parse_address_key(p, "path", &path);
505 r = parse_address_key(p, "abstract", &abstract);
514 if (!path && !abstract)
517 if (path && abstract)
522 if (l > sizeof(b->sockaddr.un.sun_path))
525 b->sockaddr.un.sun_family = AF_UNIX;
526 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
527 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
528 } else if (abstract) {
529 l = strlen(abstract);
530 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
533 b->sockaddr.un.sun_family = AF_UNIX;
534 b->sockaddr.un.sun_path[0] = 0;
535 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
536 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
542 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
543 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
545 struct addrinfo *result, hints = {
546 .ai_socktype = SOCK_STREAM,
547 .ai_flags = AI_ADDRCONFIG,
555 while (**p != 0 && **p != ';') {
556 r = parse_address_key(p, "guid", guid);
562 r = parse_address_key(p, "host", &host);
568 r = parse_address_key(p, "port", &port);
574 r = parse_address_key(p, "family", &family);
587 if (streq(family, "ipv4"))
588 hints.ai_family = AF_INET;
589 else if (streq(family, "ipv6"))
590 hints.ai_family = AF_INET6;
595 r = getaddrinfo(host, port, &hints, &result);
599 return -EADDRNOTAVAIL;
601 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
602 b->sockaddr_size = result->ai_addrlen;
604 freeaddrinfo(result);
609 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
611 unsigned n_argv = 0, j;
620 while (**p != 0 && **p != ';') {
621 r = parse_address_key(p, "guid", guid);
627 r = parse_address_key(p, "path", &path);
633 if (startswith(*p, "argv")) {
637 ul = strtoul(*p + 4, (char**) p, 10);
638 if (errno > 0 || **p != '=' || ul > 256) {
648 x = realloc(argv, sizeof(char*) * (ul + 2));
654 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
660 r = parse_address_key(p, NULL, argv + ul);
675 /* Make sure there are no holes in the array, with the
676 * exception of argv[0] */
677 for (j = 1; j < n_argv; j++)
683 if (argv && argv[0] == NULL) {
684 argv[0] = strdup(path);
696 for (j = 0; j < n_argv; j++)
704 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
705 _cleanup_free_ char *path = NULL;
713 while (**p != 0 && **p != ';') {
714 r = parse_address_key(p, "guid", guid);
720 r = parse_address_key(p, "path", &path);
739 static int parse_container_address(sd_bus *b, const char **p, char **guid) {
740 _cleanup_free_ char *machine = NULL;
748 while (**p != 0 && **p != ';') {
749 r = parse_address_key(p, "guid", guid);
755 r = parse_address_key(p, "machine", &machine);
767 if (!filename_is_safe(machine))
771 b->machine = machine;
774 b->sockaddr.un.sun_family = AF_UNIX;
775 strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
776 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
781 static void bus_reset_parsed_address(sd_bus *b) {
785 b->sockaddr_size = 0;
786 strv_free(b->exec_argv);
790 b->server_id = SD_ID128_NULL;
797 static int bus_parse_next_address(sd_bus *b) {
798 _cleanup_free_ char *guid = NULL;
806 if (b->address[b->address_index] == 0)
809 bus_reset_parsed_address(b);
811 a = b->address + b->address_index;
820 if (startswith(a, "unix:")) {
823 r = parse_unix_address(b, &a, &guid);
828 } else if (startswith(a, "tcp:")) {
831 r = parse_tcp_address(b, &a, &guid);
837 } else if (startswith(a, "unixexec:")) {
840 r = parse_exec_address(b, &a, &guid);
846 } else if (startswith(a, "kernel:")) {
849 r = parse_kernel_address(b, &a, &guid);
854 } else if (startswith(a, "x-container:")) {
857 r = parse_container_address(b, &a, &guid);
870 r = sd_id128_from_string(guid, &b->server_id);
875 b->address_index = a - b->address;
879 static int bus_start_address(sd_bus *b) {
889 r = bus_socket_exec(b);
893 b->last_connect_error = -r;
894 } else if (b->kernel) {
896 r = bus_kernel_connect(b);
900 b->last_connect_error = -r;
902 } else if (b->machine) {
904 r = bus_container_connect(b);
908 b->last_connect_error = -r;
910 } else if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
912 r = bus_socket_connect(b);
916 b->last_connect_error = -r;
919 r = bus_parse_next_address(b);
923 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
927 int bus_next_address(sd_bus *b) {
930 bus_reset_parsed_address(b);
931 return bus_start_address(b);
934 static int bus_start_fd(sd_bus *b) {
939 assert(b->input_fd >= 0);
940 assert(b->output_fd >= 0);
942 r = fd_nonblock(b->input_fd, true);
946 r = fd_cloexec(b->input_fd, true);
950 if (b->input_fd != b->output_fd) {
951 r = fd_nonblock(b->output_fd, true);
955 r = fd_cloexec(b->output_fd, true);
960 if (fstat(b->input_fd, &st) < 0)
963 if (S_ISCHR(b->input_fd))
964 return bus_kernel_take_fd(b);
966 return bus_socket_take_fd(b);
969 _public_ int sd_bus_start(sd_bus *bus) {
972 assert_return(bus, -EINVAL);
973 assert_return(bus->state == BUS_UNSET, -EPERM);
974 assert_return(!bus_pid_changed(bus), -ECHILD);
976 bus->state = BUS_OPENING;
978 if (bus->is_server && bus->bus_client)
981 if (bus->input_fd >= 0)
982 r = bus_start_fd(bus);
983 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
984 r = bus_start_address(bus);
991 return bus_send_hello(bus);
994 _public_ int sd_bus_open_system(sd_bus **ret) {
999 assert_return(ret, -EINVAL);
1005 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1007 r = sd_bus_set_address(b, e);
1010 r = sd_bus_set_address(b, "kernel:path=/dev/kdbus/0-system/bus;unix:path=/run/dbus/system_bus_socket");
1012 r = sd_bus_set_address(b, "unix:path=/run/dbus/system_bus_socket");
1018 b->bus_client = true;
1020 /* Let's do per-method access control on the system bus. We
1021 * need the caller's UID and capability set for that. */
1023 b->attach_flags |= KDBUS_ATTACH_CAPS | KDBUS_ATTACH_CREDS;
1025 r = sd_bus_start(b);
1037 _public_ int sd_bus_open_user(sd_bus **ret) {
1042 assert_return(ret, -EINVAL);
1048 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1050 r = sd_bus_set_address(b, e);
1054 e = secure_getenv("XDG_RUNTIME_DIR");
1056 _cleanup_free_ char *ee = NULL;
1058 ee = bus_address_escape(e);
1065 asprintf(&b->address, "kernel:path=/dev/kdbus/%lu-user/bus;unix:path=%s/bus", (unsigned long) getuid(), ee);
1067 b->address = strjoin("unix:path=", ee, "/bus", NULL);
1071 asprintf(&b->address, "kernel:path=/dev/kdbus/%lu-user/bus", (unsigned long) getuid());
1073 return -ECONNREFUSED;
1083 b->bus_client = true;
1085 /* We don't do any per-method access control on the user
1089 r = sd_bus_start(b);
1101 _public_ int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1102 _cleanup_free_ char *e = NULL;
1107 assert_return(host, -EINVAL);
1108 assert_return(ret, -EINVAL);
1110 e = bus_address_escape(host);
1114 p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1118 r = sd_bus_new(&bus);
1125 bus->bus_client = true;
1127 r = sd_bus_start(bus);
1137 _public_ int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1138 _cleanup_free_ char *e = NULL;
1143 assert_return(machine, -EINVAL);
1144 assert_return(ret, -EINVAL);
1145 assert_return(filename_is_safe(machine), -EINVAL);
1147 e = bus_address_escape(machine);
1152 p = strjoin("kernel:path=/dev/kdbus/ns/machine-", e, "/0-system/bus;x-container:machine=", e, NULL);
1154 p = strjoin("x-container:machine=", e, NULL);
1159 r = sd_bus_new(&bus);
1166 bus->bus_client = true;
1168 r = sd_bus_start(bus);
1178 _public_ void sd_bus_close(sd_bus *bus) {
1182 if (bus->state == BUS_CLOSED)
1184 if (bus_pid_changed(bus))
1187 bus->state = BUS_CLOSED;
1189 sd_bus_detach_event(bus);
1191 /* Drop all queued messages so that they drop references to
1192 * the bus object and the bus may be freed */
1193 bus_reset_queues(bus);
1195 if (!bus->is_kernel)
1198 /* We'll leave the fd open in case this is a kernel bus, since
1199 * there might still be memblocks around that reference this
1200 * bus, and they might need to invoke the * KDBUS_CMD_FREE
1201 * ioctl on the fd when they are freed. */
1204 static void bus_enter_closing(sd_bus *bus) {
1207 if (bus->state != BUS_OPENING &&
1208 bus->state != BUS_AUTHENTICATING &&
1209 bus->state != BUS_HELLO &&
1210 bus->state != BUS_RUNNING)
1213 bus->state = BUS_CLOSING;
1216 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1217 assert_return(bus, NULL);
1219 assert_se(REFCNT_INC(bus->n_ref) >= 2);
1224 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1229 if (REFCNT_DEC(bus->n_ref) <= 0)
1235 _public_ int sd_bus_is_open(sd_bus *bus) {
1237 assert_return(bus, -EINVAL);
1238 assert_return(!bus_pid_changed(bus), -ECHILD);
1240 return BUS_IS_OPEN(bus->state);
1243 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1246 assert_return(bus, -EINVAL);
1247 assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1248 assert_return(!bus_pid_changed(bus), -ECHILD);
1250 if (type == SD_BUS_TYPE_UNIX_FD) {
1251 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1254 r = bus_ensure_running(bus);
1258 return bus->can_fds;
1261 return bus_type_is_valid(type);
1264 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1267 assert_return(bus, -EINVAL);
1268 assert_return(server_id, -EINVAL);
1269 assert_return(!bus_pid_changed(bus), -ECHILD);
1271 r = bus_ensure_running(bus);
1275 *server_id = bus->server_id;
1279 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1283 if (m->header->version > b->message_version)
1287 /* If we copy the same message to multiple
1288 * destinations, avoid using the same serial
1290 b->serial = MAX(b->serial, BUS_MESSAGE_SERIAL(m));
1294 return bus_message_seal(m, ++b->serial);
1297 int bus_seal_synthetic_message(sd_bus *b, sd_bus_message *m) {
1301 if (m->header->version > b->message_version)
1304 /* The bus specification says the serial number cannot be 0,
1305 * hence let's fill something in for synthetic messages. Since
1306 * synthetic messages might have a fake sender and we don't
1307 * want to interfere with the real sender's serial numbers we
1308 * pick a fixed, artifical one. We use (uint32_t) -1 rather
1309 * than (uint64_t) -1 since dbus1 only had 32bit identifiers,
1310 * even though kdbus can do 64bit. */
1312 return bus_message_seal(m, 0xFFFFFFFFULL);
1315 static int bus_write_message(sd_bus *bus, sd_bus_message *message, size_t *idx) {
1320 return bus_kernel_write_message(bus, message);
1322 return bus_socket_write_message(bus, message, idx);
1325 static int dispatch_wqueue(sd_bus *bus) {
1329 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1331 while (bus->wqueue_size > 0) {
1333 r = bus_write_message(bus, bus->wqueue[0], &bus->windex);
1337 /* Didn't do anything this time */
1339 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1340 /* Fully written. Let's drop the entry from
1343 * This isn't particularly optimized, but
1344 * well, this is supposed to be our worst-case
1345 * buffer only, and the socket buffer is
1346 * supposed to be our primary buffer, and if
1347 * it got full, then all bets are off
1350 sd_bus_message_unref(bus->wqueue[0]);
1351 bus->wqueue_size --;
1352 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1362 static int bus_read_message(sd_bus *bus) {
1366 return bus_kernel_read_message(bus);
1368 return bus_socket_read_message(bus);
1371 int bus_rqueue_make_room(sd_bus *bus) {
1375 x = bus->rqueue_size + 1;
1377 if (bus->rqueue_allocated >= x)
1380 if (x > BUS_RQUEUE_MAX)
1383 q = realloc(bus->rqueue, x * sizeof(sd_bus_message*));
1388 bus->rqueue_allocated = x;
1393 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1398 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1401 if (bus->rqueue_size > 0) {
1402 /* Dispatch a queued message */
1404 *m = bus->rqueue[0];
1405 bus->rqueue_size --;
1406 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1410 /* Try to read a new message */
1411 r = bus_read_message(bus);
1421 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1424 assert_return(bus, -EINVAL);
1425 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1426 assert_return(m, -EINVAL);
1427 assert_return(!bus_pid_changed(bus), -ECHILD);
1430 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1437 /* If the serial number isn't kept, then we know that no reply
1439 if (!serial && !m->sealed)
1440 m->header->flags |= BUS_MESSAGE_NO_REPLY_EXPECTED;
1442 r = bus_seal_message(bus, m);
1446 /* If this is a reply and no reply was requested, then let's
1447 * suppress this, if we can */
1448 if (m->dont_send && !serial)
1451 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1454 r = bus_write_message(bus, m, &idx);
1456 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1457 bus_enter_closing(bus);
1460 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1461 /* Wasn't fully written. So let's remember how
1462 * much was written. Note that the first entry
1463 * of the wqueue array is always allocated so
1464 * that we always can remember how much was
1466 bus->wqueue[0] = sd_bus_message_ref(m);
1467 bus->wqueue_size = 1;
1473 /* Just append it to the queue. */
1475 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1478 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1483 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1487 *serial = BUS_MESSAGE_SERIAL(m);
1492 _public_ int sd_bus_send_to(sd_bus *bus, sd_bus_message *m, const char *destination, uint64_t *serial) {
1495 assert_return(bus, -EINVAL);
1496 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1497 assert_return(m, -EINVAL);
1498 assert_return(!bus_pid_changed(bus), -ECHILD);
1500 if (!streq_ptr(m->destination, destination)) {
1505 r = sd_bus_message_set_destination(m, destination);
1510 return sd_bus_send(bus, m, serial);
1513 static usec_t calc_elapse(uint64_t usec) {
1514 if (usec == (uint64_t) -1)
1518 usec = BUS_DEFAULT_TIMEOUT;
1520 return now(CLOCK_MONOTONIC) + usec;
1523 static int timeout_compare(const void *a, const void *b) {
1524 const struct reply_callback *x = a, *y = b;
1526 if (x->timeout != 0 && y->timeout == 0)
1529 if (x->timeout == 0 && y->timeout != 0)
1532 if (x->timeout < y->timeout)
1535 if (x->timeout > y->timeout)
1541 _public_ int sd_bus_call_async(
1544 sd_bus_message_handler_t callback,
1549 struct reply_callback *c;
1552 assert_return(bus, -EINVAL);
1553 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1554 assert_return(m, -EINVAL);
1555 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1556 assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1557 assert_return(callback, -EINVAL);
1558 assert_return(!bus_pid_changed(bus), -ECHILD);
1560 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1564 if (usec != (uint64_t) -1) {
1565 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1570 r = bus_seal_message(bus, m);
1574 c = new0(struct reply_callback, 1);
1578 c->callback = callback;
1579 c->userdata = userdata;
1580 c->serial = BUS_MESSAGE_SERIAL(m);
1581 c->timeout = calc_elapse(usec);
1583 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1589 if (c->timeout != 0) {
1590 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1593 sd_bus_call_async_cancel(bus, c->serial);
1598 r = sd_bus_send(bus, m, serial);
1600 sd_bus_call_async_cancel(bus, c->serial);
1607 _public_ int sd_bus_call_async_cancel(sd_bus *bus, uint64_t serial) {
1608 struct reply_callback *c;
1610 assert_return(bus, -EINVAL);
1611 assert_return(serial != 0, -EINVAL);
1612 assert_return(!bus_pid_changed(bus), -ECHILD);
1614 c = hashmap_remove(bus->reply_callbacks, &serial);
1618 if (c->timeout != 0)
1619 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1625 int bus_ensure_running(sd_bus *bus) {
1630 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED || bus->state == BUS_CLOSING)
1632 if (bus->state == BUS_RUNNING)
1636 r = sd_bus_process(bus, NULL);
1639 if (bus->state == BUS_RUNNING)
1644 r = sd_bus_wait(bus, (uint64_t) -1);
1650 _public_ int sd_bus_call(
1654 sd_bus_error *error,
1655 sd_bus_message **reply) {
1662 assert_return(bus, -EINVAL);
1663 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1664 assert_return(m, -EINVAL);
1665 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1666 assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1667 assert_return(!bus_error_is_dirty(error), -EINVAL);
1668 assert_return(!bus_pid_changed(bus), -ECHILD);
1670 r = bus_ensure_running(bus);
1674 i = bus->rqueue_size;
1676 r = sd_bus_send(bus, m, &serial);
1680 timeout = calc_elapse(usec);
1685 while (i < bus->rqueue_size) {
1686 sd_bus_message *incoming = NULL;
1688 incoming = bus->rqueue[i];
1690 if (incoming->reply_serial == serial) {
1691 /* Found a match! */
1693 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1696 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1701 sd_bus_message_unref(incoming);
1704 } else if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR)
1705 r = sd_bus_error_copy(error, &incoming->error);
1709 sd_bus_message_unref(incoming);
1712 } else if (incoming->header->serial == serial &&
1715 streq(bus->unique_name, incoming->sender)) {
1717 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1720 /* Our own message? Somebody is trying
1721 * to send its own client a message,
1722 * let's not dead-lock, let's fail
1725 sd_bus_message_unref(incoming);
1729 /* Try to read more, right-away */
1733 r = bus_read_message(bus);
1735 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1736 bus_enter_closing(bus);
1746 n = now(CLOCK_MONOTONIC);
1752 left = (uint64_t) -1;
1754 r = bus_poll(bus, true, left);
1758 r = dispatch_wqueue(bus);
1760 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1761 bus_enter_closing(bus);
1768 _public_ int sd_bus_get_fd(sd_bus *bus) {
1770 assert_return(bus, -EINVAL);
1771 assert_return(bus->input_fd == bus->output_fd, -EPERM);
1772 assert_return(!bus_pid_changed(bus), -ECHILD);
1774 return bus->input_fd;
1777 _public_ int sd_bus_get_events(sd_bus *bus) {
1780 assert_return(bus, -EINVAL);
1781 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1782 assert_return(!bus_pid_changed(bus), -ECHILD);
1784 if (bus->state == BUS_OPENING)
1786 else if (bus->state == BUS_AUTHENTICATING) {
1788 if (bus_socket_auth_needs_write(bus))
1793 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1794 if (bus->rqueue_size <= 0)
1796 if (bus->wqueue_size > 0)
1803 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1804 struct reply_callback *c;
1806 assert_return(bus, -EINVAL);
1807 assert_return(timeout_usec, -EINVAL);
1808 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1809 assert_return(!bus_pid_changed(bus), -ECHILD);
1811 if (bus->state == BUS_CLOSING) {
1816 if (bus->state == BUS_AUTHENTICATING) {
1817 *timeout_usec = bus->auth_timeout;
1821 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1822 *timeout_usec = (uint64_t) -1;
1826 if (bus->rqueue_size > 0) {
1831 c = prioq_peek(bus->reply_callbacks_prioq);
1833 *timeout_usec = (uint64_t) -1;
1837 *timeout_usec = c->timeout;
1841 static int process_timeout(sd_bus *bus) {
1842 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1843 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1844 struct reply_callback *c;
1850 c = prioq_peek(bus->reply_callbacks_prioq);
1854 n = now(CLOCK_MONOTONIC);
1858 r = bus_message_new_synthetic_error(
1861 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1866 m->sender = "org.freedesktop.DBus";
1868 r = bus_seal_synthetic_message(bus, m);
1872 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1873 hashmap_remove(bus->reply_callbacks, &c->serial);
1876 bus->iteration_counter ++;
1878 r = c->callback(bus, m, c->userdata, &error_buffer);
1879 r = bus_maybe_reply_error(m, r, &error_buffer);
1882 bus->current = NULL;
1887 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1891 if (bus->state != BUS_HELLO)
1894 /* Let's make sure the first message on the bus is the HELLO
1895 * reply. But note that we don't actually parse the message
1896 * here (we leave that to the usual handling), we just verify
1897 * we don't let any earlier msg through. */
1899 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1900 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1903 if (m->reply_serial != bus->hello_serial)
1909 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1910 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1911 struct reply_callback *c;
1917 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1918 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1921 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1925 if (c->timeout != 0)
1926 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1928 r = sd_bus_message_rewind(m, true);
1932 r = c->callback(bus, m, c->userdata, &error_buffer);
1933 r = bus_maybe_reply_error(m, r, &error_buffer);
1939 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1940 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1941 struct filter_callback *l;
1948 bus->filter_callbacks_modified = false;
1950 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1952 if (bus->filter_callbacks_modified)
1955 /* Don't run this more than once per iteration */
1956 if (l->last_iteration == bus->iteration_counter)
1959 l->last_iteration = bus->iteration_counter;
1961 r = sd_bus_message_rewind(m, true);
1965 r = l->callback(bus, m, l->userdata, &error_buffer);
1966 r = bus_maybe_reply_error(m, r, &error_buffer);
1972 } while (bus->filter_callbacks_modified);
1977 static int process_match(sd_bus *bus, sd_bus_message *m) {
1984 bus->match_callbacks_modified = false;
1986 r = bus_match_run(bus, &bus->match_callbacks, m);
1990 } while (bus->match_callbacks_modified);
1995 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1996 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
2002 if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
2005 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
2008 if (m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED)
2011 if (streq_ptr(m->member, "Ping"))
2012 r = sd_bus_message_new_method_return(m, &reply);
2013 else if (streq_ptr(m->member, "GetMachineId")) {
2017 r = sd_id128_get_machine(&id);
2021 r = sd_bus_message_new_method_return(m, &reply);
2025 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
2027 r = sd_bus_message_new_method_errorf(
2029 SD_BUS_ERROR_UNKNOWN_METHOD,
2030 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
2036 r = sd_bus_send(bus, reply, NULL);
2043 static int process_message(sd_bus *bus, sd_bus_message *m) {
2050 bus->iteration_counter++;
2052 log_debug("Got message sender=%s object=%s interface=%s member=%s",
2053 strna(sd_bus_message_get_sender(m)),
2054 strna(sd_bus_message_get_path(m)),
2055 strna(sd_bus_message_get_interface(m)),
2056 strna(sd_bus_message_get_member(m)));
2058 r = process_hello(bus, m);
2062 r = process_reply(bus, m);
2066 r = process_filter(bus, m);
2070 r = process_match(bus, m);
2074 r = process_builtin(bus, m);
2078 r = bus_process_object(bus, m);
2081 bus->current = NULL;
2085 static int process_running(sd_bus *bus, sd_bus_message **ret) {
2086 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2090 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2092 r = process_timeout(bus);
2096 r = dispatch_wqueue(bus);
2100 r = dispatch_rqueue(bus, &m);
2106 r = process_message(bus, m);
2111 r = sd_bus_message_rewind(m, true);
2120 if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2122 r = sd_bus_reply_method_errorf(
2124 SD_BUS_ERROR_UNKNOWN_OBJECT,
2125 "Unknown object '%s'.", m->path);
2139 static int process_closing(sd_bus *bus, sd_bus_message **ret) {
2140 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2141 struct reply_callback *c;
2145 assert(bus->state == BUS_CLOSING);
2147 c = hashmap_first(bus->reply_callbacks);
2149 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2151 /* First, fail all outstanding method calls */
2152 r = bus_message_new_synthetic_error(
2155 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Connection terminated"),
2160 r = bus_seal_synthetic_message(bus, m);
2164 if (c->timeout != 0)
2165 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
2167 hashmap_remove(bus->reply_callbacks, &c->serial);
2170 bus->iteration_counter++;
2172 r = c->callback(bus, m, c->userdata, &error_buffer);
2173 r = bus_maybe_reply_error(m, r, &error_buffer);
2179 /* Then, synthesize a Disconnected message */
2180 r = sd_bus_message_new_signal(
2182 "/org/freedesktop/DBus/Local",
2183 "org.freedesktop.DBus.Local",
2189 m->sender = "org.freedesktop.DBus.Local";
2191 r = bus_seal_synthetic_message(bus, m);
2198 bus->iteration_counter++;
2200 r = process_filter(bus, m);
2204 r = process_match(bus, m);
2216 bus->current = NULL;
2220 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2221 BUS_DONT_DESTROY(bus);
2224 /* Returns 0 when we didn't do anything. This should cause the
2225 * caller to invoke sd_bus_wait() before returning the next
2226 * time. Returns > 0 when we did something, which possibly
2227 * means *ret is filled in with an unprocessed message. */
2229 assert_return(bus, -EINVAL);
2230 assert_return(!bus_pid_changed(bus), -ECHILD);
2232 /* We don't allow recursively invoking sd_bus_process(). */
2233 assert_return(!bus->current, -EBUSY);
2235 switch (bus->state) {
2242 r = bus_socket_process_opening(bus);
2243 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2244 bus_enter_closing(bus);
2252 case BUS_AUTHENTICATING:
2253 r = bus_socket_process_authenticating(bus);
2254 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2255 bus_enter_closing(bus);
2267 r = process_running(bus, ret);
2268 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2269 bus_enter_closing(bus);
2279 return process_closing(bus, ret);
2282 assert_not_reached("Unknown state");
2285 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2286 struct pollfd p[2] = {};
2289 usec_t m = (usec_t) -1;
2293 if (bus->state == BUS_CLOSING)
2296 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2298 e = sd_bus_get_events(bus);
2303 /* The caller really needs some more data, he doesn't
2304 * care about what's already read, or any timeouts
2309 /* The caller wants to process if there's something to
2310 * process, but doesn't care otherwise */
2312 r = sd_bus_get_timeout(bus, &until);
2317 nw = now(CLOCK_MONOTONIC);
2318 m = until > nw ? until - nw : 0;
2322 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2325 p[0].fd = bus->input_fd;
2326 if (bus->output_fd == bus->input_fd) {
2330 p[0].events = e & POLLIN;
2331 p[1].fd = bus->output_fd;
2332 p[1].events = e & POLLOUT;
2336 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2340 return r > 0 ? 1 : 0;
2343 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2345 assert_return(bus, -EINVAL);
2346 assert_return(!bus_pid_changed(bus), -ECHILD);
2348 if (bus->state == BUS_CLOSING)
2351 assert_return(BUS_IS_OPEN(bus->state) , -ENOTCONN);
2353 if (bus->rqueue_size > 0)
2356 return bus_poll(bus, false, timeout_usec);
2359 _public_ int sd_bus_flush(sd_bus *bus) {
2362 assert_return(bus, -EINVAL);
2363 assert_return(!bus_pid_changed(bus), -ECHILD);
2365 if (bus->state == BUS_CLOSING)
2368 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2370 r = bus_ensure_running(bus);
2374 if (bus->wqueue_size <= 0)
2378 r = dispatch_wqueue(bus);
2380 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
2381 bus_enter_closing(bus);
2386 if (bus->wqueue_size <= 0)
2389 r = bus_poll(bus, false, (uint64_t) -1);
2395 _public_ int sd_bus_add_filter(sd_bus *bus,
2396 sd_bus_message_handler_t callback,
2399 struct filter_callback *f;
2401 assert_return(bus, -EINVAL);
2402 assert_return(callback, -EINVAL);
2403 assert_return(!bus_pid_changed(bus), -ECHILD);
2405 f = new0(struct filter_callback, 1);
2408 f->callback = callback;
2409 f->userdata = userdata;
2411 bus->filter_callbacks_modified = true;
2412 LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2416 _public_ int sd_bus_remove_filter(sd_bus *bus,
2417 sd_bus_message_handler_t callback,
2420 struct filter_callback *f;
2422 assert_return(bus, -EINVAL);
2423 assert_return(callback, -EINVAL);
2424 assert_return(!bus_pid_changed(bus), -ECHILD);
2426 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2427 if (f->callback == callback && f->userdata == userdata) {
2428 bus->filter_callbacks_modified = true;
2429 LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2438 _public_ int sd_bus_add_match(sd_bus *bus,
2440 sd_bus_message_handler_t callback,
2443 struct bus_match_component *components = NULL;
2444 unsigned n_components = 0;
2445 uint64_t cookie = 0;
2448 assert_return(bus, -EINVAL);
2449 assert_return(match, -EINVAL);
2450 assert_return(!bus_pid_changed(bus), -ECHILD);
2452 r = bus_match_parse(match, &components, &n_components);
2456 if (bus->bus_client) {
2457 cookie = ++bus->match_cookie;
2459 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2464 bus->match_callbacks_modified = true;
2465 r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2467 if (bus->bus_client)
2468 bus_remove_match_internal(bus, match, cookie);
2472 bus_match_parse_free(components, n_components);
2476 _public_ int sd_bus_remove_match(sd_bus *bus,
2478 sd_bus_message_handler_t callback,
2481 struct bus_match_component *components = NULL;
2482 unsigned n_components = 0;
2484 uint64_t cookie = 0;
2486 assert_return(bus, -EINVAL);
2487 assert_return(match, -EINVAL);
2488 assert_return(!bus_pid_changed(bus), -ECHILD);
2490 r = bus_match_parse(match, &components, &n_components);
2494 bus->match_callbacks_modified = true;
2495 r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2497 if (bus->bus_client)
2498 q = bus_remove_match_internal(bus, match, cookie);
2500 bus_match_parse_free(components, n_components);
2502 return r < 0 ? r : q;
2505 bool bus_pid_changed(sd_bus *bus) {
2508 /* We don't support people creating a bus connection and
2509 * keeping it around over a fork(). Let's complain. */
2511 return bus->original_pid != getpid();
2514 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2515 sd_bus *bus = userdata;
2520 r = sd_bus_process(bus, NULL);
2527 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2528 sd_bus *bus = userdata;
2533 r = sd_bus_process(bus, NULL);
2540 static int prepare_callback(sd_event_source *s, void *userdata) {
2541 sd_bus *bus = userdata;
2548 e = sd_bus_get_events(bus);
2552 if (bus->output_fd != bus->input_fd) {
2554 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2558 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2562 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2567 r = sd_bus_get_timeout(bus, &until);
2573 j = sd_event_source_set_time(bus->time_event_source, until);
2578 r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2585 static int quit_callback(sd_event_source *event, void *userdata) {
2586 sd_bus *bus = userdata;
2595 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2598 assert_return(bus, -EINVAL);
2599 assert_return(!bus->event, -EBUSY);
2601 assert(!bus->input_io_event_source);
2602 assert(!bus->output_io_event_source);
2603 assert(!bus->time_event_source);
2606 bus->event = sd_event_ref(event);
2608 r = sd_event_default(&bus->event);
2613 r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2617 r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2621 if (bus->output_fd != bus->input_fd) {
2622 r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2626 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2631 r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2635 r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
2639 r = sd_event_source_set_priority(bus->time_event_source, priority);
2643 r = sd_event_add_quit(bus->event, quit_callback, bus, &bus->quit_event_source);
2650 sd_bus_detach_event(bus);
2654 _public_ int sd_bus_detach_event(sd_bus *bus) {
2655 assert_return(bus, -EINVAL);
2660 if (bus->input_io_event_source) {
2661 sd_event_source_set_enabled(bus->input_io_event_source, SD_EVENT_OFF);
2662 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2665 if (bus->output_io_event_source) {
2666 sd_event_source_set_enabled(bus->output_io_event_source, SD_EVENT_OFF);
2667 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2670 if (bus->time_event_source) {
2671 sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);
2672 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2675 if (bus->quit_event_source) {
2676 sd_event_source_set_enabled(bus->quit_event_source, SD_EVENT_OFF);
2677 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2681 bus->event = sd_event_unref(bus->event);
2686 _public_ sd_event* sd_bus_get_event(sd_bus *bus) {
2687 assert_return(bus, NULL);
2692 _public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2693 assert_return(bus, NULL);
2695 return bus->current;
2698 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
2703 assert(default_bus);
2706 return !!*default_bus;
2709 *ret = sd_bus_ref(*default_bus);
2717 b->default_bus_ptr = default_bus;
2725 _public_ int sd_bus_default_system(sd_bus **ret) {
2726 static __thread sd_bus *default_system_bus = NULL;
2728 return bus_default(sd_bus_open_system, &default_system_bus, ret);
2731 _public_ int sd_bus_default_user(sd_bus **ret) {
2732 static __thread sd_bus *default_user_bus = NULL;
2734 return bus_default(sd_bus_open_user, &default_user_bus, ret);
2737 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
2738 assert_return(b, -EINVAL);
2739 assert_return(tid, -EINVAL);
2740 assert_return(!bus_pid_changed(b), -ECHILD);
2748 return sd_event_get_tid(b->event, tid);
2753 _public_ char *sd_bus_label_escape(const char *s) {
2757 assert_return(s, NULL);
2759 /* Escapes all chars that D-Bus' object path cannot deal
2760 * with. Can be reversed with bus_path_unescape(). We special
2761 * case the empty string. */
2766 r = new(char, strlen(s)*3 + 1);
2770 for (f = s, t = r; *f; f++) {
2772 /* Escape everything that is not a-zA-Z0-9. We also
2773 * escape 0-9 if it's the first character */
2775 if (!(*f >= 'A' && *f <= 'Z') &&
2776 !(*f >= 'a' && *f <= 'z') &&
2777 !(f > s && *f >= '0' && *f <= '9')) {
2779 *(t++) = hexchar(*f >> 4);
2780 *(t++) = hexchar(*f);
2790 _public_ char *sd_bus_label_unescape(const char *f) {
2793 assert_return(f, NULL);
2795 /* Special case for the empty string */
2799 r = new(char, strlen(f) + 1);
2803 for (t = r; *f; f++) {
2808 if ((a = unhexchar(f[1])) < 0 ||
2809 (b = unhexchar(f[2])) < 0) {
2810 /* Invalid escape code, let's take it literal then */
2813 *(t++) = (char) ((a << 4) | b);
2825 _public_ int sd_bus_get_peer_creds(sd_bus *bus, uint64_t mask, sd_bus_creds **ret) {
2830 assert_return(bus, -EINVAL);
2831 assert_return(mask <= _SD_BUS_CREDS_ALL, -ENOTSUP);
2832 assert_return(ret, -EINVAL);
2833 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2834 assert_return(!bus_pid_changed(bus), -ECHILD);
2835 assert_return(!bus->is_kernel, -ENOTSUP);
2837 if (!bus->ucred_valid && !isempty(bus->label))
2840 c = bus_creds_new();
2844 if (bus->ucred_valid) {
2845 pid = c->pid = bus->ucred.pid;
2846 c->uid = bus->ucred.uid;
2847 c->gid = bus->ucred.gid;
2849 c->mask |= (SD_BUS_CREDS_UID | SD_BUS_CREDS_PID | SD_BUS_CREDS_GID) & mask;
2852 if (!isempty(bus->label) && (mask & SD_BUS_CREDS_SELINUX_CONTEXT)) {
2853 c->label = strdup(bus->label);
2855 sd_bus_creds_unref(c);
2859 c->mask |= SD_BUS_CREDS_SELINUX_CONTEXT;
2862 r = bus_creds_add_more(c, mask, pid, 0);