1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
39 #include "bus-internal.h"
40 #include "bus-message.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
50 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
52 static void bus_close_fds(sd_bus *b) {
56 close_nointr_nofail(b->input_fd);
58 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
59 close_nointr_nofail(b->output_fd);
61 b->input_fd = b->output_fd = -1;
64 static void bus_node_destroy(sd_bus *b, struct node *n) {
65 struct node_callback *c;
66 struct node_vtable *v;
67 struct node_enumerator *e;
75 bus_node_destroy(b, n->child);
77 while ((c = n->callbacks)) {
78 LIST_REMOVE(callbacks, n->callbacks, c);
82 while ((v = n->vtables)) {
83 LIST_REMOVE(vtables, n->vtables, v);
88 while ((e = n->enumerators)) {
89 LIST_REMOVE(enumerators, n->enumerators, e);
94 LIST_REMOVE(siblings, n->parent->child, n);
96 assert_se(hashmap_remove(b->nodes, n->path) == n);
101 static void bus_free(sd_bus *b) {
102 struct filter_callback *f;
108 sd_bus_detach_event(b);
113 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
116 free(b->unique_name);
117 free(b->auth_buffer);
122 strv_free(b->exec_argv);
124 close_many(b->fds, b->n_fds);
127 for (i = 0; i < b->rqueue_size; i++)
128 sd_bus_message_unref(b->rqueue[i]);
131 for (i = 0; i < b->wqueue_size; i++)
132 sd_bus_message_unref(b->wqueue[i]);
135 hashmap_free_free(b->reply_callbacks);
136 prioq_free(b->reply_callbacks_prioq);
138 while ((f = b->filter_callbacks)) {
139 LIST_REMOVE(callbacks, b->filter_callbacks, f);
143 bus_match_free(&b->match_callbacks);
145 hashmap_free_free(b->vtable_methods);
146 hashmap_free_free(b->vtable_properties);
148 while ((n = hashmap_first(b->nodes)))
149 bus_node_destroy(b, n);
151 hashmap_free(b->nodes);
153 bus_kernel_flush_memfd(b);
155 assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
160 int sd_bus_new(sd_bus **ret) {
163 assert_return(ret, -EINVAL);
169 r->n_ref = REFCNT_INIT;
170 r->input_fd = r->output_fd = -1;
171 r->message_version = 1;
172 r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
173 r->original_pid = getpid();
175 assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
177 /* We guarantee that wqueue always has space for at least one
179 r->wqueue = new(sd_bus_message*, 1);
189 int sd_bus_set_address(sd_bus *bus, const char *address) {
192 assert_return(bus, -EINVAL);
193 assert_return(bus->state == BUS_UNSET, -EPERM);
194 assert_return(address, -EINVAL);
195 assert_return(!bus_pid_changed(bus), -ECHILD);
207 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
208 assert_return(bus, -EINVAL);
209 assert_return(bus->state == BUS_UNSET, -EPERM);
210 assert_return(input_fd >= 0, -EINVAL);
211 assert_return(output_fd >= 0, -EINVAL);
212 assert_return(!bus_pid_changed(bus), -ECHILD);
214 bus->input_fd = input_fd;
215 bus->output_fd = output_fd;
219 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
222 assert_return(bus, -EINVAL);
223 assert_return(bus->state == BUS_UNSET, -EPERM);
224 assert_return(path, -EINVAL);
225 assert_return(!strv_isempty(argv), -EINVAL);
226 assert_return(!bus_pid_changed(bus), -ECHILD);
238 free(bus->exec_path);
239 strv_free(bus->exec_argv);
247 int sd_bus_set_bus_client(sd_bus *bus, int b) {
248 assert_return(bus, -EINVAL);
249 assert_return(bus->state == BUS_UNSET, -EPERM);
250 assert_return(!bus_pid_changed(bus), -ECHILD);
252 bus->bus_client = !!b;
256 int sd_bus_negotiate_fds(sd_bus *bus, int b) {
257 assert_return(bus, -EINVAL);
258 assert_return(bus->state == BUS_UNSET, -EPERM);
259 assert_return(!bus_pid_changed(bus), -ECHILD);
261 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
265 int sd_bus_negotiate_attach_comm(sd_bus *bus, int b) {
266 assert_return(bus, -EINVAL);
267 assert_return(bus->state == BUS_UNSET, -EPERM);
268 assert_return(!bus_pid_changed(bus), -ECHILD);
270 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_COMM, b);
274 int sd_bus_negotiate_attach_exe(sd_bus *bus, int b) {
275 assert_return(bus, -EINVAL);
276 assert_return(bus->state == BUS_UNSET, -EPERM);
277 assert_return(!bus_pid_changed(bus), -ECHILD);
279 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_EXE, b);
283 int sd_bus_negotiate_attach_cmdline(sd_bus *bus, int b) {
284 assert_return(bus, -EINVAL);
285 assert_return(bus->state == BUS_UNSET, -EPERM);
286 assert_return(!bus_pid_changed(bus), -ECHILD);
288 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CMDLINE, b);
292 int sd_bus_negotiate_attach_cgroup(sd_bus *bus, int b) {
293 assert_return(bus, -EINVAL);
294 assert_return(bus->state == BUS_UNSET, -EPERM);
295 assert_return(!bus_pid_changed(bus), -ECHILD);
297 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CGROUP, b);
301 int sd_bus_negotiate_attach_caps(sd_bus *bus, int b) {
302 assert_return(bus, -EINVAL);
303 assert_return(bus->state == BUS_UNSET, -EPERM);
304 assert_return(!bus_pid_changed(bus), -ECHILD);
306 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CAPS, b);
310 int sd_bus_negotiate_attach_selinux_context(sd_bus *bus, int b) {
311 assert_return(bus, -EINVAL);
312 assert_return(bus->state == BUS_UNSET, -EPERM);
313 assert_return(!bus_pid_changed(bus), -ECHILD);
315 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_SECLABEL, b);
319 int sd_bus_negotiate_attach_audit(sd_bus *bus, int b) {
320 assert_return(bus, -EINVAL);
321 assert_return(bus->state == BUS_UNSET, -EPERM);
322 assert_return(!bus_pid_changed(bus), -ECHILD);
324 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_AUDIT, b);
328 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
329 assert_return(bus, -EINVAL);
330 assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
331 assert_return(bus->state == BUS_UNSET, -EPERM);
332 assert_return(!bus_pid_changed(bus), -ECHILD);
334 bus->is_server = !!b;
335 bus->server_id = server_id;
339 int sd_bus_set_anonymous(sd_bus *bus, int b) {
340 assert_return(bus, -EINVAL);
341 assert_return(bus->state == BUS_UNSET, -EPERM);
342 assert_return(!bus_pid_changed(bus), -ECHILD);
344 bus->anonymous_auth = !!b;
348 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
353 assert(bus->state == BUS_HELLO);
356 r = sd_bus_message_get_errno(reply);
362 r = sd_bus_message_read(reply, "s", &s);
366 if (!service_name_is_valid(s) || s[0] != ':')
369 bus->unique_name = strdup(s);
370 if (!bus->unique_name)
373 bus->state = BUS_RUNNING;
378 static int bus_send_hello(sd_bus *bus) {
379 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
384 if (!bus->bus_client || bus->is_kernel)
387 r = sd_bus_message_new_method_call(
389 "org.freedesktop.DBus",
391 "org.freedesktop.DBus",
397 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
400 int bus_start_running(sd_bus *bus) {
403 if (bus->bus_client && !bus->is_kernel) {
404 bus->state = BUS_HELLO;
408 bus->state = BUS_RUNNING;
412 static int parse_address_key(const char **p, const char *key, char **value) {
423 if (strncmp(*p, key, l) != 0)
436 while (*a != ';' && *a != ',' && *a != 0) {
454 c = (char) ((x << 4) | y);
461 t = realloc(r, n + 2);
489 static void skip_address_key(const char **p) {
493 *p += strcspn(*p, ",");
499 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
500 _cleanup_free_ char *path = NULL, *abstract = NULL;
509 while (**p != 0 && **p != ';') {
510 r = parse_address_key(p, "guid", guid);
516 r = parse_address_key(p, "path", &path);
522 r = parse_address_key(p, "abstract", &abstract);
531 if (!path && !abstract)
534 if (path && abstract)
539 if (l > sizeof(b->sockaddr.un.sun_path))
542 b->sockaddr.un.sun_family = AF_UNIX;
543 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
544 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
545 } else if (abstract) {
546 l = strlen(abstract);
547 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
550 b->sockaddr.un.sun_family = AF_UNIX;
551 b->sockaddr.un.sun_path[0] = 0;
552 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
553 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
559 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
560 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
562 struct addrinfo *result, hints = {
563 .ai_socktype = SOCK_STREAM,
564 .ai_flags = AI_ADDRCONFIG,
572 while (**p != 0 && **p != ';') {
573 r = parse_address_key(p, "guid", guid);
579 r = parse_address_key(p, "host", &host);
585 r = parse_address_key(p, "port", &port);
591 r = parse_address_key(p, "family", &family);
604 if (streq(family, "ipv4"))
605 hints.ai_family = AF_INET;
606 else if (streq(family, "ipv6"))
607 hints.ai_family = AF_INET6;
612 r = getaddrinfo(host, port, &hints, &result);
616 return -EADDRNOTAVAIL;
618 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
619 b->sockaddr_size = result->ai_addrlen;
621 freeaddrinfo(result);
626 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
628 unsigned n_argv = 0, j;
637 while (**p != 0 && **p != ';') {
638 r = parse_address_key(p, "guid", guid);
644 r = parse_address_key(p, "path", &path);
650 if (startswith(*p, "argv")) {
654 ul = strtoul(*p + 4, (char**) p, 10);
655 if (errno > 0 || **p != '=' || ul > 256) {
665 x = realloc(argv, sizeof(char*) * (ul + 2));
671 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
677 r = parse_address_key(p, NULL, argv + ul);
692 /* Make sure there are no holes in the array, with the
693 * exception of argv[0] */
694 for (j = 1; j < n_argv; j++)
700 if (argv && argv[0] == NULL) {
701 argv[0] = strdup(path);
713 for (j = 0; j < n_argv; j++)
721 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
722 _cleanup_free_ char *path = NULL;
730 while (**p != 0 && **p != ';') {
731 r = parse_address_key(p, "guid", guid);
737 r = parse_address_key(p, "path", &path);
756 static void bus_reset_parsed_address(sd_bus *b) {
760 b->sockaddr_size = 0;
761 strv_free(b->exec_argv);
765 b->server_id = SD_ID128_NULL;
770 static int bus_parse_next_address(sd_bus *b) {
771 _cleanup_free_ char *guid = NULL;
779 if (b->address[b->address_index] == 0)
782 bus_reset_parsed_address(b);
784 a = b->address + b->address_index;
793 if (startswith(a, "unix:")) {
796 r = parse_unix_address(b, &a, &guid);
801 } else if (startswith(a, "tcp:")) {
804 r = parse_tcp_address(b, &a, &guid);
810 } else if (startswith(a, "unixexec:")) {
813 r = parse_exec_address(b, &a, &guid);
819 } else if (startswith(a, "kernel:")) {
822 r = parse_kernel_address(b, &a, &guid);
835 r = sd_id128_from_string(guid, &b->server_id);
840 b->address_index = a - b->address;
844 static int bus_start_address(sd_bus *b) {
852 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
854 r = bus_socket_connect(b);
858 b->last_connect_error = -r;
860 } else if (b->exec_path) {
862 r = bus_socket_exec(b);
866 b->last_connect_error = -r;
867 } else if (b->kernel) {
869 r = bus_kernel_connect(b);
873 b->last_connect_error = -r;
876 r = bus_parse_next_address(b);
880 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
884 int bus_next_address(sd_bus *b) {
887 bus_reset_parsed_address(b);
888 return bus_start_address(b);
891 static int bus_start_fd(sd_bus *b) {
896 assert(b->input_fd >= 0);
897 assert(b->output_fd >= 0);
899 r = fd_nonblock(b->input_fd, true);
903 r = fd_cloexec(b->input_fd, true);
907 if (b->input_fd != b->output_fd) {
908 r = fd_nonblock(b->output_fd, true);
912 r = fd_cloexec(b->output_fd, true);
917 if (fstat(b->input_fd, &st) < 0)
920 if (S_ISCHR(b->input_fd))
921 return bus_kernel_take_fd(b);
923 return bus_socket_take_fd(b);
926 int sd_bus_start(sd_bus *bus) {
929 assert_return(bus, -EINVAL);
930 assert_return(bus->state == BUS_UNSET, -EPERM);
931 assert_return(!bus_pid_changed(bus), -ECHILD);
933 bus->state = BUS_OPENING;
935 if (bus->is_server && bus->bus_client)
938 if (bus->input_fd >= 0)
939 r = bus_start_fd(bus);
940 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel)
941 r = bus_start_address(bus);
948 return bus_send_hello(bus);
951 int sd_bus_open_system(sd_bus **ret) {
956 assert_return(ret, -EINVAL);
962 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
964 r = sd_bus_set_address(b, e);
968 b->sockaddr.un.sun_family = AF_UNIX;
969 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
970 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
973 b->bus_client = true;
987 int sd_bus_open_user(sd_bus **ret) {
993 assert_return(ret, -EINVAL);
999 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1001 r = sd_bus_set_address(b, e);
1005 e = secure_getenv("XDG_RUNTIME_DIR");
1012 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1017 b->sockaddr.un.sun_family = AF_UNIX;
1018 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1019 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1022 b->bus_client = true;
1024 r = sd_bus_start(b);
1036 int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1037 _cleanup_free_ char *e = NULL;
1042 assert_return(host, -EINVAL);
1043 assert_return(ret, -EINVAL);
1045 e = bus_address_escape(host);
1049 p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1053 r = sd_bus_new(&bus);
1060 bus->bus_client = true;
1062 r = sd_bus_start(bus);
1072 void sd_bus_close(sd_bus *bus) {
1075 if (bus->state == BUS_CLOSED)
1077 if (bus_pid_changed(bus))
1080 bus->state = BUS_CLOSED;
1082 sd_bus_detach_event(bus);
1084 if (!bus->is_kernel)
1087 /* We'll leave the fd open in case this is a kernel bus, since
1088 * there might still be memblocks around that reference this
1089 * bus, and they might need to invoke the
1090 * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1094 sd_bus *sd_bus_ref(sd_bus *bus) {
1098 assert_se(REFCNT_INC(bus->n_ref) >= 2);
1103 sd_bus *sd_bus_unref(sd_bus *bus) {
1107 if (REFCNT_DEC(bus->n_ref) <= 0)
1113 int sd_bus_is_open(sd_bus *bus) {
1115 assert_return(bus, -EINVAL);
1116 assert_return(!bus_pid_changed(bus), -ECHILD);
1118 return BUS_IS_OPEN(bus->state);
1121 int sd_bus_can_send(sd_bus *bus, char type) {
1124 assert_return(bus, -EINVAL);
1125 assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1126 assert_return(!bus_pid_changed(bus), -ECHILD);
1128 if (type == SD_BUS_TYPE_UNIX_FD) {
1129 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1132 r = bus_ensure_running(bus);
1136 return bus->can_fds;
1139 return bus_type_is_valid(type);
1142 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1145 assert_return(bus, -EINVAL);
1146 assert_return(server_id, -EINVAL);
1147 assert_return(!bus_pid_changed(bus), -ECHILD);
1149 r = bus_ensure_running(bus);
1153 *server_id = bus->server_id;
1157 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1160 if (m->header->version > b->message_version)
1166 return bus_message_seal(m, ++b->serial);
1169 static int dispatch_wqueue(sd_bus *bus) {
1173 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1175 while (bus->wqueue_size > 0) {
1178 r = bus_kernel_write_message(bus, bus->wqueue[0]);
1180 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1186 /* Didn't do anything this time */
1188 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1189 /* Fully written. Let's drop the entry from
1192 * This isn't particularly optimized, but
1193 * well, this is supposed to be our worst-case
1194 * buffer only, and the socket buffer is
1195 * supposed to be our primary buffer, and if
1196 * it got full, then all bets are off
1199 sd_bus_message_unref(bus->wqueue[0]);
1200 bus->wqueue_size --;
1201 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1211 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1212 sd_bus_message *z = NULL;
1217 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1219 if (bus->rqueue_size > 0) {
1220 /* Dispatch a queued message */
1222 *m = bus->rqueue[0];
1223 bus->rqueue_size --;
1224 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1228 /* Try to read a new message */
1231 r = bus_kernel_read_message(bus, &z);
1233 r = bus_socket_read_message(bus, &z);
1249 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1252 assert_return(bus, -EINVAL);
1253 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1254 assert_return(m, -EINVAL);
1255 assert_return(!bus_pid_changed(bus), -ECHILD);
1258 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1265 /* If the serial number isn't kept, then we know that no reply
1267 if (!serial && !m->sealed)
1268 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1270 r = bus_seal_message(bus, m);
1274 /* If this is a reply and no reply was requested, then let's
1275 * suppress this, if we can */
1276 if (m->dont_send && !serial)
1279 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1283 r = bus_kernel_write_message(bus, m);
1285 r = bus_socket_write_message(bus, m, &idx);
1290 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1291 /* Wasn't fully written. So let's remember how
1292 * much was written. Note that the first entry
1293 * of the wqueue array is always allocated so
1294 * that we always can remember how much was
1296 bus->wqueue[0] = sd_bus_message_ref(m);
1297 bus->wqueue_size = 1;
1303 /* Just append it to the queue. */
1305 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1308 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1313 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1317 *serial = BUS_MESSAGE_SERIAL(m);
1322 static usec_t calc_elapse(uint64_t usec) {
1323 if (usec == (uint64_t) -1)
1327 usec = BUS_DEFAULT_TIMEOUT;
1329 return now(CLOCK_MONOTONIC) + usec;
1332 static int timeout_compare(const void *a, const void *b) {
1333 const struct reply_callback *x = a, *y = b;
1335 if (x->timeout != 0 && y->timeout == 0)
1338 if (x->timeout == 0 && y->timeout != 0)
1341 if (x->timeout < y->timeout)
1344 if (x->timeout > y->timeout)
1350 int sd_bus_send_with_reply(
1353 sd_bus_message_handler_t callback,
1358 struct reply_callback *c;
1361 assert_return(bus, -EINVAL);
1362 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1363 assert_return(m, -EINVAL);
1364 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1365 assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1366 assert_return(callback, -EINVAL);
1367 assert_return(!bus_pid_changed(bus), -ECHILD);
1369 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1373 if (usec != (uint64_t) -1) {
1374 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1379 r = bus_seal_message(bus, m);
1383 c = new0(struct reply_callback, 1);
1387 c->callback = callback;
1388 c->userdata = userdata;
1389 c->serial = BUS_MESSAGE_SERIAL(m);
1390 c->timeout = calc_elapse(usec);
1392 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1398 if (c->timeout != 0) {
1399 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1402 sd_bus_send_with_reply_cancel(bus, c->serial);
1407 r = sd_bus_send(bus, m, serial);
1409 sd_bus_send_with_reply_cancel(bus, c->serial);
1416 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1417 struct reply_callback *c;
1419 assert_return(bus, -EINVAL);
1420 assert_return(serial != 0, -EINVAL);
1421 assert_return(!bus_pid_changed(bus), -ECHILD);
1423 c = hashmap_remove(bus->reply_callbacks, &serial);
1427 if (c->timeout != 0)
1428 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1434 int bus_ensure_running(sd_bus *bus) {
1439 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED)
1441 if (bus->state == BUS_RUNNING)
1445 r = sd_bus_process(bus, NULL);
1448 if (bus->state == BUS_RUNNING)
1453 r = sd_bus_wait(bus, (uint64_t) -1);
1459 int sd_bus_send_with_reply_and_block(
1463 sd_bus_error *error,
1464 sd_bus_message **reply) {
1471 assert_return(bus, -EINVAL);
1472 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1473 assert_return(m, -EINVAL);
1474 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1475 assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1476 assert_return(!bus_error_is_dirty(error), -EINVAL);
1477 assert_return(!bus_pid_changed(bus), -ECHILD);
1479 r = bus_ensure_running(bus);
1483 r = sd_bus_send(bus, m, &serial);
1487 timeout = calc_elapse(usec);
1491 sd_bus_message *incoming = NULL;
1496 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1499 /* Make sure there's room for queuing this
1500 * locally, before we read the message */
1502 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1511 r = bus_kernel_read_message(bus, &incoming);
1513 r = bus_socket_read_message(bus, &incoming);
1518 if (incoming->reply_serial == serial) {
1519 /* Found a match! */
1521 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1526 sd_bus_message_unref(incoming);
1531 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) {
1534 r = sd_bus_error_copy(error, &incoming->error);
1536 sd_bus_message_unref(incoming);
1540 k = sd_bus_error_get_errno(&incoming->error);
1541 sd_bus_message_unref(incoming);
1545 sd_bus_message_unref(incoming);
1549 /* There's already guaranteed to be room for
1550 * this, so need to resize things here */
1551 bus->rqueue[bus->rqueue_size ++] = incoming;
1554 /* Try to read more, right-away */
1563 n = now(CLOCK_MONOTONIC);
1569 left = (uint64_t) -1;
1571 r = bus_poll(bus, true, left);
1575 r = dispatch_wqueue(bus);
1581 int sd_bus_get_fd(sd_bus *bus) {
1583 assert_return(bus, -EINVAL);
1584 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1585 assert_return(bus->input_fd == bus->output_fd, -EPERM);
1586 assert_return(!bus_pid_changed(bus), -ECHILD);
1588 return bus->input_fd;
1591 int sd_bus_get_events(sd_bus *bus) {
1594 assert_return(bus, -EINVAL);
1595 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1596 assert_return(!bus_pid_changed(bus), -ECHILD);
1598 if (bus->state == BUS_OPENING)
1600 else if (bus->state == BUS_AUTHENTICATING) {
1602 if (bus_socket_auth_needs_write(bus))
1607 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1608 if (bus->rqueue_size <= 0)
1610 if (bus->wqueue_size > 0)
1617 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1618 struct reply_callback *c;
1620 assert_return(bus, -EINVAL);
1621 assert_return(timeout_usec, -EINVAL);
1622 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1623 assert_return(!bus_pid_changed(bus), -ECHILD);
1625 if (bus->state == BUS_AUTHENTICATING) {
1626 *timeout_usec = bus->auth_timeout;
1630 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1631 *timeout_usec = (uint64_t) -1;
1635 if (bus->rqueue_size > 0) {
1640 c = prioq_peek(bus->reply_callbacks_prioq);
1642 *timeout_usec = (uint64_t) -1;
1646 *timeout_usec = c->timeout;
1650 static int process_timeout(sd_bus *bus) {
1651 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1652 struct reply_callback *c;
1658 c = prioq_peek(bus->reply_callbacks_prioq);
1662 n = now(CLOCK_MONOTONIC);
1666 r = bus_message_new_synthetic_error(
1669 &SD_BUS_ERROR_MAKE(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1674 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1675 hashmap_remove(bus->reply_callbacks, &c->serial);
1677 r = c->callback(bus, m, c->userdata);
1680 return r < 0 ? r : 1;
1683 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1687 if (bus->state != BUS_HELLO)
1690 /* Let's make sure the first message on the bus is the HELLO
1691 * reply. But note that we don't actually parse the message
1692 * here (we leave that to the usual handling), we just verify
1693 * we don't let any earlier msg through. */
1695 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1696 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1699 if (m->reply_serial != bus->hello_serial)
1705 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1706 struct reply_callback *c;
1712 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1713 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1716 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1720 if (c->timeout != 0)
1721 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1723 r = sd_bus_message_rewind(m, true);
1727 r = c->callback(bus, m, c->userdata);
1733 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1734 struct filter_callback *l;
1741 bus->filter_callbacks_modified = false;
1743 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1745 if (bus->filter_callbacks_modified)
1748 /* Don't run this more than once per iteration */
1749 if (l->last_iteration == bus->iteration_counter)
1752 l->last_iteration = bus->iteration_counter;
1754 r = sd_bus_message_rewind(m, true);
1758 r = l->callback(bus, m, l->userdata);
1764 } while (bus->filter_callbacks_modified);
1769 static int process_match(sd_bus *bus, sd_bus_message *m) {
1776 bus->match_callbacks_modified = false;
1778 r = bus_match_run(bus, &bus->match_callbacks, m);
1782 } while (bus->match_callbacks_modified);
1787 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1788 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1794 if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
1797 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1800 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1803 if (streq_ptr(m->member, "Ping"))
1804 r = sd_bus_message_new_method_return(bus, m, &reply);
1805 else if (streq_ptr(m->member, "GetMachineId")) {
1809 r = sd_id128_get_machine(&id);
1813 r = sd_bus_message_new_method_return(bus, m, &reply);
1817 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1819 r = sd_bus_message_new_method_errorf(
1821 SD_BUS_ERROR_UNKNOWN_METHOD,
1822 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1828 r = sd_bus_send(bus, reply, NULL);
1835 static int process_message(sd_bus *bus, sd_bus_message *m) {
1841 bus->iteration_counter++;
1843 log_debug("Got message sender=%s object=%s interface=%s member=%s",
1844 strna(sd_bus_message_get_sender(m)),
1845 strna(sd_bus_message_get_path(m)),
1846 strna(sd_bus_message_get_interface(m)),
1847 strna(sd_bus_message_get_member(m)));
1849 r = process_hello(bus, m);
1853 r = process_reply(bus, m);
1857 r = process_filter(bus, m);
1861 r = process_match(bus, m);
1865 r = process_builtin(bus, m);
1869 return bus_process_object(bus, m);
1872 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1873 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1877 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1879 r = process_timeout(bus);
1883 r = dispatch_wqueue(bus);
1887 r = dispatch_rqueue(bus, &m);
1893 r = process_message(bus, m);
1898 r = sd_bus_message_rewind(m, true);
1907 if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
1909 r = sd_bus_reply_method_errorf(
1911 SD_BUS_ERROR_UNKNOWN_OBJECT,
1912 "Unknown object '%s'.", m->path);
1926 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1927 BUS_DONT_DESTROY(bus);
1930 /* Returns 0 when we didn't do anything. This should cause the
1931 * caller to invoke sd_bus_wait() before returning the next
1932 * time. Returns > 0 when we did something, which possibly
1933 * means *ret is filled in with an unprocessed message. */
1935 assert_return(bus, -EINVAL);
1936 assert_return(!bus_pid_changed(bus), -ECHILD);
1938 /* We don't allow recursively invoking sd_bus_process(). */
1939 assert_return(!bus->processing, -EBUSY);
1941 switch (bus->state) {
1948 r = bus_socket_process_opening(bus);
1955 case BUS_AUTHENTICATING:
1957 r = bus_socket_process_authenticating(bus);
1967 bus->processing = true;
1968 r = process_running(bus, ret);
1969 bus->processing = false;
1974 assert_not_reached("Unknown state");
1977 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1978 struct pollfd p[2] = {};
1981 usec_t m = (usec_t) -1;
1984 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1986 e = sd_bus_get_events(bus);
1991 /* The caller really needs some more data, he doesn't
1992 * care about what's already read, or any timeouts
1997 /* The caller wants to process if there's something to
1998 * process, but doesn't care otherwise */
2000 r = sd_bus_get_timeout(bus, &until);
2005 nw = now(CLOCK_MONOTONIC);
2006 m = until > nw ? until - nw : 0;
2010 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2013 p[0].fd = bus->input_fd;
2014 if (bus->output_fd == bus->input_fd) {
2018 p[0].events = e & POLLIN;
2019 p[1].fd = bus->output_fd;
2020 p[1].events = e & POLLOUT;
2024 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2028 return r > 0 ? 1 : 0;
2031 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2033 assert_return(bus, -EINVAL);
2034 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2035 assert_return(!bus_pid_changed(bus), -ECHILD);
2037 if (bus->rqueue_size > 0)
2040 return bus_poll(bus, false, timeout_usec);
2043 int sd_bus_flush(sd_bus *bus) {
2046 assert_return(bus, -EINVAL);
2047 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2048 assert_return(!bus_pid_changed(bus), -ECHILD);
2050 r = bus_ensure_running(bus);
2054 if (bus->wqueue_size <= 0)
2058 r = dispatch_wqueue(bus);
2062 if (bus->wqueue_size <= 0)
2065 r = bus_poll(bus, false, (uint64_t) -1);
2071 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2072 struct filter_callback *f;
2074 assert_return(bus, -EINVAL);
2075 assert_return(callback, -EINVAL);
2076 assert_return(!bus_pid_changed(bus), -ECHILD);
2078 f = new0(struct filter_callback, 1);
2081 f->callback = callback;
2082 f->userdata = userdata;
2084 bus->filter_callbacks_modified = true;
2085 LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2089 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2090 struct filter_callback *f;
2092 assert_return(bus, -EINVAL);
2093 assert_return(callback, -EINVAL);
2094 assert_return(!bus_pid_changed(bus), -ECHILD);
2096 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2097 if (f->callback == callback && f->userdata == userdata) {
2098 bus->filter_callbacks_modified = true;
2099 LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2108 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2109 struct bus_match_component *components = NULL;
2110 unsigned n_components = 0;
2111 uint64_t cookie = 0;
2114 assert_return(bus, -EINVAL);
2115 assert_return(match, -EINVAL);
2116 assert_return(!bus_pid_changed(bus), -ECHILD);
2118 r = bus_match_parse(match, &components, &n_components);
2122 if (bus->bus_client) {
2123 cookie = ++bus->match_cookie;
2125 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2130 bus->match_callbacks_modified = true;
2131 r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2133 if (bus->bus_client)
2134 bus_remove_match_internal(bus, match, cookie);
2138 bus_match_parse_free(components, n_components);
2142 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2143 struct bus_match_component *components = NULL;
2144 unsigned n_components = 0;
2146 uint64_t cookie = 0;
2148 assert_return(bus, -EINVAL);
2149 assert_return(match, -EINVAL);
2150 assert_return(!bus_pid_changed(bus), -ECHILD);
2152 r = bus_match_parse(match, &components, &n_components);
2156 bus->match_callbacks_modified = true;
2157 r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2159 if (bus->bus_client)
2160 q = bus_remove_match_internal(bus, match, cookie);
2162 bus_match_parse_free(components, n_components);
2164 return r < 0 ? r : q;
2167 bool bus_pid_changed(sd_bus *bus) {
2170 /* We don't support people creating a bus connection and
2171 * keeping it around over a fork(). Let's complain. */
2173 return bus->original_pid != getpid();
2176 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2177 void *bus = userdata;
2182 r = sd_bus_process(bus, NULL);
2189 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2190 void *bus = userdata;
2195 r = sd_bus_process(bus, NULL);
2202 static int prepare_callback(sd_event_source *s, void *userdata) {
2203 sd_bus *bus = userdata;
2210 e = sd_bus_get_events(bus);
2214 if (bus->output_fd != bus->input_fd) {
2216 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2220 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2224 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2229 r = sd_bus_get_timeout(bus, &until);
2235 j = sd_event_source_set_time(bus->time_event_source, until);
2240 r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2247 static int quit_callback(sd_event_source *event, void *userdata) {
2248 sd_bus *bus = userdata;
2257 int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2260 assert_return(bus, -EINVAL);
2261 assert_return(event, -EINVAL);
2262 assert_return(!bus->event, -EBUSY);
2264 assert(!bus->input_io_event_source);
2265 assert(!bus->output_io_event_source);
2266 assert(!bus->time_event_source);
2268 bus->event = sd_event_ref(event);
2270 r = sd_event_add_io(event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2274 r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2278 if (bus->output_fd != bus->input_fd) {
2279 r = sd_event_add_io(event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2283 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2288 r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2292 r = sd_event_add_monotonic(event, 0, 0, time_callback, bus, &bus->time_event_source);
2296 r = sd_event_source_set_priority(bus->time_event_source, priority);
2300 r = sd_event_add_quit(event, quit_callback, bus, &bus->quit_event_source);
2307 sd_bus_detach_event(bus);
2311 int sd_bus_detach_event(sd_bus *bus) {
2312 assert_return(bus, -EINVAL);
2313 assert_return(bus->event, -ENXIO);
2315 if (bus->input_io_event_source)
2316 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2318 if (bus->output_io_event_source)
2319 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2321 if (bus->time_event_source)
2322 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2324 if (bus->quit_event_source)
2325 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2328 bus->event = sd_event_unref(bus->event);