1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
39 #include "bus-internal.h"
40 #include "bus-message.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
49 #include "bus-container.h"
50 #include "bus-protocol.h"
52 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
53 static int attach_io_events(sd_bus *b);
54 static void detach_io_events(sd_bus *b);
56 static void bus_close_fds(sd_bus *b) {
62 close_nointr_nofail(b->input_fd);
64 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
65 close_nointr_nofail(b->output_fd);
67 b->input_fd = b->output_fd = -1;
70 static void bus_node_destroy(sd_bus *b, struct node *n) {
71 struct node_callback *c;
72 struct node_vtable *v;
73 struct node_enumerator *e;
81 bus_node_destroy(b, n->child);
83 while ((c = n->callbacks)) {
84 LIST_REMOVE(callbacks, n->callbacks, c);
88 while ((v = n->vtables)) {
89 LIST_REMOVE(vtables, n->vtables, v);
94 while ((e = n->enumerators)) {
95 LIST_REMOVE(enumerators, n->enumerators, e);
100 LIST_REMOVE(siblings, n->parent->child, n);
102 assert_se(hashmap_remove(b->nodes, n->path) == n);
107 static void bus_reset_queues(sd_bus *b) {
112 for (i = 0; i < b->rqueue_size; i++)
113 sd_bus_message_unref(b->rqueue[i]);
116 for (i = 0; i < b->wqueue_size; i++)
117 sd_bus_message_unref(b->wqueue[i]);
120 b->rqueue = b->wqueue = NULL;
121 b->rqueue_allocated = b->wqueue_allocated = 0;
122 b->rqueue_size = b->wqueue_size = 0;
125 static void bus_free(sd_bus *b) {
126 struct filter_callback *f;
131 sd_bus_detach_event(b);
136 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
139 free(b->unique_name);
140 free(b->auth_buffer);
145 free(b->cgroup_root);
148 strv_free(b->exec_argv);
150 close_many(b->fds, b->n_fds);
155 hashmap_free_free(b->reply_callbacks);
156 prioq_free(b->reply_callbacks_prioq);
158 while ((f = b->filter_callbacks)) {
159 LIST_REMOVE(callbacks, b->filter_callbacks, f);
163 bus_match_free(&b->match_callbacks);
165 hashmap_free_free(b->vtable_methods);
166 hashmap_free_free(b->vtable_properties);
168 while ((n = hashmap_first(b->nodes)))
169 bus_node_destroy(b, n);
171 hashmap_free(b->nodes);
173 bus_kernel_flush_memfd(b);
175 assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
180 _public_ int sd_bus_new(sd_bus **ret) {
183 assert_return(ret, -EINVAL);
189 r->n_ref = REFCNT_INIT;
190 r->input_fd = r->output_fd = -1;
191 r->message_version = 1;
192 r->creds_mask |= SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
193 r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
194 r->attach_flags |= KDBUS_ATTACH_NAMES;
195 r->original_pid = getpid();
197 assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
199 /* We guarantee that wqueue always has space for at least one
201 if (!GREEDY_REALLOC(r->wqueue, r->wqueue_allocated, 1)) {
210 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
213 assert_return(bus, -EINVAL);
214 assert_return(bus->state == BUS_UNSET, -EPERM);
215 assert_return(address, -EINVAL);
216 assert_return(!bus_pid_changed(bus), -ECHILD);
228 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
229 assert_return(bus, -EINVAL);
230 assert_return(bus->state == BUS_UNSET, -EPERM);
231 assert_return(input_fd >= 0, -EINVAL);
232 assert_return(output_fd >= 0, -EINVAL);
233 assert_return(!bus_pid_changed(bus), -ECHILD);
235 bus->input_fd = input_fd;
236 bus->output_fd = output_fd;
240 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
243 assert_return(bus, -EINVAL);
244 assert_return(bus->state == BUS_UNSET, -EPERM);
245 assert_return(path, -EINVAL);
246 assert_return(!strv_isempty(argv), -EINVAL);
247 assert_return(!bus_pid_changed(bus), -ECHILD);
259 free(bus->exec_path);
260 strv_free(bus->exec_argv);
268 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
269 assert_return(bus, -EINVAL);
270 assert_return(bus->state == BUS_UNSET, -EPERM);
271 assert_return(!bus_pid_changed(bus), -ECHILD);
273 bus->bus_client = !!b;
277 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
278 assert_return(bus, -EINVAL);
279 assert_return(bus->state == BUS_UNSET, -EPERM);
280 assert_return(!bus_pid_changed(bus), -ECHILD);
282 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
286 _public_ int sd_bus_negotiate_attach_timestamp(sd_bus *bus, int b) {
287 assert_return(bus, -EINVAL);
288 assert_return(bus->state == BUS_UNSET, -EPERM);
289 assert_return(!bus_pid_changed(bus), -ECHILD);
291 SET_FLAG(bus->attach_flags, KDBUS_ATTACH_TIMESTAMP, b);
295 _public_ int sd_bus_negotiate_attach_creds(sd_bus *bus, uint64_t mask) {
296 assert_return(bus, -EINVAL);
297 assert_return(mask <= _SD_BUS_CREDS_ALL, -EINVAL);
298 assert_return(bus->state == BUS_UNSET, -EPERM);
299 assert_return(!bus_pid_changed(bus), -ECHILD);
301 /* The well knowns we need unconditionally, so that matches can work */
302 bus->creds_mask = mask | SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
304 return kdbus_translate_attach_flags(bus->creds_mask, &bus->creds_mask);
307 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
308 assert_return(bus, -EINVAL);
309 assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
310 assert_return(bus->state == BUS_UNSET, -EPERM);
311 assert_return(!bus_pid_changed(bus), -ECHILD);
313 bus->is_server = !!b;
314 bus->server_id = server_id;
318 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
319 assert_return(bus, -EINVAL);
320 assert_return(bus->state == BUS_UNSET, -EPERM);
321 assert_return(!bus_pid_changed(bus), -ECHILD);
323 bus->anonymous_auth = !!b;
327 _public_ int sd_bus_set_trusted(sd_bus *bus, int b) {
328 assert_return(bus, -EINVAL);
329 assert_return(bus->state == BUS_UNSET, -EPERM);
330 assert_return(!bus_pid_changed(bus), -ECHILD);
336 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata, sd_bus_error *error) {
341 assert(bus->state == BUS_HELLO || bus->state == BUS_CLOSING);
344 r = sd_bus_message_get_errno(reply);
350 r = sd_bus_message_read(reply, "s", &s);
354 if (!service_name_is_valid(s) || s[0] != ':')
357 bus->unique_name = strdup(s);
358 if (!bus->unique_name)
361 if (bus->state == BUS_HELLO)
362 bus->state = BUS_RUNNING;
367 static int bus_send_hello(sd_bus *bus) {
368 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
373 if (!bus->bus_client || bus->is_kernel)
376 r = sd_bus_message_new_method_call(
378 "org.freedesktop.DBus",
379 "/org/freedesktop/DBus",
380 "org.freedesktop.DBus",
386 return sd_bus_call_async(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
389 int bus_start_running(sd_bus *bus) {
392 if (bus->bus_client && !bus->is_kernel) {
393 bus->state = BUS_HELLO;
397 bus->state = BUS_RUNNING;
401 static int parse_address_key(const char **p, const char *key, char **value) {
402 size_t l, n = 0, allocated = 0;
412 if (strncmp(*p, key, l) != 0)
425 while (*a != ';' && *a != ',' && *a != 0) {
443 c = (char) ((x << 4) | y);
450 if (!GREEDY_REALLOC(r, allocated, n + 2))
474 static void skip_address_key(const char **p) {
478 *p += strcspn(*p, ",");
484 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
485 _cleanup_free_ char *path = NULL, *abstract = NULL;
494 while (**p != 0 && **p != ';') {
495 r = parse_address_key(p, "guid", guid);
501 r = parse_address_key(p, "path", &path);
507 r = parse_address_key(p, "abstract", &abstract);
516 if (!path && !abstract)
519 if (path && abstract)
524 if (l > sizeof(b->sockaddr.un.sun_path))
527 b->sockaddr.un.sun_family = AF_UNIX;
528 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
529 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
530 } else if (abstract) {
531 l = strlen(abstract);
532 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
535 b->sockaddr.un.sun_family = AF_UNIX;
536 b->sockaddr.un.sun_path[0] = 0;
537 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
538 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
544 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
545 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
547 struct addrinfo *result, hints = {
548 .ai_socktype = SOCK_STREAM,
549 .ai_flags = AI_ADDRCONFIG,
557 while (**p != 0 && **p != ';') {
558 r = parse_address_key(p, "guid", guid);
564 r = parse_address_key(p, "host", &host);
570 r = parse_address_key(p, "port", &port);
576 r = parse_address_key(p, "family", &family);
589 if (streq(family, "ipv4"))
590 hints.ai_family = AF_INET;
591 else if (streq(family, "ipv6"))
592 hints.ai_family = AF_INET6;
597 r = getaddrinfo(host, port, &hints, &result);
601 return -EADDRNOTAVAIL;
603 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
604 b->sockaddr_size = result->ai_addrlen;
606 freeaddrinfo(result);
611 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
613 unsigned n_argv = 0, j;
615 size_t allocated = 0;
623 while (**p != 0 && **p != ';') {
624 r = parse_address_key(p, "guid", guid);
630 r = parse_address_key(p, "path", &path);
636 if (startswith(*p, "argv")) {
640 ul = strtoul(*p + 4, (char**) p, 10);
641 if (errno > 0 || **p != '=' || ul > 256) {
649 if (!GREEDY_REALLOC0(argv, allocated, ul + 2)) {
657 r = parse_address_key(p, NULL, argv + ul);
672 /* Make sure there are no holes in the array, with the
673 * exception of argv[0] */
674 for (j = 1; j < n_argv; j++)
680 if (argv && argv[0] == NULL) {
681 argv[0] = strdup(path);
693 for (j = 0; j < n_argv; j++)
701 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
702 _cleanup_free_ char *path = NULL;
710 while (**p != 0 && **p != ';') {
711 r = parse_address_key(p, "guid", guid);
717 r = parse_address_key(p, "path", &path);
736 static int parse_container_unix_address(sd_bus *b, const char **p, char **guid) {
737 _cleanup_free_ char *machine = NULL;
745 while (**p != 0 && **p != ';') {
746 r = parse_address_key(p, "guid", guid);
752 r = parse_address_key(p, "machine", &machine);
764 if (!filename_is_safe(machine))
768 b->machine = machine;
771 b->sockaddr.un.sun_family = AF_UNIX;
772 strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
773 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
778 static int parse_container_kernel_address(sd_bus *b, const char **p, char **guid) {
779 _cleanup_free_ char *machine = NULL;
787 while (**p != 0 && **p != ';') {
788 r = parse_address_key(p, "guid", guid);
794 r = parse_address_key(p, "machine", &machine);
806 if (!filename_is_safe(machine))
810 b->machine = machine;
814 b->kernel = strdup("/dev/kdbus/0-system/bus");
821 static void bus_reset_parsed_address(sd_bus *b) {
825 b->sockaddr_size = 0;
826 strv_free(b->exec_argv);
830 b->server_id = SD_ID128_NULL;
837 static int bus_parse_next_address(sd_bus *b) {
838 _cleanup_free_ char *guid = NULL;
846 if (b->address[b->address_index] == 0)
849 bus_reset_parsed_address(b);
851 a = b->address + b->address_index;
860 if (startswith(a, "unix:")) {
863 r = parse_unix_address(b, &a, &guid);
868 } else if (startswith(a, "tcp:")) {
871 r = parse_tcp_address(b, &a, &guid);
877 } else if (startswith(a, "unixexec:")) {
880 r = parse_exec_address(b, &a, &guid);
886 } else if (startswith(a, "kernel:")) {
889 r = parse_kernel_address(b, &a, &guid);
894 } else if (startswith(a, "x-container-unix:")) {
897 r = parse_container_unix_address(b, &a, &guid);
902 } else if (startswith(a, "x-container-kernel:")) {
905 r = parse_container_kernel_address(b, &a, &guid);
918 r = sd_id128_from_string(guid, &b->server_id);
923 b->address_index = a - b->address;
927 static int bus_start_address(sd_bus *b) {
933 bool skipped = false;
938 r = bus_socket_exec(b);
939 else if (b->machine && b->kernel)
940 r = bus_container_connect_kernel(b);
941 else if (b->machine && b->sockaddr.sa.sa_family != AF_UNSPEC)
942 r = bus_container_connect_socket(b);
944 r = bus_kernel_connect(b);
945 else if (b->sockaddr.sa.sa_family != AF_UNSPEC)
946 r = bus_socket_connect(b);
952 r = attach_io_events(b);
957 b->last_connect_error = -r;
960 r = bus_parse_next_address(b);
964 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
968 int bus_next_address(sd_bus *b) {
971 bus_reset_parsed_address(b);
972 return bus_start_address(b);
975 static int bus_start_fd(sd_bus *b) {
980 assert(b->input_fd >= 0);
981 assert(b->output_fd >= 0);
983 r = fd_nonblock(b->input_fd, true);
987 r = fd_cloexec(b->input_fd, true);
991 if (b->input_fd != b->output_fd) {
992 r = fd_nonblock(b->output_fd, true);
996 r = fd_cloexec(b->output_fd, true);
1001 if (fstat(b->input_fd, &st) < 0)
1004 if (S_ISCHR(b->input_fd))
1005 return bus_kernel_take_fd(b);
1007 return bus_socket_take_fd(b);
1010 _public_ int sd_bus_start(sd_bus *bus) {
1013 assert_return(bus, -EINVAL);
1014 assert_return(bus->state == BUS_UNSET, -EPERM);
1015 assert_return(!bus_pid_changed(bus), -ECHILD);
1017 bus->state = BUS_OPENING;
1019 if (bus->is_server && bus->bus_client)
1022 if (bus->input_fd >= 0)
1023 r = bus_start_fd(bus);
1024 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
1025 r = bus_start_address(bus);
1032 return bus_send_hello(bus);
1035 _public_ int sd_bus_open_system(sd_bus **ret) {
1040 assert_return(ret, -EINVAL);
1046 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1048 r = sd_bus_set_address(b, e);
1051 r = sd_bus_set_address(b, "kernel:path=/dev/kdbus/0-system/bus;unix:path=/run/dbus/system_bus_socket");
1053 r = sd_bus_set_address(b, "unix:path=/run/dbus/system_bus_socket");
1059 b->bus_client = true;
1061 /* Let's do per-method access control on the system bus. We
1062 * need the caller's UID and capability set for that. */
1064 b->attach_flags |= KDBUS_ATTACH_CAPS | KDBUS_ATTACH_CREDS;
1066 r = sd_bus_start(b);
1078 _public_ int sd_bus_open_user(sd_bus **ret) {
1083 assert_return(ret, -EINVAL);
1089 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1091 r = sd_bus_set_address(b, e);
1095 e = secure_getenv("XDG_RUNTIME_DIR");
1097 _cleanup_free_ char *ee = NULL;
1099 ee = bus_address_escape(e);
1106 asprintf(&b->address, "kernel:path=/dev/kdbus/%lu-user/bus;unix:path=%s/bus", (unsigned long) getuid(), ee);
1108 b->address = strjoin("unix:path=", ee, "/bus", NULL);
1112 asprintf(&b->address, "kernel:path=/dev/kdbus/%lu-user/bus", (unsigned long) getuid());
1114 return -ECONNREFUSED;
1124 b->bus_client = true;
1126 /* We don't do any per-method access control on the user
1130 r = sd_bus_start(b);
1142 _public_ int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1143 _cleanup_free_ char *e = NULL;
1148 assert_return(host, -EINVAL);
1149 assert_return(ret, -EINVAL);
1151 e = bus_address_escape(host);
1155 p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1159 r = sd_bus_new(&bus);
1166 bus->bus_client = true;
1168 r = sd_bus_start(bus);
1178 _public_ int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1179 _cleanup_free_ char *e = NULL;
1184 assert_return(machine, -EINVAL);
1185 assert_return(ret, -EINVAL);
1186 assert_return(filename_is_safe(machine), -EINVAL);
1188 e = bus_address_escape(machine);
1193 p = strjoin("x-container-kernel:machine=", e, ";x-container-unix:machine=", e, NULL);
1195 p = strjoin("x-container-unix:machine=", e, NULL);
1200 r = sd_bus_new(&bus);
1207 bus->bus_client = true;
1209 r = sd_bus_start(bus);
1219 _public_ void sd_bus_close(sd_bus *bus) {
1223 if (bus->state == BUS_CLOSED)
1225 if (bus_pid_changed(bus))
1228 bus->state = BUS_CLOSED;
1230 sd_bus_detach_event(bus);
1232 /* Drop all queued messages so that they drop references to
1233 * the bus object and the bus may be freed */
1234 bus_reset_queues(bus);
1236 if (!bus->is_kernel)
1239 /* We'll leave the fd open in case this is a kernel bus, since
1240 * there might still be memblocks around that reference this
1241 * bus, and they might need to invoke the * KDBUS_CMD_FREE
1242 * ioctl on the fd when they are freed. */
1245 static void bus_enter_closing(sd_bus *bus) {
1248 if (bus->state != BUS_OPENING &&
1249 bus->state != BUS_AUTHENTICATING &&
1250 bus->state != BUS_HELLO &&
1251 bus->state != BUS_RUNNING)
1254 bus->state = BUS_CLOSING;
1257 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1258 assert_return(bus, NULL);
1260 assert_se(REFCNT_INC(bus->n_ref) >= 2);
1265 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1270 if (REFCNT_DEC(bus->n_ref) <= 0)
1276 _public_ int sd_bus_is_open(sd_bus *bus) {
1278 assert_return(bus, -EINVAL);
1279 assert_return(!bus_pid_changed(bus), -ECHILD);
1281 return BUS_IS_OPEN(bus->state);
1284 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1287 assert_return(bus, -EINVAL);
1288 assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1289 assert_return(!bus_pid_changed(bus), -ECHILD);
1291 if (type == SD_BUS_TYPE_UNIX_FD) {
1292 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1295 r = bus_ensure_running(bus);
1299 return bus->can_fds;
1302 return bus_type_is_valid(type);
1305 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1308 assert_return(bus, -EINVAL);
1309 assert_return(server_id, -EINVAL);
1310 assert_return(!bus_pid_changed(bus), -ECHILD);
1312 r = bus_ensure_running(bus);
1316 *server_id = bus->server_id;
1320 static int bus_seal_message(sd_bus *b, sd_bus_message *m, usec_t timeout) {
1325 /* If we copy the same message to multiple
1326 * destinations, avoid using the same serial
1328 b->serial = MAX(b->serial, BUS_MESSAGE_SERIAL(m));
1333 timeout = BUS_DEFAULT_TIMEOUT;
1335 return bus_message_seal(m, ++b->serial, timeout);
1338 static int bus_remarshal_message(sd_bus *b, sd_bus_message **m) {
1341 /* Do packet version and endianess already match? */
1342 if ((b->message_version == 0 || b->message_version == (*m)->header->version) &&
1343 (b->message_endian == 0 || b->message_endian == (*m)->header->endian))
1346 /* No? Then remarshal! */
1347 return bus_message_remarshal(b, m);
1350 int bus_seal_synthetic_message(sd_bus *b, sd_bus_message *m) {
1354 /* The bus specification says the serial number cannot be 0,
1355 * hence let's fill something in for synthetic messages. Since
1356 * synthetic messages might have a fake sender and we don't
1357 * want to interfere with the real sender's serial numbers we
1358 * pick a fixed, artifical one. We use (uint32_t) -1 rather
1359 * than (uint64_t) -1 since dbus1 only had 32bit identifiers,
1360 * even though kdbus can do 64bit. */
1362 return bus_message_seal(m, 0xFFFFFFFFULL, 0);
1365 static int bus_write_message(sd_bus *bus, sd_bus_message *message, size_t *idx) {
1370 return bus_kernel_write_message(bus, message);
1372 return bus_socket_write_message(bus, message, idx);
1375 static int dispatch_wqueue(sd_bus *bus) {
1379 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1381 while (bus->wqueue_size > 0) {
1383 r = bus_write_message(bus, bus->wqueue[0], &bus->windex);
1387 /* Didn't do anything this time */
1389 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1390 /* Fully written. Let's drop the entry from
1393 * This isn't particularly optimized, but
1394 * well, this is supposed to be our worst-case
1395 * buffer only, and the socket buffer is
1396 * supposed to be our primary buffer, and if
1397 * it got full, then all bets are off
1400 sd_bus_message_unref(bus->wqueue[0]);
1401 bus->wqueue_size --;
1402 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1412 static int bus_read_message(sd_bus *bus) {
1416 return bus_kernel_read_message(bus);
1418 return bus_socket_read_message(bus);
1421 int bus_rqueue_make_room(sd_bus *bus) {
1424 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1427 if (!GREEDY_REALLOC(bus->rqueue, bus->rqueue_allocated, bus->rqueue_size + 1))
1433 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1438 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1441 if (bus->rqueue_size > 0) {
1442 /* Dispatch a queued message */
1444 *m = bus->rqueue[0];
1445 bus->rqueue_size --;
1446 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1450 /* Try to read a new message */
1451 r = bus_read_message(bus);
1461 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *_m, uint64_t *serial) {
1462 _cleanup_bus_message_unref_ sd_bus_message *m = sd_bus_message_ref(_m);
1465 assert_return(bus, -EINVAL);
1466 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1467 assert_return(m, -EINVAL);
1468 assert_return(!bus_pid_changed(bus), -ECHILD);
1471 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1478 /* If the serial number isn't kept, then we know that no reply
1480 if (!serial && !m->sealed)
1481 m->header->flags |= BUS_MESSAGE_NO_REPLY_EXPECTED;
1483 r = bus_seal_message(bus, m, 0);
1487 /* Remarshall if we have to. This will possible unref the
1488 * message and place a replacement in m */
1489 r = bus_remarshal_message(bus, &m);
1493 /* If this is a reply and no reply was requested, then let's
1494 * suppress this, if we can */
1495 if (m->dont_send && !serial)
1498 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1501 r = bus_write_message(bus, m, &idx);
1503 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1504 bus_enter_closing(bus);
1507 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1508 /* Wasn't fully written. So let's remember how
1509 * much was written. Note that the first entry
1510 * of the wqueue array is always allocated so
1511 * that we always can remember how much was
1513 bus->wqueue[0] = sd_bus_message_ref(m);
1514 bus->wqueue_size = 1;
1518 /* Just append it to the queue. */
1520 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1523 if (!GREEDY_REALLOC(bus->wqueue, bus->wqueue_allocated, bus->wqueue_size + 1))
1526 bus->wqueue[bus->wqueue_size ++] = sd_bus_message_ref(m);
1530 *serial = BUS_MESSAGE_SERIAL(m);
1535 _public_ int sd_bus_send_to(sd_bus *bus, sd_bus_message *m, const char *destination, uint64_t *serial) {
1538 assert_return(bus, -EINVAL);
1539 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1540 assert_return(m, -EINVAL);
1541 assert_return(!bus_pid_changed(bus), -ECHILD);
1543 if (!streq_ptr(m->destination, destination)) {
1548 r = sd_bus_message_set_destination(m, destination);
1553 return sd_bus_send(bus, m, serial);
1556 static usec_t calc_elapse(uint64_t usec) {
1557 if (usec == (uint64_t) -1)
1560 return now(CLOCK_MONOTONIC) + usec;
1563 static int timeout_compare(const void *a, const void *b) {
1564 const struct reply_callback *x = a, *y = b;
1566 if (x->timeout != 0 && y->timeout == 0)
1569 if (x->timeout == 0 && y->timeout != 0)
1572 if (x->timeout < y->timeout)
1575 if (x->timeout > y->timeout)
1581 _public_ int sd_bus_call_async(
1584 sd_bus_message_handler_t callback,
1589 _cleanup_bus_message_unref_ sd_bus_message *m = sd_bus_message_ref(_m);
1590 struct reply_callback *c;
1593 assert_return(bus, -EINVAL);
1594 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1595 assert_return(m, -EINVAL);
1596 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1597 assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1598 assert_return(callback, -EINVAL);
1599 assert_return(!bus_pid_changed(bus), -ECHILD);
1601 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1605 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1609 r = bus_seal_message(bus, m, usec);
1613 r = bus_remarshal_message(bus, &m);
1617 c = new0(struct reply_callback, 1);
1621 c->callback = callback;
1622 c->userdata = userdata;
1623 c->serial = BUS_MESSAGE_SERIAL(m);
1624 c->timeout = calc_elapse(m->timeout);
1626 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1632 if (c->timeout != 0) {
1633 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1636 sd_bus_call_async_cancel(bus, c->serial);
1641 r = sd_bus_send(bus, m, serial);
1643 sd_bus_call_async_cancel(bus, c->serial);
1650 _public_ int sd_bus_call_async_cancel(sd_bus *bus, uint64_t serial) {
1651 struct reply_callback *c;
1653 assert_return(bus, -EINVAL);
1654 assert_return(serial != 0, -EINVAL);
1655 assert_return(!bus_pid_changed(bus), -ECHILD);
1657 c = hashmap_remove(bus->reply_callbacks, &serial);
1661 if (c->timeout != 0)
1662 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1668 int bus_ensure_running(sd_bus *bus) {
1673 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED || bus->state == BUS_CLOSING)
1675 if (bus->state == BUS_RUNNING)
1679 r = sd_bus_process(bus, NULL);
1682 if (bus->state == BUS_RUNNING)
1687 r = sd_bus_wait(bus, (uint64_t) -1);
1693 _public_ int sd_bus_call(
1697 sd_bus_error *error,
1698 sd_bus_message **reply) {
1700 _cleanup_bus_message_unref_ sd_bus_message *m = sd_bus_message_ref(_m);
1706 assert_return(bus, -EINVAL);
1707 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1708 assert_return(m, -EINVAL);
1709 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1710 assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1711 assert_return(!bus_error_is_dirty(error), -EINVAL);
1712 assert_return(!bus_pid_changed(bus), -ECHILD);
1714 r = bus_ensure_running(bus);
1718 i = bus->rqueue_size;
1720 r = bus_seal_message(bus, m, usec);
1724 r = bus_remarshal_message(bus, &m);
1728 r = sd_bus_send(bus, m, &serial);
1732 timeout = calc_elapse(m->timeout);
1737 while (i < bus->rqueue_size) {
1738 sd_bus_message *incoming = NULL;
1740 incoming = bus->rqueue[i];
1742 if (incoming->reply_serial == serial) {
1743 /* Found a match! */
1745 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1748 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1753 sd_bus_message_unref(incoming);
1756 } else if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR)
1757 r = sd_bus_error_copy(error, &incoming->error);
1761 sd_bus_message_unref(incoming);
1764 } else if (incoming->header->serial == serial &&
1767 streq(bus->unique_name, incoming->sender)) {
1769 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1772 /* Our own message? Somebody is trying
1773 * to send its own client a message,
1774 * let's not dead-lock, let's fail
1777 sd_bus_message_unref(incoming);
1781 /* Try to read more, right-away */
1785 r = bus_read_message(bus);
1787 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1788 bus_enter_closing(bus);
1798 n = now(CLOCK_MONOTONIC);
1804 left = (uint64_t) -1;
1806 r = bus_poll(bus, true, left);
1812 r = dispatch_wqueue(bus);
1814 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1815 bus_enter_closing(bus);
1822 _public_ int sd_bus_get_fd(sd_bus *bus) {
1824 assert_return(bus, -EINVAL);
1825 assert_return(bus->input_fd == bus->output_fd, -EPERM);
1826 assert_return(!bus_pid_changed(bus), -ECHILD);
1828 return bus->input_fd;
1831 _public_ int sd_bus_get_events(sd_bus *bus) {
1834 assert_return(bus, -EINVAL);
1835 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1836 assert_return(!bus_pid_changed(bus), -ECHILD);
1838 if (bus->state == BUS_OPENING)
1840 else if (bus->state == BUS_AUTHENTICATING) {
1842 if (bus_socket_auth_needs_write(bus))
1847 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1848 if (bus->rqueue_size <= 0)
1850 if (bus->wqueue_size > 0)
1857 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1858 struct reply_callback *c;
1860 assert_return(bus, -EINVAL);
1861 assert_return(timeout_usec, -EINVAL);
1862 assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1863 assert_return(!bus_pid_changed(bus), -ECHILD);
1865 if (bus->state == BUS_CLOSING) {
1870 if (bus->state == BUS_AUTHENTICATING) {
1871 *timeout_usec = bus->auth_timeout;
1875 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1876 *timeout_usec = (uint64_t) -1;
1880 if (bus->rqueue_size > 0) {
1885 c = prioq_peek(bus->reply_callbacks_prioq);
1887 *timeout_usec = (uint64_t) -1;
1891 *timeout_usec = c->timeout;
1895 static int process_timeout(sd_bus *bus) {
1896 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1897 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1898 struct reply_callback *c;
1904 c = prioq_peek(bus->reply_callbacks_prioq);
1908 n = now(CLOCK_MONOTONIC);
1912 r = bus_message_new_synthetic_error(
1915 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1920 m->sender = "org.freedesktop.DBus";
1922 r = bus_seal_synthetic_message(bus, m);
1926 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1927 hashmap_remove(bus->reply_callbacks, &c->serial);
1930 bus->iteration_counter ++;
1932 r = c->callback(bus, m, c->userdata, &error_buffer);
1933 r = bus_maybe_reply_error(m, r, &error_buffer);
1936 bus->current = NULL;
1941 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1945 if (bus->state != BUS_HELLO)
1948 /* Let's make sure the first message on the bus is the HELLO
1949 * reply. But note that we don't actually parse the message
1950 * here (we leave that to the usual handling), we just verify
1951 * we don't let any earlier msg through. */
1953 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1954 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1957 if (m->reply_serial != bus->hello_serial)
1963 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1964 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1965 struct reply_callback *c;
1971 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1972 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1975 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1979 if (c->timeout != 0)
1980 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1982 r = sd_bus_message_rewind(m, true);
1986 r = c->callback(bus, m, c->userdata, &error_buffer);
1987 r = bus_maybe_reply_error(m, r, &error_buffer);
1993 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1994 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1995 struct filter_callback *l;
2002 bus->filter_callbacks_modified = false;
2004 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
2006 if (bus->filter_callbacks_modified)
2009 /* Don't run this more than once per iteration */
2010 if (l->last_iteration == bus->iteration_counter)
2013 l->last_iteration = bus->iteration_counter;
2015 r = sd_bus_message_rewind(m, true);
2019 r = l->callback(bus, m, l->userdata, &error_buffer);
2020 r = bus_maybe_reply_error(m, r, &error_buffer);
2026 } while (bus->filter_callbacks_modified);
2031 static int process_match(sd_bus *bus, sd_bus_message *m) {
2038 bus->match_callbacks_modified = false;
2040 r = bus_match_run(bus, &bus->match_callbacks, m);
2044 } while (bus->match_callbacks_modified);
2049 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
2050 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
2056 if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
2059 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
2062 if (m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED)
2065 if (streq_ptr(m->member, "Ping"))
2066 r = sd_bus_message_new_method_return(m, &reply);
2067 else if (streq_ptr(m->member, "GetMachineId")) {
2071 r = sd_id128_get_machine(&id);
2075 r = sd_bus_message_new_method_return(m, &reply);
2079 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
2081 r = sd_bus_message_new_method_errorf(
2083 SD_BUS_ERROR_UNKNOWN_METHOD,
2084 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
2090 r = sd_bus_send(bus, reply, NULL);
2097 static int process_message(sd_bus *bus, sd_bus_message *m) {
2104 bus->iteration_counter++;
2106 log_debug("Got message sender=%s object=%s interface=%s member=%s",
2107 strna(sd_bus_message_get_sender(m)),
2108 strna(sd_bus_message_get_path(m)),
2109 strna(sd_bus_message_get_interface(m)),
2110 strna(sd_bus_message_get_member(m)));
2112 r = process_hello(bus, m);
2116 r = process_reply(bus, m);
2120 r = process_filter(bus, m);
2124 r = process_match(bus, m);
2128 r = process_builtin(bus, m);
2132 r = bus_process_object(bus, m);
2135 bus->current = NULL;
2139 static int process_running(sd_bus *bus, sd_bus_message **ret) {
2140 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2144 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2146 r = process_timeout(bus);
2150 r = dispatch_wqueue(bus);
2154 r = dispatch_rqueue(bus, &m);
2160 r = process_message(bus, m);
2165 r = sd_bus_message_rewind(m, true);
2174 if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2176 log_debug("Unprocessed message call sender=%s object=%s interface=%s member=%s",
2177 strna(sd_bus_message_get_sender(m)),
2178 strna(sd_bus_message_get_path(m)),
2179 strna(sd_bus_message_get_interface(m)),
2180 strna(sd_bus_message_get_member(m)));
2182 r = sd_bus_reply_method_errorf(
2184 SD_BUS_ERROR_UNKNOWN_OBJECT,
2185 "Unknown object '%s'.", m->path);
2199 static int process_closing(sd_bus *bus, sd_bus_message **ret) {
2200 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2201 struct reply_callback *c;
2205 assert(bus->state == BUS_CLOSING);
2207 c = hashmap_first(bus->reply_callbacks);
2209 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2211 /* First, fail all outstanding method calls */
2212 r = bus_message_new_synthetic_error(
2215 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Connection terminated"),
2220 r = bus_seal_synthetic_message(bus, m);
2224 if (c->timeout != 0)
2225 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
2227 hashmap_remove(bus->reply_callbacks, &c->serial);
2230 bus->iteration_counter++;
2232 r = c->callback(bus, m, c->userdata, &error_buffer);
2233 r = bus_maybe_reply_error(m, r, &error_buffer);
2239 /* Then, synthesize a Disconnected message */
2240 r = sd_bus_message_new_signal(
2242 "/org/freedesktop/DBus/Local",
2243 "org.freedesktop.DBus.Local",
2249 m->sender = "org.freedesktop.DBus.Local";
2251 r = bus_seal_synthetic_message(bus, m);
2258 bus->iteration_counter++;
2260 r = process_filter(bus, m);
2264 r = process_match(bus, m);
2276 bus->current = NULL;
2280 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2281 BUS_DONT_DESTROY(bus);
2284 /* Returns 0 when we didn't do anything. This should cause the
2285 * caller to invoke sd_bus_wait() before returning the next
2286 * time. Returns > 0 when we did something, which possibly
2287 * means *ret is filled in with an unprocessed message. */
2289 assert_return(bus, -EINVAL);
2290 assert_return(!bus_pid_changed(bus), -ECHILD);
2292 /* We don't allow recursively invoking sd_bus_process(). */
2293 assert_return(!bus->current, -EBUSY);
2295 switch (bus->state) {
2304 r = bus_socket_process_opening(bus);
2305 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2306 bus_enter_closing(bus);
2314 case BUS_AUTHENTICATING:
2315 r = bus_socket_process_authenticating(bus);
2316 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2317 bus_enter_closing(bus);
2329 r = process_running(bus, ret);
2330 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2331 bus_enter_closing(bus);
2341 return process_closing(bus, ret);
2344 assert_not_reached("Unknown state");
2347 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2348 struct pollfd p[2] = {};
2351 usec_t m = (usec_t) -1;
2355 if (bus->state == BUS_CLOSING)
2358 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2360 e = sd_bus_get_events(bus);
2365 /* The caller really needs some more data, he doesn't
2366 * care about what's already read, or any timeouts
2371 /* The caller wants to process if there's something to
2372 * process, but doesn't care otherwise */
2374 r = sd_bus_get_timeout(bus, &until);
2379 nw = now(CLOCK_MONOTONIC);
2380 m = until > nw ? until - nw : 0;
2384 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2387 p[0].fd = bus->input_fd;
2388 if (bus->output_fd == bus->input_fd) {
2392 p[0].events = e & POLLIN;
2393 p[1].fd = bus->output_fd;
2394 p[1].events = e & POLLOUT;
2398 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2402 return r > 0 ? 1 : 0;
2405 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2407 assert_return(bus, -EINVAL);
2408 assert_return(!bus_pid_changed(bus), -ECHILD);
2410 if (bus->state == BUS_CLOSING)
2413 assert_return(BUS_IS_OPEN(bus->state) , -ENOTCONN);
2415 if (bus->rqueue_size > 0)
2418 return bus_poll(bus, false, timeout_usec);
2421 _public_ int sd_bus_flush(sd_bus *bus) {
2424 assert_return(bus, -EINVAL);
2425 assert_return(!bus_pid_changed(bus), -ECHILD);
2427 if (bus->state == BUS_CLOSING)
2430 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2432 r = bus_ensure_running(bus);
2436 if (bus->wqueue_size <= 0)
2440 r = dispatch_wqueue(bus);
2442 if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
2443 bus_enter_closing(bus);
2448 if (bus->wqueue_size <= 0)
2451 r = bus_poll(bus, false, (uint64_t) -1);
2457 _public_ int sd_bus_add_filter(sd_bus *bus,
2458 sd_bus_message_handler_t callback,
2461 struct filter_callback *f;
2463 assert_return(bus, -EINVAL);
2464 assert_return(callback, -EINVAL);
2465 assert_return(!bus_pid_changed(bus), -ECHILD);
2467 f = new0(struct filter_callback, 1);
2470 f->callback = callback;
2471 f->userdata = userdata;
2473 bus->filter_callbacks_modified = true;
2474 LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2478 _public_ int sd_bus_remove_filter(sd_bus *bus,
2479 sd_bus_message_handler_t callback,
2482 struct filter_callback *f;
2484 assert_return(bus, -EINVAL);
2485 assert_return(callback, -EINVAL);
2486 assert_return(!bus_pid_changed(bus), -ECHILD);
2488 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2489 if (f->callback == callback && f->userdata == userdata) {
2490 bus->filter_callbacks_modified = true;
2491 LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2500 _public_ int sd_bus_add_match(sd_bus *bus,
2502 sd_bus_message_handler_t callback,
2505 struct bus_match_component *components = NULL;
2506 unsigned n_components = 0;
2507 uint64_t cookie = 0;
2510 assert_return(bus, -EINVAL);
2511 assert_return(match, -EINVAL);
2512 assert_return(!bus_pid_changed(bus), -ECHILD);
2514 r = bus_match_parse(match, &components, &n_components);
2518 if (bus->bus_client) {
2519 cookie = ++bus->match_cookie;
2521 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2526 bus->match_callbacks_modified = true;
2527 r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2529 if (bus->bus_client)
2530 bus_remove_match_internal(bus, match, cookie);
2534 bus_match_parse_free(components, n_components);
2538 _public_ int sd_bus_remove_match(sd_bus *bus,
2540 sd_bus_message_handler_t callback,
2543 struct bus_match_component *components = NULL;
2544 unsigned n_components = 0;
2546 uint64_t cookie = 0;
2548 assert_return(bus, -EINVAL);
2549 assert_return(match, -EINVAL);
2550 assert_return(!bus_pid_changed(bus), -ECHILD);
2552 r = bus_match_parse(match, &components, &n_components);
2556 bus->match_callbacks_modified = true;
2557 r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2559 if (bus->bus_client)
2560 q = bus_remove_match_internal(bus, match, cookie);
2562 bus_match_parse_free(components, n_components);
2564 return r < 0 ? r : q;
2567 bool bus_pid_changed(sd_bus *bus) {
2570 /* We don't support people creating a bus connection and
2571 * keeping it around over a fork(). Let's complain. */
2573 return bus->original_pid != getpid();
2576 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2577 sd_bus *bus = userdata;
2582 r = sd_bus_process(bus, NULL);
2589 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2590 sd_bus *bus = userdata;
2595 r = sd_bus_process(bus, NULL);
2602 static int prepare_callback(sd_event_source *s, void *userdata) {
2603 sd_bus *bus = userdata;
2610 e = sd_bus_get_events(bus);
2614 if (bus->output_fd != bus->input_fd) {
2616 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2620 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2624 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2629 r = sd_bus_get_timeout(bus, &until);
2635 j = sd_event_source_set_time(bus->time_event_source, until);
2640 r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2647 static int quit_callback(sd_event_source *event, void *userdata) {
2648 sd_bus *bus = userdata;
2657 static int attach_io_events(sd_bus *bus) {
2662 if (bus->input_fd < 0)
2668 if (!bus->input_io_event_source) {
2669 r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2673 r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2677 r = sd_event_source_set_priority(bus->input_io_event_source, bus->event_priority);
2679 r = sd_event_source_set_io_fd(bus->input_io_event_source, bus->input_fd);
2684 if (bus->output_fd != bus->input_fd) {
2685 assert(bus->output_fd >= 0);
2687 if (!bus->output_io_event_source) {
2688 r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2692 r = sd_event_source_set_priority(bus->output_io_event_source, bus->event_priority);
2694 r = sd_event_source_set_io_fd(bus->output_io_event_source, bus->output_fd);
2703 static void detach_io_events(sd_bus *bus) {
2706 if (bus->input_io_event_source) {
2707 sd_event_source_set_enabled(bus->input_io_event_source, SD_EVENT_OFF);
2708 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2711 if (bus->output_io_event_source) {
2712 sd_event_source_set_enabled(bus->output_io_event_source, SD_EVENT_OFF);
2713 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2717 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2720 assert_return(bus, -EINVAL);
2721 assert_return(!bus->event, -EBUSY);
2723 assert(!bus->input_io_event_source);
2724 assert(!bus->output_io_event_source);
2725 assert(!bus->time_event_source);
2728 bus->event = sd_event_ref(event);
2730 r = sd_event_default(&bus->event);
2735 bus->event_priority = priority;
2737 r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
2741 r = sd_event_source_set_priority(bus->time_event_source, priority);
2745 r = sd_event_add_exit(bus->event, quit_callback, bus, &bus->quit_event_source);
2749 r = attach_io_events(bus);
2756 sd_bus_detach_event(bus);
2760 _public_ int sd_bus_detach_event(sd_bus *bus) {
2761 assert_return(bus, -EINVAL);
2766 detach_io_events(bus);
2768 if (bus->time_event_source) {
2769 sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);
2770 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2773 if (bus->quit_event_source) {
2774 sd_event_source_set_enabled(bus->quit_event_source, SD_EVENT_OFF);
2775 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2779 bus->event = sd_event_unref(bus->event);
2784 _public_ sd_event* sd_bus_get_event(sd_bus *bus) {
2785 assert_return(bus, NULL);
2790 _public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2791 assert_return(bus, NULL);
2793 return bus->current;
2796 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
2801 assert(default_bus);
2804 return !!*default_bus;
2807 *ret = sd_bus_ref(*default_bus);
2815 b->default_bus_ptr = default_bus;
2823 _public_ int sd_bus_default_system(sd_bus **ret) {
2824 static thread_local sd_bus *default_system_bus = NULL;
2826 return bus_default(sd_bus_open_system, &default_system_bus, ret);
2829 _public_ int sd_bus_default_user(sd_bus **ret) {
2830 static thread_local sd_bus *default_user_bus = NULL;
2832 return bus_default(sd_bus_open_user, &default_user_bus, ret);
2835 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
2836 assert_return(b, -EINVAL);
2837 assert_return(tid, -EINVAL);
2838 assert_return(!bus_pid_changed(b), -ECHILD);
2846 return sd_event_get_tid(b->event, tid);
2851 _public_ char *sd_bus_label_escape(const char *s) {
2855 assert_return(s, NULL);
2857 /* Escapes all chars that D-Bus' object path cannot deal
2858 * with. Can be reversed with bus_path_unescape(). We special
2859 * case the empty string. */
2864 r = new(char, strlen(s)*3 + 1);
2868 for (f = s, t = r; *f; f++) {
2870 /* Escape everything that is not a-zA-Z0-9. We also
2871 * escape 0-9 if it's the first character */
2873 if (!(*f >= 'A' && *f <= 'Z') &&
2874 !(*f >= 'a' && *f <= 'z') &&
2875 !(f > s && *f >= '0' && *f <= '9')) {
2877 *(t++) = hexchar(*f >> 4);
2878 *(t++) = hexchar(*f);
2888 _public_ char *sd_bus_label_unescape(const char *f) {
2891 assert_return(f, NULL);
2893 /* Special case for the empty string */
2897 r = new(char, strlen(f) + 1);
2901 for (t = r; *f; f++) {
2906 if ((a = unhexchar(f[1])) < 0 ||
2907 (b = unhexchar(f[2])) < 0) {
2908 /* Invalid escape code, let's take it literal then */
2911 *(t++) = (char) ((a << 4) | b);
2923 _public_ int sd_bus_get_peer_creds(sd_bus *bus, uint64_t mask, sd_bus_creds **ret) {
2928 assert_return(bus, -EINVAL);
2929 assert_return(mask <= _SD_BUS_CREDS_ALL, -ENOTSUP);
2930 assert_return(ret, -EINVAL);
2931 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2932 assert_return(!bus_pid_changed(bus), -ECHILD);
2933 assert_return(!bus->is_kernel, -ENOTSUP);
2935 if (!bus->ucred_valid && !isempty(bus->label))
2938 c = bus_creds_new();
2942 if (bus->ucred_valid) {
2943 pid = c->pid = bus->ucred.pid;
2944 c->uid = bus->ucred.uid;
2945 c->gid = bus->ucred.gid;
2947 c->mask |= (SD_BUS_CREDS_UID | SD_BUS_CREDS_PID | SD_BUS_CREDS_GID) & mask;
2950 if (!isempty(bus->label) && (mask & SD_BUS_CREDS_SELINUX_CONTEXT)) {
2951 c->label = strdup(bus->label);
2953 sd_bus_creds_unref(c);
2957 c->mask |= SD_BUS_CREDS_SELINUX_CONTEXT;
2960 r = bus_creds_add_more(c, mask, pid, 0);
2968 _public_ int sd_bus_try_close(sd_bus *bus) {
2971 assert_return(bus, -EINVAL);
2972 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2973 assert_return(!bus_pid_changed(bus), -ECHILD);
2974 assert_return(bus->is_kernel, -ENOTSUP);
2976 if (bus->rqueue_size > 0)
2979 if (bus->wqueue_size > 0)
2982 r = bus_kernel_try_close(bus);