1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
39 #include "bus-internal.h"
40 #include "bus-message.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
50 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
52 static void bus_close_fds(sd_bus *b) {
56 close_nointr_nofail(b->input_fd);
58 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
59 close_nointr_nofail(b->output_fd);
61 b->input_fd = b->output_fd = -1;
64 static void bus_node_destroy(sd_bus *b, struct node *n) {
65 struct node_callback *c;
66 struct node_vtable *v;
67 struct node_enumerator *e;
75 bus_node_destroy(b, n->child);
77 while ((c = n->callbacks)) {
78 LIST_REMOVE(callbacks, n->callbacks, c);
82 while ((v = n->vtables)) {
83 LIST_REMOVE(vtables, n->vtables, v);
88 while ((e = n->enumerators)) {
89 LIST_REMOVE(enumerators, n->enumerators, e);
94 LIST_REMOVE(siblings, n->parent->child, n);
96 assert_se(hashmap_remove(b->nodes, n->path) == n);
101 static void bus_free(sd_bus *b) {
102 struct filter_callback *f;
108 sd_bus_detach_event(b);
113 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
116 free(b->unique_name);
117 free(b->auth_buffer);
122 strv_free(b->exec_argv);
124 close_many(b->fds, b->n_fds);
127 for (i = 0; i < b->rqueue_size; i++)
128 sd_bus_message_unref(b->rqueue[i]);
131 for (i = 0; i < b->wqueue_size; i++)
132 sd_bus_message_unref(b->wqueue[i]);
135 hashmap_free_free(b->reply_callbacks);
136 prioq_free(b->reply_callbacks_prioq);
138 while ((f = b->filter_callbacks)) {
139 LIST_REMOVE(callbacks, b->filter_callbacks, f);
143 bus_match_free(&b->match_callbacks);
145 hashmap_free_free(b->vtable_methods);
146 hashmap_free_free(b->vtable_properties);
148 while ((n = hashmap_first(b->nodes)))
149 bus_node_destroy(b, n);
151 hashmap_free(b->nodes);
153 bus_kernel_flush_memfd(b);
155 assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
160 int sd_bus_new(sd_bus **ret) {
163 assert_return(ret, -EINVAL);
169 r->n_ref = REFCNT_INIT;
170 r->input_fd = r->output_fd = -1;
171 r->message_version = 1;
172 r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
173 r->original_pid = getpid();
175 assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
177 /* We guarantee that wqueue always has space for at least one
179 r->wqueue = new(sd_bus_message*, 1);
189 int sd_bus_set_address(sd_bus *bus, const char *address) {
192 assert_return(bus, -EINVAL);
193 assert_return(bus->state == BUS_UNSET, -EPERM);
194 assert_return(address, -EINVAL);
195 assert_return(!bus_pid_changed(bus), -ECHILD);
207 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
208 assert_return(bus, -EINVAL);
209 assert_return(bus->state == BUS_UNSET, -EPERM);
210 assert_return(input_fd >= 0, -EINVAL);
211 assert_return(output_fd >= 0, -EINVAL);
212 assert_return(!bus_pid_changed(bus), -ECHILD);
214 bus->input_fd = input_fd;
215 bus->output_fd = output_fd;
219 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
222 assert_return(bus, -EINVAL);
223 assert_return(bus->state == BUS_UNSET, -EPERM);
224 assert_return(path, -EINVAL);
225 assert_return(!strv_isempty(argv), -EINVAL);
226 assert_return(!bus_pid_changed(bus), -ECHILD);
238 free(bus->exec_path);
239 strv_free(bus->exec_argv);
247 int sd_bus_set_bus_client(sd_bus *bus, int b) {
248 assert_return(bus, -EINVAL);
249 assert_return(bus->state == BUS_UNSET, -EPERM);
250 assert_return(!bus_pid_changed(bus), -ECHILD);
252 bus->bus_client = !!b;
256 int sd_bus_negotiate_fds(sd_bus *bus, int b) {
257 assert_return(bus, -EINVAL);
258 assert_return(bus->state == BUS_UNSET, -EPERM);
259 assert_return(!bus_pid_changed(bus), -ECHILD);
261 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
265 int sd_bus_negotiate_attach_comm(sd_bus *bus, int b) {
266 assert_return(bus, -EINVAL);
267 assert_return(bus->state == BUS_UNSET, -EPERM);
268 assert_return(!bus_pid_changed(bus), -ECHILD);
270 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_COMM, b);
274 int sd_bus_negotiate_attach_exe(sd_bus *bus, int b) {
275 assert_return(bus, -EINVAL);
276 assert_return(bus->state == BUS_UNSET, -EPERM);
277 assert_return(!bus_pid_changed(bus), -ECHILD);
279 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_EXE, b);
283 int sd_bus_negotiate_attach_cmdline(sd_bus *bus, int b) {
284 assert_return(bus, -EINVAL);
285 assert_return(bus->state == BUS_UNSET, -EPERM);
286 assert_return(!bus_pid_changed(bus), -ECHILD);
288 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CMDLINE, b);
292 int sd_bus_negotiate_attach_cgroup(sd_bus *bus, int b) {
293 assert_return(bus, -EINVAL);
294 assert_return(bus->state == BUS_UNSET, -EPERM);
295 assert_return(!bus_pid_changed(bus), -ECHILD);
297 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CGROUP, b);
301 int sd_bus_negotiate_attach_caps(sd_bus *bus, int b) {
302 assert_return(bus, -EINVAL);
303 assert_return(bus->state == BUS_UNSET, -EPERM);
304 assert_return(!bus_pid_changed(bus), -ECHILD);
306 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CAPS, b);
310 int sd_bus_negotiate_attach_selinux_context(sd_bus *bus, int b) {
311 assert_return(bus, -EINVAL);
312 assert_return(bus->state == BUS_UNSET, -EPERM);
313 assert_return(!bus_pid_changed(bus), -ECHILD);
315 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_SECLABEL, b);
319 int sd_bus_negotiate_attach_audit(sd_bus *bus, int b) {
320 assert_return(bus, -EINVAL);
321 assert_return(bus->state == BUS_UNSET, -EPERM);
322 assert_return(!bus_pid_changed(bus), -ECHILD);
324 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_AUDIT, b);
328 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
329 assert_return(bus, -EINVAL);
330 assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
331 assert_return(bus->state == BUS_UNSET, -EPERM);
332 assert_return(!bus_pid_changed(bus), -ECHILD);
334 bus->is_server = !!b;
335 bus->server_id = server_id;
339 int sd_bus_set_anonymous(sd_bus *bus, int b) {
340 assert_return(bus, -EINVAL);
341 assert_return(bus->state == BUS_UNSET, -EPERM);
342 assert_return(!bus_pid_changed(bus), -ECHILD);
344 bus->anonymous_auth = !!b;
348 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
353 assert(bus->state == BUS_HELLO);
356 r = sd_bus_message_get_errno(reply);
362 r = sd_bus_message_read(reply, "s", &s);
366 if (!service_name_is_valid(s) || s[0] != ':')
369 bus->unique_name = strdup(s);
370 if (!bus->unique_name)
373 bus->state = BUS_RUNNING;
378 static int bus_send_hello(sd_bus *bus) {
379 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
384 if (!bus->bus_client || bus->is_kernel)
387 r = sd_bus_message_new_method_call(
389 "org.freedesktop.DBus",
391 "org.freedesktop.DBus",
397 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
400 int bus_start_running(sd_bus *bus) {
403 if (bus->bus_client && !bus->is_kernel) {
404 bus->state = BUS_HELLO;
408 bus->state = BUS_RUNNING;
412 static int parse_address_key(const char **p, const char *key, char **value) {
423 if (strncmp(*p, key, l) != 0)
436 while (*a != ';' && *a != ',' && *a != 0) {
454 c = (char) ((x << 4) | y);
461 t = realloc(r, n + 2);
489 static void skip_address_key(const char **p) {
493 *p += strcspn(*p, ",");
499 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
500 _cleanup_free_ char *path = NULL, *abstract = NULL;
509 while (**p != 0 && **p != ';') {
510 r = parse_address_key(p, "guid", guid);
516 r = parse_address_key(p, "path", &path);
522 r = parse_address_key(p, "abstract", &abstract);
531 if (!path && !abstract)
534 if (path && abstract)
539 if (l > sizeof(b->sockaddr.un.sun_path))
542 b->sockaddr.un.sun_family = AF_UNIX;
543 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
544 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
545 } else if (abstract) {
546 l = strlen(abstract);
547 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
550 b->sockaddr.un.sun_family = AF_UNIX;
551 b->sockaddr.un.sun_path[0] = 0;
552 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
553 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
559 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
560 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
562 struct addrinfo *result, hints = {
563 .ai_socktype = SOCK_STREAM,
564 .ai_flags = AI_ADDRCONFIG,
572 while (**p != 0 && **p != ';') {
573 r = parse_address_key(p, "guid", guid);
579 r = parse_address_key(p, "host", &host);
585 r = parse_address_key(p, "port", &port);
591 r = parse_address_key(p, "family", &family);
604 if (streq(family, "ipv4"))
605 hints.ai_family = AF_INET;
606 else if (streq(family, "ipv6"))
607 hints.ai_family = AF_INET6;
612 r = getaddrinfo(host, port, &hints, &result);
616 return -EADDRNOTAVAIL;
618 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
619 b->sockaddr_size = result->ai_addrlen;
621 freeaddrinfo(result);
626 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
628 unsigned n_argv = 0, j;
637 while (**p != 0 && **p != ';') {
638 r = parse_address_key(p, "guid", guid);
644 r = parse_address_key(p, "path", &path);
650 if (startswith(*p, "argv")) {
654 ul = strtoul(*p + 4, (char**) p, 10);
655 if (errno > 0 || **p != '=' || ul > 256) {
665 x = realloc(argv, sizeof(char*) * (ul + 2));
671 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
677 r = parse_address_key(p, NULL, argv + ul);
692 /* Make sure there are no holes in the array, with the
693 * exception of argv[0] */
694 for (j = 1; j < n_argv; j++)
700 if (argv && argv[0] == NULL) {
701 argv[0] = strdup(path);
713 for (j = 0; j < n_argv; j++)
721 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
722 _cleanup_free_ char *path = NULL;
730 while (**p != 0 && **p != ';') {
731 r = parse_address_key(p, "guid", guid);
737 r = parse_address_key(p, "path", &path);
756 static void bus_reset_parsed_address(sd_bus *b) {
760 b->sockaddr_size = 0;
761 strv_free(b->exec_argv);
765 b->server_id = SD_ID128_NULL;
770 static int bus_parse_next_address(sd_bus *b) {
771 _cleanup_free_ char *guid = NULL;
779 if (b->address[b->address_index] == 0)
782 bus_reset_parsed_address(b);
784 a = b->address + b->address_index;
793 if (startswith(a, "unix:")) {
796 r = parse_unix_address(b, &a, &guid);
801 } else if (startswith(a, "tcp:")) {
804 r = parse_tcp_address(b, &a, &guid);
810 } else if (startswith(a, "unixexec:")) {
813 r = parse_exec_address(b, &a, &guid);
819 } else if (startswith(a, "kernel:")) {
822 r = parse_kernel_address(b, &a, &guid);
835 r = sd_id128_from_string(guid, &b->server_id);
840 b->address_index = a - b->address;
844 static int bus_start_address(sd_bus *b) {
852 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
854 r = bus_socket_connect(b);
858 b->last_connect_error = -r;
860 } else if (b->exec_path) {
862 r = bus_socket_exec(b);
866 b->last_connect_error = -r;
867 } else if (b->kernel) {
869 r = bus_kernel_connect(b);
873 b->last_connect_error = -r;
876 r = bus_parse_next_address(b);
880 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
884 int bus_next_address(sd_bus *b) {
887 bus_reset_parsed_address(b);
888 return bus_start_address(b);
891 static int bus_start_fd(sd_bus *b) {
896 assert(b->input_fd >= 0);
897 assert(b->output_fd >= 0);
899 r = fd_nonblock(b->input_fd, true);
903 r = fd_cloexec(b->input_fd, true);
907 if (b->input_fd != b->output_fd) {
908 r = fd_nonblock(b->output_fd, true);
912 r = fd_cloexec(b->output_fd, true);
917 if (fstat(b->input_fd, &st) < 0)
920 if (S_ISCHR(b->input_fd))
921 return bus_kernel_take_fd(b);
923 return bus_socket_take_fd(b);
926 int sd_bus_start(sd_bus *bus) {
929 assert_return(bus, -EINVAL);
930 assert_return(bus->state == BUS_UNSET, -EPERM);
931 assert_return(!bus_pid_changed(bus), -ECHILD);
933 bus->state = BUS_OPENING;
935 if (bus->is_server && bus->bus_client)
938 if (bus->input_fd >= 0)
939 r = bus_start_fd(bus);
940 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel)
941 r = bus_start_address(bus);
948 return bus_send_hello(bus);
951 int sd_bus_open_system(sd_bus **ret) {
956 assert_return(ret, -EINVAL);
962 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
964 r = sd_bus_set_address(b, e);
968 b->sockaddr.un.sun_family = AF_UNIX;
969 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
970 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
973 b->bus_client = true;
987 int sd_bus_open_user(sd_bus **ret) {
993 assert_return(ret, -EINVAL);
999 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1001 r = sd_bus_set_address(b, e);
1005 e = secure_getenv("XDG_RUNTIME_DIR");
1012 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1017 b->sockaddr.un.sun_family = AF_UNIX;
1018 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1019 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1022 b->bus_client = true;
1024 r = sd_bus_start(b);
1036 void sd_bus_close(sd_bus *bus) {
1039 if (bus->state == BUS_CLOSED)
1041 if (bus_pid_changed(bus))
1044 bus->state = BUS_CLOSED;
1046 sd_bus_detach_event(bus);
1048 if (!bus->is_kernel)
1051 /* We'll leave the fd open in case this is a kernel bus, since
1052 * there might still be memblocks around that reference this
1053 * bus, and they might need to invoke the
1054 * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1058 sd_bus *sd_bus_ref(sd_bus *bus) {
1062 assert_se(REFCNT_INC(bus->n_ref) >= 2);
1067 sd_bus *sd_bus_unref(sd_bus *bus) {
1071 if (REFCNT_DEC(bus->n_ref) <= 0)
1077 int sd_bus_is_open(sd_bus *bus) {
1079 assert_return(bus, -EINVAL);
1080 assert_return(!bus_pid_changed(bus), -ECHILD);
1082 return BUS_IS_OPEN(bus->state);
1085 int sd_bus_can_send(sd_bus *bus, char type) {
1088 assert_return(bus, -EINVAL);
1089 assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1090 assert_return(!bus_pid_changed(bus), -ECHILD);
1092 if (type == SD_BUS_TYPE_UNIX_FD) {
1093 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1096 r = bus_ensure_running(bus);
1100 return bus->can_fds;
1103 return bus_type_is_valid(type);
1106 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1109 assert_return(bus, -EINVAL);
1110 assert_return(server_id, -EINVAL);
1111 assert_return(!bus_pid_changed(bus), -ECHILD);
1113 r = bus_ensure_running(bus);
1117 *server_id = bus->server_id;
1121 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1124 if (m->header->version > b->message_version)
1130 return bus_message_seal(m, ++b->serial);
1133 static int dispatch_wqueue(sd_bus *bus) {
1137 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1139 while (bus->wqueue_size > 0) {
1142 r = bus_kernel_write_message(bus, bus->wqueue[0]);
1144 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1150 /* Didn't do anything this time */
1152 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1153 /* Fully written. Let's drop the entry from
1156 * This isn't particularly optimized, but
1157 * well, this is supposed to be our worst-case
1158 * buffer only, and the socket buffer is
1159 * supposed to be our primary buffer, and if
1160 * it got full, then all bets are off
1163 sd_bus_message_unref(bus->wqueue[0]);
1164 bus->wqueue_size --;
1165 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1175 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1176 sd_bus_message *z = NULL;
1181 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1183 if (bus->rqueue_size > 0) {
1184 /* Dispatch a queued message */
1186 *m = bus->rqueue[0];
1187 bus->rqueue_size --;
1188 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1192 /* Try to read a new message */
1195 r = bus_kernel_read_message(bus, &z);
1197 r = bus_socket_read_message(bus, &z);
1213 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1216 assert_return(bus, -EINVAL);
1217 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1218 assert_return(m, -EINVAL);
1219 assert_return(!bus_pid_changed(bus), -ECHILD);
1222 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1229 /* If the serial number isn't kept, then we know that no reply
1231 if (!serial && !m->sealed)
1232 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1234 r = bus_seal_message(bus, m);
1238 /* If this is a reply and no reply was requested, then let's
1239 * suppress this, if we can */
1240 if (m->dont_send && !serial)
1243 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1247 r = bus_kernel_write_message(bus, m);
1249 r = bus_socket_write_message(bus, m, &idx);
1254 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1255 /* Wasn't fully written. So let's remember how
1256 * much was written. Note that the first entry
1257 * of the wqueue array is always allocated so
1258 * that we always can remember how much was
1260 bus->wqueue[0] = sd_bus_message_ref(m);
1261 bus->wqueue_size = 1;
1267 /* Just append it to the queue. */
1269 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1272 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1277 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1281 *serial = BUS_MESSAGE_SERIAL(m);
1286 static usec_t calc_elapse(uint64_t usec) {
1287 if (usec == (uint64_t) -1)
1291 usec = BUS_DEFAULT_TIMEOUT;
1293 return now(CLOCK_MONOTONIC) + usec;
1296 static int timeout_compare(const void *a, const void *b) {
1297 const struct reply_callback *x = a, *y = b;
1299 if (x->timeout != 0 && y->timeout == 0)
1302 if (x->timeout == 0 && y->timeout != 0)
1305 if (x->timeout < y->timeout)
1308 if (x->timeout > y->timeout)
1314 int sd_bus_send_with_reply(
1317 sd_bus_message_handler_t callback,
1322 struct reply_callback *c;
1325 assert_return(bus, -EINVAL);
1326 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1327 assert_return(m, -EINVAL);
1328 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1329 assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1330 assert_return(callback, -EINVAL);
1331 assert_return(!bus_pid_changed(bus), -ECHILD);
1333 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1337 if (usec != (uint64_t) -1) {
1338 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1343 r = bus_seal_message(bus, m);
1347 c = new0(struct reply_callback, 1);
1351 c->callback = callback;
1352 c->userdata = userdata;
1353 c->serial = BUS_MESSAGE_SERIAL(m);
1354 c->timeout = calc_elapse(usec);
1356 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1362 if (c->timeout != 0) {
1363 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1366 sd_bus_send_with_reply_cancel(bus, c->serial);
1371 r = sd_bus_send(bus, m, serial);
1373 sd_bus_send_with_reply_cancel(bus, c->serial);
1380 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1381 struct reply_callback *c;
1383 assert_return(bus, -EINVAL);
1384 assert_return(serial != 0, -EINVAL);
1385 assert_return(!bus_pid_changed(bus), -ECHILD);
1387 c = hashmap_remove(bus->reply_callbacks, &serial);
1391 if (c->timeout != 0)
1392 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1398 int bus_ensure_running(sd_bus *bus) {
1403 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED)
1405 if (bus->state == BUS_RUNNING)
1409 r = sd_bus_process(bus, NULL);
1412 if (bus->state == BUS_RUNNING)
1417 r = sd_bus_wait(bus, (uint64_t) -1);
1423 int sd_bus_send_with_reply_and_block(
1427 sd_bus_error *error,
1428 sd_bus_message **reply) {
1435 assert_return(bus, -EINVAL);
1436 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1437 assert_return(m, -EINVAL);
1438 assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1439 assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1440 assert_return(!bus_error_is_dirty(error), -EINVAL);
1441 assert_return(!bus_pid_changed(bus), -ECHILD);
1443 r = bus_ensure_running(bus);
1447 r = sd_bus_send(bus, m, &serial);
1451 timeout = calc_elapse(usec);
1455 sd_bus_message *incoming = NULL;
1460 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1463 /* Make sure there's room for queuing this
1464 * locally, before we read the message */
1466 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1475 r = bus_kernel_read_message(bus, &incoming);
1477 r = bus_socket_read_message(bus, &incoming);
1482 if (incoming->reply_serial == serial) {
1483 /* Found a match! */
1485 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1490 sd_bus_message_unref(incoming);
1495 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) {
1498 r = sd_bus_error_copy(error, &incoming->error);
1500 sd_bus_message_unref(incoming);
1504 k = sd_bus_error_get_errno(&incoming->error);
1505 sd_bus_message_unref(incoming);
1509 sd_bus_message_unref(incoming);
1513 /* There's already guaranteed to be room for
1514 * this, so need to resize things here */
1515 bus->rqueue[bus->rqueue_size ++] = incoming;
1518 /* Try to read more, right-away */
1527 n = now(CLOCK_MONOTONIC);
1533 left = (uint64_t) -1;
1535 r = bus_poll(bus, true, left);
1539 r = dispatch_wqueue(bus);
1545 int sd_bus_get_fd(sd_bus *bus) {
1547 assert_return(bus, -EINVAL);
1548 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1549 assert_return(bus->input_fd == bus->output_fd, -EPERM);
1550 assert_return(!bus_pid_changed(bus), -ECHILD);
1552 return bus->input_fd;
1555 int sd_bus_get_events(sd_bus *bus) {
1558 assert_return(bus, -EINVAL);
1559 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1560 assert_return(!bus_pid_changed(bus), -ECHILD);
1562 if (bus->state == BUS_OPENING)
1564 else if (bus->state == BUS_AUTHENTICATING) {
1566 if (bus_socket_auth_needs_write(bus))
1571 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1572 if (bus->rqueue_size <= 0)
1574 if (bus->wqueue_size > 0)
1581 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1582 struct reply_callback *c;
1584 assert_return(bus, -EINVAL);
1585 assert_return(timeout_usec, -EINVAL);
1586 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1587 assert_return(!bus_pid_changed(bus), -ECHILD);
1589 if (bus->state == BUS_AUTHENTICATING) {
1590 *timeout_usec = bus->auth_timeout;
1594 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1595 *timeout_usec = (uint64_t) -1;
1599 if (bus->rqueue_size > 0) {
1604 c = prioq_peek(bus->reply_callbacks_prioq);
1606 *timeout_usec = (uint64_t) -1;
1610 *timeout_usec = c->timeout;
1614 static int process_timeout(sd_bus *bus) {
1615 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1616 struct reply_callback *c;
1622 c = prioq_peek(bus->reply_callbacks_prioq);
1626 n = now(CLOCK_MONOTONIC);
1630 r = bus_message_new_synthetic_error(
1633 &SD_BUS_ERROR_MAKE(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1638 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1639 hashmap_remove(bus->reply_callbacks, &c->serial);
1641 r = c->callback(bus, m, c->userdata);
1644 return r < 0 ? r : 1;
1647 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1651 if (bus->state != BUS_HELLO)
1654 /* Let's make sure the first message on the bus is the HELLO
1655 * reply. But note that we don't actually parse the message
1656 * here (we leave that to the usual handling), we just verify
1657 * we don't let any earlier msg through. */
1659 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1660 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1663 if (m->reply_serial != bus->hello_serial)
1669 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1670 struct reply_callback *c;
1676 if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1677 m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1680 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1684 if (c->timeout != 0)
1685 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1687 r = sd_bus_message_rewind(m, true);
1691 r = c->callback(bus, m, c->userdata);
1697 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1698 struct filter_callback *l;
1705 bus->filter_callbacks_modified = false;
1707 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1709 if (bus->filter_callbacks_modified)
1712 /* Don't run this more than once per iteration */
1713 if (l->last_iteration == bus->iteration_counter)
1716 l->last_iteration = bus->iteration_counter;
1718 r = sd_bus_message_rewind(m, true);
1722 r = l->callback(bus, m, l->userdata);
1728 } while (bus->filter_callbacks_modified);
1733 static int process_match(sd_bus *bus, sd_bus_message *m) {
1740 bus->match_callbacks_modified = false;
1742 r = bus_match_run(bus, &bus->match_callbacks, m);
1746 } while (bus->match_callbacks_modified);
1751 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1752 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1758 if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
1761 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1764 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1767 if (streq_ptr(m->member, "Ping"))
1768 r = sd_bus_message_new_method_return(bus, m, &reply);
1769 else if (streq_ptr(m->member, "GetMachineId")) {
1773 r = sd_id128_get_machine(&id);
1777 r = sd_bus_message_new_method_return(bus, m, &reply);
1781 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1783 r = sd_bus_message_new_method_errorf(
1785 SD_BUS_ERROR_UNKNOWN_METHOD,
1786 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1792 r = sd_bus_send(bus, reply, NULL);
1799 static int process_message(sd_bus *bus, sd_bus_message *m) {
1805 bus->iteration_counter++;
1807 log_debug("Got message sender=%s object=%s interface=%s member=%s",
1808 strna(sd_bus_message_get_sender(m)),
1809 strna(sd_bus_message_get_path(m)),
1810 strna(sd_bus_message_get_interface(m)),
1811 strna(sd_bus_message_get_member(m)));
1813 r = process_hello(bus, m);
1817 r = process_reply(bus, m);
1821 r = process_filter(bus, m);
1825 r = process_match(bus, m);
1829 r = process_builtin(bus, m);
1833 return bus_process_object(bus, m);
1836 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1837 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1841 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1843 r = process_timeout(bus);
1847 r = dispatch_wqueue(bus);
1851 r = dispatch_rqueue(bus, &m);
1857 r = process_message(bus, m);
1862 r = sd_bus_message_rewind(m, true);
1871 if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
1873 r = sd_bus_reply_method_errorf(
1875 SD_BUS_ERROR_UNKNOWN_OBJECT,
1876 "Unknown object '%s'.", m->path);
1890 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1891 BUS_DONT_DESTROY(bus);
1894 /* Returns 0 when we didn't do anything. This should cause the
1895 * caller to invoke sd_bus_wait() before returning the next
1896 * time. Returns > 0 when we did something, which possibly
1897 * means *ret is filled in with an unprocessed message. */
1899 assert_return(bus, -EINVAL);
1900 assert_return(!bus_pid_changed(bus), -ECHILD);
1902 /* We don't allow recursively invoking sd_bus_process(). */
1903 assert_return(!bus->processing, -EBUSY);
1905 switch (bus->state) {
1912 r = bus_socket_process_opening(bus);
1919 case BUS_AUTHENTICATING:
1921 r = bus_socket_process_authenticating(bus);
1931 bus->processing = true;
1932 r = process_running(bus, ret);
1933 bus->processing = false;
1938 assert_not_reached("Unknown state");
1941 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1942 struct pollfd p[2] = {};
1945 usec_t m = (usec_t) -1;
1948 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1950 e = sd_bus_get_events(bus);
1955 /* The caller really needs some more data, he doesn't
1956 * care about what's already read, or any timeouts
1961 /* The caller wants to process if there's something to
1962 * process, but doesn't care otherwise */
1964 r = sd_bus_get_timeout(bus, &until);
1969 nw = now(CLOCK_MONOTONIC);
1970 m = until > nw ? until - nw : 0;
1974 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1977 p[0].fd = bus->input_fd;
1978 if (bus->output_fd == bus->input_fd) {
1982 p[0].events = e & POLLIN;
1983 p[1].fd = bus->output_fd;
1984 p[1].events = e & POLLOUT;
1988 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1992 return r > 0 ? 1 : 0;
1995 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1997 assert_return(bus, -EINVAL);
1998 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1999 assert_return(!bus_pid_changed(bus), -ECHILD);
2001 if (bus->rqueue_size > 0)
2004 return bus_poll(bus, false, timeout_usec);
2007 int sd_bus_flush(sd_bus *bus) {
2010 assert_return(bus, -EINVAL);
2011 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2012 assert_return(!bus_pid_changed(bus), -ECHILD);
2014 r = bus_ensure_running(bus);
2018 if (bus->wqueue_size <= 0)
2022 r = dispatch_wqueue(bus);
2026 if (bus->wqueue_size <= 0)
2029 r = bus_poll(bus, false, (uint64_t) -1);
2035 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2036 struct filter_callback *f;
2038 assert_return(bus, -EINVAL);
2039 assert_return(callback, -EINVAL);
2040 assert_return(!bus_pid_changed(bus), -ECHILD);
2042 f = new0(struct filter_callback, 1);
2045 f->callback = callback;
2046 f->userdata = userdata;
2048 bus->filter_callbacks_modified = true;
2049 LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2053 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2054 struct filter_callback *f;
2056 assert_return(bus, -EINVAL);
2057 assert_return(callback, -EINVAL);
2058 assert_return(!bus_pid_changed(bus), -ECHILD);
2060 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2061 if (f->callback == callback && f->userdata == userdata) {
2062 bus->filter_callbacks_modified = true;
2063 LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2072 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2073 struct bus_match_component *components = NULL;
2074 unsigned n_components = 0;
2075 uint64_t cookie = 0;
2078 assert_return(bus, -EINVAL);
2079 assert_return(match, -EINVAL);
2080 assert_return(!bus_pid_changed(bus), -ECHILD);
2082 r = bus_match_parse(match, &components, &n_components);
2086 if (bus->bus_client) {
2087 cookie = ++bus->match_cookie;
2089 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2094 bus->match_callbacks_modified = true;
2095 r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2097 if (bus->bus_client)
2098 bus_remove_match_internal(bus, match, cookie);
2102 bus_match_parse_free(components, n_components);
2106 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2107 struct bus_match_component *components = NULL;
2108 unsigned n_components = 0;
2110 uint64_t cookie = 0;
2112 assert_return(bus, -EINVAL);
2113 assert_return(match, -EINVAL);
2114 assert_return(!bus_pid_changed(bus), -ECHILD);
2116 r = bus_match_parse(match, &components, &n_components);
2120 bus->match_callbacks_modified = true;
2121 r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2123 if (bus->bus_client)
2124 q = bus_remove_match_internal(bus, match, cookie);
2126 bus_match_parse_free(components, n_components);
2128 return r < 0 ? r : q;
2131 bool bus_pid_changed(sd_bus *bus) {
2134 /* We don't support people creating a bus connection and
2135 * keeping it around over a fork(). Let's complain. */
2137 return bus->original_pid != getpid();
2140 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2141 void *bus = userdata;
2146 r = sd_bus_process(bus, NULL);
2153 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2154 void *bus = userdata;
2159 r = sd_bus_process(bus, NULL);
2166 static int prepare_callback(sd_event_source *s, void *userdata) {
2167 sd_bus *bus = userdata;
2174 e = sd_bus_get_events(bus);
2178 if (bus->output_fd != bus->input_fd) {
2180 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2184 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2188 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2193 r = sd_bus_get_timeout(bus, &until);
2199 j = sd_event_source_set_time(bus->time_event_source, until);
2204 r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2211 static int quit_callback(sd_event_source *event, void *userdata) {
2212 sd_bus *bus = userdata;
2221 int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2224 assert_return(bus, -EINVAL);
2225 assert_return(event, -EINVAL);
2226 assert_return(!bus->event, -EBUSY);
2228 assert(!bus->input_io_event_source);
2229 assert(!bus->output_io_event_source);
2230 assert(!bus->time_event_source);
2232 bus->event = sd_event_ref(event);
2234 r = sd_event_add_io(event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2238 r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2242 if (bus->output_fd != bus->input_fd) {
2243 r = sd_event_add_io(event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2247 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2252 r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2256 r = sd_event_add_monotonic(event, 0, 0, time_callback, bus, &bus->time_event_source);
2260 r = sd_event_source_set_priority(bus->time_event_source, priority);
2264 r = sd_event_add_quit(event, quit_callback, bus, &bus->quit_event_source);
2271 sd_bus_detach_event(bus);
2275 int sd_bus_detach_event(sd_bus *bus) {
2276 assert_return(bus, -EINVAL);
2277 assert_return(bus->event, -ENXIO);
2279 if (bus->input_io_event_source)
2280 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2282 if (bus->output_io_event_source)
2283 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2285 if (bus->time_event_source)
2286 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2288 if (bus->quit_event_source)
2289 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2292 bus->event = sd_event_unref(bus->event);