1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
39 #include "bus-internal.h"
40 #include "bus-message.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
49 #include "bus-container.h"
51 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
53 static void bus_close_fds(sd_bus *b) {
57 close_nointr_nofail(b->input_fd);
59 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
60 close_nointr_nofail(b->output_fd);
62 b->input_fd = b->output_fd = -1;
65 static void bus_node_destroy(sd_bus *b, struct node *n) {
66 struct node_callback *c;
67 struct node_vtable *v;
68 struct node_enumerator *e;
76 bus_node_destroy(b, n->child);
78 while ((c = n->callbacks)) {
79 LIST_REMOVE(callbacks, n->callbacks, c);
83 while ((v = n->vtables)) {
84 LIST_REMOVE(vtables, n->vtables, v);
89 while ((e = n->enumerators)) {
90 LIST_REMOVE(enumerators, n->enumerators, e);
95 LIST_REMOVE(siblings, n->parent->child, n);
97 assert_se(hashmap_remove(b->nodes, n->path) == n);
102 static void bus_free(sd_bus *b) {
103 struct filter_callback *f;
109 sd_bus_detach_event(b);
114 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
117 free(b->unique_name);
118 free(b->auth_buffer);
124 strv_free(b->exec_argv);
126 close_many(b->fds, b->n_fds);
129 for (i = 0; i < b->rqueue_size; i++)
130 sd_bus_message_unref(b->rqueue[i]);
133 for (i = 0; i < b->wqueue_size; i++)
134 sd_bus_message_unref(b->wqueue[i]);
137 hashmap_free_free(b->reply_callbacks);
138 prioq_free(b->reply_callbacks_prioq);
140 while ((f = b->filter_callbacks)) {
141 LIST_REMOVE(callbacks, b->filter_callbacks, f);
145 bus_match_free(&b->match_callbacks);
147 hashmap_free_free(b->vtable_methods);
148 hashmap_free_free(b->vtable_properties);
150 while ((n = hashmap_first(b->nodes)))
151 bus_node_destroy(b, n);
153 hashmap_free(b->nodes);
155 bus_kernel_flush_memfd(b);
157 assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
162 _public_ int sd_bus_new(sd_bus **ret) {
165 assert_return(ret, -EINVAL);
171 r->n_ref = REFCNT_INIT;
172 r->input_fd = r->output_fd = -1;
173 r->message_version = 1;
174 r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
175 r->original_pid = getpid();
177 assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
179 /* We guarantee that wqueue always has space for at least one
181 r->wqueue = new(sd_bus_message*, 1);
191 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
194 assert_return(bus, -EINVAL);
195 assert_return(bus->state == BUS_UNSET, -EPERM);
196 assert_return(address, -EINVAL);
197 assert_return(!bus_pid_changed(bus), -ECHILD);
209 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
210 assert_return(bus, -EINVAL);
211 assert_return(bus->state == BUS_UNSET, -EPERM);
212 assert_return(input_fd >= 0, -EINVAL);
213 assert_return(output_fd >= 0, -EINVAL);
214 assert_return(!bus_pid_changed(bus), -ECHILD);
216 bus->input_fd = input_fd;
217 bus->output_fd = output_fd;
221 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
224 assert_return(bus, -EINVAL);
225 assert_return(bus->state == BUS_UNSET, -EPERM);
226 assert_return(path, -EINVAL);
227 assert_return(!strv_isempty(argv), -EINVAL);
228 assert_return(!bus_pid_changed(bus), -ECHILD);
240 free(bus->exec_path);
241 strv_free(bus->exec_argv);
249 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
250 assert_return(bus, -EINVAL);
251 assert_return(bus->state == BUS_UNSET, -EPERM);
252 assert_return(!bus_pid_changed(bus), -ECHILD);
254 bus->bus_client = !!b;
258 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
259 assert_return(bus, -EINVAL);
260 assert_return(bus->state == BUS_UNSET, -EPERM);
261 assert_return(!bus_pid_changed(bus), -ECHILD);
263 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
267 _public_ int sd_bus_negotiate_attach_comm(sd_bus *bus, int b) {
268 assert_return(bus, -EINVAL);
269 assert_return(bus->state == BUS_UNSET, -EPERM);
270 assert_return(!bus_pid_changed(bus), -ECHILD);
272 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_COMM, b);
276 _public_ int sd_bus_negotiate_attach_exe(sd_bus *bus, int b) {
277 assert_return(bus, -EINVAL);
278 assert_return(bus->state == BUS_UNSET, -EPERM);
279 assert_return(!bus_pid_changed(bus), -ECHILD);
281 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_EXE, b);
285 _public_ int sd_bus_negotiate_attach_cmdline(sd_bus *bus, int b) {
286 assert_return(bus, -EINVAL);
287 assert_return(bus->state == BUS_UNSET, -EPERM);
288 assert_return(!bus_pid_changed(bus), -ECHILD);
290 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CMDLINE, b);
294 _public_ int sd_bus_negotiate_attach_cgroup(sd_bus *bus, int b) {
295 assert_return(bus, -EINVAL);
296 assert_return(bus->state == BUS_UNSET, -EPERM);
297 assert_return(!bus_pid_changed(bus), -ECHILD);
299 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CGROUP, b);
303 _public_ int sd_bus_negotiate_attach_caps(sd_bus *bus, int b) {
304 assert_return(bus, -EINVAL);
305 assert_return(bus->state == BUS_UNSET, -EPERM);
306 assert_return(!bus_pid_changed(bus), -ECHILD);
308 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CAPS, b);
312 _public_ int sd_bus_negotiate_attach_selinux_context(sd_bus *bus, int b) {
313 assert_return(bus, -EINVAL);
314 assert_return(bus->state == BUS_UNSET, -EPERM);
315 assert_return(!bus_pid_changed(bus), -ECHILD);
317 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_SECLABEL, b);
321 _public_ int sd_bus_negotiate_attach_audit(sd_bus *bus, int b) {
322 assert_return(bus, -EINVAL);
323 assert_return(bus->state == BUS_UNSET, -EPERM);
324 assert_return(!bus_pid_changed(bus), -ECHILD);
326 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_AUDIT, b);
330 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
331 assert_return(bus, -EINVAL);
332 assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
333 assert_return(bus->state == BUS_UNSET, -EPERM);
334 assert_return(!bus_pid_changed(bus), -ECHILD);
336 bus->is_server = !!b;
337 bus->server_id = server_id;
341 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
342 assert_return(bus, -EINVAL);
343 assert_return(bus->state == BUS_UNSET, -EPERM);
344 assert_return(!bus_pid_changed(bus), -ECHILD);
346 bus->anonymous_auth = !!b;
350 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
355 assert(bus->state == BUS_HELLO);
358 r = sd_bus_message_get_errno(reply);
364 r = sd_bus_message_read(reply, "s", &s);
368 if (!service_name_is_valid(s) || s[0] != ':')
371 bus->unique_name = strdup(s);
372 if (!bus->unique_name)
375 bus->state = BUS_RUNNING;
380 static int bus_send_hello(sd_bus *bus) {
381 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
386 if (!bus->bus_client || bus->is_kernel)
389 r = sd_bus_message_new_method_call(
391 "org.freedesktop.DBus",
393 "org.freedesktop.DBus",
399 return sd_bus_call_async(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
402 int bus_start_running(sd_bus *bus) {
405 if (bus->bus_client && !bus->is_kernel) {
406 bus->state = BUS_HELLO;
410 bus->state = BUS_RUNNING;
414 static int parse_address_key(const char **p, const char *key, char **value) {
425 if (strncmp(*p, key, l) != 0)
438 while (*a != ';' && *a != ',' && *a != 0) {
456 c = (char) ((x << 4) | y);
463 t = realloc(r, n + 2);
491 static void skip_address_key(const char **p) {
495 *p += strcspn(*p, ",");
501 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
502 _cleanup_free_ char *path = NULL, *abstract = NULL;
511 while (**p != 0 && **p != ';') {
512 r = parse_address_key(p, "guid", guid);
518 r = parse_address_key(p, "path", &path);
524 r = parse_address_key(p, "abstract", &abstract);
533 if (!path && !abstract)
536 if (path && abstract)
541 if (l > sizeof(b->sockaddr.un.sun_path))
544 b->sockaddr.un.sun_family = AF_UNIX;
545 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
546 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
547 } else if (abstract) {
548 l = strlen(abstract);
549 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
552 b->sockaddr.un.sun_family = AF_UNIX;
553 b->sockaddr.un.sun_path[0] = 0;
554 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
555 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
561 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
562 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
564 struct addrinfo *result, hints = {
565 .ai_socktype = SOCK_STREAM,
566 .ai_flags = AI_ADDRCONFIG,
574 while (**p != 0 && **p != ';') {
575 r = parse_address_key(p, "guid", guid);
581 r = parse_address_key(p, "host", &host);
587 r = parse_address_key(p, "port", &port);
593 r = parse_address_key(p, "family", &family);
606 if (streq(family, "ipv4"))
607 hints.ai_family = AF_INET;
608 else if (streq(family, "ipv6"))
609 hints.ai_family = AF_INET6;
614 r = getaddrinfo(host, port, &hints, &result);
618 return -EADDRNOTAVAIL;
620 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
621 b->sockaddr_size = result->ai_addrlen;
623 freeaddrinfo(result);
628 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
630 unsigned n_argv = 0, j;
639 while (**p != 0 && **p != ';') {
640 r = parse_address_key(p, "guid", guid);
646 r = parse_address_key(p, "path", &path);
652 if (startswith(*p, "argv")) {
656 ul = strtoul(*p + 4, (char**) p, 10);
657 if (errno > 0 || **p != '=' || ul > 256) {
667 x = realloc(argv, sizeof(char*) * (ul + 2));
673 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
679 r = parse_address_key(p, NULL, argv + ul);
694 /* Make sure there are no holes in the array, with the
695 * exception of argv[0] */
696 for (j = 1; j < n_argv; j++)
702 if (argv && argv[0] == NULL) {
703 argv[0] = strdup(path);
715 for (j = 0; j < n_argv; j++)
723 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
724 _cleanup_free_ char *path = NULL;
732 while (**p != 0 && **p != ';') {
733 r = parse_address_key(p, "guid", guid);
739 r = parse_address_key(p, "path", &path);
758 static int parse_container_address(sd_bus *b, const char **p, char **guid) {
759 _cleanup_free_ char *machine = NULL;
767 while (**p != 0 && **p != ';') {
768 r = parse_address_key(p, "guid", guid);
774 r = parse_address_key(p, "machine", &machine);
787 b->machine = machine;
790 b->sockaddr.un.sun_family = AF_UNIX;
791 strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
792 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
797 static void bus_reset_parsed_address(sd_bus *b) {
801 b->sockaddr_size = 0;
802 strv_free(b->exec_argv);
806 b->server_id = SD_ID128_NULL;
813 static int bus_parse_next_address(sd_bus *b) {
814 _cleanup_free_ char *guid = NULL;
822 if (b->address[b->address_index] == 0)
825 bus_reset_parsed_address(b);
827 a = b->address + b->address_index;
836 if (startswith(a, "unix:")) {
839 r = parse_unix_address(b, &a, &guid);
844 } else if (startswith(a, "tcp:")) {
847 r = parse_tcp_address(b, &a, &guid);
853 } else if (startswith(a, "unixexec:")) {
856 r = parse_exec_address(b, &a, &guid);
862 } else if (startswith(a, "kernel:")) {
865 r = parse_kernel_address(b, &a, &guid);
870 } else if (startswith(a, "x-container:")) {
873 r = parse_container_address(b, &a, &guid);
886 r = sd_id128_from_string(guid, &b->server_id);
891 b->address_index = a - b->address;
895 static int bus_start_address(sd_bus *b) {
905 r = bus_socket_exec(b);
909 b->last_connect_error = -r;
910 } else if (b->kernel) {
912 r = bus_kernel_connect(b);
916 b->last_connect_error = -r;
918 } else if (b->machine) {
920 r = bus_container_connect(b);
924 b->last_connect_error = -r;
926 } else if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
928 r = bus_socket_connect(b);
932 b->last_connect_error = -r;
935 r = bus_parse_next_address(b);
939 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
943 int bus_next_address(sd_bus *b) {
946 bus_reset_parsed_address(b);
947 return bus_start_address(b);
950 static int bus_start_fd(sd_bus *b) {
955 assert(b->input_fd >= 0);
956 assert(b->output_fd >= 0);
958 r = fd_nonblock(b->input_fd, true);
962 r = fd_cloexec(b->input_fd, true);
966 if (b->input_fd != b->output_fd) {
967 r = fd_nonblock(b->output_fd, true);
971 r = fd_cloexec(b->output_fd, true);
976 if (fstat(b->input_fd, &st) < 0)
979 if (S_ISCHR(b->input_fd))
980 return bus_kernel_take_fd(b);
982 return bus_socket_take_fd(b);
985 _public_ int sd_bus_start(sd_bus *bus) {
988 assert_return(bus, -EINVAL);
989 assert_return(bus->state == BUS_UNSET, -EPERM);
990 assert_return(!bus_pid_changed(bus), -ECHILD);
992 bus->state = BUS_OPENING;
994 if (bus->is_server && bus->bus_client)
997 if (bus->input_fd >= 0)
998 r = bus_start_fd(bus);
999 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
1000 r = bus_start_address(bus);
1007 return bus_send_hello(bus);
1010 _public_ int sd_bus_open_system(sd_bus **ret) {
1015 assert_return(ret, -EINVAL);
1021 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1023 r = sd_bus_set_address(b, e);
1027 b->sockaddr.un.sun_family = AF_UNIX;
1028 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
1029 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
1032 b->bus_client = true;
1034 r = sd_bus_start(b);
1046 _public_ int sd_bus_open_user(sd_bus **ret) {
1052 assert_return(ret, -EINVAL);
1058 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1060 r = sd_bus_set_address(b, e);
1064 e = secure_getenv("XDG_RUNTIME_DIR");
1071 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1076 b->sockaddr.un.sun_family = AF_UNIX;
1077 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1078 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1081 b->bus_client = true;
1083 r = sd_bus_start(b);
1095 _public_ int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1096 _cleanup_free_ char *e = NULL;
1101 assert_return(host, -EINVAL);
1102 assert_return(ret, -EINVAL);
1104 e = bus_address_escape(host);
1108 p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1112 r = sd_bus_new(&bus);
1119 bus->bus_client = true;
1121 r = sd_bus_start(bus);
1131 _public_ int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1132 _cleanup_free_ char *e = NULL;
1137 assert_return(machine, -EINVAL);
1138 assert_return(ret, -EINVAL);
1140 e = bus_address_escape(machine);
1144 p = strjoin("x-container:machine=", e, NULL);
1148 r = sd_bus_new(&bus);
1155 bus->bus_client = true;
1157 r = sd_bus_start(bus);
1167 _public_ void sd_bus_close(sd_bus *bus) {
1170 if (bus->state == BUS_CLOSED)
1172 if (bus_pid_changed(bus))
1175 bus->state = BUS_CLOSED;
1177 sd_bus_detach_event(bus);
1179 if (!bus->is_kernel)
1182 /* We'll leave the fd open in case this is a kernel bus, since
1183 * there might still be memblocks around that reference this
1184 * bus, and they might need to invoke the
1185 * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1189 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1190 assert_return(bus, NULL);
1192 assert_se(REFCNT_INC(bus->n_ref) >= 2);
1197 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1198 assert_return(bus, NULL);
1200 if (REFCNT_DEC(bus->n_ref) <= 0)
1206 _public_ int sd_bus_is_open(sd_bus *bus) {
1208 assert_return(bus, -EINVAL);
1209 assert_return(!bus_pid_changed(bus), -ECHILD);
1211 return BUS_IS_OPEN(bus->state);
1214 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1217 assert_return(bus, -EINVAL);
1218 assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1219 assert_return(!bus_pid_changed(bus), -ECHILD);
1221 if (type == SD_BUS_TYPE_UNIX_FD) {
1222 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1225 r = bus_ensure_running(bus);
1229 return bus->can_fds;
1232 return bus_type_is_valid(type);
1235 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1238 assert_return(bus, -EINVAL);
1239 assert_return(server_id, -EINVAL);
1240 assert_return(!bus_pid_changed(bus), -ECHILD);
1242 r = bus_ensure_running(bus);
1246 *server_id = bus->server_id;
1250 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1253 if (m->header->version > b->message_version)
1259 return bus_message_seal(m, ++b->serial);
1262 static int dispatch_wqueue(sd_bus *bus) {
1266 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1268 while (bus->wqueue_size > 0) {
1271 r = bus_kernel_write_message(bus, bus->wqueue[0]);
1273 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1279 /* Didn't do anything this time */
1281 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1282 /* Fully written. Let's drop the entry from
1285 * This isn't particularly optimized, but
1286 * well, this is supposed to be our worst-case
1287 * buffer only, and the socket buffer is
1288 * supposed to be our primary buffer, and if
1289 * it got full, then all bets are off
1292 sd_bus_message_unref(bus->wqueue[0]);
1293 bus->wqueue_size --;
1294 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1304 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1305 sd_bus_message *z = NULL;
1310 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1312 if (bus->rqueue_size > 0) {
1313 /* Dispatch a queued message */
1315 *m = bus->rqueue[0];
1316 bus->rqueue_size --;
1317 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1321 /* Try to read a new message */
1324 r = bus_kernel_read_message(bus, &z);
1326 r = bus_socket_read_message(bus, &z);
1342 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1345 assert_return(bus, -EINVAL);
1346 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1347 assert_return(m, -EINVAL);
1348 assert_return(!bus_pid_changed(bus), -ECHILD);
1351 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1358 /* If the serial number isn't kept, then we know that no reply
1360 if (!serial && !m->sealed)
1361 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1363 r = bus_seal_message(bus, m);
1367 /* If this is a reply and no reply was requested, then let's
1368 * suppress this, if we can */
1369 if (m->dont_send && !serial)
1372 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1376 r = bus_kernel_write_message(bus, m);
1378 r = bus_socket_write_message(bus, m, &idx);
1383 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1384 /* Wasn't fully written. So let's remember how
1385 * much was written. Note that the first entry
1386 * of the wqueue array is always allocated so
1387 * that we always can remember how much was
1389 bus->wqueue[0] = sd_bus_message_ref(m);
1390 bus->wqueue_size = 1;
1396 /* Just append it to the queue. */
1398 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1401 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1406 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1410 *serial = BUS_MESSAGE_SERIAL(m);
1415 static usec_t calc_elapse(uint64_t usec) {
1416 if (usec == (uint64_t) -1)
1420 usec = BUS_DEFAULT_TIMEOUT;
1422 return now(CLOCK_MONOTONIC) + usec;
1425 static int timeout_compare(const void *a, const void *b) {
1426 const struct reply_callback *x = a, *y = b;
1428 if (x->timeout != 0 && y->timeout == 0)
1431 if (x->timeout == 0 && y->timeout != 0)
1434 if (x->timeout < y->timeout)
1437 if (x->timeout > y->timeout)
1443 _public_ int sd_bus_call_async(
1446 sd_bus_message_handler_t callback,
1451 struct reply_callback *c;
1454 assert_return(bus, -EINVAL);
1455 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1456 assert_return(m, -EINVAL);
1457 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1458 assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1459 assert_return(callback, -EINVAL);
1460 assert_return(!bus_pid_changed(bus), -ECHILD);
1462 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1466 if (usec != (uint64_t) -1) {
1467 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1472 r = bus_seal_message(bus, m);
1476 c = new0(struct reply_callback, 1);
1480 c->callback = callback;
1481 c->userdata = userdata;
1482 c->serial = BUS_MESSAGE_SERIAL(m);
1483 c->timeout = calc_elapse(usec);
1485 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1491 if (c->timeout != 0) {
1492 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1495 sd_bus_call_async_cancel(bus, c->serial);
1500 r = sd_bus_send(bus, m, serial);
1502 sd_bus_call_async_cancel(bus, c->serial);
1509 _public_ int sd_bus_call_async_cancel(sd_bus *bus, uint64_t serial) {
1510 struct reply_callback *c;
1512 assert_return(bus, -EINVAL);
1513 assert_return(serial != 0, -EINVAL);
1514 assert_return(!bus_pid_changed(bus), -ECHILD);
1516 c = hashmap_remove(bus->reply_callbacks, &serial);
1520 if (c->timeout != 0)
1521 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1527 int bus_ensure_running(sd_bus *bus) {
1532 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED)
1534 if (bus->state == BUS_RUNNING)
1538 r = sd_bus_process(bus, NULL);
1541 if (bus->state == BUS_RUNNING)
1546 r = sd_bus_wait(bus, (uint64_t) -1);
1552 _public_ int sd_bus_call(
1556 sd_bus_error *error,
1557 sd_bus_message **reply) {
1564 assert_return(bus, -EINVAL);
1565 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1566 assert_return(m, -EINVAL);
1567 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1568 assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1569 assert_return(!bus_error_is_dirty(error), -EINVAL);
1570 assert_return(!bus_pid_changed(bus), -ECHILD);
1572 r = bus_ensure_running(bus);
1576 r = sd_bus_send(bus, m, &serial);
1580 timeout = calc_elapse(usec);
1584 sd_bus_message *incoming = NULL;
1589 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1592 /* Make sure there's room for queuing this
1593 * locally, before we read the message */
1595 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1604 r = bus_kernel_read_message(bus, &incoming);
1606 r = bus_socket_read_message(bus, &incoming);
1611 if (incoming->reply_serial == serial) {
1612 /* Found a match! */
1614 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1619 sd_bus_message_unref(incoming);
1624 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) {
1627 r = sd_bus_error_copy(error, &incoming->error);
1629 sd_bus_message_unref(incoming);
1633 k = sd_bus_error_get_errno(&incoming->error);
1634 sd_bus_message_unref(incoming);
1638 sd_bus_message_unref(incoming);
1641 } else if (incoming->header->serial == serial &&
1644 streq(bus->unique_name, incoming->sender)) {
1646 /* Our own message? Somebody is trying
1647 * to send its own client a message,
1648 * let's not dead-lock, let's fail
1651 sd_bus_message_unref(incoming);
1655 /* There's already guaranteed to be room for
1656 * this, so need to resize things here */
1657 bus->rqueue[bus->rqueue_size ++] = incoming;
1660 /* Try to read more, right-away */
1669 n = now(CLOCK_MONOTONIC);
1675 left = (uint64_t) -1;
1677 r = bus_poll(bus, true, left);
1681 r = dispatch_wqueue(bus);
1687 _public_ int sd_bus_get_fd(sd_bus *bus) {
1689 assert_return(bus, -EINVAL);
1690 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1691 assert_return(bus->input_fd == bus->output_fd, -EPERM);
1692 assert_return(!bus_pid_changed(bus), -ECHILD);
1694 return bus->input_fd;
1697 _public_ int sd_bus_get_events(sd_bus *bus) {
1700 assert_return(bus, -EINVAL);
1701 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1702 assert_return(!bus_pid_changed(bus), -ECHILD);
1704 if (bus->state == BUS_OPENING)
1706 else if (bus->state == BUS_AUTHENTICATING) {
1708 if (bus_socket_auth_needs_write(bus))
1713 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1714 if (bus->rqueue_size <= 0)
1716 if (bus->wqueue_size > 0)
1723 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1724 struct reply_callback *c;
1726 assert_return(bus, -EINVAL);
1727 assert_return(timeout_usec, -EINVAL);
1728 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1729 assert_return(!bus_pid_changed(bus), -ECHILD);
1731 if (bus->state == BUS_AUTHENTICATING) {
1732 *timeout_usec = bus->auth_timeout;
1736 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1737 *timeout_usec = (uint64_t) -1;
1741 if (bus->rqueue_size > 0) {
1746 c = prioq_peek(bus->reply_callbacks_prioq);
1748 *timeout_usec = (uint64_t) -1;
1752 *timeout_usec = c->timeout;
1756 static int process_timeout(sd_bus *bus) {
1757 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1758 struct reply_callback *c;
1764 c = prioq_peek(bus->reply_callbacks_prioq);
1768 n = now(CLOCK_MONOTONIC);
1772 r = bus_message_new_synthetic_error(
1775 &SD_BUS_ERROR_MAKE(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1780 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1781 hashmap_remove(bus->reply_callbacks, &c->serial);
1783 r = c->callback(bus, m, c->userdata);
1786 return r < 0 ? r : 1;
1789 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1793 if (bus->state != BUS_HELLO)
1796 /* Let's make sure the first message on the bus is the HELLO
1797 * reply. But note that we don't actually parse the message
1798 * here (we leave that to the usual handling), we just verify
1799 * we don't let any earlier msg through. */
1801 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1802 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1805 if (m->reply_serial != bus->hello_serial)
1811 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1812 struct reply_callback *c;
1818 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1819 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1822 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1826 if (c->timeout != 0)
1827 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1829 r = sd_bus_message_rewind(m, true);
1833 r = c->callback(bus, m, c->userdata);
1839 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1840 struct filter_callback *l;
1847 bus->filter_callbacks_modified = false;
1849 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1851 if (bus->filter_callbacks_modified)
1854 /* Don't run this more than once per iteration */
1855 if (l->last_iteration == bus->iteration_counter)
1858 l->last_iteration = bus->iteration_counter;
1860 r = sd_bus_message_rewind(m, true);
1864 r = l->callback(bus, m, l->userdata);
1870 } while (bus->filter_callbacks_modified);
1875 static int process_match(sd_bus *bus, sd_bus_message *m) {
1882 bus->match_callbacks_modified = false;
1884 r = bus_match_run(bus, &bus->match_callbacks, m);
1888 } while (bus->match_callbacks_modified);
1893 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1894 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1900 if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
1903 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1906 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1909 if (streq_ptr(m->member, "Ping"))
1910 r = sd_bus_message_new_method_return(bus, m, &reply);
1911 else if (streq_ptr(m->member, "GetMachineId")) {
1915 r = sd_id128_get_machine(&id);
1919 r = sd_bus_message_new_method_return(bus, m, &reply);
1923 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1925 r = sd_bus_message_new_method_errorf(
1927 SD_BUS_ERROR_UNKNOWN_METHOD,
1928 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1934 r = sd_bus_send(bus, reply, NULL);
1941 static int process_message(sd_bus *bus, sd_bus_message *m) {
1948 bus->iteration_counter++;
1950 log_debug("Got message sender=%s object=%s interface=%s member=%s",
1951 strna(sd_bus_message_get_sender(m)),
1952 strna(sd_bus_message_get_path(m)),
1953 strna(sd_bus_message_get_interface(m)),
1954 strna(sd_bus_message_get_member(m)));
1956 r = process_hello(bus, m);
1960 r = process_reply(bus, m);
1964 r = process_filter(bus, m);
1968 r = process_match(bus, m);
1972 r = process_builtin(bus, m);
1976 r = bus_process_object(bus, m);
1979 bus->current = NULL;
1983 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1984 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1988 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1990 r = process_timeout(bus);
1994 r = dispatch_wqueue(bus);
1998 r = dispatch_rqueue(bus, &m);
2004 r = process_message(bus, m);
2009 r = sd_bus_message_rewind(m, true);
2018 if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2020 r = sd_bus_reply_method_errorf(
2022 SD_BUS_ERROR_UNKNOWN_OBJECT,
2023 "Unknown object '%s'.", m->path);
2037 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2038 BUS_DONT_DESTROY(bus);
2041 /* Returns 0 when we didn't do anything. This should cause the
2042 * caller to invoke sd_bus_wait() before returning the next
2043 * time. Returns > 0 when we did something, which possibly
2044 * means *ret is filled in with an unprocessed message. */
2046 assert_return(bus, -EINVAL);
2047 assert_return(!bus_pid_changed(bus), -ECHILD);
2049 /* We don't allow recursively invoking sd_bus_process(). */
2050 assert_return(!bus->processing, -EBUSY);
2052 switch (bus->state) {
2059 r = bus_socket_process_opening(bus);
2066 case BUS_AUTHENTICATING:
2068 r = bus_socket_process_authenticating(bus);
2078 bus->processing = true;
2079 r = process_running(bus, ret);
2080 bus->processing = false;
2085 assert_not_reached("Unknown state");
2088 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2089 struct pollfd p[2] = {};
2092 usec_t m = (usec_t) -1;
2095 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2097 e = sd_bus_get_events(bus);
2102 /* The caller really needs some more data, he doesn't
2103 * care about what's already read, or any timeouts
2108 /* The caller wants to process if there's something to
2109 * process, but doesn't care otherwise */
2111 r = sd_bus_get_timeout(bus, &until);
2116 nw = now(CLOCK_MONOTONIC);
2117 m = until > nw ? until - nw : 0;
2121 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2124 p[0].fd = bus->input_fd;
2125 if (bus->output_fd == bus->input_fd) {
2129 p[0].events = e & POLLIN;
2130 p[1].fd = bus->output_fd;
2131 p[1].events = e & POLLOUT;
2135 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2139 return r > 0 ? 1 : 0;
2142 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2144 assert_return(bus, -EINVAL);
2145 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2146 assert_return(!bus_pid_changed(bus), -ECHILD);
2148 if (bus->rqueue_size > 0)
2151 return bus_poll(bus, false, timeout_usec);
2154 _public_ int sd_bus_flush(sd_bus *bus) {
2157 assert_return(bus, -EINVAL);
2158 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2159 assert_return(!bus_pid_changed(bus), -ECHILD);
2161 r = bus_ensure_running(bus);
2165 if (bus->wqueue_size <= 0)
2169 r = dispatch_wqueue(bus);
2173 if (bus->wqueue_size <= 0)
2176 r = bus_poll(bus, false, (uint64_t) -1);
2182 _public_ int sd_bus_add_filter(sd_bus *bus,
2183 sd_bus_message_handler_t callback,
2186 struct filter_callback *f;
2188 assert_return(bus, -EINVAL);
2189 assert_return(callback, -EINVAL);
2190 assert_return(!bus_pid_changed(bus), -ECHILD);
2192 f = new0(struct filter_callback, 1);
2195 f->callback = callback;
2196 f->userdata = userdata;
2198 bus->filter_callbacks_modified = true;
2199 LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2203 _public_ int sd_bus_remove_filter(sd_bus *bus,
2204 sd_bus_message_handler_t callback,
2207 struct filter_callback *f;
2209 assert_return(bus, -EINVAL);
2210 assert_return(callback, -EINVAL);
2211 assert_return(!bus_pid_changed(bus), -ECHILD);
2213 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2214 if (f->callback == callback && f->userdata == userdata) {
2215 bus->filter_callbacks_modified = true;
2216 LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2225 _public_ int sd_bus_add_match(sd_bus *bus,
2227 sd_bus_message_handler_t callback,
2230 struct bus_match_component *components = NULL;
2231 unsigned n_components = 0;
2232 uint64_t cookie = 0;
2235 assert_return(bus, -EINVAL);
2236 assert_return(match, -EINVAL);
2237 assert_return(!bus_pid_changed(bus), -ECHILD);
2239 r = bus_match_parse(match, &components, &n_components);
2243 if (bus->bus_client) {
2244 cookie = ++bus->match_cookie;
2246 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2251 bus->match_callbacks_modified = true;
2252 r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2254 if (bus->bus_client)
2255 bus_remove_match_internal(bus, match, cookie);
2259 bus_match_parse_free(components, n_components);
2263 _public_ int sd_bus_remove_match(sd_bus *bus,
2265 sd_bus_message_handler_t callback,
2268 struct bus_match_component *components = NULL;
2269 unsigned n_components = 0;
2271 uint64_t cookie = 0;
2273 assert_return(bus, -EINVAL);
2274 assert_return(match, -EINVAL);
2275 assert_return(!bus_pid_changed(bus), -ECHILD);
2277 r = bus_match_parse(match, &components, &n_components);
2281 bus->match_callbacks_modified = true;
2282 r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2284 if (bus->bus_client)
2285 q = bus_remove_match_internal(bus, match, cookie);
2287 bus_match_parse_free(components, n_components);
2289 return r < 0 ? r : q;
2292 bool bus_pid_changed(sd_bus *bus) {
2295 /* We don't support people creating a bus connection and
2296 * keeping it around over a fork(). Let's complain. */
2298 return bus->original_pid != getpid();
2301 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2302 void *bus = userdata;
2307 r = sd_bus_process(bus, NULL);
2314 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2315 void *bus = userdata;
2320 r = sd_bus_process(bus, NULL);
2327 static int prepare_callback(sd_event_source *s, void *userdata) {
2328 sd_bus *bus = userdata;
2335 e = sd_bus_get_events(bus);
2339 if (bus->output_fd != bus->input_fd) {
2341 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2345 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2349 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2354 r = sd_bus_get_timeout(bus, &until);
2360 j = sd_event_source_set_time(bus->time_event_source, until);
2365 r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2372 static int quit_callback(sd_event_source *event, void *userdata) {
2373 sd_bus *bus = userdata;
2382 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2385 assert_return(bus, -EINVAL);
2386 assert_return(!bus->event, -EBUSY);
2388 assert(!bus->input_io_event_source);
2389 assert(!bus->output_io_event_source);
2390 assert(!bus->time_event_source);
2393 bus->event = sd_event_ref(event);
2395 r = sd_event_default(&bus->event);
2400 r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2404 r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2408 if (bus->output_fd != bus->input_fd) {
2409 r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2413 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2418 r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2422 r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
2426 r = sd_event_source_set_priority(bus->time_event_source, priority);
2430 r = sd_event_add_quit(bus->event, quit_callback, bus, &bus->quit_event_source);
2437 sd_bus_detach_event(bus);
2441 _public_ int sd_bus_detach_event(sd_bus *bus) {
2442 assert_return(bus, -EINVAL);
2443 assert_return(bus->event, -ENXIO);
2445 if (bus->input_io_event_source)
2446 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2448 if (bus->output_io_event_source)
2449 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2451 if (bus->time_event_source)
2452 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2454 if (bus->quit_event_source)
2455 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2458 bus->event = sd_event_unref(bus->event);
2463 _public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2464 assert_return(bus, NULL);
2466 return bus->current;
2469 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
2474 assert(default_bus);
2477 return !!*default_bus;
2480 *ret = sd_bus_ref(*default_bus);
2488 b->default_bus_ptr = default_bus;
2496 _public_ int sd_bus_default_system(sd_bus **ret) {
2497 static __thread sd_bus *default_system_bus = NULL;
2499 return bus_default(sd_bus_open_system, &default_system_bus, ret);
2502 _public_ int sd_bus_default_user(sd_bus **ret) {
2503 static __thread sd_bus *default_user_bus = NULL;
2505 return bus_default(sd_bus_open_user, &default_user_bus, ret);
2508 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
2509 assert_return(b, -EINVAL);
2510 assert_return(tid, -EINVAL);
2511 assert_return(!bus_pid_changed(b), -ECHILD);
2519 return sd_event_get_tid(b->event, tid);