1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
39 #include "bus-internal.h"
40 #include "bus-message.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
49 #include "bus-container.h"
50 #include "bus-protocol.h"
52 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
53 static int attach_io_events(sd_bus *b);
54 static void detach_io_events(sd_bus *b);
56 static void bus_close_fds(sd_bus *b) {
62 close_nointr_nofail(b->input_fd);
64 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
65 close_nointr_nofail(b->output_fd);
67 b->input_fd = b->output_fd = -1;
70 static void bus_node_destroy(sd_bus *b, struct node *n) {
71 struct node_callback *c;
72 struct node_vtable *v;
73 struct node_enumerator *e;
81 bus_node_destroy(b, n->child);
83 while ((c = n->callbacks)) {
84 LIST_REMOVE(callbacks, n->callbacks, c);
88 while ((v = n->vtables)) {
89 LIST_REMOVE(vtables, n->vtables, v);
94 while ((e = n->enumerators)) {
95 LIST_REMOVE(enumerators, n->enumerators, e);
100 LIST_REMOVE(siblings, n->parent->child, n);
102 assert_se(hashmap_remove(b->nodes, n->path) == n);
107 static void bus_reset_queues(sd_bus *b) {
112 for (i = 0; i < b->rqueue_size; i++)
113 sd_bus_message_unref(b->rqueue[i]);
116 for (i = 0; i < b->wqueue_size; i++)
117 sd_bus_message_unref(b->wqueue[i]);
120 b->rqueue = b->wqueue = NULL;
121 b->rqueue_size = b->wqueue_size = 0;
124 static void bus_free(sd_bus *b) {
125 struct filter_callback *f;
130 sd_bus_detach_event(b);
135 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
138 free(b->unique_name);
139 free(b->auth_buffer);
145 strv_free(b->exec_argv);
147 close_many(b->fds, b->n_fds);
152 hashmap_free_free(b->reply_callbacks);
153 prioq_free(b->reply_callbacks_prioq);
155 while ((f = b->filter_callbacks)) {
156 LIST_REMOVE(callbacks, b->filter_callbacks, f);
160 bus_match_free(&b->match_callbacks);
162 hashmap_free_free(b->vtable_methods);
163 hashmap_free_free(b->vtable_properties);
165 while ((n = hashmap_first(b->nodes)))
166 bus_node_destroy(b, n);
168 hashmap_free(b->nodes);
170 bus_kernel_flush_memfd(b);
172 assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
177 _public_ int sd_bus_new(sd_bus **ret) {
180 assert_return(ret, -EINVAL);
186 r->n_ref = REFCNT_INIT;
187 r->input_fd = r->output_fd = -1;
188 r->message_version = 1;
189 r->creds_mask |= SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
190 r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
191 r->attach_flags |= KDBUS_ATTACH_NAMES;
192 r->original_pid = getpid();
194 assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
196 /* We guarantee that wqueue always has space for at least one
198 r->wqueue = new(sd_bus_message*, 1);
208 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
211 assert_return(bus, -EINVAL);
212 assert_return(bus->state == BUS_UNSET, -EPERM);
213 assert_return(address, -EINVAL);
214 assert_return(!bus_pid_changed(bus), -ECHILD);
226 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
227 assert_return(bus, -EINVAL);
228 assert_return(bus->state == BUS_UNSET, -EPERM);
229 assert_return(input_fd >= 0, -EINVAL);
230 assert_return(output_fd >= 0, -EINVAL);
231 assert_return(!bus_pid_changed(bus), -ECHILD);
233 bus->input_fd = input_fd;
234 bus->output_fd = output_fd;
238 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
241 assert_return(bus, -EINVAL);
242 assert_return(bus->state == BUS_UNSET, -EPERM);
243 assert_return(path, -EINVAL);
244 assert_return(!strv_isempty(argv), -EINVAL);
245 assert_return(!bus_pid_changed(bus), -ECHILD);
257 free(bus->exec_path);
258 strv_free(bus->exec_argv);
266 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
267 assert_return(bus, -EINVAL);
268 assert_return(bus->state == BUS_UNSET, -EPERM);
269 assert_return(!bus_pid_changed(bus), -ECHILD);
271 bus->bus_client = !!b;
275 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
276 assert_return(bus, -EINVAL);
277 assert_return(bus->state == BUS_UNSET, -EPERM);
278 assert_return(!bus_pid_changed(bus), -ECHILD);
280 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
284 _public_ int sd_bus_negotiate_attach_timestamp(sd_bus *bus, int b) {
285 assert_return(bus, -EINVAL);
286 assert_return(bus->state == BUS_UNSET, -EPERM);
287 assert_return(!bus_pid_changed(bus), -ECHILD);
289 SET_FLAG(bus->attach_flags, KDBUS_ATTACH_TIMESTAMP, b);
293 _public_ int sd_bus_negotiate_attach_creds(sd_bus *bus, uint64_t mask) {
294 assert_return(bus, -EINVAL);
295 assert_return(mask <= _SD_BUS_CREDS_ALL, -EINVAL);
296 assert_return(bus->state == BUS_UNSET, -EPERM);
297 assert_return(!bus_pid_changed(bus), -ECHILD);
299 /* The well knowns we need unconditionally, so that matches can work */
300 bus->creds_mask = mask | SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
302 return kdbus_translate_attach_flags(bus->creds_mask, &bus->creds_mask);
305 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
306 assert_return(bus, -EINVAL);
307 assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
308 assert_return(bus->state == BUS_UNSET, -EPERM);
309 assert_return(!bus_pid_changed(bus), -ECHILD);
311 bus->is_server = !!b;
312 bus->server_id = server_id;
316 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
317 assert_return(bus, -EINVAL);
318 assert_return(bus->state == BUS_UNSET, -EPERM);
319 assert_return(!bus_pid_changed(bus), -ECHILD);
321 bus->anonymous_auth = !!b;
325 _public_ int sd_bus_set_trusted(sd_bus *bus, int b) {
326 assert_return(bus, -EINVAL);
327 assert_return(bus->state == BUS_UNSET, -EPERM);
328 assert_return(!bus_pid_changed(bus), -ECHILD);
334 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata, sd_bus_error *error) {
339 assert(bus->state == BUS_HELLO || bus->state == BUS_CLOSING);
342 r = sd_bus_message_get_errno(reply);
348 r = sd_bus_message_read(reply, "s", &s);
352 if (!service_name_is_valid(s) || s[0] != ':')
355 bus->unique_name = strdup(s);
356 if (!bus->unique_name)
359 if (bus->state == BUS_HELLO)
360 bus->state = BUS_RUNNING;
365 static int bus_send_hello(sd_bus *bus) {
366 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
371 if (!bus->bus_client || bus->is_kernel)
374 r = sd_bus_message_new_method_call(
376 "org.freedesktop.DBus",
378 "org.freedesktop.DBus",
384 return sd_bus_call_async(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
387 int bus_start_running(sd_bus *bus) {
390 if (bus->bus_client && !bus->is_kernel) {
391 bus->state = BUS_HELLO;
395 bus->state = BUS_RUNNING;
399 static int parse_address_key(const char **p, const char *key, char **value) {
410 if (strncmp(*p, key, l) != 0)
423 while (*a != ';' && *a != ',' && *a != 0) {
441 c = (char) ((x << 4) | y);
448 t = realloc(r, n + 2);
476 static void skip_address_key(const char **p) {
480 *p += strcspn(*p, ",");
486 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
487 _cleanup_free_ char *path = NULL, *abstract = NULL;
496 while (**p != 0 && **p != ';') {
497 r = parse_address_key(p, "guid", guid);
503 r = parse_address_key(p, "path", &path);
509 r = parse_address_key(p, "abstract", &abstract);
518 if (!path && !abstract)
521 if (path && abstract)
526 if (l > sizeof(b->sockaddr.un.sun_path))
529 b->sockaddr.un.sun_family = AF_UNIX;
530 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
531 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
532 } else if (abstract) {
533 l = strlen(abstract);
534 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
537 b->sockaddr.un.sun_family = AF_UNIX;
538 b->sockaddr.un.sun_path[0] = 0;
539 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
540 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
546 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
547 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
549 struct addrinfo *result, hints = {
550 .ai_socktype = SOCK_STREAM,
551 .ai_flags = AI_ADDRCONFIG,
559 while (**p != 0 && **p != ';') {
560 r = parse_address_key(p, "guid", guid);
566 r = parse_address_key(p, "host", &host);
572 r = parse_address_key(p, "port", &port);
578 r = parse_address_key(p, "family", &family);
591 if (streq(family, "ipv4"))
592 hints.ai_family = AF_INET;
593 else if (streq(family, "ipv6"))
594 hints.ai_family = AF_INET6;
599 r = getaddrinfo(host, port, &hints, &result);
603 return -EADDRNOTAVAIL;
605 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
606 b->sockaddr_size = result->ai_addrlen;
608 freeaddrinfo(result);
613 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
615 unsigned n_argv = 0, j;
624 while (**p != 0 && **p != ';') {
625 r = parse_address_key(p, "guid", guid);
631 r = parse_address_key(p, "path", &path);
637 if (startswith(*p, "argv")) {
641 ul = strtoul(*p + 4, (char**) p, 10);
642 if (errno > 0 || **p != '=' || ul > 256) {
652 x = realloc(argv, sizeof(char*) * (ul + 2));
658 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
664 r = parse_address_key(p, NULL, argv + ul);
679 /* Make sure there are no holes in the array, with the
680 * exception of argv[0] */
681 for (j = 1; j < n_argv; j++)
687 if (argv && argv[0] == NULL) {
688 argv[0] = strdup(path);
700 for (j = 0; j < n_argv; j++)
708 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
709 _cleanup_free_ char *path = NULL;
717 while (**p != 0 && **p != ';') {
718 r = parse_address_key(p, "guid", guid);
724 r = parse_address_key(p, "path", &path);
743 static int parse_container_address(sd_bus *b, const char **p, char **guid) {
744 _cleanup_free_ char *machine = NULL;
752 while (**p != 0 && **p != ';') {
753 r = parse_address_key(p, "guid", guid);
759 r = parse_address_key(p, "machine", &machine);
771 if (!filename_is_safe(machine))
775 b->machine = machine;
778 b->sockaddr.un.sun_family = AF_UNIX;
779 strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
780 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
785 static void bus_reset_parsed_address(sd_bus *b) {
789 b->sockaddr_size = 0;
790 strv_free(b->exec_argv);
794 b->server_id = SD_ID128_NULL;
801 static int bus_parse_next_address(sd_bus *b) {
802 _cleanup_free_ char *guid = NULL;
810 if (b->address[b->address_index] == 0)
813 bus_reset_parsed_address(b);
815 a = b->address + b->address_index;
824 if (startswith(a, "unix:")) {
827 r = parse_unix_address(b, &a, &guid);
832 } else if (startswith(a, "tcp:")) {
835 r = parse_tcp_address(b, &a, &guid);
841 } else if (startswith(a, "unixexec:")) {
844 r = parse_exec_address(b, &a, &guid);
850 } else if (startswith(a, "kernel:")) {
853 r = parse_kernel_address(b, &a, &guid);
858 } else if (startswith(a, "x-container:")) {
861 r = parse_container_address(b, &a, &guid);
874 r = sd_id128_from_string(guid, &b->server_id);
879 b->address_index = a - b->address;
883 static int bus_start_address(sd_bus *b) {
889 bool skipped = false;
894 r = bus_socket_exec(b);
896 r = bus_kernel_connect(b);
898 r = bus_container_connect(b);
899 else if (b->sockaddr.sa.sa_family != AF_UNSPEC)
900 r = bus_socket_connect(b);
906 r = attach_io_events(b);
911 b->last_connect_error = -r;
914 r = bus_parse_next_address(b);
918 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
922 int bus_next_address(sd_bus *b) {
925 bus_reset_parsed_address(b);
926 return bus_start_address(b);
929 static int bus_start_fd(sd_bus *b) {
934 assert(b->input_fd >= 0);
935 assert(b->output_fd >= 0);
937 r = fd_nonblock(b->input_fd, true);
941 r = fd_cloexec(b->input_fd, true);
945 if (b->input_fd != b->output_fd) {
946 r = fd_nonblock(b->output_fd, true);
950 r = fd_cloexec(b->output_fd, true);
955 if (fstat(b->input_fd, &st) < 0)
958 if (S_ISCHR(b->input_fd))
959 return bus_kernel_take_fd(b);
961 return bus_socket_take_fd(b);
964 _public_ int sd_bus_start(sd_bus *bus) {
967 assert_return(bus, -EINVAL);
968 assert_return(bus->state == BUS_UNSET, -EPERM);
969 assert_return(!bus_pid_changed(bus), -ECHILD);
971 bus->state = BUS_OPENING;
973 if (bus->is_server && bus->bus_client)
976 if (bus->input_fd >= 0)
977 r = bus_start_fd(bus);
978 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
979 r = bus_start_address(bus);
986 return bus_send_hello(bus);
989 _public_ int sd_bus_open_system(sd_bus **ret) {
994 assert_return(ret, -EINVAL);
1000 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1002 r = sd_bus_set_address(b, e);
1005 r = sd_bus_set_address(b, "kernel:path=/dev/kdbus/0-system/bus;unix:path=/run/dbus/system_bus_socket");
1007 r = sd_bus_set_address(b, "unix:path=/run/dbus/system_bus_socket");
1013 b->bus_client = true;
1015 /* Let's do per-method access control on the system bus. We
1016 * need the caller's UID and capability set for that. */
1018 b->attach_flags |= KDBUS_ATTACH_CAPS | KDBUS_ATTACH_CREDS;
1020 r = sd_bus_start(b);
1032 _public_ int sd_bus_open_user(sd_bus **ret) {
1037 assert_return(ret, -EINVAL);
1043 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1045 r = sd_bus_set_address(b, e);
1049 e = secure_getenv("XDG_RUNTIME_DIR");
1051 _cleanup_free_ char *ee = NULL;
1053 ee = bus_address_escape(e);
1060 asprintf(&b->address, "kernel:path=/dev/kdbus/%lu-user/bus;unix:path=%s/bus", (unsigned long) getuid(), ee);
1062 b->address = strjoin("unix:path=", ee, "/bus", NULL);
1066 asprintf(&b->address, "kernel:path=/dev/kdbus/%lu-user/bus", (unsigned long) getuid());
1068 return -ECONNREFUSED;
1078 b->bus_client = true;
1080 /* We don't do any per-method access control on the user
1084 r = sd_bus_start(b);
1096 _public_ int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1097 _cleanup_free_ char *e = NULL;
1102 assert_return(host, -EINVAL);
1103 assert_return(ret, -EINVAL);
1105 e = bus_address_escape(host);
1109 p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1113 r = sd_bus_new(&bus);
1120 bus->bus_client = true;
1122 r = sd_bus_start(bus);
1132 _public_ int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1133 _cleanup_free_ char *e = NULL;
1138 assert_return(machine, -EINVAL);
1139 assert_return(ret, -EINVAL);
1140 assert_return(filename_is_safe(machine), -EINVAL);
1142 e = bus_address_escape(machine);
1147 p = strjoin("kernel:path=/dev/kdbus/ns/machine-", e, "/0-system/bus;x-container:machine=", e, NULL);
1149 p = strjoin("x-container:machine=", e, NULL);
1154 r = sd_bus_new(&bus);
1161 bus->bus_client = true;
1163 r = sd_bus_start(bus);
1173 _public_ void sd_bus_close(sd_bus *bus) {
1177 if (bus->state == BUS_CLOSED)
1179 if (bus_pid_changed(bus))
1182 bus->state = BUS_CLOSED;
1184 sd_bus_detach_event(bus);
1186 /* Drop all queued messages so that they drop references to
1187 * the bus object and the bus may be freed */
1188 bus_reset_queues(bus);
1190 if (!bus->is_kernel)
1193 /* We'll leave the fd open in case this is a kernel bus, since
1194 * there might still be memblocks around that reference this
1195 * bus, and they might need to invoke the * KDBUS_CMD_FREE
1196 * ioctl on the fd when they are freed. */
1199 static void bus_enter_closing(sd_bus *bus) {
1202 if (bus->state != BUS_OPENING &&
1203 bus->state != BUS_AUTHENTICATING &&
1204 bus->state != BUS_HELLO &&
1205 bus->state != BUS_RUNNING)
1208 bus->state = BUS_CLOSING;
1211 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1212 assert_return(bus, NULL);
1214 assert_se(REFCNT_INC(bus->n_ref) >= 2);
1219 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1224 if (REFCNT_DEC(bus->n_ref) <= 0)
1230 _public_ int sd_bus_is_open(sd_bus *bus) {
1232 assert_return(bus, -EINVAL);
1233 assert_return(!bus_pid_changed(bus), -ECHILD);
1235 return BUS_IS_OPEN(bus->state);
1238 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1241 assert_return(bus, -EINVAL);
1242 assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1243 assert_return(!bus_pid_changed(bus), -ECHILD);
1245 if (type == SD_BUS_TYPE_UNIX_FD) {
1246 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1249 r = bus_ensure_running(bus);
1253 return bus->can_fds;
1256 return bus_type_is_valid(type);
1259 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1262 assert_return(bus, -EINVAL);
1263 assert_return(server_id, -EINVAL);
1264 assert_return(!bus_pid_changed(bus), -ECHILD);
1266 r = bus_ensure_running(bus);
1270 *server_id = bus->server_id;
1274 static int bus_seal_message(sd_bus *b, sd_bus_message *m, usec_t timeout) {
1278 if (b->message_version != 0 &&
1279 m->header->version != b->message_version)
1282 if (b->message_endian != 0 &&
1283 m->header->endian != b->message_endian)
1287 /* If we copy the same message to multiple
1288 * destinations, avoid using the same serial
1290 b->serial = MAX(b->serial, BUS_MESSAGE_SERIAL(m));
1295 timeout = BUS_DEFAULT_TIMEOUT;
1297 return bus_message_seal(m, ++b->serial, timeout);
1300 int bus_seal_synthetic_message(sd_bus *b, sd_bus_message *m) {
1304 /* The bus specification says the serial number cannot be 0,
1305 * hence let's fill something in for synthetic messages. Since
1306 * synthetic messages might have a fake sender and we don't
1307 * want to interfere with the real sender's serial numbers we
1308 * pick a fixed, artifical one. We use (uint32_t) -1 rather
1309 * than (uint64_t) -1 since dbus1 only had 32bit identifiers,
1310 * even though kdbus can do 64bit. */
1312 return bus_message_seal(m, 0xFFFFFFFFULL, 0);
1315 static int bus_write_message(sd_bus *bus, sd_bus_message *message, size_t *idx) {
1320 return bus_kernel_write_message(bus, message);
1322 return bus_socket_write_message(bus, message, idx);
1325 static int dispatch_wqueue(sd_bus *bus) {
1329 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1331 while (bus->wqueue_size > 0) {
1333 r = bus_write_message(bus, bus->wqueue[0], &bus->windex);
1337 /* Didn't do anything this time */
1339 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1340 /* Fully written. Let's drop the entry from
1343 * This isn't particularly optimized, but
1344 * well, this is supposed to be our worst-case
1345 * buffer only, and the socket buffer is
1346 * supposed to be our primary buffer, and if
1347 * it got full, then all bets are off
1350 sd_bus_message_unref(bus->wqueue[0]);
1351 bus->wqueue_size --;
1352 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1362 static int bus_read_message(sd_bus *bus) {
1366 return bus_kernel_read_message(bus);
1368 return bus_socket_read_message(bus);
1371 int bus_rqueue_make_room(sd_bus *bus) {
1375 x = bus->rqueue_size + 1;
1377 if (bus->rqueue_allocated >= x)
1380 if (x > BUS_RQUEUE_MAX)
1383 q = realloc(bus->rqueue, x * sizeof(sd_bus_message*));
1388 bus->rqueue_allocated = x;
1393 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1398 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1401 if (bus->rqueue_size > 0) {
1402 /* Dispatch a queued message */
1404 *m = bus->rqueue[0];
1405 bus->rqueue_size --;
1406 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1410 /* Try to read a new message */
1411 r = bus_read_message(bus);
1421 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1424 assert_return(bus, -EINVAL);
1425 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1426 assert_return(m, -EINVAL);
1427 assert_return(!bus_pid_changed(bus), -ECHILD);
1430 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1437 /* If the serial number isn't kept, then we know that no reply
1439 if (!serial && !m->sealed)
1440 m->header->flags |= BUS_MESSAGE_NO_REPLY_EXPECTED;
1442 r = bus_seal_message(bus, m, 0);
1446 /* If this is a reply and no reply was requested, then let's
1447 * suppress this, if we can */
1448 if (m->dont_send && !serial)
1451 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1454 r = bus_write_message(bus, m, &idx);
1456 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1457 bus_enter_closing(bus);
1460 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1461 /* Wasn't fully written. So let's remember how
1462 * much was written. Note that the first entry
1463 * of the wqueue array is always allocated so
1464 * that we always can remember how much was
1466 bus->wqueue[0] = sd_bus_message_ref(m);
1467 bus->wqueue_size = 1;
1473 /* Just append it to the queue. */
1475 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1478 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1483 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1487 *serial = BUS_MESSAGE_SERIAL(m);
1492 _public_ int sd_bus_send_to(sd_bus *bus, sd_bus_message *m, const char *destination, uint64_t *serial) {
1495 assert_return(bus, -EINVAL);
1496 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1497 assert_return(m, -EINVAL);
1498 assert_return(!bus_pid_changed(bus), -ECHILD);
1500 if (!streq_ptr(m->destination, destination)) {
1505 r = sd_bus_message_set_destination(m, destination);
1510 return sd_bus_send(bus, m, serial);
1513 static usec_t calc_elapse(uint64_t usec) {
1514 if (usec == (uint64_t) -1)
1517 return now(CLOCK_MONOTONIC) + usec;
1520 static int timeout_compare(const void *a, const void *b) {
1521 const struct reply_callback *x = a, *y = b;
1523 if (x->timeout != 0 && y->timeout == 0)
1526 if (x->timeout == 0 && y->timeout != 0)
1529 if (x->timeout < y->timeout)
1532 if (x->timeout > y->timeout)
1538 _public_ int sd_bus_call_async(
1541 sd_bus_message_handler_t callback,
1546 struct reply_callback *c;
1549 assert_return(bus, -EINVAL);
1550 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1551 assert_return(m, -EINVAL);
1552 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1553 assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1554 assert_return(callback, -EINVAL);
1555 assert_return(!bus_pid_changed(bus), -ECHILD);
1557 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1561 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1565 r = bus_seal_message(bus, m, usec);
1569 c = new0(struct reply_callback, 1);
1573 c->callback = callback;
1574 c->userdata = userdata;
1575 c->serial = BUS_MESSAGE_SERIAL(m);
1576 c->timeout = calc_elapse(m->timeout);
1578 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1584 if (c->timeout != 0) {
1585 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1588 sd_bus_call_async_cancel(bus, c->serial);
1593 r = sd_bus_send(bus, m, serial);
1595 sd_bus_call_async_cancel(bus, c->serial);
1602 _public_ int sd_bus_call_async_cancel(sd_bus *bus, uint64_t serial) {
1603 struct reply_callback *c;
1605 assert_return(bus, -EINVAL);
1606 assert_return(serial != 0, -EINVAL);
1607 assert_return(!bus_pid_changed(bus), -ECHILD);
1609 c = hashmap_remove(bus->reply_callbacks, &serial);
1613 if (c->timeout != 0)
1614 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1620 int bus_ensure_running(sd_bus *bus) {
1625 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED || bus->state == BUS_CLOSING)
1627 if (bus->state == BUS_RUNNING)
1631 r = sd_bus_process(bus, NULL);
1634 if (bus->state == BUS_RUNNING)
1639 r = sd_bus_wait(bus, (uint64_t) -1);
1645 _public_ int sd_bus_call(
1649 sd_bus_error *error,
1650 sd_bus_message **reply) {
1657 assert_return(bus, -EINVAL);
1658 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1659 assert_return(m, -EINVAL);
1660 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1661 assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1662 assert_return(!bus_error_is_dirty(error), -EINVAL);
1663 assert_return(!bus_pid_changed(bus), -ECHILD);
1665 r = bus_ensure_running(bus);
1669 i = bus->rqueue_size;
1671 r = bus_seal_message(bus, m, usec);
1675 r = sd_bus_send(bus, m, &serial);
1679 timeout = calc_elapse(m->timeout);
1684 while (i < bus->rqueue_size) {
1685 sd_bus_message *incoming = NULL;
1687 incoming = bus->rqueue[i];
1689 if (incoming->reply_serial == serial) {
1690 /* Found a match! */
1692 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1695 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1700 sd_bus_message_unref(incoming);
1703 } else if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR)
1704 r = sd_bus_error_copy(error, &incoming->error);
1708 sd_bus_message_unref(incoming);
1711 } else if (incoming->header->serial == serial &&
1714 streq(bus->unique_name, incoming->sender)) {
1716 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1719 /* Our own message? Somebody is trying
1720 * to send its own client a message,
1721 * let's not dead-lock, let's fail
1724 sd_bus_message_unref(incoming);
1728 /* Try to read more, right-away */
1732 r = bus_read_message(bus);
1734 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1735 bus_enter_closing(bus);
1745 n = now(CLOCK_MONOTONIC);
1751 left = (uint64_t) -1;
1753 r = bus_poll(bus, true, left);
1759 r = dispatch_wqueue(bus);
1761 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1762 bus_enter_closing(bus);
1769 _public_ int sd_bus_get_fd(sd_bus *bus) {
1771 assert_return(bus, -EINVAL);
1772 assert_return(bus->input_fd == bus->output_fd, -EPERM);
1773 assert_return(!bus_pid_changed(bus), -ECHILD);
1775 return bus->input_fd;
1778 _public_ int sd_bus_get_events(sd_bus *bus) {
1781 assert_return(bus, -EINVAL);
1782 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1783 assert_return(!bus_pid_changed(bus), -ECHILD);
1785 if (bus->state == BUS_OPENING)
1787 else if (bus->state == BUS_AUTHENTICATING) {
1789 if (bus_socket_auth_needs_write(bus))
1794 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1795 if (bus->rqueue_size <= 0)
1797 if (bus->wqueue_size > 0)
1804 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1805 struct reply_callback *c;
1807 assert_return(bus, -EINVAL);
1808 assert_return(timeout_usec, -EINVAL);
1809 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1810 assert_return(!bus_pid_changed(bus), -ECHILD);
1812 if (bus->state == BUS_CLOSING) {
1817 if (bus->state == BUS_AUTHENTICATING) {
1818 *timeout_usec = bus->auth_timeout;
1822 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1823 *timeout_usec = (uint64_t) -1;
1827 if (bus->rqueue_size > 0) {
1832 c = prioq_peek(bus->reply_callbacks_prioq);
1834 *timeout_usec = (uint64_t) -1;
1838 *timeout_usec = c->timeout;
1842 static int process_timeout(sd_bus *bus) {
1843 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1844 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1845 struct reply_callback *c;
1851 c = prioq_peek(bus->reply_callbacks_prioq);
1855 n = now(CLOCK_MONOTONIC);
1859 r = bus_message_new_synthetic_error(
1862 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1867 m->sender = "org.freedesktop.DBus";
1869 r = bus_seal_synthetic_message(bus, m);
1873 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1874 hashmap_remove(bus->reply_callbacks, &c->serial);
1877 bus->iteration_counter ++;
1879 r = c->callback(bus, m, c->userdata, &error_buffer);
1880 r = bus_maybe_reply_error(m, r, &error_buffer);
1883 bus->current = NULL;
1888 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1892 if (bus->state != BUS_HELLO)
1895 /* Let's make sure the first message on the bus is the HELLO
1896 * reply. But note that we don't actually parse the message
1897 * here (we leave that to the usual handling), we just verify
1898 * we don't let any earlier msg through. */
1900 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1901 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1904 if (m->reply_serial != bus->hello_serial)
1910 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1911 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1912 struct reply_callback *c;
1918 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1919 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1922 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1926 if (c->timeout != 0)
1927 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1929 r = sd_bus_message_rewind(m, true);
1933 r = c->callback(bus, m, c->userdata, &error_buffer);
1934 r = bus_maybe_reply_error(m, r, &error_buffer);
1940 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1941 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1942 struct filter_callback *l;
1949 bus->filter_callbacks_modified = false;
1951 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1953 if (bus->filter_callbacks_modified)
1956 /* Don't run this more than once per iteration */
1957 if (l->last_iteration == bus->iteration_counter)
1960 l->last_iteration = bus->iteration_counter;
1962 r = sd_bus_message_rewind(m, true);
1966 r = l->callback(bus, m, l->userdata, &error_buffer);
1967 r = bus_maybe_reply_error(m, r, &error_buffer);
1973 } while (bus->filter_callbacks_modified);
1978 static int process_match(sd_bus *bus, sd_bus_message *m) {
1985 bus->match_callbacks_modified = false;
1987 r = bus_match_run(bus, &bus->match_callbacks, m);
1991 } while (bus->match_callbacks_modified);
1996 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1997 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
2003 if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
2006 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
2009 if (m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED)
2012 if (streq_ptr(m->member, "Ping"))
2013 r = sd_bus_message_new_method_return(m, &reply);
2014 else if (streq_ptr(m->member, "GetMachineId")) {
2018 r = sd_id128_get_machine(&id);
2022 r = sd_bus_message_new_method_return(m, &reply);
2026 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
2028 r = sd_bus_message_new_method_errorf(
2030 SD_BUS_ERROR_UNKNOWN_METHOD,
2031 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
2037 r = sd_bus_send(bus, reply, NULL);
2044 static int process_message(sd_bus *bus, sd_bus_message *m) {
2051 bus->iteration_counter++;
2053 log_debug("Got message sender=%s object=%s interface=%s member=%s",
2054 strna(sd_bus_message_get_sender(m)),
2055 strna(sd_bus_message_get_path(m)),
2056 strna(sd_bus_message_get_interface(m)),
2057 strna(sd_bus_message_get_member(m)));
2059 r = process_hello(bus, m);
2063 r = process_reply(bus, m);
2067 r = process_filter(bus, m);
2071 r = process_match(bus, m);
2075 r = process_builtin(bus, m);
2079 r = bus_process_object(bus, m);
2082 bus->current = NULL;
2086 static int process_running(sd_bus *bus, sd_bus_message **ret) {
2087 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2091 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2093 r = process_timeout(bus);
2097 r = dispatch_wqueue(bus);
2101 r = dispatch_rqueue(bus, &m);
2107 r = process_message(bus, m);
2112 r = sd_bus_message_rewind(m, true);
2121 if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2123 r = sd_bus_reply_method_errorf(
2125 SD_BUS_ERROR_UNKNOWN_OBJECT,
2126 "Unknown object '%s'.", m->path);
2140 static int process_closing(sd_bus *bus, sd_bus_message **ret) {
2141 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2142 struct reply_callback *c;
2146 assert(bus->state == BUS_CLOSING);
2148 c = hashmap_first(bus->reply_callbacks);
2150 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2152 /* First, fail all outstanding method calls */
2153 r = bus_message_new_synthetic_error(
2156 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Connection terminated"),
2161 r = bus_seal_synthetic_message(bus, m);
2165 if (c->timeout != 0)
2166 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
2168 hashmap_remove(bus->reply_callbacks, &c->serial);
2171 bus->iteration_counter++;
2173 r = c->callback(bus, m, c->userdata, &error_buffer);
2174 r = bus_maybe_reply_error(m, r, &error_buffer);
2180 /* Then, synthesize a Disconnected message */
2181 r = sd_bus_message_new_signal(
2183 "/org/freedesktop/DBus/Local",
2184 "org.freedesktop.DBus.Local",
2190 m->sender = "org.freedesktop.DBus.Local";
2192 r = bus_seal_synthetic_message(bus, m);
2199 bus->iteration_counter++;
2201 r = process_filter(bus, m);
2205 r = process_match(bus, m);
2217 bus->current = NULL;
2221 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2222 BUS_DONT_DESTROY(bus);
2225 /* Returns 0 when we didn't do anything. This should cause the
2226 * caller to invoke sd_bus_wait() before returning the next
2227 * time. Returns > 0 when we did something, which possibly
2228 * means *ret is filled in with an unprocessed message. */
2230 assert_return(bus, -EINVAL);
2231 assert_return(!bus_pid_changed(bus), -ECHILD);
2233 /* We don't allow recursively invoking sd_bus_process(). */
2234 assert_return(!bus->current, -EBUSY);
2236 switch (bus->state) {
2245 r = bus_socket_process_opening(bus);
2246 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2247 bus_enter_closing(bus);
2255 case BUS_AUTHENTICATING:
2256 r = bus_socket_process_authenticating(bus);
2257 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2258 bus_enter_closing(bus);
2270 r = process_running(bus, ret);
2271 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2272 bus_enter_closing(bus);
2282 return process_closing(bus, ret);
2285 assert_not_reached("Unknown state");
2288 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2289 struct pollfd p[2] = {};
2292 usec_t m = (usec_t) -1;
2296 if (bus->state == BUS_CLOSING)
2299 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2301 e = sd_bus_get_events(bus);
2306 /* The caller really needs some more data, he doesn't
2307 * care about what's already read, or any timeouts
2312 /* The caller wants to process if there's something to
2313 * process, but doesn't care otherwise */
2315 r = sd_bus_get_timeout(bus, &until);
2320 nw = now(CLOCK_MONOTONIC);
2321 m = until > nw ? until - nw : 0;
2325 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2328 p[0].fd = bus->input_fd;
2329 if (bus->output_fd == bus->input_fd) {
2333 p[0].events = e & POLLIN;
2334 p[1].fd = bus->output_fd;
2335 p[1].events = e & POLLOUT;
2339 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2343 return r > 0 ? 1 : 0;
2346 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2348 assert_return(bus, -EINVAL);
2349 assert_return(!bus_pid_changed(bus), -ECHILD);
2351 if (bus->state == BUS_CLOSING)
2354 assert_return(BUS_IS_OPEN(bus->state) , -ENOTCONN);
2356 if (bus->rqueue_size > 0)
2359 return bus_poll(bus, false, timeout_usec);
2362 _public_ int sd_bus_flush(sd_bus *bus) {
2365 assert_return(bus, -EINVAL);
2366 assert_return(!bus_pid_changed(bus), -ECHILD);
2368 if (bus->state == BUS_CLOSING)
2371 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2373 r = bus_ensure_running(bus);
2377 if (bus->wqueue_size <= 0)
2381 r = dispatch_wqueue(bus);
2383 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
2384 bus_enter_closing(bus);
2389 if (bus->wqueue_size <= 0)
2392 r = bus_poll(bus, false, (uint64_t) -1);
2398 _public_ int sd_bus_add_filter(sd_bus *bus,
2399 sd_bus_message_handler_t callback,
2402 struct filter_callback *f;
2404 assert_return(bus, -EINVAL);
2405 assert_return(callback, -EINVAL);
2406 assert_return(!bus_pid_changed(bus), -ECHILD);
2408 f = new0(struct filter_callback, 1);
2411 f->callback = callback;
2412 f->userdata = userdata;
2414 bus->filter_callbacks_modified = true;
2415 LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2419 _public_ int sd_bus_remove_filter(sd_bus *bus,
2420 sd_bus_message_handler_t callback,
2423 struct filter_callback *f;
2425 assert_return(bus, -EINVAL);
2426 assert_return(callback, -EINVAL);
2427 assert_return(!bus_pid_changed(bus), -ECHILD);
2429 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2430 if (f->callback == callback && f->userdata == userdata) {
2431 bus->filter_callbacks_modified = true;
2432 LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2441 _public_ int sd_bus_add_match(sd_bus *bus,
2443 sd_bus_message_handler_t callback,
2446 struct bus_match_component *components = NULL;
2447 unsigned n_components = 0;
2448 uint64_t cookie = 0;
2451 assert_return(bus, -EINVAL);
2452 assert_return(match, -EINVAL);
2453 assert_return(!bus_pid_changed(bus), -ECHILD);
2455 r = bus_match_parse(match, &components, &n_components);
2459 if (bus->bus_client) {
2460 cookie = ++bus->match_cookie;
2462 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2467 bus->match_callbacks_modified = true;
2468 r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2470 if (bus->bus_client)
2471 bus_remove_match_internal(bus, match, cookie);
2475 bus_match_parse_free(components, n_components);
2479 _public_ int sd_bus_remove_match(sd_bus *bus,
2481 sd_bus_message_handler_t callback,
2484 struct bus_match_component *components = NULL;
2485 unsigned n_components = 0;
2487 uint64_t cookie = 0;
2489 assert_return(bus, -EINVAL);
2490 assert_return(match, -EINVAL);
2491 assert_return(!bus_pid_changed(bus), -ECHILD);
2493 r = bus_match_parse(match, &components, &n_components);
2497 bus->match_callbacks_modified = true;
2498 r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2500 if (bus->bus_client)
2501 q = bus_remove_match_internal(bus, match, cookie);
2503 bus_match_parse_free(components, n_components);
2505 return r < 0 ? r : q;
2508 bool bus_pid_changed(sd_bus *bus) {
2511 /* We don't support people creating a bus connection and
2512 * keeping it around over a fork(). Let's complain. */
2514 return bus->original_pid != getpid();
2517 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2518 sd_bus *bus = userdata;
2523 r = sd_bus_process(bus, NULL);
2530 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2531 sd_bus *bus = userdata;
2536 r = sd_bus_process(bus, NULL);
2543 static int prepare_callback(sd_event_source *s, void *userdata) {
2544 sd_bus *bus = userdata;
2551 e = sd_bus_get_events(bus);
2555 if (bus->output_fd != bus->input_fd) {
2557 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2561 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2565 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2570 r = sd_bus_get_timeout(bus, &until);
2576 j = sd_event_source_set_time(bus->time_event_source, until);
2581 r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2588 static int quit_callback(sd_event_source *event, void *userdata) {
2589 sd_bus *bus = userdata;
2598 static int attach_io_events(sd_bus *bus) {
2603 if (bus->input_fd < 0)
2609 if (!bus->input_io_event_source) {
2610 r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2614 r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2618 r = sd_event_source_set_priority(bus->input_io_event_source, bus->event_priority);
2620 r = sd_event_source_set_io_fd(bus->input_io_event_source, bus->input_fd);
2625 if (bus->output_fd != bus->input_fd) {
2626 assert(bus->output_fd >= 0);
2628 if (!bus->output_io_event_source) {
2629 r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2633 r = sd_event_source_set_priority(bus->output_io_event_source, bus->event_priority);
2635 r = sd_event_source_set_io_fd(bus->output_io_event_source, bus->output_fd);
2644 static void detach_io_events(sd_bus *bus) {
2647 if (bus->input_io_event_source) {
2648 sd_event_source_set_enabled(bus->input_io_event_source, SD_EVENT_OFF);
2649 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2652 if (bus->output_io_event_source) {
2653 sd_event_source_set_enabled(bus->output_io_event_source, SD_EVENT_OFF);
2654 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2658 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2661 assert_return(bus, -EINVAL);
2662 assert_return(!bus->event, -EBUSY);
2664 assert(!bus->input_io_event_source);
2665 assert(!bus->output_io_event_source);
2666 assert(!bus->time_event_source);
2669 bus->event = sd_event_ref(event);
2671 r = sd_event_default(&bus->event);
2676 bus->event_priority = priority;
2678 r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
2682 r = sd_event_source_set_priority(bus->time_event_source, priority);
2686 r = sd_event_add_exit(bus->event, quit_callback, bus, &bus->quit_event_source);
2690 r = attach_io_events(bus);
2697 sd_bus_detach_event(bus);
2701 _public_ int sd_bus_detach_event(sd_bus *bus) {
2702 assert_return(bus, -EINVAL);
2707 detach_io_events(bus);
2709 if (bus->time_event_source) {
2710 sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);
2711 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2714 if (bus->quit_event_source) {
2715 sd_event_source_set_enabled(bus->quit_event_source, SD_EVENT_OFF);
2716 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2720 bus->event = sd_event_unref(bus->event);
2725 _public_ sd_event* sd_bus_get_event(sd_bus *bus) {
2726 assert_return(bus, NULL);
2731 _public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2732 assert_return(bus, NULL);
2734 return bus->current;
2737 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
2742 assert(default_bus);
2745 return !!*default_bus;
2748 *ret = sd_bus_ref(*default_bus);
2756 b->default_bus_ptr = default_bus;
2764 _public_ int sd_bus_default_system(sd_bus **ret) {
2765 static __thread sd_bus *default_system_bus = NULL;
2767 return bus_default(sd_bus_open_system, &default_system_bus, ret);
2770 _public_ int sd_bus_default_user(sd_bus **ret) {
2771 static __thread sd_bus *default_user_bus = NULL;
2773 return bus_default(sd_bus_open_user, &default_user_bus, ret);
2776 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
2777 assert_return(b, -EINVAL);
2778 assert_return(tid, -EINVAL);
2779 assert_return(!bus_pid_changed(b), -ECHILD);
2787 return sd_event_get_tid(b->event, tid);
2792 _public_ char *sd_bus_label_escape(const char *s) {
2796 assert_return(s, NULL);
2798 /* Escapes all chars that D-Bus' object path cannot deal
2799 * with. Can be reversed with bus_path_unescape(). We special
2800 * case the empty string. */
2805 r = new(char, strlen(s)*3 + 1);
2809 for (f = s, t = r; *f; f++) {
2811 /* Escape everything that is not a-zA-Z0-9. We also
2812 * escape 0-9 if it's the first character */
2814 if (!(*f >= 'A' && *f <= 'Z') &&
2815 !(*f >= 'a' && *f <= 'z') &&
2816 !(f > s && *f >= '0' && *f <= '9')) {
2818 *(t++) = hexchar(*f >> 4);
2819 *(t++) = hexchar(*f);
2829 _public_ char *sd_bus_label_unescape(const char *f) {
2832 assert_return(f, NULL);
2834 /* Special case for the empty string */
2838 r = new(char, strlen(f) + 1);
2842 for (t = r; *f; f++) {
2847 if ((a = unhexchar(f[1])) < 0 ||
2848 (b = unhexchar(f[2])) < 0) {
2849 /* Invalid escape code, let's take it literal then */
2852 *(t++) = (char) ((a << 4) | b);
2864 _public_ int sd_bus_get_peer_creds(sd_bus *bus, uint64_t mask, sd_bus_creds **ret) {
2869 assert_return(bus, -EINVAL);
2870 assert_return(mask <= _SD_BUS_CREDS_ALL, -ENOTSUP);
2871 assert_return(ret, -EINVAL);
2872 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2873 assert_return(!bus_pid_changed(bus), -ECHILD);
2874 assert_return(!bus->is_kernel, -ENOTSUP);
2876 if (!bus->ucred_valid && !isempty(bus->label))
2879 c = bus_creds_new();
2883 if (bus->ucred_valid) {
2884 pid = c->pid = bus->ucred.pid;
2885 c->uid = bus->ucred.uid;
2886 c->gid = bus->ucred.gid;
2888 c->mask |= (SD_BUS_CREDS_UID | SD_BUS_CREDS_PID | SD_BUS_CREDS_GID) & mask;
2891 if (!isempty(bus->label) && (mask & SD_BUS_CREDS_SELINUX_CONTEXT)) {
2892 c->label = strdup(bus->label);
2894 sd_bus_creds_unref(c);
2898 c->mask |= SD_BUS_CREDS_SELINUX_CONTEXT;
2901 r = bus_creds_add_more(c, mask, pid, 0);