1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
39 #include "bus-internal.h"
40 #include "bus-message.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
49 #include "bus-container.h"
50 #include "bus-protocol.h"
52 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
53 static int attach_io_events(sd_bus *b);
54 static void detach_io_events(sd_bus *b);
56 static void bus_close_fds(sd_bus *b) {
62 close_nointr_nofail(b->input_fd);
64 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
65 close_nointr_nofail(b->output_fd);
67 b->input_fd = b->output_fd = -1;
70 static void bus_node_destroy(sd_bus *b, struct node *n) {
71 struct node_callback *c;
72 struct node_vtable *v;
73 struct node_enumerator *e;
81 bus_node_destroy(b, n->child);
83 while ((c = n->callbacks)) {
84 LIST_REMOVE(callbacks, n->callbacks, c);
88 while ((v = n->vtables)) {
89 LIST_REMOVE(vtables, n->vtables, v);
94 while ((e = n->enumerators)) {
95 LIST_REMOVE(enumerators, n->enumerators, e);
100 LIST_REMOVE(siblings, n->parent->child, n);
102 assert_se(hashmap_remove(b->nodes, n->path) == n);
107 static void bus_reset_queues(sd_bus *b) {
112 for (i = 0; i < b->rqueue_size; i++)
113 sd_bus_message_unref(b->rqueue[i]);
116 for (i = 0; i < b->wqueue_size; i++)
117 sd_bus_message_unref(b->wqueue[i]);
120 b->rqueue = b->wqueue = NULL;
121 b->rqueue_size = b->wqueue_size = 0;
124 static void bus_free(sd_bus *b) {
125 struct filter_callback *f;
130 sd_bus_detach_event(b);
135 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
138 free(b->unique_name);
139 free(b->auth_buffer);
145 strv_free(b->exec_argv);
147 close_many(b->fds, b->n_fds);
152 hashmap_free_free(b->reply_callbacks);
153 prioq_free(b->reply_callbacks_prioq);
155 while ((f = b->filter_callbacks)) {
156 LIST_REMOVE(callbacks, b->filter_callbacks, f);
160 bus_match_free(&b->match_callbacks);
162 hashmap_free_free(b->vtable_methods);
163 hashmap_free_free(b->vtable_properties);
165 while ((n = hashmap_first(b->nodes)))
166 bus_node_destroy(b, n);
168 hashmap_free(b->nodes);
170 bus_kernel_flush_memfd(b);
172 assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
177 _public_ int sd_bus_new(sd_bus **ret) {
180 assert_return(ret, -EINVAL);
186 r->n_ref = REFCNT_INIT;
187 r->input_fd = r->output_fd = -1;
188 r->message_version = 1;
189 r->creds_mask |= SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
190 r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
191 r->attach_flags |= KDBUS_ATTACH_NAMES;
192 r->original_pid = getpid();
194 assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
196 /* We guarantee that wqueue always has space for at least one
198 r->wqueue = new(sd_bus_message*, 1);
208 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
211 assert_return(bus, -EINVAL);
212 assert_return(bus->state == BUS_UNSET, -EPERM);
213 assert_return(address, -EINVAL);
214 assert_return(!bus_pid_changed(bus), -ECHILD);
226 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
227 assert_return(bus, -EINVAL);
228 assert_return(bus->state == BUS_UNSET, -EPERM);
229 assert_return(input_fd >= 0, -EINVAL);
230 assert_return(output_fd >= 0, -EINVAL);
231 assert_return(!bus_pid_changed(bus), -ECHILD);
233 bus->input_fd = input_fd;
234 bus->output_fd = output_fd;
238 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
241 assert_return(bus, -EINVAL);
242 assert_return(bus->state == BUS_UNSET, -EPERM);
243 assert_return(path, -EINVAL);
244 assert_return(!strv_isempty(argv), -EINVAL);
245 assert_return(!bus_pid_changed(bus), -ECHILD);
257 free(bus->exec_path);
258 strv_free(bus->exec_argv);
266 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
267 assert_return(bus, -EINVAL);
268 assert_return(bus->state == BUS_UNSET, -EPERM);
269 assert_return(!bus_pid_changed(bus), -ECHILD);
271 bus->bus_client = !!b;
275 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
276 assert_return(bus, -EINVAL);
277 assert_return(bus->state == BUS_UNSET, -EPERM);
278 assert_return(!bus_pid_changed(bus), -ECHILD);
280 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
284 _public_ int sd_bus_negotiate_attach_timestamp(sd_bus *bus, int b) {
285 assert_return(bus, -EINVAL);
286 assert_return(bus->state == BUS_UNSET, -EPERM);
287 assert_return(!bus_pid_changed(bus), -ECHILD);
289 SET_FLAG(bus->attach_flags, KDBUS_ATTACH_TIMESTAMP, b);
293 _public_ int sd_bus_negotiate_attach_creds(sd_bus *bus, uint64_t mask) {
294 assert_return(bus, -EINVAL);
295 assert_return(mask <= _SD_BUS_CREDS_ALL, -EINVAL);
296 assert_return(bus->state == BUS_UNSET, -EPERM);
297 assert_return(!bus_pid_changed(bus), -ECHILD);
299 /* The well knowns we need unconditionally, so that matches can work */
300 bus->creds_mask = mask | SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
302 return kdbus_translate_attach_flags(bus->creds_mask, &bus->creds_mask);
305 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
306 assert_return(bus, -EINVAL);
307 assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
308 assert_return(bus->state == BUS_UNSET, -EPERM);
309 assert_return(!bus_pid_changed(bus), -ECHILD);
311 bus->is_server = !!b;
312 bus->server_id = server_id;
316 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
317 assert_return(bus, -EINVAL);
318 assert_return(bus->state == BUS_UNSET, -EPERM);
319 assert_return(!bus_pid_changed(bus), -ECHILD);
321 bus->anonymous_auth = !!b;
325 _public_ int sd_bus_set_trusted(sd_bus *bus, int b) {
326 assert_return(bus, -EINVAL);
327 assert_return(bus->state == BUS_UNSET, -EPERM);
328 assert_return(!bus_pid_changed(bus), -ECHILD);
334 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata, sd_bus_error *error) {
339 assert(bus->state == BUS_HELLO || bus->state == BUS_CLOSING);
342 r = sd_bus_message_get_errno(reply);
348 r = sd_bus_message_read(reply, "s", &s);
352 if (!service_name_is_valid(s) || s[0] != ':')
355 bus->unique_name = strdup(s);
356 if (!bus->unique_name)
359 if (bus->state == BUS_HELLO)
360 bus->state = BUS_RUNNING;
365 static int bus_send_hello(sd_bus *bus) {
366 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
371 if (!bus->bus_client || bus->is_kernel)
374 r = sd_bus_message_new_method_call(
376 "org.freedesktop.DBus",
378 "org.freedesktop.DBus",
384 return sd_bus_call_async(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
387 int bus_start_running(sd_bus *bus) {
390 if (bus->bus_client && !bus->is_kernel) {
391 bus->state = BUS_HELLO;
395 bus->state = BUS_RUNNING;
399 static int parse_address_key(const char **p, const char *key, char **value) {
410 if (strncmp(*p, key, l) != 0)
423 while (*a != ';' && *a != ',' && *a != 0) {
441 c = (char) ((x << 4) | y);
448 t = realloc(r, n + 2);
476 static void skip_address_key(const char **p) {
480 *p += strcspn(*p, ",");
486 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
487 _cleanup_free_ char *path = NULL, *abstract = NULL;
496 while (**p != 0 && **p != ';') {
497 r = parse_address_key(p, "guid", guid);
503 r = parse_address_key(p, "path", &path);
509 r = parse_address_key(p, "abstract", &abstract);
518 if (!path && !abstract)
521 if (path && abstract)
526 if (l > sizeof(b->sockaddr.un.sun_path))
529 b->sockaddr.un.sun_family = AF_UNIX;
530 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
531 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
532 } else if (abstract) {
533 l = strlen(abstract);
534 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
537 b->sockaddr.un.sun_family = AF_UNIX;
538 b->sockaddr.un.sun_path[0] = 0;
539 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
540 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
546 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
547 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
549 struct addrinfo *result, hints = {
550 .ai_socktype = SOCK_STREAM,
551 .ai_flags = AI_ADDRCONFIG,
559 while (**p != 0 && **p != ';') {
560 r = parse_address_key(p, "guid", guid);
566 r = parse_address_key(p, "host", &host);
572 r = parse_address_key(p, "port", &port);
578 r = parse_address_key(p, "family", &family);
591 if (streq(family, "ipv4"))
592 hints.ai_family = AF_INET;
593 else if (streq(family, "ipv6"))
594 hints.ai_family = AF_INET6;
599 r = getaddrinfo(host, port, &hints, &result);
603 return -EADDRNOTAVAIL;
605 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
606 b->sockaddr_size = result->ai_addrlen;
608 freeaddrinfo(result);
613 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
615 unsigned n_argv = 0, j;
624 while (**p != 0 && **p != ';') {
625 r = parse_address_key(p, "guid", guid);
631 r = parse_address_key(p, "path", &path);
637 if (startswith(*p, "argv")) {
641 ul = strtoul(*p + 4, (char**) p, 10);
642 if (errno > 0 || **p != '=' || ul > 256) {
652 x = realloc(argv, sizeof(char*) * (ul + 2));
658 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
664 r = parse_address_key(p, NULL, argv + ul);
679 /* Make sure there are no holes in the array, with the
680 * exception of argv[0] */
681 for (j = 1; j < n_argv; j++)
687 if (argv && argv[0] == NULL) {
688 argv[0] = strdup(path);
700 for (j = 0; j < n_argv; j++)
708 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
709 _cleanup_free_ char *path = NULL;
717 while (**p != 0 && **p != ';') {
718 r = parse_address_key(p, "guid", guid);
724 r = parse_address_key(p, "path", &path);
743 static int parse_container_address(sd_bus *b, const char **p, char **guid) {
744 _cleanup_free_ char *machine = NULL;
752 while (**p != 0 && **p != ';') {
753 r = parse_address_key(p, "guid", guid);
759 r = parse_address_key(p, "machine", &machine);
771 if (!filename_is_safe(machine))
775 b->machine = machine;
778 b->sockaddr.un.sun_family = AF_UNIX;
779 strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
780 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
785 static void bus_reset_parsed_address(sd_bus *b) {
789 b->sockaddr_size = 0;
790 strv_free(b->exec_argv);
794 b->server_id = SD_ID128_NULL;
801 static int bus_parse_next_address(sd_bus *b) {
802 _cleanup_free_ char *guid = NULL;
810 if (b->address[b->address_index] == 0)
813 bus_reset_parsed_address(b);
815 a = b->address + b->address_index;
824 if (startswith(a, "unix:")) {
827 r = parse_unix_address(b, &a, &guid);
832 } else if (startswith(a, "tcp:")) {
835 r = parse_tcp_address(b, &a, &guid);
841 } else if (startswith(a, "unixexec:")) {
844 r = parse_exec_address(b, &a, &guid);
850 } else if (startswith(a, "kernel:")) {
853 r = parse_kernel_address(b, &a, &guid);
858 } else if (startswith(a, "x-container:")) {
861 r = parse_container_address(b, &a, &guid);
874 r = sd_id128_from_string(guid, &b->server_id);
879 b->address_index = a - b->address;
883 static int bus_start_address(sd_bus *b) {
889 bool skipped = false;
894 r = bus_socket_exec(b);
896 r = bus_kernel_connect(b);
898 r = bus_container_connect(b);
899 else if (b->sockaddr.sa.sa_family != AF_UNSPEC)
900 r = bus_socket_connect(b);
906 r = attach_io_events(b);
911 b->last_connect_error = -r;
914 r = bus_parse_next_address(b);
918 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
922 int bus_next_address(sd_bus *b) {
925 bus_reset_parsed_address(b);
926 return bus_start_address(b);
929 static int bus_start_fd(sd_bus *b) {
934 assert(b->input_fd >= 0);
935 assert(b->output_fd >= 0);
937 r = fd_nonblock(b->input_fd, true);
941 r = fd_cloexec(b->input_fd, true);
945 if (b->input_fd != b->output_fd) {
946 r = fd_nonblock(b->output_fd, true);
950 r = fd_cloexec(b->output_fd, true);
955 if (fstat(b->input_fd, &st) < 0)
958 if (S_ISCHR(b->input_fd))
959 return bus_kernel_take_fd(b);
961 return bus_socket_take_fd(b);
964 _public_ int sd_bus_start(sd_bus *bus) {
967 assert_return(bus, -EINVAL);
968 assert_return(bus->state == BUS_UNSET, -EPERM);
969 assert_return(!bus_pid_changed(bus), -ECHILD);
971 bus->state = BUS_OPENING;
973 if (bus->is_server && bus->bus_client)
976 if (bus->input_fd >= 0)
977 r = bus_start_fd(bus);
978 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
979 r = bus_start_address(bus);
986 return bus_send_hello(bus);
989 _public_ int sd_bus_open_system(sd_bus **ret) {
994 assert_return(ret, -EINVAL);
1000 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1002 r = sd_bus_set_address(b, e);
1005 r = sd_bus_set_address(b, "kernel:path=/dev/kdbus/0-system/bus;unix:path=/run/dbus/system_bus_socket");
1007 r = sd_bus_set_address(b, "unix:path=/run/dbus/system_bus_socket");
1013 b->bus_client = true;
1015 /* Let's do per-method access control on the system bus. We
1016 * need the caller's UID and capability set for that. */
1018 b->attach_flags |= KDBUS_ATTACH_CAPS | KDBUS_ATTACH_CREDS;
1020 r = sd_bus_start(b);
1032 _public_ int sd_bus_open_user(sd_bus **ret) {
1037 assert_return(ret, -EINVAL);
1043 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1045 r = sd_bus_set_address(b, e);
1049 e = secure_getenv("XDG_RUNTIME_DIR");
1051 _cleanup_free_ char *ee = NULL;
1053 ee = bus_address_escape(e);
1060 asprintf(&b->address, "kernel:path=/dev/kdbus/%lu-user/bus;unix:path=%s/bus", (unsigned long) getuid(), ee);
1062 b->address = strjoin("unix:path=", ee, "/bus", NULL);
1066 asprintf(&b->address, "kernel:path=/dev/kdbus/%lu-user/bus", (unsigned long) getuid());
1068 return -ECONNREFUSED;
1078 b->bus_client = true;
1080 /* We don't do any per-method access control on the user
1084 r = sd_bus_start(b);
1096 _public_ int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1097 _cleanup_free_ char *e = NULL;
1102 assert_return(host, -EINVAL);
1103 assert_return(ret, -EINVAL);
1105 e = bus_address_escape(host);
1109 p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1113 r = sd_bus_new(&bus);
1120 bus->bus_client = true;
1122 r = sd_bus_start(bus);
1132 _public_ int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1133 _cleanup_free_ char *e = NULL;
1138 assert_return(machine, -EINVAL);
1139 assert_return(ret, -EINVAL);
1140 assert_return(filename_is_safe(machine), -EINVAL);
1142 e = bus_address_escape(machine);
1147 p = strjoin("kernel:path=/dev/kdbus/ns/machine-", e, "/0-system/bus;x-container:machine=", e, NULL);
1149 p = strjoin("x-container:machine=", e, NULL);
1154 r = sd_bus_new(&bus);
1161 bus->bus_client = true;
1163 r = sd_bus_start(bus);
1173 _public_ void sd_bus_close(sd_bus *bus) {
1177 if (bus->state == BUS_CLOSED)
1179 if (bus_pid_changed(bus))
1182 bus->state = BUS_CLOSED;
1184 sd_bus_detach_event(bus);
1186 /* Drop all queued messages so that they drop references to
1187 * the bus object and the bus may be freed */
1188 bus_reset_queues(bus);
1190 if (!bus->is_kernel)
1193 /* We'll leave the fd open in case this is a kernel bus, since
1194 * there might still be memblocks around that reference this
1195 * bus, and they might need to invoke the * KDBUS_CMD_FREE
1196 * ioctl on the fd when they are freed. */
1199 static void bus_enter_closing(sd_bus *bus) {
1202 if (bus->state != BUS_OPENING &&
1203 bus->state != BUS_AUTHENTICATING &&
1204 bus->state != BUS_HELLO &&
1205 bus->state != BUS_RUNNING)
1208 bus->state = BUS_CLOSING;
1211 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1212 assert_return(bus, NULL);
1214 assert_se(REFCNT_INC(bus->n_ref) >= 2);
1219 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1224 if (REFCNT_DEC(bus->n_ref) <= 0)
1230 _public_ int sd_bus_is_open(sd_bus *bus) {
1232 assert_return(bus, -EINVAL);
1233 assert_return(!bus_pid_changed(bus), -ECHILD);
1235 return BUS_IS_OPEN(bus->state);
1238 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1241 assert_return(bus, -EINVAL);
1242 assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1243 assert_return(!bus_pid_changed(bus), -ECHILD);
1245 if (type == SD_BUS_TYPE_UNIX_FD) {
1246 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1249 r = bus_ensure_running(bus);
1253 return bus->can_fds;
1256 return bus_type_is_valid(type);
1259 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1262 assert_return(bus, -EINVAL);
1263 assert_return(server_id, -EINVAL);
1264 assert_return(!bus_pid_changed(bus), -ECHILD);
1266 r = bus_ensure_running(bus);
1270 *server_id = bus->server_id;
1274 static int bus_seal_message(sd_bus *b, sd_bus_message *m, usec_t timeout) {
1278 if (b->message_version != 0 &&
1279 m->header->version != b->message_version)
1282 if (b->message_endian != 0 &&
1283 m->header->endian != b->message_endian)
1287 /* If we copy the same message to multiple
1288 * destinations, avoid using the same serial
1290 b->serial = MAX(b->serial, BUS_MESSAGE_SERIAL(m));
1295 timeout = BUS_DEFAULT_TIMEOUT;
1297 return bus_message_seal(m, ++b->serial, timeout);
1300 int bus_seal_synthetic_message(sd_bus *b, sd_bus_message *m) {
1304 /* The bus specification says the serial number cannot be 0,
1305 * hence let's fill something in for synthetic messages. Since
1306 * synthetic messages might have a fake sender and we don't
1307 * want to interfere with the real sender's serial numbers we
1308 * pick a fixed, artifical one. We use (uint32_t) -1 rather
1309 * than (uint64_t) -1 since dbus1 only had 32bit identifiers,
1310 * even though kdbus can do 64bit. */
1312 return bus_message_seal(m, 0xFFFFFFFFULL, 0);
1315 static int bus_write_message(sd_bus *bus, sd_bus_message *message, size_t *idx) {
1320 return bus_kernel_write_message(bus, message);
1322 return bus_socket_write_message(bus, message, idx);
1325 static int dispatch_wqueue(sd_bus *bus) {
1329 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1331 while (bus->wqueue_size > 0) {
1333 r = bus_write_message(bus, bus->wqueue[0], &bus->windex);
1337 /* Didn't do anything this time */
1339 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1340 /* Fully written. Let's drop the entry from
1343 * This isn't particularly optimized, but
1344 * well, this is supposed to be our worst-case
1345 * buffer only, and the socket buffer is
1346 * supposed to be our primary buffer, and if
1347 * it got full, then all bets are off
1350 sd_bus_message_unref(bus->wqueue[0]);
1351 bus->wqueue_size --;
1352 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1362 static int bus_read_message(sd_bus *bus) {
1366 return bus_kernel_read_message(bus);
1368 return bus_socket_read_message(bus);
1371 int bus_rqueue_make_room(sd_bus *bus) {
1375 x = bus->rqueue_size + 1;
1377 if (bus->rqueue_allocated >= x)
1380 if (x > BUS_RQUEUE_MAX)
1383 q = realloc(bus->rqueue, x * sizeof(sd_bus_message*));
1388 bus->rqueue_allocated = x;
1393 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1398 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1401 if (bus->rqueue_size > 0) {
1402 /* Dispatch a queued message */
1404 *m = bus->rqueue[0];
1405 bus->rqueue_size --;
1406 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1410 /* Try to read a new message */
1411 r = bus_read_message(bus);
1421 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1424 assert_return(bus, -EINVAL);
1425 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1426 assert_return(m, -EINVAL);
1427 assert_return(!bus_pid_changed(bus), -ECHILD);
1430 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1437 /* If the serial number isn't kept, then we know that no reply
1439 if (!serial && !m->sealed)
1440 m->header->flags |= BUS_MESSAGE_NO_REPLY_EXPECTED;
1442 r = bus_seal_message(bus, m, 0);
1446 /* If this is a reply and no reply was requested, then let's
1447 * suppress this, if we can */
1448 if (m->dont_send && !serial)
1451 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1454 r = bus_write_message(bus, m, &idx);
1456 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1457 bus_enter_closing(bus);
1460 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1461 /* Wasn't fully written. So let's remember how
1462 * much was written. Note that the first entry
1463 * of the wqueue array is always allocated so
1464 * that we always can remember how much was
1466 bus->wqueue[0] = sd_bus_message_ref(m);
1467 bus->wqueue_size = 1;
1473 /* Just append it to the queue. */
1475 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1478 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1483 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1487 *serial = BUS_MESSAGE_SERIAL(m);
1492 _public_ int sd_bus_send_to(sd_bus *bus, sd_bus_message *m, const char *destination, uint64_t *serial) {
1495 assert_return(bus, -EINVAL);
1496 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1497 assert_return(m, -EINVAL);
1498 assert_return(!bus_pid_changed(bus), -ECHILD);
1500 if (!streq_ptr(m->destination, destination)) {
1505 r = sd_bus_message_set_destination(m, destination);
1510 return sd_bus_send(bus, m, serial);
1513 static usec_t calc_elapse(uint64_t usec) {
1514 if (usec == (uint64_t) -1)
1517 return now(CLOCK_MONOTONIC) + usec;
1520 static int timeout_compare(const void *a, const void *b) {
1521 const struct reply_callback *x = a, *y = b;
1523 if (x->timeout != 0 && y->timeout == 0)
1526 if (x->timeout == 0 && y->timeout != 0)
1529 if (x->timeout < y->timeout)
1532 if (x->timeout > y->timeout)
1538 _public_ int sd_bus_call_async(
1541 sd_bus_message_handler_t callback,
1546 struct reply_callback *c;
1549 assert_return(bus, -EINVAL);
1550 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1551 assert_return(m, -EINVAL);
1552 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1553 assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1554 assert_return(callback, -EINVAL);
1555 assert_return(!bus_pid_changed(bus), -ECHILD);
1557 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1561 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1565 r = bus_seal_message(bus, m, usec);
1569 c = new0(struct reply_callback, 1);
1573 c->callback = callback;
1574 c->userdata = userdata;
1575 c->serial = BUS_MESSAGE_SERIAL(m);
1576 c->timeout = calc_elapse(m->timeout);
1578 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1584 if (c->timeout != 0) {
1585 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1588 sd_bus_call_async_cancel(bus, c->serial);
1593 r = sd_bus_send(bus, m, serial);
1595 sd_bus_call_async_cancel(bus, c->serial);
1602 _public_ int sd_bus_call_async_cancel(sd_bus *bus, uint64_t serial) {
1603 struct reply_callback *c;
1605 assert_return(bus, -EINVAL);
1606 assert_return(serial != 0, -EINVAL);
1607 assert_return(!bus_pid_changed(bus), -ECHILD);
1609 c = hashmap_remove(bus->reply_callbacks, &serial);
1613 if (c->timeout != 0)
1614 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1620 int bus_ensure_running(sd_bus *bus) {
1625 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED || bus->state == BUS_CLOSING)
1627 if (bus->state == BUS_RUNNING)
1631 r = sd_bus_process(bus, NULL);
1634 if (bus->state == BUS_RUNNING)
1639 r = sd_bus_wait(bus, (uint64_t) -1);
1645 _public_ int sd_bus_call(
1649 sd_bus_error *error,
1650 sd_bus_message **reply) {
1657 assert_return(bus, -EINVAL);
1658 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1659 assert_return(m, -EINVAL);
1660 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1661 assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1662 assert_return(!bus_error_is_dirty(error), -EINVAL);
1663 assert_return(!bus_pid_changed(bus), -ECHILD);
1665 r = bus_ensure_running(bus);
1669 i = bus->rqueue_size;
1671 r = bus_seal_message(bus, m, usec);
1675 r = sd_bus_send(bus, m, &serial);
1679 timeout = calc_elapse(m->timeout);
1684 while (i < bus->rqueue_size) {
1685 sd_bus_message *incoming = NULL;
1687 incoming = bus->rqueue[i];
1689 if (incoming->reply_serial == serial) {
1690 /* Found a match! */
1692 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1695 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1700 sd_bus_message_unref(incoming);
1703 } else if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR)
1704 r = sd_bus_error_copy(error, &incoming->error);
1708 sd_bus_message_unref(incoming);
1711 } else if (incoming->header->serial == serial &&
1714 streq(bus->unique_name, incoming->sender)) {
1716 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1719 /* Our own message? Somebody is trying
1720 * to send its own client a message,
1721 * let's not dead-lock, let's fail
1724 sd_bus_message_unref(incoming);
1728 /* Try to read more, right-away */
1732 r = bus_read_message(bus);
1734 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1735 bus_enter_closing(bus);
1745 n = now(CLOCK_MONOTONIC);
1751 left = (uint64_t) -1;
1753 r = bus_poll(bus, true, left);
1759 r = dispatch_wqueue(bus);
1761 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1762 bus_enter_closing(bus);
1769 _public_ int sd_bus_get_fd(sd_bus *bus) {
1771 assert_return(bus, -EINVAL);
1772 assert_return(bus->input_fd == bus->output_fd, -EPERM);
1773 assert_return(!bus_pid_changed(bus), -ECHILD);
1775 return bus->input_fd;
1778 _public_ int sd_bus_get_events(sd_bus *bus) {
1781 assert_return(bus, -EINVAL);
1782 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1783 assert_return(!bus_pid_changed(bus), -ECHILD);
1785 if (bus->state == BUS_OPENING)
1787 else if (bus->state == BUS_AUTHENTICATING) {
1789 if (bus_socket_auth_needs_write(bus))
1794 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1795 if (bus->rqueue_size <= 0)
1797 if (bus->wqueue_size > 0)
1804 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1805 struct reply_callback *c;
1807 assert_return(bus, -EINVAL);
1808 assert_return(timeout_usec, -EINVAL);
1809 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1810 assert_return(!bus_pid_changed(bus), -ECHILD);
1812 if (bus->state == BUS_CLOSING) {
1817 if (bus->state == BUS_AUTHENTICATING) {
1818 *timeout_usec = bus->auth_timeout;
1822 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1823 *timeout_usec = (uint64_t) -1;
1827 if (bus->rqueue_size > 0) {
1832 c = prioq_peek(bus->reply_callbacks_prioq);
1834 *timeout_usec = (uint64_t) -1;
1838 *timeout_usec = c->timeout;
1842 static int process_timeout(sd_bus *bus) {
1843 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1844 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1845 struct reply_callback *c;
1851 c = prioq_peek(bus->reply_callbacks_prioq);
1855 n = now(CLOCK_MONOTONIC);
1859 r = bus_message_new_synthetic_error(
1862 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1867 m->sender = "org.freedesktop.DBus";
1869 r = bus_seal_synthetic_message(bus, m);
1873 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1874 hashmap_remove(bus->reply_callbacks, &c->serial);
1877 bus->iteration_counter ++;
1879 r = c->callback(bus, m, c->userdata, &error_buffer);
1880 r = bus_maybe_reply_error(m, r, &error_buffer);
1883 bus->current = NULL;
1888 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1892 if (bus->state != BUS_HELLO)
1895 /* Let's make sure the first message on the bus is the HELLO
1896 * reply. But note that we don't actually parse the message
1897 * here (we leave that to the usual handling), we just verify
1898 * we don't let any earlier msg through. */
1900 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1901 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1904 if (m->reply_serial != bus->hello_serial)
1910 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1911 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1912 struct reply_callback *c;
1918 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1919 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1922 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1926 if (c->timeout != 0)
1927 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1929 r = sd_bus_message_rewind(m, true);
1933 r = c->callback(bus, m, c->userdata, &error_buffer);
1934 r = bus_maybe_reply_error(m, r, &error_buffer);
1940 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1941 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1942 struct filter_callback *l;
1949 bus->filter_callbacks_modified = false;
1951 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1953 if (bus->filter_callbacks_modified)
1956 /* Don't run this more than once per iteration */
1957 if (l->last_iteration == bus->iteration_counter)
1960 l->last_iteration = bus->iteration_counter;
1962 r = sd_bus_message_rewind(m, true);
1966 r = l->callback(bus, m, l->userdata, &error_buffer);
1967 r = bus_maybe_reply_error(m, r, &error_buffer);
1973 } while (bus->filter_callbacks_modified);
1978 static int process_match(sd_bus *bus, sd_bus_message *m) {
1985 bus->match_callbacks_modified = false;
1987 r = bus_match_run(bus, &bus->match_callbacks, m);
1991 } while (bus->match_callbacks_modified);
1996 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1997 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
2003 if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
2006 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
2009 if (m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED)
2012 if (streq_ptr(m->member, "Ping"))
2013 r = sd_bus_message_new_method_return(m, &reply);
2014 else if (streq_ptr(m->member, "GetMachineId")) {
2018 r = sd_id128_get_machine(&id);
2022 r = sd_bus_message_new_method_return(m, &reply);
2026 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
2028 r = sd_bus_message_new_method_errorf(
2030 SD_BUS_ERROR_UNKNOWN_METHOD,
2031 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
2037 r = sd_bus_send(bus, reply, NULL);
2044 static int process_message(sd_bus *bus, sd_bus_message *m) {
2051 bus->iteration_counter++;
2053 log_debug("Got message sender=%s object=%s interface=%s member=%s",
2054 strna(sd_bus_message_get_sender(m)),
2055 strna(sd_bus_message_get_path(m)),
2056 strna(sd_bus_message_get_interface(m)),
2057 strna(sd_bus_message_get_member(m)));
2059 r = process_hello(bus, m);
2063 r = process_reply(bus, m);
2067 r = process_filter(bus, m);
2071 r = process_match(bus, m);
2075 r = process_builtin(bus, m);
2079 r = bus_process_object(bus, m);
2082 bus->current = NULL;
2086 static int process_running(sd_bus *bus, sd_bus_message **ret) {
2087 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2091 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2093 r = process_timeout(bus);
2097 r = dispatch_wqueue(bus);
2101 r = dispatch_rqueue(bus, &m);
2107 r = process_message(bus, m);
2112 r = sd_bus_message_rewind(m, true);
2121 if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2123 r = sd_bus_reply_method_errorf(
2125 SD_BUS_ERROR_UNKNOWN_OBJECT,
2126 "Unknown object '%s'.", m->path);
2140 static int process_closing(sd_bus *bus, sd_bus_message **ret) {
2141 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2142 struct reply_callback *c;
2146 assert(bus->state == BUS_CLOSING);
2148 c = hashmap_first(bus->reply_callbacks);
2150 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2152 /* First, fail all outstanding method calls */
2153 r = bus_message_new_synthetic_error(
2156 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Connection terminated"),
2161 r = bus_seal_synthetic_message(bus, m);
2165 if (c->timeout != 0)
2166 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
2168 hashmap_remove(bus->reply_callbacks, &c->serial);
2171 bus->iteration_counter++;
2173 r = c->callback(bus, m, c->userdata, &error_buffer);
2174 r = bus_maybe_reply_error(m, r, &error_buffer);
2180 /* Then, synthesize a Disconnected message */
2181 r = sd_bus_message_new_signal(
2183 "/org/freedesktop/DBus/Local",
2184 "org.freedesktop.DBus.Local",
2190 m->sender = "org.freedesktop.DBus.Local";
2192 r = bus_seal_synthetic_message(bus, m);
2199 bus->iteration_counter++;
2201 r = process_filter(bus, m);
2205 r = process_match(bus, m);
2217 bus->current = NULL;
2221 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2222 BUS_DONT_DESTROY(bus);
2225 /* Returns 0 when we didn't do anything. This should cause the
2226 * caller to invoke sd_bus_wait() before returning the next
2227 * time. Returns > 0 when we did something, which possibly
2228 * means *ret is filled in with an unprocessed message. */
2230 assert_return(bus, -EINVAL);
2231 assert_return(!bus_pid_changed(bus), -ECHILD);
2233 /* We don't allow recursively invoking sd_bus_process(). */
2234 assert_return(!bus->current, -EBUSY);
2236 switch (bus->state) {
2243 r = bus_socket_process_opening(bus);
2244 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2245 bus_enter_closing(bus);
2253 case BUS_AUTHENTICATING:
2254 r = bus_socket_process_authenticating(bus);
2255 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2256 bus_enter_closing(bus);
2268 r = process_running(bus, ret);
2269 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2270 bus_enter_closing(bus);
2280 return process_closing(bus, ret);
2283 assert_not_reached("Unknown state");
2286 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2287 struct pollfd p[2] = {};
2290 usec_t m = (usec_t) -1;
2294 if (bus->state == BUS_CLOSING)
2297 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2299 e = sd_bus_get_events(bus);
2304 /* The caller really needs some more data, he doesn't
2305 * care about what's already read, or any timeouts
2310 /* The caller wants to process if there's something to
2311 * process, but doesn't care otherwise */
2313 r = sd_bus_get_timeout(bus, &until);
2318 nw = now(CLOCK_MONOTONIC);
2319 m = until > nw ? until - nw : 0;
2323 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2326 p[0].fd = bus->input_fd;
2327 if (bus->output_fd == bus->input_fd) {
2331 p[0].events = e & POLLIN;
2332 p[1].fd = bus->output_fd;
2333 p[1].events = e & POLLOUT;
2337 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2341 return r > 0 ? 1 : 0;
2344 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2346 assert_return(bus, -EINVAL);
2347 assert_return(!bus_pid_changed(bus), -ECHILD);
2349 if (bus->state == BUS_CLOSING)
2352 assert_return(BUS_IS_OPEN(bus->state) , -ENOTCONN);
2354 if (bus->rqueue_size > 0)
2357 return bus_poll(bus, false, timeout_usec);
2360 _public_ int sd_bus_flush(sd_bus *bus) {
2363 assert_return(bus, -EINVAL);
2364 assert_return(!bus_pid_changed(bus), -ECHILD);
2366 if (bus->state == BUS_CLOSING)
2369 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2371 r = bus_ensure_running(bus);
2375 if (bus->wqueue_size <= 0)
2379 r = dispatch_wqueue(bus);
2381 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
2382 bus_enter_closing(bus);
2387 if (bus->wqueue_size <= 0)
2390 r = bus_poll(bus, false, (uint64_t) -1);
2396 _public_ int sd_bus_add_filter(sd_bus *bus,
2397 sd_bus_message_handler_t callback,
2400 struct filter_callback *f;
2402 assert_return(bus, -EINVAL);
2403 assert_return(callback, -EINVAL);
2404 assert_return(!bus_pid_changed(bus), -ECHILD);
2406 f = new0(struct filter_callback, 1);
2409 f->callback = callback;
2410 f->userdata = userdata;
2412 bus->filter_callbacks_modified = true;
2413 LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2417 _public_ int sd_bus_remove_filter(sd_bus *bus,
2418 sd_bus_message_handler_t callback,
2421 struct filter_callback *f;
2423 assert_return(bus, -EINVAL);
2424 assert_return(callback, -EINVAL);
2425 assert_return(!bus_pid_changed(bus), -ECHILD);
2427 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2428 if (f->callback == callback && f->userdata == userdata) {
2429 bus->filter_callbacks_modified = true;
2430 LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2439 _public_ int sd_bus_add_match(sd_bus *bus,
2441 sd_bus_message_handler_t callback,
2444 struct bus_match_component *components = NULL;
2445 unsigned n_components = 0;
2446 uint64_t cookie = 0;
2449 assert_return(bus, -EINVAL);
2450 assert_return(match, -EINVAL);
2451 assert_return(!bus_pid_changed(bus), -ECHILD);
2453 r = bus_match_parse(match, &components, &n_components);
2457 if (bus->bus_client) {
2458 cookie = ++bus->match_cookie;
2460 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2465 bus->match_callbacks_modified = true;
2466 r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2468 if (bus->bus_client)
2469 bus_remove_match_internal(bus, match, cookie);
2473 bus_match_parse_free(components, n_components);
2477 _public_ int sd_bus_remove_match(sd_bus *bus,
2479 sd_bus_message_handler_t callback,
2482 struct bus_match_component *components = NULL;
2483 unsigned n_components = 0;
2485 uint64_t cookie = 0;
2487 assert_return(bus, -EINVAL);
2488 assert_return(match, -EINVAL);
2489 assert_return(!bus_pid_changed(bus), -ECHILD);
2491 r = bus_match_parse(match, &components, &n_components);
2495 bus->match_callbacks_modified = true;
2496 r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2498 if (bus->bus_client)
2499 q = bus_remove_match_internal(bus, match, cookie);
2501 bus_match_parse_free(components, n_components);
2503 return r < 0 ? r : q;
2506 bool bus_pid_changed(sd_bus *bus) {
2509 /* We don't support people creating a bus connection and
2510 * keeping it around over a fork(). Let's complain. */
2512 return bus->original_pid != getpid();
2515 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2516 sd_bus *bus = userdata;
2521 r = sd_bus_process(bus, NULL);
2528 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2529 sd_bus *bus = userdata;
2534 r = sd_bus_process(bus, NULL);
2541 static int prepare_callback(sd_event_source *s, void *userdata) {
2542 sd_bus *bus = userdata;
2549 e = sd_bus_get_events(bus);
2553 if (bus->output_fd != bus->input_fd) {
2555 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2559 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2563 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2568 r = sd_bus_get_timeout(bus, &until);
2574 j = sd_event_source_set_time(bus->time_event_source, until);
2579 r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2586 static int quit_callback(sd_event_source *event, void *userdata) {
2587 sd_bus *bus = userdata;
2596 static int attach_io_events(sd_bus *bus) {
2601 if (bus->input_fd < 0)
2607 if (!bus->input_io_event_source) {
2608 r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2612 r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2616 r = sd_event_source_set_priority(bus->input_io_event_source, bus->event_priority);
2618 r = sd_event_source_set_io_fd(bus->input_io_event_source, bus->input_fd);
2623 if (bus->output_fd != bus->input_fd) {
2624 assert(bus->output_fd >= 0);
2626 if (!bus->output_io_event_source) {
2627 r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2631 r = sd_event_source_set_priority(bus->output_io_event_source, bus->event_priority);
2633 r = sd_event_source_set_io_fd(bus->output_io_event_source, bus->output_fd);
2642 static void detach_io_events(sd_bus *bus) {
2645 if (bus->input_io_event_source) {
2646 sd_event_source_set_enabled(bus->input_io_event_source, SD_EVENT_OFF);
2647 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2650 if (bus->output_io_event_source) {
2651 sd_event_source_set_enabled(bus->output_io_event_source, SD_EVENT_OFF);
2652 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2656 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2659 assert_return(bus, -EINVAL);
2660 assert_return(!bus->event, -EBUSY);
2662 assert(!bus->input_io_event_source);
2663 assert(!bus->output_io_event_source);
2664 assert(!bus->time_event_source);
2667 bus->event = sd_event_ref(event);
2669 r = sd_event_default(&bus->event);
2674 bus->event_priority = priority;
2676 r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
2680 r = sd_event_source_set_priority(bus->time_event_source, priority);
2684 r = sd_event_add_exit(bus->event, quit_callback, bus, &bus->quit_event_source);
2688 r = attach_io_events(bus);
2695 sd_bus_detach_event(bus);
2699 _public_ int sd_bus_detach_event(sd_bus *bus) {
2700 assert_return(bus, -EINVAL);
2705 detach_io_events(bus);
2707 if (bus->time_event_source) {
2708 sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);
2709 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2712 if (bus->quit_event_source) {
2713 sd_event_source_set_enabled(bus->quit_event_source, SD_EVENT_OFF);
2714 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2718 bus->event = sd_event_unref(bus->event);
2723 _public_ sd_event* sd_bus_get_event(sd_bus *bus) {
2724 assert_return(bus, NULL);
2729 _public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2730 assert_return(bus, NULL);
2732 return bus->current;
2735 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
2740 assert(default_bus);
2743 return !!*default_bus;
2746 *ret = sd_bus_ref(*default_bus);
2754 b->default_bus_ptr = default_bus;
2762 _public_ int sd_bus_default_system(sd_bus **ret) {
2763 static __thread sd_bus *default_system_bus = NULL;
2765 return bus_default(sd_bus_open_system, &default_system_bus, ret);
2768 _public_ int sd_bus_default_user(sd_bus **ret) {
2769 static __thread sd_bus *default_user_bus = NULL;
2771 return bus_default(sd_bus_open_user, &default_user_bus, ret);
2774 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
2775 assert_return(b, -EINVAL);
2776 assert_return(tid, -EINVAL);
2777 assert_return(!bus_pid_changed(b), -ECHILD);
2785 return sd_event_get_tid(b->event, tid);
2790 _public_ char *sd_bus_label_escape(const char *s) {
2794 assert_return(s, NULL);
2796 /* Escapes all chars that D-Bus' object path cannot deal
2797 * with. Can be reversed with bus_path_unescape(). We special
2798 * case the empty string. */
2803 r = new(char, strlen(s)*3 + 1);
2807 for (f = s, t = r; *f; f++) {
2809 /* Escape everything that is not a-zA-Z0-9. We also
2810 * escape 0-9 if it's the first character */
2812 if (!(*f >= 'A' && *f <= 'Z') &&
2813 !(*f >= 'a' && *f <= 'z') &&
2814 !(f > s && *f >= '0' && *f <= '9')) {
2816 *(t++) = hexchar(*f >> 4);
2817 *(t++) = hexchar(*f);
2827 _public_ char *sd_bus_label_unescape(const char *f) {
2830 assert_return(f, NULL);
2832 /* Special case for the empty string */
2836 r = new(char, strlen(f) + 1);
2840 for (t = r; *f; f++) {
2845 if ((a = unhexchar(f[1])) < 0 ||
2846 (b = unhexchar(f[2])) < 0) {
2847 /* Invalid escape code, let's take it literal then */
2850 *(t++) = (char) ((a << 4) | b);
2862 _public_ int sd_bus_get_peer_creds(sd_bus *bus, uint64_t mask, sd_bus_creds **ret) {
2867 assert_return(bus, -EINVAL);
2868 assert_return(mask <= _SD_BUS_CREDS_ALL, -ENOTSUP);
2869 assert_return(ret, -EINVAL);
2870 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2871 assert_return(!bus_pid_changed(bus), -ECHILD);
2872 assert_return(!bus->is_kernel, -ENOTSUP);
2874 if (!bus->ucred_valid && !isempty(bus->label))
2877 c = bus_creds_new();
2881 if (bus->ucred_valid) {
2882 pid = c->pid = bus->ucred.pid;
2883 c->uid = bus->ucred.uid;
2884 c->gid = bus->ucred.gid;
2886 c->mask |= (SD_BUS_CREDS_UID | SD_BUS_CREDS_PID | SD_BUS_CREDS_GID) & mask;
2889 if (!isempty(bus->label) && (mask & SD_BUS_CREDS_SELINUX_CONTEXT)) {
2890 c->label = strdup(bus->label);
2892 sd_bus_creds_unref(c);
2896 c->mask |= SD_BUS_CREDS_SELINUX_CONTEXT;
2899 r = bus_creds_add_more(c, mask, pid, 0);