1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
39 #include "bus-internal.h"
40 #include "bus-message.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
49 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
51 static void bus_close_fds(sd_bus *b) {
55 close_nointr_nofail(b->input_fd);
57 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
58 close_nointr_nofail(b->output_fd);
60 b->input_fd = b->output_fd = -1;
63 static void bus_node_destroy(sd_bus *b, struct node *n) {
64 struct node_callback *c;
65 struct node_vtable *v;
66 struct node_enumerator *e;
74 bus_node_destroy(b, n->child);
76 while ((c = n->callbacks)) {
77 LIST_REMOVE(callbacks, n->callbacks, c);
81 while ((v = n->vtables)) {
82 LIST_REMOVE(vtables, n->vtables, v);
87 while ((e = n->enumerators)) {
88 LIST_REMOVE(enumerators, n->enumerators, e);
93 LIST_REMOVE(siblings, n->parent->child, n);
95 assert_se(hashmap_remove(b->nodes, n->path) == n);
100 static void bus_free(sd_bus *b) {
101 struct filter_callback *f;
110 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
113 free(b->unique_name);
114 free(b->auth_buffer);
119 strv_free(b->exec_argv);
121 close_many(b->fds, b->n_fds);
124 for (i = 0; i < b->rqueue_size; i++)
125 sd_bus_message_unref(b->rqueue[i]);
128 for (i = 0; i < b->wqueue_size; i++)
129 sd_bus_message_unref(b->wqueue[i]);
132 hashmap_free_free(b->reply_callbacks);
133 prioq_free(b->reply_callbacks_prioq);
135 while ((f = b->filter_callbacks)) {
136 LIST_REMOVE(callbacks, b->filter_callbacks, f);
140 bus_match_free(&b->match_callbacks);
142 hashmap_free_free(b->vtable_methods);
143 hashmap_free_free(b->vtable_properties);
145 while ((n = hashmap_first(b->nodes)))
146 bus_node_destroy(b, n);
148 hashmap_free(b->nodes);
150 bus_kernel_flush_memfd(b);
152 assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
157 int sd_bus_new(sd_bus **ret) {
160 assert_return(ret, -EINVAL);
166 r->n_ref = REFCNT_INIT;
167 r->input_fd = r->output_fd = -1;
168 r->message_version = 1;
169 r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
170 r->original_pid = getpid();
172 assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
174 /* We guarantee that wqueue always has space for at least one
176 r->wqueue = new(sd_bus_message*, 1);
186 int sd_bus_set_address(sd_bus *bus, const char *address) {
189 assert_return(bus, -EINVAL);
190 assert_return(bus->state == BUS_UNSET, -EPERM);
191 assert_return(address, -EINVAL);
192 assert_return(!bus_pid_changed(bus), -ECHILD);
204 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
205 assert_return(bus, -EINVAL);
206 assert_return(bus->state == BUS_UNSET, -EPERM);
207 assert_return(input_fd >= 0, -EINVAL);
208 assert_return(output_fd >= 0, -EINVAL);
209 assert_return(!bus_pid_changed(bus), -ECHILD);
211 bus->input_fd = input_fd;
212 bus->output_fd = output_fd;
216 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
219 assert_return(bus, -EINVAL);
220 assert_return(bus->state == BUS_UNSET, -EPERM);
221 assert_return(path, -EINVAL);
222 assert_return(!strv_isempty(argv), -EINVAL);
223 assert_return(!bus_pid_changed(bus), -ECHILD);
235 free(bus->exec_path);
236 strv_free(bus->exec_argv);
244 int sd_bus_set_bus_client(sd_bus *bus, int b) {
245 assert_return(bus, -EINVAL);
246 assert_return(bus->state == BUS_UNSET, -EPERM);
247 assert_return(!bus_pid_changed(bus), -ECHILD);
249 bus->bus_client = !!b;
253 int sd_bus_negotiate_fds(sd_bus *bus, int b) {
254 assert_return(bus, -EINVAL);
255 assert_return(bus->state == BUS_UNSET, -EPERM);
256 assert_return(!bus_pid_changed(bus), -ECHILD);
258 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
262 int sd_bus_negotiate_attach_comm(sd_bus *bus, int b) {
263 assert_return(bus, -EINVAL);
264 assert_return(bus->state == BUS_UNSET, -EPERM);
265 assert_return(!bus_pid_changed(bus), -ECHILD);
267 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_COMM, b);
271 int sd_bus_negotiate_attach_exe(sd_bus *bus, int b) {
272 assert_return(bus, -EINVAL);
273 assert_return(bus->state == BUS_UNSET, -EPERM);
274 assert_return(!bus_pid_changed(bus), -ECHILD);
276 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_EXE, b);
280 int sd_bus_negotiate_attach_cmdline(sd_bus *bus, int b) {
281 assert_return(bus, -EINVAL);
282 assert_return(bus->state == BUS_UNSET, -EPERM);
283 assert_return(!bus_pid_changed(bus), -ECHILD);
285 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CMDLINE, b);
289 int sd_bus_negotiate_attach_cgroup(sd_bus *bus, int b) {
290 assert_return(bus, -EINVAL);
291 assert_return(bus->state == BUS_UNSET, -EPERM);
292 assert_return(!bus_pid_changed(bus), -ECHILD);
294 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CGROUP, b);
298 int sd_bus_negotiate_attach_caps(sd_bus *bus, int b) {
299 assert_return(bus, -EINVAL);
300 assert_return(bus->state == BUS_UNSET, -EPERM);
301 assert_return(!bus_pid_changed(bus), -ECHILD);
303 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CAPS, b);
307 int sd_bus_negotiate_attach_selinux_context(sd_bus *bus, int b) {
308 assert_return(bus, -EINVAL);
309 assert_return(bus->state == BUS_UNSET, -EPERM);
310 assert_return(!bus_pid_changed(bus), -ECHILD);
312 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_SECLABEL, b);
316 int sd_bus_negotiate_attach_audit(sd_bus *bus, int b) {
317 assert_return(bus, -EINVAL);
318 assert_return(bus->state == BUS_UNSET, -EPERM);
319 assert_return(!bus_pid_changed(bus), -ECHILD);
321 SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_AUDIT, b);
325 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
326 assert_return(bus, -EINVAL);
327 assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
328 assert_return(bus->state == BUS_UNSET, -EPERM);
329 assert_return(!bus_pid_changed(bus), -ECHILD);
331 bus->is_server = !!b;
332 bus->server_id = server_id;
336 int sd_bus_set_anonymous(sd_bus *bus, int b) {
337 assert_return(bus, -EINVAL);
338 assert_return(bus->state == BUS_UNSET, -EPERM);
339 assert_return(!bus_pid_changed(bus), -ECHILD);
341 bus->anonymous_auth = !!b;
345 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
350 assert(bus->state == BUS_HELLO);
353 r = bus_message_to_errno(reply);
357 r = sd_bus_message_read(reply, "s", &s);
361 if (!service_name_is_valid(s) || s[0] != ':')
364 bus->unique_name = strdup(s);
365 if (!bus->unique_name)
368 bus->state = BUS_RUNNING;
373 static int bus_send_hello(sd_bus *bus) {
374 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
379 if (!bus->bus_client || bus->is_kernel)
382 r = sd_bus_message_new_method_call(
384 "org.freedesktop.DBus",
386 "org.freedesktop.DBus",
392 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
395 int bus_start_running(sd_bus *bus) {
398 if (bus->bus_client && !bus->is_kernel) {
399 bus->state = BUS_HELLO;
403 bus->state = BUS_RUNNING;
407 static int parse_address_key(const char **p, const char *key, char **value) {
418 if (strncmp(*p, key, l) != 0)
431 while (*a != ';' && *a != ',' && *a != 0) {
449 c = (char) ((x << 4) | y);
456 t = realloc(r, n + 2);
484 static void skip_address_key(const char **p) {
488 *p += strcspn(*p, ",");
494 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
495 _cleanup_free_ char *path = NULL, *abstract = NULL;
504 while (**p != 0 && **p != ';') {
505 r = parse_address_key(p, "guid", guid);
511 r = parse_address_key(p, "path", &path);
517 r = parse_address_key(p, "abstract", &abstract);
526 if (!path && !abstract)
529 if (path && abstract)
534 if (l > sizeof(b->sockaddr.un.sun_path))
537 b->sockaddr.un.sun_family = AF_UNIX;
538 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
539 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
540 } else if (abstract) {
541 l = strlen(abstract);
542 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
545 b->sockaddr.un.sun_family = AF_UNIX;
546 b->sockaddr.un.sun_path[0] = 0;
547 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
548 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
554 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
555 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
557 struct addrinfo *result, hints = {
558 .ai_socktype = SOCK_STREAM,
559 .ai_flags = AI_ADDRCONFIG,
567 while (**p != 0 && **p != ';') {
568 r = parse_address_key(p, "guid", guid);
574 r = parse_address_key(p, "host", &host);
580 r = parse_address_key(p, "port", &port);
586 r = parse_address_key(p, "family", &family);
599 if (streq(family, "ipv4"))
600 hints.ai_family = AF_INET;
601 else if (streq(family, "ipv6"))
602 hints.ai_family = AF_INET6;
607 r = getaddrinfo(host, port, &hints, &result);
611 return -EADDRNOTAVAIL;
613 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
614 b->sockaddr_size = result->ai_addrlen;
616 freeaddrinfo(result);
621 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
623 unsigned n_argv = 0, j;
632 while (**p != 0 && **p != ';') {
633 r = parse_address_key(p, "guid", guid);
639 r = parse_address_key(p, "path", &path);
645 if (startswith(*p, "argv")) {
649 ul = strtoul(*p + 4, (char**) p, 10);
650 if (errno > 0 || **p != '=' || ul > 256) {
660 x = realloc(argv, sizeof(char*) * (ul + 2));
666 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
672 r = parse_address_key(p, NULL, argv + ul);
687 /* Make sure there are no holes in the array, with the
688 * exception of argv[0] */
689 for (j = 1; j < n_argv; j++)
695 if (argv && argv[0] == NULL) {
696 argv[0] = strdup(path);
708 for (j = 0; j < n_argv; j++)
716 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
717 _cleanup_free_ char *path = NULL;
725 while (**p != 0 && **p != ';') {
726 r = parse_address_key(p, "guid", guid);
732 r = parse_address_key(p, "path", &path);
751 static void bus_reset_parsed_address(sd_bus *b) {
755 b->sockaddr_size = 0;
756 strv_free(b->exec_argv);
760 b->server_id = SD_ID128_NULL;
765 static int bus_parse_next_address(sd_bus *b) {
766 _cleanup_free_ char *guid = NULL;
774 if (b->address[b->address_index] == 0)
777 bus_reset_parsed_address(b);
779 a = b->address + b->address_index;
788 if (startswith(a, "unix:")) {
791 r = parse_unix_address(b, &a, &guid);
796 } else if (startswith(a, "tcp:")) {
799 r = parse_tcp_address(b, &a, &guid);
805 } else if (startswith(a, "unixexec:")) {
808 r = parse_exec_address(b, &a, &guid);
814 } else if (startswith(a, "kernel:")) {
817 r = parse_kernel_address(b, &a, &guid);
830 r = sd_id128_from_string(guid, &b->server_id);
835 b->address_index = a - b->address;
839 static int bus_start_address(sd_bus *b) {
847 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
849 r = bus_socket_connect(b);
853 b->last_connect_error = -r;
855 } else if (b->exec_path) {
857 r = bus_socket_exec(b);
861 b->last_connect_error = -r;
862 } else if (b->kernel) {
864 r = bus_kernel_connect(b);
868 b->last_connect_error = -r;
871 r = bus_parse_next_address(b);
875 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
879 int bus_next_address(sd_bus *b) {
882 bus_reset_parsed_address(b);
883 return bus_start_address(b);
886 static int bus_start_fd(sd_bus *b) {
891 assert(b->input_fd >= 0);
892 assert(b->output_fd >= 0);
894 r = fd_nonblock(b->input_fd, true);
898 r = fd_cloexec(b->input_fd, true);
902 if (b->input_fd != b->output_fd) {
903 r = fd_nonblock(b->output_fd, true);
907 r = fd_cloexec(b->output_fd, true);
912 if (fstat(b->input_fd, &st) < 0)
915 if (S_ISCHR(b->input_fd))
916 return bus_kernel_take_fd(b);
918 return bus_socket_take_fd(b);
921 int sd_bus_start(sd_bus *bus) {
924 assert_return(bus, -EINVAL);
925 assert_return(bus->state == BUS_UNSET, -EPERM);
926 assert_return(!bus_pid_changed(bus), -ECHILD);
928 bus->state = BUS_OPENING;
930 if (bus->is_server && bus->bus_client)
933 if (bus->input_fd >= 0)
934 r = bus_start_fd(bus);
935 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel)
936 r = bus_start_address(bus);
943 return bus_send_hello(bus);
946 int sd_bus_open_system(sd_bus **ret) {
951 assert_return(ret, -EINVAL);
957 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
959 r = sd_bus_set_address(b, e);
963 b->sockaddr.un.sun_family = AF_UNIX;
964 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
965 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
968 b->bus_client = true;
982 int sd_bus_open_user(sd_bus **ret) {
988 assert_return(ret, -EINVAL);
994 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
996 r = sd_bus_set_address(b, e);
1000 e = secure_getenv("XDG_RUNTIME_DIR");
1007 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1012 b->sockaddr.un.sun_family = AF_UNIX;
1013 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1014 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1017 b->bus_client = true;
1019 r = sd_bus_start(b);
1031 void sd_bus_close(sd_bus *bus) {
1034 if (bus->state == BUS_CLOSED)
1036 if (bus_pid_changed(bus))
1039 bus->state = BUS_CLOSED;
1041 if (!bus->is_kernel)
1044 /* We'll leave the fd open in case this is a kernel bus, since
1045 * there might still be memblocks around that reference this
1046 * bus, and they might need to invoke the
1047 * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1051 sd_bus *sd_bus_ref(sd_bus *bus) {
1055 assert_se(REFCNT_INC(bus->n_ref) >= 2);
1060 sd_bus *sd_bus_unref(sd_bus *bus) {
1064 if (REFCNT_DEC(bus->n_ref) <= 0)
1070 int sd_bus_is_open(sd_bus *bus) {
1072 assert_return(bus, -EINVAL);
1073 assert_return(!bus_pid_changed(bus), -ECHILD);
1075 return BUS_IS_OPEN(bus->state);
1078 int sd_bus_can_send(sd_bus *bus, char type) {
1081 assert_return(bus, -EINVAL);
1082 assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1083 assert_return(!bus_pid_changed(bus), -ECHILD);
1085 if (type == SD_BUS_TYPE_UNIX_FD) {
1086 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1089 r = bus_ensure_running(bus);
1093 return bus->can_fds;
1096 return bus_type_is_valid(type);
1099 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1102 assert_return(bus, -EINVAL);
1103 assert_return(server_id, -EINVAL);
1104 assert_return(!bus_pid_changed(bus), -ECHILD);
1106 r = bus_ensure_running(bus);
1110 *server_id = bus->server_id;
1114 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1117 if (m->header->version > b->message_version)
1123 return bus_message_seal(m, ++b->serial);
1126 static int dispatch_wqueue(sd_bus *bus) {
1130 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1132 while (bus->wqueue_size > 0) {
1135 r = bus_kernel_write_message(bus, bus->wqueue[0]);
1137 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1143 /* Didn't do anything this time */
1145 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1146 /* Fully written. Let's drop the entry from
1149 * This isn't particularly optimized, but
1150 * well, this is supposed to be our worst-case
1151 * buffer only, and the socket buffer is
1152 * supposed to be our primary buffer, and if
1153 * it got full, then all bets are off
1156 sd_bus_message_unref(bus->wqueue[0]);
1157 bus->wqueue_size --;
1158 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1168 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1169 sd_bus_message *z = NULL;
1174 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1176 if (bus->rqueue_size > 0) {
1177 /* Dispatch a queued message */
1179 *m = bus->rqueue[0];
1180 bus->rqueue_size --;
1181 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1185 /* Try to read a new message */
1188 r = bus_kernel_read_message(bus, &z);
1190 r = bus_socket_read_message(bus, &z);
1206 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1209 assert_return(bus, -EINVAL);
1210 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1211 assert_return(m, -EINVAL);
1212 assert_return(!bus_pid_changed(bus), -ECHILD);
1215 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1222 /* If the serial number isn't kept, then we know that no reply
1224 if (!serial && !m->sealed)
1225 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1227 r = bus_seal_message(bus, m);
1231 /* If this is a reply and no reply was requested, then let's
1232 * suppress this, if we can */
1233 if (m->dont_send && !serial)
1236 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1240 r = bus_kernel_write_message(bus, m);
1242 r = bus_socket_write_message(bus, m, &idx);
1247 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1248 /* Wasn't fully written. So let's remember how
1249 * much was written. Note that the first entry
1250 * of the wqueue array is always allocated so
1251 * that we always can remember how much was
1253 bus->wqueue[0] = sd_bus_message_ref(m);
1254 bus->wqueue_size = 1;
1260 /* Just append it to the queue. */
1262 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1265 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1270 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1274 *serial = BUS_MESSAGE_SERIAL(m);
1279 static usec_t calc_elapse(uint64_t usec) {
1280 if (usec == (uint64_t) -1)
1284 usec = BUS_DEFAULT_TIMEOUT;
1286 return now(CLOCK_MONOTONIC) + usec;
1289 static int timeout_compare(const void *a, const void *b) {
1290 const struct reply_callback *x = a, *y = b;
1292 if (x->timeout != 0 && y->timeout == 0)
1295 if (x->timeout == 0 && y->timeout != 0)
1298 if (x->timeout < y->timeout)
1301 if (x->timeout > y->timeout)
1307 int sd_bus_send_with_reply(
1310 sd_bus_message_handler_t callback,
1315 struct reply_callback *c;
1318 assert_return(bus, -EINVAL);
1319 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1320 assert_return(m, -EINVAL);
1321 assert_return(m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL, -EINVAL);
1322 assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1323 assert_return(callback, -EINVAL);
1324 assert_return(!bus_pid_changed(bus), -ECHILD);
1326 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1330 if (usec != (uint64_t) -1) {
1331 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1336 r = bus_seal_message(bus, m);
1340 c = new0(struct reply_callback, 1);
1344 c->callback = callback;
1345 c->userdata = userdata;
1346 c->serial = BUS_MESSAGE_SERIAL(m);
1347 c->timeout = calc_elapse(usec);
1349 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1355 if (c->timeout != 0) {
1356 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1359 sd_bus_send_with_reply_cancel(bus, c->serial);
1364 r = sd_bus_send(bus, m, serial);
1366 sd_bus_send_with_reply_cancel(bus, c->serial);
1373 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1374 struct reply_callback *c;
1376 assert_return(bus, -EINVAL);
1377 assert_return(serial != 0, -EINVAL);
1378 assert_return(!bus_pid_changed(bus), -ECHILD);
1380 c = hashmap_remove(bus->reply_callbacks, &serial);
1384 if (c->timeout != 0)
1385 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1391 int bus_ensure_running(sd_bus *bus) {
1396 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED)
1398 if (bus->state == BUS_RUNNING)
1402 r = sd_bus_process(bus, NULL);
1405 if (bus->state == BUS_RUNNING)
1410 r = sd_bus_wait(bus, (uint64_t) -1);
1416 int sd_bus_send_with_reply_and_block(
1420 sd_bus_error *error,
1421 sd_bus_message **reply) {
1428 assert_return(bus, -EINVAL);
1429 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1430 assert_return(m, -EINVAL);
1431 assert_return(m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL, -EINVAL);
1432 assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1433 assert_return(!bus_error_is_dirty(error), -EINVAL);
1434 assert_return(!bus_pid_changed(bus), -ECHILD);
1436 r = bus_ensure_running(bus);
1440 r = sd_bus_send(bus, m, &serial);
1444 timeout = calc_elapse(usec);
1448 sd_bus_message *incoming = NULL;
1453 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1456 /* Make sure there's room for queuing this
1457 * locally, before we read the message */
1459 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1468 r = bus_kernel_read_message(bus, &incoming);
1470 r = bus_socket_read_message(bus, &incoming);
1475 if (incoming->reply_serial == serial) {
1476 /* Found a match! */
1478 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1483 sd_bus_message_unref(incoming);
1488 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1491 r = sd_bus_error_copy(error, &incoming->error);
1493 sd_bus_message_unref(incoming);
1497 k = bus_error_to_errno(&incoming->error);
1498 sd_bus_message_unref(incoming);
1502 sd_bus_message_unref(incoming);
1506 /* There's already guaranteed to be room for
1507 * this, so need to resize things here */
1508 bus->rqueue[bus->rqueue_size ++] = incoming;
1511 /* Try to read more, right-away */
1520 n = now(CLOCK_MONOTONIC);
1526 left = (uint64_t) -1;
1528 r = bus_poll(bus, true, left);
1532 r = dispatch_wqueue(bus);
1538 int sd_bus_get_fd(sd_bus *bus) {
1540 assert_return(bus, -EINVAL);
1541 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1542 assert_return(bus->input_fd == bus->output_fd, -EPERM);
1543 assert_return(!bus_pid_changed(bus), -ECHILD);
1545 return bus->input_fd;
1548 int sd_bus_get_events(sd_bus *bus) {
1551 assert_return(bus, -EINVAL);
1552 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1553 assert_return(!bus_pid_changed(bus), -ECHILD);
1555 if (bus->state == BUS_OPENING)
1557 else if (bus->state == BUS_AUTHENTICATING) {
1559 if (bus_socket_auth_needs_write(bus))
1564 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1565 if (bus->rqueue_size <= 0)
1567 if (bus->wqueue_size > 0)
1574 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1575 struct reply_callback *c;
1577 assert_return(bus, -EINVAL);
1578 assert_return(timeout_usec, -EINVAL);
1579 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1580 assert_return(!bus_pid_changed(bus), -ECHILD);
1582 if (bus->state == BUS_AUTHENTICATING) {
1583 *timeout_usec = bus->auth_timeout;
1587 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1588 *timeout_usec = (uint64_t) -1;
1592 c = prioq_peek(bus->reply_callbacks_prioq);
1594 *timeout_usec = (uint64_t) -1;
1598 *timeout_usec = c->timeout;
1602 static int process_timeout(sd_bus *bus) {
1603 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1604 struct reply_callback *c;
1610 c = prioq_peek(bus->reply_callbacks_prioq);
1614 n = now(CLOCK_MONOTONIC);
1618 r = bus_message_new_synthetic_error(
1621 &SD_BUS_ERROR_MAKE("org.freedesktop.DBus.Error.Timeout", "Timed out"),
1626 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1627 hashmap_remove(bus->reply_callbacks, &c->serial);
1629 r = c->callback(bus, m, c->userdata);
1632 return r < 0 ? r : 1;
1635 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1639 if (bus->state != BUS_HELLO)
1642 /* Let's make sure the first message on the bus is the HELLO
1643 * reply. But note that we don't actually parse the message
1644 * here (we leave that to the usual handling), we just verify
1645 * we don't let any earlier msg through. */
1647 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1648 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1651 if (m->reply_serial != bus->hello_serial)
1657 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1658 struct reply_callback *c;
1664 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1665 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1668 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1672 if (c->timeout != 0)
1673 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1675 r = sd_bus_message_rewind(m, true);
1679 r = c->callback(bus, m, c->userdata);
1685 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1686 struct filter_callback *l;
1693 bus->filter_callbacks_modified = false;
1695 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1697 if (bus->filter_callbacks_modified)
1700 /* Don't run this more than once per iteration */
1701 if (l->last_iteration == bus->iteration_counter)
1704 l->last_iteration = bus->iteration_counter;
1706 r = sd_bus_message_rewind(m, true);
1710 r = l->callback(bus, m, l->userdata);
1716 } while (bus->filter_callbacks_modified);
1721 static int process_match(sd_bus *bus, sd_bus_message *m) {
1728 bus->match_callbacks_modified = false;
1730 r = bus_match_run(bus, &bus->match_callbacks, m);
1734 } while (bus->match_callbacks_modified);
1739 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1740 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1746 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1749 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1752 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1755 if (streq_ptr(m->member, "Ping"))
1756 r = sd_bus_message_new_method_return(bus, m, &reply);
1757 else if (streq_ptr(m->member, "GetMachineId")) {
1761 r = sd_id128_get_machine(&id);
1765 r = sd_bus_message_new_method_return(bus, m, &reply);
1769 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1771 r = sd_bus_message_new_method_errorf(
1773 "org.freedesktop.DBus.Error.UnknownMethod",
1774 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1780 r = sd_bus_send(bus, reply, NULL);
1787 static int process_message(sd_bus *bus, sd_bus_message *m) {
1793 bus->iteration_counter++;
1795 r = process_hello(bus, m);
1799 r = process_reply(bus, m);
1803 r = process_filter(bus, m);
1807 r = process_match(bus, m);
1811 r = process_builtin(bus, m);
1815 return bus_process_object(bus, m);
1818 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1819 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1823 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1825 r = process_timeout(bus);
1829 r = dispatch_wqueue(bus);
1833 r = dispatch_rqueue(bus, &m);
1839 r = process_message(bus, m);
1844 r = sd_bus_message_rewind(m, true);
1853 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1855 r = sd_bus_reply_method_errorf(
1857 "org.freedesktop.DBus.Error.UnknownObject",
1858 "Unknown object '%s'.", m->path);
1872 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1875 /* Returns 0 when we didn't do anything. This should cause the
1876 * caller to invoke sd_bus_wait() before returning the next
1877 * time. Returns > 0 when we did something, which possibly
1878 * means *ret is filled in with an unprocessed message. */
1880 assert_return(bus, -EINVAL);
1881 assert_return(!bus_pid_changed(bus), -ECHILD);
1883 /* We don't allow recursively invoking sd_bus_process(). */
1884 assert_return(!bus->processing, -EBUSY);
1886 switch (bus->state) {
1893 r = bus_socket_process_opening(bus);
1900 case BUS_AUTHENTICATING:
1902 r = bus_socket_process_authenticating(bus);
1912 bus->processing = true;
1913 r = process_running(bus, ret);
1914 bus->processing = false;
1919 assert_not_reached("Unknown state");
1922 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1923 struct pollfd p[2] = {};
1929 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1931 e = sd_bus_get_events(bus);
1938 r = sd_bus_get_timeout(bus, &until);
1945 nw = now(CLOCK_MONOTONIC);
1946 m = until > nw ? until - nw : 0;
1949 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1952 p[0].fd = bus->input_fd;
1953 if (bus->output_fd == bus->input_fd) {
1957 p[0].events = e & POLLIN;
1958 p[1].fd = bus->output_fd;
1959 p[1].events = e & POLLOUT;
1963 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1967 return r > 0 ? 1 : 0;
1970 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1972 assert_return(bus, -EINVAL);
1973 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1974 assert_return(!bus_pid_changed(bus), -ECHILD);
1976 if (bus->rqueue_size > 0)
1979 return bus_poll(bus, false, timeout_usec);
1982 int sd_bus_flush(sd_bus *bus) {
1985 assert_return(bus, -EINVAL);
1986 assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1987 assert_return(!bus_pid_changed(bus), -ECHILD);
1989 r = bus_ensure_running(bus);
1993 if (bus->wqueue_size <= 0)
1997 r = dispatch_wqueue(bus);
2001 if (bus->wqueue_size <= 0)
2004 r = bus_poll(bus, false, (uint64_t) -1);
2010 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2011 struct filter_callback *f;
2013 assert_return(bus, -EINVAL);
2014 assert_return(callback, -EINVAL);
2015 assert_return(!bus_pid_changed(bus), -ECHILD);
2017 f = new0(struct filter_callback, 1);
2020 f->callback = callback;
2021 f->userdata = userdata;
2023 bus->filter_callbacks_modified = true;
2024 LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2028 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2029 struct filter_callback *f;
2031 assert_return(bus, -EINVAL);
2032 assert_return(callback, -EINVAL);
2033 assert_return(!bus_pid_changed(bus), -ECHILD);
2035 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2036 if (f->callback == callback && f->userdata == userdata) {
2037 bus->filter_callbacks_modified = true;
2038 LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2047 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2048 struct bus_match_component *components = NULL;
2049 unsigned n_components = 0;
2050 uint64_t cookie = 0;
2053 assert_return(bus, -EINVAL);
2054 assert_return(match, -EINVAL);
2055 assert_return(!bus_pid_changed(bus), -ECHILD);
2057 r = bus_match_parse(match, &components, &n_components);
2061 if (bus->bus_client) {
2062 cookie = ++bus->match_cookie;
2064 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2069 bus->match_callbacks_modified = true;
2070 r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2072 if (bus->bus_client)
2073 bus_remove_match_internal(bus, match, cookie);
2077 bus_match_parse_free(components, n_components);
2081 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2082 struct bus_match_component *components = NULL;
2083 unsigned n_components = 0;
2085 uint64_t cookie = 0;
2087 assert_return(bus, -EINVAL);
2088 assert_return(match, -EINVAL);
2089 assert_return(!bus_pid_changed(bus), -ECHILD);
2091 r = bus_match_parse(match, &components, &n_components);
2095 bus->match_callbacks_modified = true;
2096 r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2098 if (bus->bus_client)
2099 q = bus_remove_match_internal(bus, match, cookie);
2101 bus_match_parse_free(components, n_components);
2103 return r < 0 ? r : q;
2106 bool bus_pid_changed(sd_bus *bus) {
2109 /* We don't support people creating a bus connection and
2110 * keeping it around over a fork(). Let's complain. */
2112 return bus->original_pid != getpid();