1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
38 #include "bus-internal.h"
39 #include "bus-message.h"
41 #include "bus-socket.h"
42 #include "bus-kernel.h"
43 #include "bus-control.h"
45 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
47 static void bus_close_fds(sd_bus *b) {
51 close_nointr_nofail(b->input_fd);
53 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
54 close_nointr_nofail(b->output_fd);
56 b->input_fd = b->output_fd = -1;
59 static void bus_free(sd_bus *b) {
60 struct filter_callback *f;
61 struct object_callback *c;
69 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
78 strv_free(b->exec_argv);
80 close_many(b->fds, b->n_fds);
83 for (i = 0; i < b->rqueue_size; i++)
84 sd_bus_message_unref(b->rqueue[i]);
87 for (i = 0; i < b->wqueue_size; i++)
88 sd_bus_message_unref(b->wqueue[i]);
91 hashmap_free_free(b->reply_callbacks);
92 prioq_free(b->reply_callbacks_prioq);
94 while ((f = b->filter_callbacks)) {
95 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
99 while ((c = hashmap_steal_first(b->object_callbacks))) {
104 hashmap_free(b->object_callbacks);
105 bus_match_free(&b->match_callbacks);
107 bus_kernel_flush_memfd(b);
112 int sd_bus_new(sd_bus **ret) {
122 r->n_ref = REFCNT_INIT;
123 r->input_fd = r->output_fd = -1;
124 r->message_version = 1;
125 r->negotiate_fds = true;
126 r->original_pid = getpid();
128 /* We guarantee that wqueue always has space for at least one
130 r->wqueue = new(sd_bus_message*, 1);
140 int sd_bus_set_address(sd_bus *bus, const char *address) {
145 if (bus->state != BUS_UNSET)
149 if (bus_pid_changed(bus))
162 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
165 if (bus->state != BUS_UNSET)
171 if (bus_pid_changed(bus))
174 bus->input_fd = input_fd;
175 bus->output_fd = output_fd;
179 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
184 if (bus->state != BUS_UNSET)
188 if (strv_isempty(argv))
190 if (bus_pid_changed(bus))
203 free(bus->exec_path);
204 strv_free(bus->exec_argv);
212 int sd_bus_set_bus_client(sd_bus *bus, int b) {
215 if (bus->state != BUS_UNSET)
217 if (bus_pid_changed(bus))
220 bus->bus_client = !!b;
224 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
227 if (bus->state != BUS_UNSET)
229 if (bus_pid_changed(bus))
232 bus->negotiate_fds = !!b;
236 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
239 if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
241 if (bus->state != BUS_UNSET)
243 if (bus_pid_changed(bus))
246 bus->is_server = !!b;
247 bus->server_id = server_id;
251 int sd_bus_set_anonymous(sd_bus *bus, int b) {
254 if (bus->state != BUS_UNSET)
256 if (bus_pid_changed(bus))
259 bus->anonymous_auth = !!b;
263 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
268 assert(bus->state == BUS_HELLO);
271 r = bus_message_to_errno(reply);
275 r = sd_bus_message_read(reply, "s", &s);
279 if (!service_name_is_valid(s) || s[0] != ':')
282 bus->unique_name = strdup(s);
283 if (!bus->unique_name)
286 bus->state = BUS_RUNNING;
291 static int bus_send_hello(sd_bus *bus) {
292 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
297 if (!bus->bus_client || bus->is_kernel)
300 r = sd_bus_message_new_method_call(
302 "org.freedesktop.DBus",
304 "org.freedesktop.DBus",
310 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
313 int bus_start_running(sd_bus *bus) {
316 if (bus->bus_client && !bus->is_kernel) {
317 bus->state = BUS_HELLO;
321 bus->state = BUS_RUNNING;
325 static int parse_address_key(const char **p, const char *key, char **value) {
336 if (strncmp(*p, key, l) != 0)
349 while (*a != ';' && *a != ',' && *a != 0) {
367 c = (char) ((x << 4) | y);
374 t = realloc(r, n + 2);
402 static void skip_address_key(const char **p) {
406 *p += strcspn(*p, ",");
412 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
413 _cleanup_free_ char *path = NULL, *abstract = NULL;
422 while (**p != 0 && **p != ';') {
423 r = parse_address_key(p, "guid", guid);
429 r = parse_address_key(p, "path", &path);
435 r = parse_address_key(p, "abstract", &abstract);
444 if (!path && !abstract)
447 if (path && abstract)
452 if (l > sizeof(b->sockaddr.un.sun_path))
455 b->sockaddr.un.sun_family = AF_UNIX;
456 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
457 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
458 } else if (abstract) {
459 l = strlen(abstract);
460 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
463 b->sockaddr.un.sun_family = AF_UNIX;
464 b->sockaddr.un.sun_path[0] = 0;
465 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
466 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
472 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
473 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
475 struct addrinfo *result, hints = {
476 .ai_socktype = SOCK_STREAM,
477 .ai_flags = AI_ADDRCONFIG,
485 while (**p != 0 && **p != ';') {
486 r = parse_address_key(p, "guid", guid);
492 r = parse_address_key(p, "host", &host);
498 r = parse_address_key(p, "port", &port);
504 r = parse_address_key(p, "family", &family);
517 if (streq(family, "ipv4"))
518 hints.ai_family = AF_INET;
519 else if (streq(family, "ipv6"))
520 hints.ai_family = AF_INET6;
525 r = getaddrinfo(host, port, &hints, &result);
529 return -EADDRNOTAVAIL;
531 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
532 b->sockaddr_size = result->ai_addrlen;
534 freeaddrinfo(result);
539 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
541 unsigned n_argv = 0, j;
550 while (**p != 0 && **p != ';') {
551 r = parse_address_key(p, "guid", guid);
557 r = parse_address_key(p, "path", &path);
563 if (startswith(*p, "argv")) {
567 ul = strtoul(*p + 4, (char**) p, 10);
568 if (errno > 0 || **p != '=' || ul > 256) {
578 x = realloc(argv, sizeof(char*) * (ul + 2));
584 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
590 r = parse_address_key(p, NULL, argv + ul);
605 /* Make sure there are no holes in the array, with the
606 * exception of argv[0] */
607 for (j = 1; j < n_argv; j++)
613 if (argv && argv[0] == NULL) {
614 argv[0] = strdup(path);
626 for (j = 0; j < n_argv; j++)
634 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
635 _cleanup_free_ char *path = NULL;
643 while (**p != 0 && **p != ';') {
644 r = parse_address_key(p, "guid", guid);
650 r = parse_address_key(p, "path", &path);
669 static void bus_reset_parsed_address(sd_bus *b) {
673 b->sockaddr_size = 0;
674 strv_free(b->exec_argv);
678 b->server_id = SD_ID128_NULL;
683 static int bus_parse_next_address(sd_bus *b) {
684 _cleanup_free_ char *guid = NULL;
692 if (b->address[b->address_index] == 0)
695 bus_reset_parsed_address(b);
697 a = b->address + b->address_index;
706 if (startswith(a, "unix:")) {
709 r = parse_unix_address(b, &a, &guid);
714 } else if (startswith(a, "tcp:")) {
717 r = parse_tcp_address(b, &a, &guid);
723 } else if (startswith(a, "unixexec:")) {
726 r = parse_exec_address(b, &a, &guid);
732 } else if (startswith(a, "kernel:")) {
735 r = parse_kernel_address(b, &a, &guid);
748 r = sd_id128_from_string(guid, &b->server_id);
753 b->address_index = a - b->address;
757 static int bus_start_address(sd_bus *b) {
765 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
767 r = bus_socket_connect(b);
771 b->last_connect_error = -r;
773 } else if (b->exec_path) {
775 r = bus_socket_exec(b);
779 b->last_connect_error = -r;
780 } else if (b->kernel) {
782 r = bus_kernel_connect(b);
786 b->last_connect_error = -r;
789 r = bus_parse_next_address(b);
793 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
797 int bus_next_address(sd_bus *b) {
800 bus_reset_parsed_address(b);
801 return bus_start_address(b);
804 static int bus_start_fd(sd_bus *b) {
809 assert(b->input_fd >= 0);
810 assert(b->output_fd >= 0);
812 r = fd_nonblock(b->input_fd, true);
816 r = fd_cloexec(b->input_fd, true);
820 if (b->input_fd != b->output_fd) {
821 r = fd_nonblock(b->output_fd, true);
825 r = fd_cloexec(b->output_fd, true);
830 if (fstat(b->input_fd, &st) < 0)
833 if (S_ISCHR(b->input_fd))
834 return bus_kernel_take_fd(b);
836 return bus_socket_take_fd(b);
839 int sd_bus_start(sd_bus *bus) {
844 if (bus->state != BUS_UNSET)
846 if (bus_pid_changed(bus))
849 bus->state = BUS_OPENING;
851 if (bus->is_server && bus->bus_client)
854 if (bus->input_fd >= 0)
855 r = bus_start_fd(bus);
856 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel)
857 r = bus_start_address(bus);
864 return bus_send_hello(bus);
867 int sd_bus_open_system(sd_bus **ret) {
879 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
881 r = sd_bus_set_address(b, e);
885 b->sockaddr.un.sun_family = AF_UNIX;
886 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
887 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
890 b->bus_client = true;
904 int sd_bus_open_user(sd_bus **ret) {
917 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
919 r = sd_bus_set_address(b, e);
923 e = secure_getenv("XDG_RUNTIME_DIR");
930 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
935 b->sockaddr.un.sun_family = AF_UNIX;
936 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
937 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
940 b->bus_client = true;
954 void sd_bus_close(sd_bus *bus) {
957 if (bus->state == BUS_CLOSED)
959 if (bus_pid_changed(bus))
962 bus->state = BUS_CLOSED;
967 /* We'll leave the fd open in case this is a kernel bus, since
968 * there might still be memblocks around that reference this
969 * bus, and they might need to invoke the
970 * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
974 sd_bus *sd_bus_ref(sd_bus *bus) {
978 assert_se(REFCNT_INC(bus->n_ref) >= 2);
983 sd_bus *sd_bus_unref(sd_bus *bus) {
987 if (REFCNT_DEC(bus->n_ref) <= 0)
993 int sd_bus_is_open(sd_bus *bus) {
996 if (bus_pid_changed(bus))
999 return BUS_IS_OPEN(bus->state);
1002 int sd_bus_can_send(sd_bus *bus, char type) {
1007 if (bus->state == BUS_UNSET)
1009 if (bus_pid_changed(bus))
1012 if (type == SD_BUS_TYPE_UNIX_FD) {
1013 if (!bus->negotiate_fds)
1016 r = bus_ensure_running(bus);
1020 return bus->can_fds;
1023 return bus_type_is_valid(type);
1026 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1033 if (bus_pid_changed(bus))
1036 r = bus_ensure_running(bus);
1040 *server_id = bus->server_id;
1044 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1047 if (m->header->version > b->message_version)
1053 return bus_message_seal(m, ++b->serial);
1056 static int dispatch_wqueue(sd_bus *bus) {
1060 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1062 while (bus->wqueue_size > 0) {
1065 r = bus_kernel_write_message(bus, bus->wqueue[0]);
1067 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1073 /* Didn't do anything this time */
1075 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1076 /* Fully written. Let's drop the entry from
1079 * This isn't particularly optimized, but
1080 * well, this is supposed to be our worst-case
1081 * buffer only, and the socket buffer is
1082 * supposed to be our primary buffer, and if
1083 * it got full, then all bets are off
1086 sd_bus_message_unref(bus->wqueue[0]);
1087 bus->wqueue_size --;
1088 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1098 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1099 sd_bus_message *z = NULL;
1104 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1106 if (bus->rqueue_size > 0) {
1107 /* Dispatch a queued message */
1109 *m = bus->rqueue[0];
1110 bus->rqueue_size --;
1111 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1115 /* Try to read a new message */
1118 r = bus_kernel_read_message(bus, &z);
1120 r = bus_socket_read_message(bus, &z);
1136 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1141 if (!BUS_IS_OPEN(bus->state))
1145 if (bus_pid_changed(bus))
1149 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1156 /* If the serial number isn't kept, then we know that no reply
1158 if (!serial && !m->sealed)
1159 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1161 r = bus_seal_message(bus, m);
1165 /* If this is a reply and no reply was requested, then let's
1166 * suppress this, if we can */
1167 if (m->dont_send && !serial)
1170 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1174 r = bus_kernel_write_message(bus, m);
1176 r = bus_socket_write_message(bus, m, &idx);
1181 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1182 /* Wasn't fully written. So let's remember how
1183 * much was written. Note that the first entry
1184 * of the wqueue array is always allocated so
1185 * that we always can remember how much was
1187 bus->wqueue[0] = sd_bus_message_ref(m);
1188 bus->wqueue_size = 1;
1194 /* Just append it to the queue. */
1196 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1199 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1204 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1208 *serial = BUS_MESSAGE_SERIAL(m);
1213 static usec_t calc_elapse(uint64_t usec) {
1214 if (usec == (uint64_t) -1)
1218 usec = BUS_DEFAULT_TIMEOUT;
1220 return now(CLOCK_MONOTONIC) + usec;
1223 static int timeout_compare(const void *a, const void *b) {
1224 const struct reply_callback *x = a, *y = b;
1226 if (x->timeout != 0 && y->timeout == 0)
1229 if (x->timeout == 0 && y->timeout != 0)
1232 if (x->timeout < y->timeout)
1235 if (x->timeout > y->timeout)
1241 int sd_bus_send_with_reply(
1244 sd_bus_message_handler_t callback,
1249 struct reply_callback *c;
1254 if (!BUS_IS_OPEN(bus->state))
1260 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1262 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1264 if (bus_pid_changed(bus))
1267 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1271 if (usec != (uint64_t) -1) {
1272 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1277 r = bus_seal_message(bus, m);
1281 c = new0(struct reply_callback, 1);
1285 c->callback = callback;
1286 c->userdata = userdata;
1287 c->serial = BUS_MESSAGE_SERIAL(m);
1288 c->timeout = calc_elapse(usec);
1290 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1296 if (c->timeout != 0) {
1297 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1300 sd_bus_send_with_reply_cancel(bus, c->serial);
1305 r = sd_bus_send(bus, m, serial);
1307 sd_bus_send_with_reply_cancel(bus, c->serial);
1314 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1315 struct reply_callback *c;
1321 if (bus_pid_changed(bus))
1324 c = hashmap_remove(bus->reply_callbacks, &serial);
1328 if (c->timeout != 0)
1329 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1335 int bus_ensure_running(sd_bus *bus) {
1340 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED)
1342 if (bus->state == BUS_RUNNING)
1346 r = sd_bus_process(bus, NULL);
1349 if (bus->state == BUS_RUNNING)
1354 r = sd_bus_wait(bus, (uint64_t) -1);
1360 int sd_bus_send_with_reply_and_block(
1364 sd_bus_error *error,
1365 sd_bus_message **reply) {
1374 if (!BUS_IS_OPEN(bus->state))
1378 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1380 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1382 if (bus_error_is_dirty(error))
1384 if (bus_pid_changed(bus))
1387 r = bus_ensure_running(bus);
1391 r = sd_bus_send(bus, m, &serial);
1395 timeout = calc_elapse(usec);
1399 sd_bus_message *incoming = NULL;
1404 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1407 /* Make sure there's room for queuing this
1408 * locally, before we read the message */
1410 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1419 r = bus_kernel_read_message(bus, &incoming);
1421 r = bus_socket_read_message(bus, &incoming);
1426 if (incoming->reply_serial == serial) {
1427 /* Found a match! */
1429 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1434 sd_bus_message_unref(incoming);
1439 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1442 r = sd_bus_error_copy(error, &incoming->error);
1444 sd_bus_message_unref(incoming);
1448 k = bus_error_to_errno(&incoming->error);
1449 sd_bus_message_unref(incoming);
1453 sd_bus_message_unref(incoming);
1457 /* There's already guaranteed to be room for
1458 * this, so need to resize things here */
1459 bus->rqueue[bus->rqueue_size ++] = incoming;
1462 /* Try to read more, right-away */
1471 n = now(CLOCK_MONOTONIC);
1477 left = (uint64_t) -1;
1479 r = bus_poll(bus, true, left);
1483 r = dispatch_wqueue(bus);
1489 int sd_bus_get_fd(sd_bus *bus) {
1492 if (!BUS_IS_OPEN(bus->state))
1494 if (bus->input_fd != bus->output_fd)
1496 if (bus_pid_changed(bus))
1499 return bus->input_fd;
1502 int sd_bus_get_events(sd_bus *bus) {
1507 if (!BUS_IS_OPEN(bus->state))
1509 if (bus_pid_changed(bus))
1512 if (bus->state == BUS_OPENING)
1514 else if (bus->state == BUS_AUTHENTICATING) {
1516 if (bus_socket_auth_needs_write(bus))
1521 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1522 if (bus->rqueue_size <= 0)
1524 if (bus->wqueue_size > 0)
1531 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1532 struct reply_callback *c;
1538 if (!BUS_IS_OPEN(bus->state))
1540 if (bus_pid_changed(bus))
1543 if (bus->state == BUS_AUTHENTICATING) {
1544 *timeout_usec = bus->auth_timeout;
1548 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1549 *timeout_usec = (uint64_t) -1;
1553 c = prioq_peek(bus->reply_callbacks_prioq);
1555 *timeout_usec = (uint64_t) -1;
1559 *timeout_usec = c->timeout;
1563 static int process_timeout(sd_bus *bus) {
1564 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1565 struct reply_callback *c;
1571 c = prioq_peek(bus->reply_callbacks_prioq);
1575 n = now(CLOCK_MONOTONIC);
1579 r = bus_message_new_synthetic_error(
1582 &SD_BUS_ERROR_MAKE("org.freedesktop.DBus.Error.Timeout", "Timed out"),
1587 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1588 hashmap_remove(bus->reply_callbacks, &c->serial);
1590 r = c->callback(bus, m, c->userdata);
1593 return r < 0 ? r : 1;
1596 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1600 if (bus->state != BUS_HELLO)
1603 /* Let's make sure the first message on the bus is the HELLO
1604 * reply. But note that we don't actually parse the message
1605 * here (we leave that to the usual handling), we just verify
1606 * we don't let any earlier msg through. */
1608 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1609 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1612 if (m->reply_serial != bus->hello_serial)
1618 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1619 struct reply_callback *c;
1625 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1626 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1629 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1633 if (c->timeout != 0)
1634 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1636 r = sd_bus_message_rewind(m, true);
1640 r = c->callback(bus, m, c->userdata);
1646 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1647 struct filter_callback *l;
1654 bus->filter_callbacks_modified = false;
1656 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1658 if (bus->filter_callbacks_modified)
1661 /* Don't run this more than once per iteration */
1662 if (l->last_iteration == bus->iteration_counter)
1665 l->last_iteration = bus->iteration_counter;
1667 r = sd_bus_message_rewind(m, true);
1671 r = l->callback(bus, m, l->userdata);
1677 } while (bus->filter_callbacks_modified);
1682 static int process_match(sd_bus *bus, sd_bus_message *m) {
1689 bus->match_callbacks_modified = false;
1691 r = bus_match_run(bus, &bus->match_callbacks, m);
1695 } while (bus->match_callbacks_modified);
1700 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1701 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1707 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1710 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1713 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1716 if (streq_ptr(m->member, "Ping"))
1717 r = sd_bus_message_new_method_return(bus, m, &reply);
1718 else if (streq_ptr(m->member, "GetMachineId")) {
1722 r = sd_id128_get_machine(&id);
1726 r = sd_bus_message_new_method_return(bus, m, &reply);
1730 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1732 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1734 sd_bus_error_set(&error,
1735 "org.freedesktop.DBus.Error.UnknownMethod",
1736 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1738 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1744 r = sd_bus_send(bus, reply, NULL);
1751 static int process_object(sd_bus *bus, sd_bus_message *m) {
1752 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1753 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1754 struct object_callback *c;
1762 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1765 if (hashmap_isempty(bus->object_callbacks))
1768 pl = strlen(m->path);
1773 bus->object_callbacks_modified = false;
1775 c = hashmap_get(bus->object_callbacks, m->path);
1776 if (c && c->last_iteration != bus->iteration_counter) {
1778 c->last_iteration = bus->iteration_counter;
1780 r = sd_bus_message_rewind(m, true);
1784 r = c->callback(bus, m, c->userdata);
1791 /* Look for fallback prefixes */
1796 if (bus->object_callbacks_modified)
1799 e = strrchr(p, '/');
1805 c = hashmap_get(bus->object_callbacks, p);
1806 if (c && c->last_iteration != bus->iteration_counter && c->is_fallback) {
1808 c->last_iteration = bus->iteration_counter;
1810 r = sd_bus_message_rewind(m, true);
1814 r = c->callback(bus, m, c->userdata);
1822 } while (bus->object_callbacks_modified);
1824 /* We found some handlers but none wanted to take this, then
1825 * return this -- with one exception, we can handle
1826 * introspection minimally ourselves */
1827 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1830 sd_bus_error_set(&error,
1831 "org.freedesktop.DBus.Error.UnknownMethod",
1832 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1834 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1838 r = sd_bus_send(bus, reply, NULL);
1845 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1846 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1847 _cleanup_free_ char *introspection = NULL;
1848 _cleanup_set_free_free_ Set *s = NULL;
1849 _cleanup_fclose_ FILE *f = NULL;
1850 struct object_callback *c;
1859 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1865 s = set_new(string_hash_func, string_compare_func);
1869 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1873 if (streq(c->path, "/"))
1876 if (streq(m->path, "/"))
1879 e = startswith(c->path, m->path);
1880 if (!e || *e != '/')
1892 r = set_consume(s, a);
1893 if (r < 0 && r != -EEXIST)
1897 f = open_memstream(&introspection, &size);
1901 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1902 fputs("<node>\n", f);
1903 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1904 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1906 while ((node = set_steal_first(s))) {
1907 fprintf(f, " <node name=\"%s\"/>\n", node);
1911 fputs("</node>\n", f);
1918 r = sd_bus_message_new_method_return(bus, m, &reply);
1922 r = sd_bus_message_append(reply, "s", introspection);
1926 r = sd_bus_send(bus, reply, NULL);
1933 static int process_message(sd_bus *bus, sd_bus_message *m) {
1939 bus->iteration_counter++;
1941 r = process_hello(bus, m);
1945 r = process_reply(bus, m);
1949 r = process_filter(bus, m);
1953 r = process_match(bus, m);
1957 r = process_builtin(bus, m);
1961 r = process_object(bus, m);
1965 return process_introspect(bus, m);
1968 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1969 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1973 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1975 r = process_timeout(bus);
1979 r = dispatch_wqueue(bus);
1983 r = dispatch_rqueue(bus, &m);
1989 r = process_message(bus, m);
1994 r = sd_bus_message_rewind(m, true);
2003 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
2004 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
2005 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
2007 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
2009 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
2013 r = sd_bus_send(bus, reply, NULL);
2027 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2030 /* Returns 0 when we didn't do anything. This should cause the
2031 * caller to invoke sd_bus_wait() before returning the next
2032 * time. Returns > 0 when we did something, which possibly
2033 * means *ret is filled in with an unprocessed message. */
2037 if (bus_pid_changed(bus))
2040 /* We don't allow recursively invoking sd_bus_process(). */
2041 if (bus->processing)
2044 switch (bus->state) {
2051 r = bus_socket_process_opening(bus);
2058 case BUS_AUTHENTICATING:
2060 r = bus_socket_process_authenticating(bus);
2070 bus->processing = true;
2071 r = process_running(bus, ret);
2072 bus->processing = false;
2077 assert_not_reached("Unknown state");
2080 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2081 struct pollfd p[2] = {};
2088 if (!BUS_IS_OPEN(bus->state))
2091 e = sd_bus_get_events(bus);
2098 r = sd_bus_get_timeout(bus, &until);
2105 nw = now(CLOCK_MONOTONIC);
2106 m = until > nw ? until - nw : 0;
2109 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2112 p[0].fd = bus->input_fd;
2113 if (bus->output_fd == bus->input_fd) {
2117 p[0].events = e & POLLIN;
2118 p[1].fd = bus->output_fd;
2119 p[1].events = e & POLLOUT;
2123 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2127 return r > 0 ? 1 : 0;
2130 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2134 if (!BUS_IS_OPEN(bus->state))
2136 if (bus_pid_changed(bus))
2139 if (bus->rqueue_size > 0)
2142 return bus_poll(bus, false, timeout_usec);
2145 int sd_bus_flush(sd_bus *bus) {
2150 if (!BUS_IS_OPEN(bus->state))
2152 if (bus_pid_changed(bus))
2155 r = bus_ensure_running(bus);
2159 if (bus->wqueue_size <= 0)
2163 r = dispatch_wqueue(bus);
2167 if (bus->wqueue_size <= 0)
2170 r = bus_poll(bus, false, (uint64_t) -1);
2176 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2177 struct filter_callback *f;
2183 if (bus_pid_changed(bus))
2186 f = new0(struct filter_callback, 1);
2189 f->callback = callback;
2190 f->userdata = userdata;
2192 bus->filter_callbacks_modified = true;
2193 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2197 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2198 struct filter_callback *f;
2204 if (bus_pid_changed(bus))
2207 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2208 if (f->callback == callback && f->userdata == userdata) {
2209 bus->filter_callbacks_modified = true;
2210 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2219 static int bus_add_object(
2223 sd_bus_message_handler_t callback,
2226 struct object_callback *c;
2235 if (bus_pid_changed(bus))
2238 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2242 c = new0(struct object_callback, 1);
2246 c->path = strdup(path);
2252 c->callback = callback;
2253 c->userdata = userdata;
2254 c->is_fallback = fallback;
2256 bus->object_callbacks_modified = true;
2257 r = hashmap_put(bus->object_callbacks, c->path, c);
2267 static int bus_remove_object(
2271 sd_bus_message_handler_t callback,
2274 struct object_callback *c;
2282 if (bus_pid_changed(bus))
2285 c = hashmap_get(bus->object_callbacks, path);
2289 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2292 bus->object_callbacks_modified = true;
2293 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2301 int sd_bus_add_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2302 return bus_add_object(bus, false, path, callback, userdata);
2305 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2306 return bus_remove_object(bus, false, path, callback, userdata);
2309 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2310 return bus_add_object(bus, true, prefix, callback, userdata);
2313 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2314 return bus_remove_object(bus, true, prefix, callback, userdata);
2317 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2324 if (bus_pid_changed(bus))
2327 if (bus->bus_client) {
2328 r = bus_add_match_internal(bus, match);
2334 bus->match_callbacks_modified = true;
2335 r = bus_match_add(&bus->match_callbacks, match, callback, userdata, NULL);
2338 if (bus->bus_client)
2339 bus_remove_match_internal(bus, match);
2346 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2353 if (bus_pid_changed(bus))
2356 if (bus->bus_client)
2357 r = bus_remove_match_internal(bus, match);
2360 bus->match_callbacks_modified = true;
2361 q = bus_match_remove(&bus->match_callbacks, match, callback, userdata);
2369 int sd_bus_emit_signal(
2372 const char *interface,
2374 const char *types, ...) {
2376 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2382 if (!BUS_IS_OPEN(bus->state))
2384 if (bus_pid_changed(bus))
2387 r = sd_bus_message_new_signal(bus, path, interface, member, &m);
2391 va_start(ap, types);
2392 r = bus_message_append_ap(m, types, ap);
2397 return sd_bus_send(bus, m, NULL);
2400 int sd_bus_call_method(
2402 const char *destination,
2404 const char *interface,
2406 sd_bus_error *error,
2407 sd_bus_message **reply,
2408 const char *types, ...) {
2410 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2416 if (!BUS_IS_OPEN(bus->state))
2418 if (bus_pid_changed(bus))
2421 r = sd_bus_message_new_method_call(bus, destination, path, interface, member, &m);
2425 va_start(ap, types);
2426 r = bus_message_append_ap(m, types, ap);
2431 return sd_bus_send_with_reply_and_block(bus, m, 0, error, reply);
2434 int sd_bus_reply_method_return(
2436 sd_bus_message *call,
2437 const char *types, ...) {
2439 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2449 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2451 if (!BUS_IS_OPEN(bus->state))
2453 if (bus_pid_changed(bus))
2456 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2459 r = sd_bus_message_new_method_return(bus, call, &m);
2463 va_start(ap, types);
2464 r = bus_message_append_ap(m, types, ap);
2469 return sd_bus_send(bus, m, NULL);
2472 int sd_bus_reply_method_error(
2474 sd_bus_message *call,
2475 const sd_bus_error *e) {
2477 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2486 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2488 if (!sd_bus_error_is_set(e))
2490 if (!BUS_IS_OPEN(bus->state))
2492 if (bus_pid_changed(bus))
2495 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2498 r = sd_bus_message_new_method_error(bus, call, e, &m);
2502 return sd_bus_send(bus, m, NULL);
2505 bool bus_pid_changed(sd_bus *bus) {
2508 /* We don't support people creating a bus connection and
2509 * keeping it around over a fork(). Let's complain. */
2511 return bus->original_pid != getpid();