1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
39 #include "bus-internal.h"
40 #include "bus-message.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
46 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
48 static void bus_close_fds(sd_bus *b) {
52 close_nointr_nofail(b->input_fd);
54 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
55 close_nointr_nofail(b->output_fd);
57 b->input_fd = b->output_fd = -1;
60 static void bus_free(sd_bus *b) {
61 struct filter_callback *f;
62 struct object_callback *c;
70 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
79 strv_free(b->exec_argv);
81 close_many(b->fds, b->n_fds);
84 for (i = 0; i < b->rqueue_size; i++)
85 sd_bus_message_unref(b->rqueue[i]);
88 for (i = 0; i < b->wqueue_size; i++)
89 sd_bus_message_unref(b->wqueue[i]);
92 hashmap_free_free(b->reply_callbacks);
93 prioq_free(b->reply_callbacks_prioq);
95 while ((f = b->filter_callbacks)) {
96 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
100 while ((c = hashmap_steal_first(b->object_callbacks))) {
105 hashmap_free(b->object_callbacks);
106 bus_match_free(&b->match_callbacks);
108 bus_kernel_flush_memfd(b);
110 assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
115 int sd_bus_new(sd_bus **ret) {
125 r->n_ref = REFCNT_INIT;
126 r->input_fd = r->output_fd = -1;
127 r->message_version = 1;
128 r->negotiate_fds = true;
129 r->original_pid = getpid();
131 assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
133 /* We guarantee that wqueue always has space for at least one
135 r->wqueue = new(sd_bus_message*, 1);
145 int sd_bus_set_address(sd_bus *bus, const char *address) {
150 if (bus->state != BUS_UNSET)
154 if (bus_pid_changed(bus))
167 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
170 if (bus->state != BUS_UNSET)
176 if (bus_pid_changed(bus))
179 bus->input_fd = input_fd;
180 bus->output_fd = output_fd;
184 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
189 if (bus->state != BUS_UNSET)
193 if (strv_isempty(argv))
195 if (bus_pid_changed(bus))
208 free(bus->exec_path);
209 strv_free(bus->exec_argv);
217 int sd_bus_set_bus_client(sd_bus *bus, int b) {
220 if (bus->state != BUS_UNSET)
222 if (bus_pid_changed(bus))
225 bus->bus_client = !!b;
229 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
232 if (bus->state != BUS_UNSET)
234 if (bus_pid_changed(bus))
237 bus->negotiate_fds = !!b;
241 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
244 if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
246 if (bus->state != BUS_UNSET)
248 if (bus_pid_changed(bus))
251 bus->is_server = !!b;
252 bus->server_id = server_id;
256 int sd_bus_set_anonymous(sd_bus *bus, int b) {
259 if (bus->state != BUS_UNSET)
261 if (bus_pid_changed(bus))
264 bus->anonymous_auth = !!b;
268 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
273 assert(bus->state == BUS_HELLO);
276 r = bus_message_to_errno(reply);
280 r = sd_bus_message_read(reply, "s", &s);
284 if (!service_name_is_valid(s) || s[0] != ':')
287 bus->unique_name = strdup(s);
288 if (!bus->unique_name)
291 bus->state = BUS_RUNNING;
296 static int bus_send_hello(sd_bus *bus) {
297 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
302 if (!bus->bus_client || bus->is_kernel)
305 r = sd_bus_message_new_method_call(
307 "org.freedesktop.DBus",
309 "org.freedesktop.DBus",
315 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
318 int bus_start_running(sd_bus *bus) {
321 if (bus->bus_client && !bus->is_kernel) {
322 bus->state = BUS_HELLO;
326 bus->state = BUS_RUNNING;
330 static int parse_address_key(const char **p, const char *key, char **value) {
341 if (strncmp(*p, key, l) != 0)
354 while (*a != ';' && *a != ',' && *a != 0) {
372 c = (char) ((x << 4) | y);
379 t = realloc(r, n + 2);
407 static void skip_address_key(const char **p) {
411 *p += strcspn(*p, ",");
417 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
418 _cleanup_free_ char *path = NULL, *abstract = NULL;
427 while (**p != 0 && **p != ';') {
428 r = parse_address_key(p, "guid", guid);
434 r = parse_address_key(p, "path", &path);
440 r = parse_address_key(p, "abstract", &abstract);
449 if (!path && !abstract)
452 if (path && abstract)
457 if (l > sizeof(b->sockaddr.un.sun_path))
460 b->sockaddr.un.sun_family = AF_UNIX;
461 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
462 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
463 } else if (abstract) {
464 l = strlen(abstract);
465 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
468 b->sockaddr.un.sun_family = AF_UNIX;
469 b->sockaddr.un.sun_path[0] = 0;
470 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
471 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
477 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
478 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
480 struct addrinfo *result, hints = {
481 .ai_socktype = SOCK_STREAM,
482 .ai_flags = AI_ADDRCONFIG,
490 while (**p != 0 && **p != ';') {
491 r = parse_address_key(p, "guid", guid);
497 r = parse_address_key(p, "host", &host);
503 r = parse_address_key(p, "port", &port);
509 r = parse_address_key(p, "family", &family);
522 if (streq(family, "ipv4"))
523 hints.ai_family = AF_INET;
524 else if (streq(family, "ipv6"))
525 hints.ai_family = AF_INET6;
530 r = getaddrinfo(host, port, &hints, &result);
534 return -EADDRNOTAVAIL;
536 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
537 b->sockaddr_size = result->ai_addrlen;
539 freeaddrinfo(result);
544 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
546 unsigned n_argv = 0, j;
555 while (**p != 0 && **p != ';') {
556 r = parse_address_key(p, "guid", guid);
562 r = parse_address_key(p, "path", &path);
568 if (startswith(*p, "argv")) {
572 ul = strtoul(*p + 4, (char**) p, 10);
573 if (errno > 0 || **p != '=' || ul > 256) {
583 x = realloc(argv, sizeof(char*) * (ul + 2));
589 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
595 r = parse_address_key(p, NULL, argv + ul);
610 /* Make sure there are no holes in the array, with the
611 * exception of argv[0] */
612 for (j = 1; j < n_argv; j++)
618 if (argv && argv[0] == NULL) {
619 argv[0] = strdup(path);
631 for (j = 0; j < n_argv; j++)
639 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
640 _cleanup_free_ char *path = NULL;
648 while (**p != 0 && **p != ';') {
649 r = parse_address_key(p, "guid", guid);
655 r = parse_address_key(p, "path", &path);
674 static void bus_reset_parsed_address(sd_bus *b) {
678 b->sockaddr_size = 0;
679 strv_free(b->exec_argv);
683 b->server_id = SD_ID128_NULL;
688 static int bus_parse_next_address(sd_bus *b) {
689 _cleanup_free_ char *guid = NULL;
697 if (b->address[b->address_index] == 0)
700 bus_reset_parsed_address(b);
702 a = b->address + b->address_index;
711 if (startswith(a, "unix:")) {
714 r = parse_unix_address(b, &a, &guid);
719 } else if (startswith(a, "tcp:")) {
722 r = parse_tcp_address(b, &a, &guid);
728 } else if (startswith(a, "unixexec:")) {
731 r = parse_exec_address(b, &a, &guid);
737 } else if (startswith(a, "kernel:")) {
740 r = parse_kernel_address(b, &a, &guid);
753 r = sd_id128_from_string(guid, &b->server_id);
758 b->address_index = a - b->address;
762 static int bus_start_address(sd_bus *b) {
770 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
772 r = bus_socket_connect(b);
776 b->last_connect_error = -r;
778 } else if (b->exec_path) {
780 r = bus_socket_exec(b);
784 b->last_connect_error = -r;
785 } else if (b->kernel) {
787 r = bus_kernel_connect(b);
791 b->last_connect_error = -r;
794 r = bus_parse_next_address(b);
798 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
802 int bus_next_address(sd_bus *b) {
805 bus_reset_parsed_address(b);
806 return bus_start_address(b);
809 static int bus_start_fd(sd_bus *b) {
814 assert(b->input_fd >= 0);
815 assert(b->output_fd >= 0);
817 r = fd_nonblock(b->input_fd, true);
821 r = fd_cloexec(b->input_fd, true);
825 if (b->input_fd != b->output_fd) {
826 r = fd_nonblock(b->output_fd, true);
830 r = fd_cloexec(b->output_fd, true);
835 if (fstat(b->input_fd, &st) < 0)
838 if (S_ISCHR(b->input_fd))
839 return bus_kernel_take_fd(b);
841 return bus_socket_take_fd(b);
844 int sd_bus_start(sd_bus *bus) {
849 if (bus->state != BUS_UNSET)
851 if (bus_pid_changed(bus))
854 bus->state = BUS_OPENING;
856 if (bus->is_server && bus->bus_client)
859 if (bus->input_fd >= 0)
860 r = bus_start_fd(bus);
861 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel)
862 r = bus_start_address(bus);
869 return bus_send_hello(bus);
872 int sd_bus_open_system(sd_bus **ret) {
884 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
886 r = sd_bus_set_address(b, e);
890 b->sockaddr.un.sun_family = AF_UNIX;
891 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
892 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
895 b->bus_client = true;
909 int sd_bus_open_user(sd_bus **ret) {
922 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
924 r = sd_bus_set_address(b, e);
928 e = secure_getenv("XDG_RUNTIME_DIR");
935 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
940 b->sockaddr.un.sun_family = AF_UNIX;
941 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
942 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
945 b->bus_client = true;
959 void sd_bus_close(sd_bus *bus) {
962 if (bus->state == BUS_CLOSED)
964 if (bus_pid_changed(bus))
967 bus->state = BUS_CLOSED;
972 /* We'll leave the fd open in case this is a kernel bus, since
973 * there might still be memblocks around that reference this
974 * bus, and they might need to invoke the
975 * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
979 sd_bus *sd_bus_ref(sd_bus *bus) {
983 assert_se(REFCNT_INC(bus->n_ref) >= 2);
988 sd_bus *sd_bus_unref(sd_bus *bus) {
992 if (REFCNT_DEC(bus->n_ref) <= 0)
998 int sd_bus_is_open(sd_bus *bus) {
1001 if (bus_pid_changed(bus))
1004 return BUS_IS_OPEN(bus->state);
1007 int sd_bus_can_send(sd_bus *bus, char type) {
1012 if (bus->state == BUS_UNSET)
1014 if (bus_pid_changed(bus))
1017 if (type == SD_BUS_TYPE_UNIX_FD) {
1018 if (!bus->negotiate_fds)
1021 r = bus_ensure_running(bus);
1025 return bus->can_fds;
1028 return bus_type_is_valid(type);
1031 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1038 if (bus_pid_changed(bus))
1041 r = bus_ensure_running(bus);
1045 *server_id = bus->server_id;
1049 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1052 if (m->header->version > b->message_version)
1058 return bus_message_seal(m, ++b->serial);
1061 static int dispatch_wqueue(sd_bus *bus) {
1065 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1067 while (bus->wqueue_size > 0) {
1070 r = bus_kernel_write_message(bus, bus->wqueue[0]);
1072 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1078 /* Didn't do anything this time */
1080 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1081 /* Fully written. Let's drop the entry from
1084 * This isn't particularly optimized, but
1085 * well, this is supposed to be our worst-case
1086 * buffer only, and the socket buffer is
1087 * supposed to be our primary buffer, and if
1088 * it got full, then all bets are off
1091 sd_bus_message_unref(bus->wqueue[0]);
1092 bus->wqueue_size --;
1093 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1103 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1104 sd_bus_message *z = NULL;
1109 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1111 if (bus->rqueue_size > 0) {
1112 /* Dispatch a queued message */
1114 *m = bus->rqueue[0];
1115 bus->rqueue_size --;
1116 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1120 /* Try to read a new message */
1123 r = bus_kernel_read_message(bus, &z);
1125 r = bus_socket_read_message(bus, &z);
1141 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1146 if (!BUS_IS_OPEN(bus->state))
1150 if (bus_pid_changed(bus))
1154 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1161 /* If the serial number isn't kept, then we know that no reply
1163 if (!serial && !m->sealed)
1164 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1166 r = bus_seal_message(bus, m);
1170 /* If this is a reply and no reply was requested, then let's
1171 * suppress this, if we can */
1172 if (m->dont_send && !serial)
1175 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1179 r = bus_kernel_write_message(bus, m);
1181 r = bus_socket_write_message(bus, m, &idx);
1186 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1187 /* Wasn't fully written. So let's remember how
1188 * much was written. Note that the first entry
1189 * of the wqueue array is always allocated so
1190 * that we always can remember how much was
1192 bus->wqueue[0] = sd_bus_message_ref(m);
1193 bus->wqueue_size = 1;
1199 /* Just append it to the queue. */
1201 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1204 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1209 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1213 *serial = BUS_MESSAGE_SERIAL(m);
1218 static usec_t calc_elapse(uint64_t usec) {
1219 if (usec == (uint64_t) -1)
1223 usec = BUS_DEFAULT_TIMEOUT;
1225 return now(CLOCK_MONOTONIC) + usec;
1228 static int timeout_compare(const void *a, const void *b) {
1229 const struct reply_callback *x = a, *y = b;
1231 if (x->timeout != 0 && y->timeout == 0)
1234 if (x->timeout == 0 && y->timeout != 0)
1237 if (x->timeout < y->timeout)
1240 if (x->timeout > y->timeout)
1246 int sd_bus_send_with_reply(
1249 sd_bus_message_handler_t callback,
1254 struct reply_callback *c;
1259 if (!BUS_IS_OPEN(bus->state))
1265 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1267 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1269 if (bus_pid_changed(bus))
1272 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1276 if (usec != (uint64_t) -1) {
1277 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1282 r = bus_seal_message(bus, m);
1286 c = new0(struct reply_callback, 1);
1290 c->callback = callback;
1291 c->userdata = userdata;
1292 c->serial = BUS_MESSAGE_SERIAL(m);
1293 c->timeout = calc_elapse(usec);
1295 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1301 if (c->timeout != 0) {
1302 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1305 sd_bus_send_with_reply_cancel(bus, c->serial);
1310 r = sd_bus_send(bus, m, serial);
1312 sd_bus_send_with_reply_cancel(bus, c->serial);
1319 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1320 struct reply_callback *c;
1326 if (bus_pid_changed(bus))
1329 c = hashmap_remove(bus->reply_callbacks, &serial);
1333 if (c->timeout != 0)
1334 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1340 int bus_ensure_running(sd_bus *bus) {
1345 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED)
1347 if (bus->state == BUS_RUNNING)
1351 r = sd_bus_process(bus, NULL);
1354 if (bus->state == BUS_RUNNING)
1359 r = sd_bus_wait(bus, (uint64_t) -1);
1365 int sd_bus_send_with_reply_and_block(
1369 sd_bus_error *error,
1370 sd_bus_message **reply) {
1379 if (!BUS_IS_OPEN(bus->state))
1383 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1385 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1387 if (bus_error_is_dirty(error))
1389 if (bus_pid_changed(bus))
1392 r = bus_ensure_running(bus);
1396 r = sd_bus_send(bus, m, &serial);
1400 timeout = calc_elapse(usec);
1404 sd_bus_message *incoming = NULL;
1409 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1412 /* Make sure there's room for queuing this
1413 * locally, before we read the message */
1415 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1424 r = bus_kernel_read_message(bus, &incoming);
1426 r = bus_socket_read_message(bus, &incoming);
1431 if (incoming->reply_serial == serial) {
1432 /* Found a match! */
1434 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1439 sd_bus_message_unref(incoming);
1444 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1447 r = sd_bus_error_copy(error, &incoming->error);
1449 sd_bus_message_unref(incoming);
1453 k = bus_error_to_errno(&incoming->error);
1454 sd_bus_message_unref(incoming);
1458 sd_bus_message_unref(incoming);
1462 /* There's already guaranteed to be room for
1463 * this, so need to resize things here */
1464 bus->rqueue[bus->rqueue_size ++] = incoming;
1467 /* Try to read more, right-away */
1476 n = now(CLOCK_MONOTONIC);
1482 left = (uint64_t) -1;
1484 r = bus_poll(bus, true, left);
1488 r = dispatch_wqueue(bus);
1494 int sd_bus_get_fd(sd_bus *bus) {
1497 if (!BUS_IS_OPEN(bus->state))
1499 if (bus->input_fd != bus->output_fd)
1501 if (bus_pid_changed(bus))
1504 return bus->input_fd;
1507 int sd_bus_get_events(sd_bus *bus) {
1512 if (!BUS_IS_OPEN(bus->state))
1514 if (bus_pid_changed(bus))
1517 if (bus->state == BUS_OPENING)
1519 else if (bus->state == BUS_AUTHENTICATING) {
1521 if (bus_socket_auth_needs_write(bus))
1526 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1527 if (bus->rqueue_size <= 0)
1529 if (bus->wqueue_size > 0)
1536 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1537 struct reply_callback *c;
1543 if (!BUS_IS_OPEN(bus->state))
1545 if (bus_pid_changed(bus))
1548 if (bus->state == BUS_AUTHENTICATING) {
1549 *timeout_usec = bus->auth_timeout;
1553 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1554 *timeout_usec = (uint64_t) -1;
1558 c = prioq_peek(bus->reply_callbacks_prioq);
1560 *timeout_usec = (uint64_t) -1;
1564 *timeout_usec = c->timeout;
1568 static int process_timeout(sd_bus *bus) {
1569 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1570 struct reply_callback *c;
1576 c = prioq_peek(bus->reply_callbacks_prioq);
1580 n = now(CLOCK_MONOTONIC);
1584 r = bus_message_new_synthetic_error(
1587 &SD_BUS_ERROR_MAKE("org.freedesktop.DBus.Error.Timeout", "Timed out"),
1592 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1593 hashmap_remove(bus->reply_callbacks, &c->serial);
1595 r = c->callback(bus, m, c->userdata);
1598 return r < 0 ? r : 1;
1601 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1605 if (bus->state != BUS_HELLO)
1608 /* Let's make sure the first message on the bus is the HELLO
1609 * reply. But note that we don't actually parse the message
1610 * here (we leave that to the usual handling), we just verify
1611 * we don't let any earlier msg through. */
1613 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1614 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1617 if (m->reply_serial != bus->hello_serial)
1623 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1624 struct reply_callback *c;
1630 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1631 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1634 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1638 if (c->timeout != 0)
1639 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1641 r = sd_bus_message_rewind(m, true);
1645 r = c->callback(bus, m, c->userdata);
1651 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1652 struct filter_callback *l;
1659 bus->filter_callbacks_modified = false;
1661 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1663 if (bus->filter_callbacks_modified)
1666 /* Don't run this more than once per iteration */
1667 if (l->last_iteration == bus->iteration_counter)
1670 l->last_iteration = bus->iteration_counter;
1672 r = sd_bus_message_rewind(m, true);
1676 r = l->callback(bus, m, l->userdata);
1682 } while (bus->filter_callbacks_modified);
1687 static int process_match(sd_bus *bus, sd_bus_message *m) {
1694 bus->match_callbacks_modified = false;
1696 r = bus_match_run(bus, &bus->match_callbacks, m);
1700 } while (bus->match_callbacks_modified);
1705 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1706 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1712 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1715 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1718 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1721 if (streq_ptr(m->member, "Ping"))
1722 r = sd_bus_message_new_method_return(bus, m, &reply);
1723 else if (streq_ptr(m->member, "GetMachineId")) {
1727 r = sd_id128_get_machine(&id);
1731 r = sd_bus_message_new_method_return(bus, m, &reply);
1735 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1737 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1739 sd_bus_error_set(&error,
1740 "org.freedesktop.DBus.Error.UnknownMethod",
1741 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1743 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1749 r = sd_bus_send(bus, reply, NULL);
1756 static int process_object(sd_bus *bus, sd_bus_message *m) {
1757 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1758 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1759 struct object_callback *c;
1767 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1770 if (hashmap_isempty(bus->object_callbacks))
1773 pl = strlen(m->path);
1778 bus->object_callbacks_modified = false;
1780 c = hashmap_get(bus->object_callbacks, m->path);
1781 if (c && c->last_iteration != bus->iteration_counter) {
1783 c->last_iteration = bus->iteration_counter;
1785 r = sd_bus_message_rewind(m, true);
1789 r = c->callback(bus, m, c->userdata);
1796 /* Look for fallback prefixes */
1801 if (bus->object_callbacks_modified)
1804 e = strrchr(p, '/');
1810 c = hashmap_get(bus->object_callbacks, p);
1811 if (c && c->last_iteration != bus->iteration_counter && c->is_fallback) {
1813 c->last_iteration = bus->iteration_counter;
1815 r = sd_bus_message_rewind(m, true);
1819 r = c->callback(bus, m, c->userdata);
1827 } while (bus->object_callbacks_modified);
1829 /* We found some handlers but none wanted to take this, then
1830 * return this -- with one exception, we can handle
1831 * introspection minimally ourselves */
1832 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1835 sd_bus_error_set(&error,
1836 "org.freedesktop.DBus.Error.UnknownMethod",
1837 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1839 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1843 r = sd_bus_send(bus, reply, NULL);
1850 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1851 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1852 _cleanup_free_ char *introspection = NULL;
1853 _cleanup_set_free_free_ Set *s = NULL;
1854 _cleanup_fclose_ FILE *f = NULL;
1855 struct object_callback *c;
1864 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1870 s = set_new(string_hash_func, string_compare_func);
1874 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1878 if (streq(c->path, "/"))
1881 if (streq(m->path, "/"))
1884 e = startswith(c->path, m->path);
1885 if (!e || *e != '/')
1897 r = set_consume(s, a);
1898 if (r < 0 && r != -EEXIST)
1902 f = open_memstream(&introspection, &size);
1906 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1907 fputs("<node>\n", f);
1908 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1909 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1911 while ((node = set_steal_first(s))) {
1912 fprintf(f, " <node name=\"%s\"/>\n", node);
1916 fputs("</node>\n", f);
1923 r = sd_bus_message_new_method_return(bus, m, &reply);
1927 r = sd_bus_message_append(reply, "s", introspection);
1931 r = sd_bus_send(bus, reply, NULL);
1938 static int process_message(sd_bus *bus, sd_bus_message *m) {
1944 bus->iteration_counter++;
1946 r = process_hello(bus, m);
1950 r = process_reply(bus, m);
1954 r = process_filter(bus, m);
1958 r = process_match(bus, m);
1962 r = process_builtin(bus, m);
1966 r = process_object(bus, m);
1970 return process_introspect(bus, m);
1973 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1974 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1978 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1980 r = process_timeout(bus);
1984 r = dispatch_wqueue(bus);
1988 r = dispatch_rqueue(bus, &m);
1994 r = process_message(bus, m);
1999 r = sd_bus_message_rewind(m, true);
2008 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
2009 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
2010 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
2012 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
2014 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
2018 r = sd_bus_send(bus, reply, NULL);
2032 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2035 /* Returns 0 when we didn't do anything. This should cause the
2036 * caller to invoke sd_bus_wait() before returning the next
2037 * time. Returns > 0 when we did something, which possibly
2038 * means *ret is filled in with an unprocessed message. */
2042 if (bus_pid_changed(bus))
2045 /* We don't allow recursively invoking sd_bus_process(). */
2046 if (bus->processing)
2049 switch (bus->state) {
2056 r = bus_socket_process_opening(bus);
2063 case BUS_AUTHENTICATING:
2065 r = bus_socket_process_authenticating(bus);
2075 bus->processing = true;
2076 r = process_running(bus, ret);
2077 bus->processing = false;
2082 assert_not_reached("Unknown state");
2085 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2086 struct pollfd p[2] = {};
2093 if (!BUS_IS_OPEN(bus->state))
2096 e = sd_bus_get_events(bus);
2103 r = sd_bus_get_timeout(bus, &until);
2110 nw = now(CLOCK_MONOTONIC);
2111 m = until > nw ? until - nw : 0;
2114 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2117 p[0].fd = bus->input_fd;
2118 if (bus->output_fd == bus->input_fd) {
2122 p[0].events = e & POLLIN;
2123 p[1].fd = bus->output_fd;
2124 p[1].events = e & POLLOUT;
2128 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2132 return r > 0 ? 1 : 0;
2135 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2139 if (!BUS_IS_OPEN(bus->state))
2141 if (bus_pid_changed(bus))
2144 if (bus->rqueue_size > 0)
2147 return bus_poll(bus, false, timeout_usec);
2150 int sd_bus_flush(sd_bus *bus) {
2155 if (!BUS_IS_OPEN(bus->state))
2157 if (bus_pid_changed(bus))
2160 r = bus_ensure_running(bus);
2164 if (bus->wqueue_size <= 0)
2168 r = dispatch_wqueue(bus);
2172 if (bus->wqueue_size <= 0)
2175 r = bus_poll(bus, false, (uint64_t) -1);
2181 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2182 struct filter_callback *f;
2188 if (bus_pid_changed(bus))
2191 f = new0(struct filter_callback, 1);
2194 f->callback = callback;
2195 f->userdata = userdata;
2197 bus->filter_callbacks_modified = true;
2198 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2202 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2203 struct filter_callback *f;
2209 if (bus_pid_changed(bus))
2212 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2213 if (f->callback == callback && f->userdata == userdata) {
2214 bus->filter_callbacks_modified = true;
2215 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2224 static int bus_add_object(
2228 sd_bus_message_handler_t callback,
2231 struct object_callback *c;
2240 if (bus_pid_changed(bus))
2243 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2247 c = new0(struct object_callback, 1);
2251 c->path = strdup(path);
2257 c->callback = callback;
2258 c->userdata = userdata;
2259 c->is_fallback = fallback;
2261 bus->object_callbacks_modified = true;
2262 r = hashmap_put(bus->object_callbacks, c->path, c);
2272 static int bus_remove_object(
2276 sd_bus_message_handler_t callback,
2279 struct object_callback *c;
2287 if (bus_pid_changed(bus))
2290 c = hashmap_get(bus->object_callbacks, path);
2294 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2297 bus->object_callbacks_modified = true;
2298 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2306 int sd_bus_add_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2307 return bus_add_object(bus, false, path, callback, userdata);
2310 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2311 return bus_remove_object(bus, false, path, callback, userdata);
2314 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2315 return bus_add_object(bus, true, prefix, callback, userdata);
2318 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2319 return bus_remove_object(bus, true, prefix, callback, userdata);
2322 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2329 if (bus_pid_changed(bus))
2332 if (bus->bus_client) {
2333 r = bus_add_match_internal(bus, match);
2339 bus->match_callbacks_modified = true;
2340 r = bus_match_add(&bus->match_callbacks, match, callback, userdata, NULL);
2343 if (bus->bus_client)
2344 bus_remove_match_internal(bus, match);
2351 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2358 if (bus_pid_changed(bus))
2361 if (bus->bus_client)
2362 r = bus_remove_match_internal(bus, match);
2365 bus->match_callbacks_modified = true;
2366 q = bus_match_remove(&bus->match_callbacks, match, callback, userdata);
2374 int sd_bus_emit_signal(
2377 const char *interface,
2379 const char *types, ...) {
2381 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2387 if (!BUS_IS_OPEN(bus->state))
2389 if (bus_pid_changed(bus))
2392 r = sd_bus_message_new_signal(bus, path, interface, member, &m);
2396 va_start(ap, types);
2397 r = bus_message_append_ap(m, types, ap);
2402 return sd_bus_send(bus, m, NULL);
2405 int sd_bus_call_method(
2407 const char *destination,
2409 const char *interface,
2411 sd_bus_error *error,
2412 sd_bus_message **reply,
2413 const char *types, ...) {
2415 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2421 if (!BUS_IS_OPEN(bus->state))
2423 if (bus_pid_changed(bus))
2426 r = sd_bus_message_new_method_call(bus, destination, path, interface, member, &m);
2430 va_start(ap, types);
2431 r = bus_message_append_ap(m, types, ap);
2436 return sd_bus_send_with_reply_and_block(bus, m, 0, error, reply);
2439 int sd_bus_reply_method_return(
2441 sd_bus_message *call,
2442 const char *types, ...) {
2444 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2454 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2456 if (!BUS_IS_OPEN(bus->state))
2458 if (bus_pid_changed(bus))
2461 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2464 r = sd_bus_message_new_method_return(bus, call, &m);
2468 va_start(ap, types);
2469 r = bus_message_append_ap(m, types, ap);
2474 return sd_bus_send(bus, m, NULL);
2477 int sd_bus_reply_method_error(
2479 sd_bus_message *call,
2480 const sd_bus_error *e) {
2482 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2491 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2493 if (!sd_bus_error_is_set(e))
2495 if (!BUS_IS_OPEN(bus->state))
2497 if (bus_pid_changed(bus))
2500 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2503 r = sd_bus_message_new_method_error(bus, call, e, &m);
2507 return sd_bus_send(bus, m, NULL);
2510 bool bus_pid_changed(sd_bus *bus) {
2513 /* We don't support people creating a bus connection and
2514 * keeping it around over a fork(). Let's complain. */
2516 return bus->original_pid != getpid();