1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
37 #include "bus-internal.h"
38 #include "bus-message.h"
40 #include "bus-socket.h"
41 #include "bus-kernel.h"
42 #include "bus-control.h"
44 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
46 static void bus_free(sd_bus *b) {
47 struct filter_callback *f;
48 struct object_callback *c;
62 strv_free(b->exec_argv);
64 close_many(b->fds, b->n_fds);
67 for (i = 0; i < b->rqueue_size; i++)
68 sd_bus_message_unref(b->rqueue[i]);
71 for (i = 0; i < b->wqueue_size; i++)
72 sd_bus_message_unref(b->wqueue[i]);
75 hashmap_free_free(b->reply_callbacks);
76 prioq_free(b->reply_callbacks_prioq);
78 while ((f = b->filter_callbacks)) {
79 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
83 while ((c = hashmap_steal_first(b->object_callbacks))) {
88 hashmap_free(b->object_callbacks);
89 bus_match_free(&b->match_callbacks);
91 bus_kernel_flush_memfd(b);
96 int sd_bus_new(sd_bus **ret) {
107 r->input_fd = r->output_fd = -1;
108 r->message_version = 1;
109 r->negotiate_fds = true;
111 /* We guarantee that wqueue always has space for at least one
113 r->wqueue = new(sd_bus_message*, 1);
123 int sd_bus_set_address(sd_bus *bus, const char *address) {
128 if (bus->state != BUS_UNSET)
143 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
146 if (bus->state != BUS_UNSET)
153 bus->input_fd = input_fd;
154 bus->output_fd = output_fd;
158 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
163 if (bus->state != BUS_UNSET)
167 if (strv_isempty(argv))
180 free(bus->exec_path);
181 strv_free(bus->exec_argv);
189 int sd_bus_set_bus_client(sd_bus *bus, int b) {
192 if (bus->state != BUS_UNSET)
195 bus->bus_client = !!b;
199 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
202 if (bus->state != BUS_UNSET)
205 bus->negotiate_fds = !!b;
209 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
212 if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
214 if (bus->state != BUS_UNSET)
217 bus->is_server = !!b;
218 bus->server_id = server_id;
222 int sd_bus_set_anonymous(sd_bus *bus, int b) {
225 if (bus->state != BUS_UNSET)
228 bus->anonymous_auth = !!b;
232 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
237 assert(bus->state == BUS_HELLO);
240 r = bus_message_to_errno(reply);
244 r = sd_bus_message_read(reply, "s", &s);
248 if (!service_name_is_valid(s) || s[0] != ':')
251 bus->unique_name = strdup(s);
252 if (!bus->unique_name)
255 bus->state = BUS_RUNNING;
260 static int bus_send_hello(sd_bus *bus) {
261 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
266 if (!bus->bus_client || bus->is_kernel)
269 r = sd_bus_message_new_method_call(
271 "org.freedesktop.DBus",
273 "org.freedesktop.DBus",
279 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
282 int bus_start_running(sd_bus *bus) {
285 if (bus->bus_client && !bus->is_kernel) {
286 bus->state = BUS_HELLO;
290 bus->state = BUS_RUNNING;
294 static int parse_address_key(const char **p, const char *key, char **value) {
305 if (strncmp(*p, key, l) != 0)
318 while (*a != ';' && *a != ',' && *a != 0) {
336 c = (char) ((x << 4) | y);
343 t = realloc(r, n + 2);
371 static void skip_address_key(const char **p) {
375 *p += strcspn(*p, ",");
381 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
382 _cleanup_free_ char *path = NULL, *abstract = NULL;
391 while (**p != 0 && **p != ';') {
392 r = parse_address_key(p, "guid", guid);
398 r = parse_address_key(p, "path", &path);
404 r = parse_address_key(p, "abstract", &abstract);
413 if (!path && !abstract)
416 if (path && abstract)
421 if (l > sizeof(b->sockaddr.un.sun_path))
424 b->sockaddr.un.sun_family = AF_UNIX;
425 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
426 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
427 } else if (abstract) {
428 l = strlen(abstract);
429 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
432 b->sockaddr.un.sun_family = AF_UNIX;
433 b->sockaddr.un.sun_path[0] = 0;
434 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
435 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
441 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
442 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
444 struct addrinfo *result, hints = {
445 .ai_socktype = SOCK_STREAM,
446 .ai_flags = AI_ADDRCONFIG,
454 while (**p != 0 && **p != ';') {
455 r = parse_address_key(p, "guid", guid);
461 r = parse_address_key(p, "host", &host);
467 r = parse_address_key(p, "port", &port);
473 r = parse_address_key(p, "family", &family);
486 if (streq(family, "ipv4"))
487 hints.ai_family = AF_INET;
488 else if (streq(family, "ipv6"))
489 hints.ai_family = AF_INET6;
494 r = getaddrinfo(host, port, &hints, &result);
498 return -EADDRNOTAVAIL;
500 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
501 b->sockaddr_size = result->ai_addrlen;
503 freeaddrinfo(result);
508 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
510 unsigned n_argv = 0, j;
519 while (**p != 0 && **p != ';') {
520 r = parse_address_key(p, "guid", guid);
526 r = parse_address_key(p, "path", &path);
532 if (startswith(*p, "argv")) {
536 ul = strtoul(*p + 4, (char**) p, 10);
537 if (errno > 0 || **p != '=' || ul > 256) {
547 x = realloc(argv, sizeof(char*) * (ul + 2));
553 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
559 r = parse_address_key(p, NULL, argv + ul);
574 /* Make sure there are no holes in the array, with the
575 * exception of argv[0] */
576 for (j = 1; j < n_argv; j++)
582 if (argv && argv[0] == NULL) {
583 argv[0] = strdup(path);
595 for (j = 0; j < n_argv; j++)
603 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
604 _cleanup_free_ char *path = NULL;
612 while (**p != 0 && **p != ';') {
613 r = parse_address_key(p, "guid", guid);
619 r = parse_address_key(p, "path", &path);
638 static void bus_reset_parsed_address(sd_bus *b) {
642 b->sockaddr_size = 0;
643 strv_free(b->exec_argv);
647 b->server_id = SD_ID128_NULL;
652 static int bus_parse_next_address(sd_bus *b) {
653 _cleanup_free_ char *guid = NULL;
661 if (b->address[b->address_index] == 0)
664 bus_reset_parsed_address(b);
666 a = b->address + b->address_index;
675 if (startswith(a, "unix:")) {
678 r = parse_unix_address(b, &a, &guid);
683 } else if (startswith(a, "tcp:")) {
686 r = parse_tcp_address(b, &a, &guid);
692 } else if (startswith(a, "unixexec:")) {
695 r = parse_exec_address(b, &a, &guid);
701 } else if (startswith(a, "kernel:")) {
704 r = parse_kernel_address(b, &a, &guid);
717 r = sd_id128_from_string(guid, &b->server_id);
722 b->address_index = a - b->address;
726 static int bus_start_address(sd_bus *b) {
734 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
736 r = bus_socket_connect(b);
740 b->last_connect_error = -r;
742 } else if (b->exec_path) {
744 r = bus_socket_exec(b);
748 b->last_connect_error = -r;
749 } else if (b->kernel) {
751 r = bus_kernel_connect(b);
755 b->last_connect_error = -r;
758 r = bus_parse_next_address(b);
762 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
766 int bus_next_address(sd_bus *b) {
769 bus_reset_parsed_address(b);
770 return bus_start_address(b);
773 static int bus_start_fd(sd_bus *b) {
778 assert(b->input_fd >= 0);
779 assert(b->output_fd >= 0);
781 r = fd_nonblock(b->input_fd, true);
785 r = fd_cloexec(b->input_fd, true);
789 if (b->input_fd != b->output_fd) {
790 r = fd_nonblock(b->output_fd, true);
794 r = fd_cloexec(b->output_fd, true);
799 if (fstat(b->input_fd, &st) < 0)
802 if (S_ISCHR(b->input_fd))
803 return bus_kernel_take_fd(b);
805 return bus_socket_take_fd(b);
808 int sd_bus_start(sd_bus *bus) {
813 if (bus->state != BUS_UNSET)
816 bus->state = BUS_OPENING;
818 if (bus->is_server && bus->bus_client)
821 if (bus->input_fd >= 0)
822 r = bus_start_fd(bus);
823 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel)
824 r = bus_start_address(bus);
831 return bus_send_hello(bus);
834 int sd_bus_open_system(sd_bus **ret) {
846 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
848 r = sd_bus_set_address(b, e);
852 b->sockaddr.un.sun_family = AF_UNIX;
853 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
854 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
857 b->bus_client = true;
871 int sd_bus_open_user(sd_bus **ret) {
884 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
886 r = sd_bus_set_address(b, e);
890 e = secure_getenv("XDG_RUNTIME_DIR");
897 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
902 b->sockaddr.un.sun_family = AF_UNIX;
903 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
904 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
907 b->bus_client = true;
921 void sd_bus_close(sd_bus *bus) {
925 if (bus->input_fd >= 0)
926 close_nointr_nofail(bus->input_fd);
927 if (bus->output_fd >= 0 && bus->output_fd != bus->input_fd)
928 close_nointr_nofail(bus->output_fd);
930 bus->input_fd = bus->output_fd = -1;
933 sd_bus *sd_bus_ref(sd_bus *bus) {
937 assert(bus->n_ref > 0);
943 sd_bus *sd_bus_unref(sd_bus *bus) {
947 assert(bus->n_ref > 0);
956 int sd_bus_is_open(sd_bus *bus) {
960 return bus->state != BUS_UNSET && bus->input_fd >= 0;
963 int sd_bus_can_send(sd_bus *bus, char type) {
968 if (bus->output_fd < 0)
971 if (type == SD_BUS_TYPE_UNIX_FD) {
972 if (!bus->negotiate_fds)
975 r = bus_ensure_running(bus);
982 return bus_type_is_valid(type);
985 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
993 r = bus_ensure_running(bus);
997 *server_id = bus->server_id;
1001 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1004 if (m->header->version > b->message_version)
1010 return bus_message_seal(m, ++b->serial);
1013 static int dispatch_wqueue(sd_bus *bus) {
1017 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1019 if (bus->output_fd < 0)
1022 while (bus->wqueue_size > 0) {
1025 r = bus_kernel_write_message(bus, bus->wqueue[0]);
1027 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1033 /* Didn't do anything this time */
1035 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1036 /* Fully written. Let's drop the entry from
1039 * This isn't particularly optimized, but
1040 * well, this is supposed to be our worst-case
1041 * buffer only, and the socket buffer is
1042 * supposed to be our primary buffer, and if
1043 * it got full, then all bets are off
1046 sd_bus_message_unref(bus->wqueue[0]);
1047 bus->wqueue_size --;
1048 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1058 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1059 sd_bus_message *z = NULL;
1064 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1066 if (bus->input_fd < 0)
1069 if (bus->rqueue_size > 0) {
1070 /* Dispatch a queued message */
1072 *m = bus->rqueue[0];
1073 bus->rqueue_size --;
1074 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1078 /* Try to read a new message */
1081 r = bus_kernel_read_message(bus, &z);
1083 r = bus_socket_read_message(bus, &z);
1099 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1104 if (bus->state == BUS_UNSET)
1106 if (bus->output_fd < 0)
1112 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1119 /* If the serial number isn't kept, then we know that no reply
1121 if (!serial && !m->sealed)
1122 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1124 r = bus_seal_message(bus, m);
1128 /* If this is a reply and no reply was requested, then let's
1129 * suppress this, if we can */
1130 if (m->dont_send && !serial)
1133 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1137 r = bus_kernel_write_message(bus, m);
1139 r = bus_socket_write_message(bus, m, &idx);
1144 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1145 /* Wasn't fully written. So let's remember how
1146 * much was written. Note that the first entry
1147 * of the wqueue array is always allocated so
1148 * that we always can remember how much was
1150 bus->wqueue[0] = sd_bus_message_ref(m);
1151 bus->wqueue_size = 1;
1157 /* Just append it to the queue. */
1159 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1162 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1167 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1171 *serial = BUS_MESSAGE_SERIAL(m);
1176 static usec_t calc_elapse(uint64_t usec) {
1177 if (usec == (uint64_t) -1)
1181 usec = BUS_DEFAULT_TIMEOUT;
1183 return now(CLOCK_MONOTONIC) + usec;
1186 static int timeout_compare(const void *a, const void *b) {
1187 const struct reply_callback *x = a, *y = b;
1189 if (x->timeout != 0 && y->timeout == 0)
1192 if (x->timeout == 0 && y->timeout != 0)
1195 if (x->timeout < y->timeout)
1198 if (x->timeout > y->timeout)
1204 int sd_bus_send_with_reply(
1207 sd_bus_message_handler_t callback,
1212 struct reply_callback *c;
1217 if (bus->state == BUS_UNSET)
1219 if (bus->output_fd < 0)
1225 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1227 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1230 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1234 if (usec != (uint64_t) -1) {
1235 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1240 r = bus_seal_message(bus, m);
1244 c = new0(struct reply_callback, 1);
1248 c->callback = callback;
1249 c->userdata = userdata;
1250 c->serial = BUS_MESSAGE_SERIAL(m);
1251 c->timeout = calc_elapse(usec);
1253 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1259 if (c->timeout != 0) {
1260 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1263 sd_bus_send_with_reply_cancel(bus, c->serial);
1268 r = sd_bus_send(bus, m, serial);
1270 sd_bus_send_with_reply_cancel(bus, c->serial);
1277 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1278 struct reply_callback *c;
1285 c = hashmap_remove(bus->reply_callbacks, &serial);
1289 if (c->timeout != 0)
1290 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1296 int bus_ensure_running(sd_bus *bus) {
1301 if (bus->input_fd < 0)
1303 if (bus->state == BUS_UNSET)
1306 if (bus->state == BUS_RUNNING)
1310 r = sd_bus_process(bus, NULL);
1313 if (bus->state == BUS_RUNNING)
1318 r = sd_bus_wait(bus, (uint64_t) -1);
1324 int sd_bus_send_with_reply_and_block(
1328 sd_bus_error *error,
1329 sd_bus_message **reply) {
1338 if (bus->output_fd < 0)
1340 if (bus->state == BUS_UNSET)
1344 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1346 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1348 if (bus_error_is_dirty(error))
1351 r = bus_ensure_running(bus);
1355 r = sd_bus_send(bus, m, &serial);
1359 timeout = calc_elapse(usec);
1363 sd_bus_message *incoming = NULL;
1368 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1371 /* Make sure there's room for queuing this
1372 * locally, before we read the message */
1374 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1383 r = bus_kernel_read_message(bus, &incoming);
1385 r = bus_socket_read_message(bus, &incoming);
1390 if (incoming->reply_serial == serial) {
1391 /* Found a match! */
1393 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1398 sd_bus_message_unref(incoming);
1403 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1406 r = sd_bus_error_copy(error, &incoming->error);
1408 sd_bus_message_unref(incoming);
1412 k = bus_error_to_errno(&incoming->error);
1413 sd_bus_message_unref(incoming);
1417 sd_bus_message_unref(incoming);
1421 /* There's already guaranteed to be room for
1422 * this, so need to resize things here */
1423 bus->rqueue[bus->rqueue_size ++] = incoming;
1426 /* Try to read more, right-away */
1435 n = now(CLOCK_MONOTONIC);
1441 left = (uint64_t) -1;
1443 r = bus_poll(bus, true, left);
1447 r = dispatch_wqueue(bus);
1453 int sd_bus_get_fd(sd_bus *bus) {
1456 if (bus->input_fd < 0)
1458 if (bus->input_fd != bus->output_fd)
1461 return bus->input_fd;
1464 int sd_bus_get_events(sd_bus *bus) {
1469 if (bus->state == BUS_UNSET)
1471 if (bus->input_fd < 0)
1474 if (bus->state == BUS_OPENING)
1476 else if (bus->state == BUS_AUTHENTICATING) {
1478 if (bus_socket_auth_needs_write(bus))
1483 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1484 if (bus->rqueue_size <= 0)
1486 if (bus->wqueue_size > 0)
1493 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1494 struct reply_callback *c;
1500 if (bus->state == BUS_UNSET)
1502 if (bus->input_fd < 0)
1505 if (bus->state == BUS_AUTHENTICATING) {
1506 *timeout_usec = bus->auth_timeout;
1510 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1511 *timeout_usec = (uint64_t) -1;
1515 c = prioq_peek(bus->reply_callbacks_prioq);
1517 *timeout_usec = (uint64_t) -1;
1521 *timeout_usec = c->timeout;
1525 static int process_timeout(sd_bus *bus) {
1526 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1527 struct reply_callback *c;
1533 c = prioq_peek(bus->reply_callbacks_prioq);
1537 n = now(CLOCK_MONOTONIC);
1541 r = bus_message_new_synthetic_error(
1544 &SD_BUS_ERROR_MAKE("org.freedesktop.DBus.Error.Timeout", "Timed out"),
1549 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1550 hashmap_remove(bus->reply_callbacks, &c->serial);
1552 r = c->callback(bus, m, c->userdata);
1555 return r < 0 ? r : 1;
1558 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1562 if (bus->state != BUS_HELLO)
1565 /* Let's make sure the first message on the bus is the HELLO
1566 * reply. But note that we don't actually parse the message
1567 * here (we leave that to the usual handling), we just verify
1568 * we don't let any earlier msg through. */
1570 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1571 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1574 if (m->reply_serial != bus->hello_serial)
1580 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1581 struct reply_callback *c;
1587 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1588 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1591 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1595 if (c->timeout != 0)
1596 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1598 r = sd_bus_message_rewind(m, true);
1602 r = c->callback(bus, m, c->userdata);
1608 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1609 struct filter_callback *l;
1616 bus->filter_callbacks_modified = false;
1618 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1620 if (bus->filter_callbacks_modified)
1623 /* Don't run this more than once per iteration */
1624 if (l->last_iteration == bus->iteration_counter)
1627 l->last_iteration = bus->iteration_counter;
1629 r = sd_bus_message_rewind(m, true);
1633 r = l->callback(bus, m, l->userdata);
1639 } while (bus->filter_callbacks_modified);
1644 static int process_match(sd_bus *bus, sd_bus_message *m) {
1651 bus->match_callbacks_modified = false;
1653 r = bus_match_run(bus, &bus->match_callbacks, m);
1657 } while (bus->match_callbacks_modified);
1662 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1663 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1669 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1672 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1675 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1678 if (streq_ptr(m->member, "Ping"))
1679 r = sd_bus_message_new_method_return(bus, m, &reply);
1680 else if (streq_ptr(m->member, "GetMachineId")) {
1684 r = sd_id128_get_machine(&id);
1688 r = sd_bus_message_new_method_return(bus, m, &reply);
1692 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1694 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1696 sd_bus_error_set(&error,
1697 "org.freedesktop.DBus.Error.UnknownMethod",
1698 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1700 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1706 r = sd_bus_send(bus, reply, NULL);
1713 static int process_object(sd_bus *bus, sd_bus_message *m) {
1714 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1715 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1716 struct object_callback *c;
1724 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1727 if (hashmap_isempty(bus->object_callbacks))
1730 pl = strlen(m->path);
1735 bus->object_callbacks_modified = false;
1737 c = hashmap_get(bus->object_callbacks, m->path);
1738 if (c && c->last_iteration != bus->iteration_counter) {
1740 c->last_iteration = bus->iteration_counter;
1742 r = sd_bus_message_rewind(m, true);
1746 r = c->callback(bus, m, c->userdata);
1753 /* Look for fallback prefixes */
1758 if (bus->object_callbacks_modified)
1761 e = strrchr(p, '/');
1767 c = hashmap_get(bus->object_callbacks, p);
1768 if (c && c->last_iteration != bus->iteration_counter && c->is_fallback) {
1770 c->last_iteration = bus->iteration_counter;
1772 r = sd_bus_message_rewind(m, true);
1776 r = c->callback(bus, m, c->userdata);
1784 } while (bus->object_callbacks_modified);
1786 /* We found some handlers but none wanted to take this, then
1787 * return this -- with one exception, we can handle
1788 * introspection minimally ourselves */
1789 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1792 sd_bus_error_set(&error,
1793 "org.freedesktop.DBus.Error.UnknownMethod",
1794 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1796 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1800 r = sd_bus_send(bus, reply, NULL);
1807 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1808 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1809 _cleanup_free_ char *introspection = NULL;
1810 _cleanup_set_free_free_ Set *s = NULL;
1811 _cleanup_fclose_ FILE *f = NULL;
1812 struct object_callback *c;
1821 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1827 s = set_new(string_hash_func, string_compare_func);
1831 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1835 if (streq(c->path, "/"))
1838 if (streq(m->path, "/"))
1841 e = startswith(c->path, m->path);
1842 if (!e || *e != '/')
1854 r = set_consume(s, a);
1855 if (r < 0 && r != -EEXIST)
1859 f = open_memstream(&introspection, &size);
1863 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1864 fputs("<node>\n", f);
1865 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1866 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1868 while ((node = set_steal_first(s))) {
1869 fprintf(f, " <node name=\"%s\"/>\n", node);
1873 fputs("</node>\n", f);
1880 r = sd_bus_message_new_method_return(bus, m, &reply);
1884 r = sd_bus_message_append(reply, "s", introspection);
1888 r = sd_bus_send(bus, reply, NULL);
1895 static int process_message(sd_bus *bus, sd_bus_message *m) {
1901 bus->iteration_counter++;
1903 r = process_hello(bus, m);
1907 r = process_reply(bus, m);
1911 r = process_filter(bus, m);
1915 r = process_match(bus, m);
1919 r = process_builtin(bus, m);
1923 r = process_object(bus, m);
1927 return process_introspect(bus, m);
1930 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1931 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1935 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1937 r = process_timeout(bus);
1941 r = dispatch_wqueue(bus);
1945 r = dispatch_rqueue(bus, &m);
1951 r = process_message(bus, m);
1956 r = sd_bus_message_rewind(m, true);
1965 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1966 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1967 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1969 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1971 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1975 r = sd_bus_send(bus, reply, NULL);
1989 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1992 /* Returns 0 when we didn't do anything. This should cause the
1993 * caller to invoke sd_bus_wait() before returning the next
1994 * time. Returns > 0 when we did something, which possibly
1995 * means *ret is filled in with an unprocessed message. */
1999 if (bus->input_fd < 0)
2002 /* We don't allow recursively invoking sd_bus_process(). */
2003 if (bus->processing)
2006 switch (bus->state) {
2012 r = bus_socket_process_opening(bus);
2019 case BUS_AUTHENTICATING:
2021 r = bus_socket_process_authenticating(bus);
2031 bus->processing = true;
2032 r = process_running(bus, ret);
2033 bus->processing = false;
2038 assert_not_reached("Unknown state");
2041 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2042 struct pollfd p[2] = {};
2049 if (bus->input_fd < 0)
2052 e = sd_bus_get_events(bus);
2059 r = sd_bus_get_timeout(bus, &until);
2066 nw = now(CLOCK_MONOTONIC);
2067 m = until > nw ? until - nw : 0;
2070 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2073 p[0].fd = bus->input_fd;
2074 if (bus->output_fd == bus->input_fd) {
2078 p[0].events = e & POLLIN;
2079 p[1].fd = bus->output_fd;
2080 p[1].events = e & POLLOUT;
2084 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2088 return r > 0 ? 1 : 0;
2091 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2095 if (bus->state == BUS_UNSET)
2097 if (bus->input_fd < 0)
2099 if (bus->rqueue_size > 0)
2102 return bus_poll(bus, false, timeout_usec);
2105 int sd_bus_flush(sd_bus *bus) {
2110 if (bus->state == BUS_UNSET)
2112 if (bus->output_fd < 0)
2115 r = bus_ensure_running(bus);
2119 if (bus->wqueue_size <= 0)
2123 r = dispatch_wqueue(bus);
2127 if (bus->wqueue_size <= 0)
2130 r = bus_poll(bus, false, (uint64_t) -1);
2136 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2137 struct filter_callback *f;
2144 f = new0(struct filter_callback, 1);
2147 f->callback = callback;
2148 f->userdata = userdata;
2150 bus->filter_callbacks_modified = true;
2151 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2155 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2156 struct filter_callback *f;
2163 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2164 if (f->callback == callback && f->userdata == userdata) {
2165 bus->filter_callbacks_modified = true;
2166 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2175 static int bus_add_object(
2179 sd_bus_message_handler_t callback,
2182 struct object_callback *c;
2192 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2196 c = new0(struct object_callback, 1);
2200 c->path = strdup(path);
2206 c->callback = callback;
2207 c->userdata = userdata;
2208 c->is_fallback = fallback;
2210 bus->object_callbacks_modified = true;
2211 r = hashmap_put(bus->object_callbacks, c->path, c);
2221 static int bus_remove_object(
2225 sd_bus_message_handler_t callback,
2228 struct object_callback *c;
2237 c = hashmap_get(bus->object_callbacks, path);
2241 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2244 bus->object_callbacks_modified = true;
2245 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2253 int sd_bus_add_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2254 return bus_add_object(bus, false, path, callback, userdata);
2257 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2258 return bus_remove_object(bus, false, path, callback, userdata);
2261 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2262 return bus_add_object(bus, true, prefix, callback, userdata);
2265 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2266 return bus_remove_object(bus, true, prefix, callback, userdata);
2269 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2277 if (bus->bus_client) {
2278 r = bus_add_match_internal(bus, match);
2284 bus->match_callbacks_modified = true;
2285 r = bus_match_add(&bus->match_callbacks, match, callback, userdata, NULL);
2288 if (bus->bus_client)
2289 bus_remove_match_internal(bus, match);
2296 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2304 if (bus->bus_client)
2305 r = bus_remove_match_internal(bus, match);
2308 bus->match_callbacks_modified = true;
2309 q = bus_match_remove(&bus->match_callbacks, match, callback, userdata);
2317 int sd_bus_emit_signal(
2320 const char *interface,
2322 const char *types, ...) {
2324 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2331 r = sd_bus_message_new_signal(bus, path, interface, member, &m);
2335 va_start(ap, types);
2336 r = bus_message_append_ap(m, types, ap);
2341 return sd_bus_send(bus, m, NULL);
2344 int sd_bus_call_method(
2346 const char *destination,
2348 const char *interface,
2350 sd_bus_error *error,
2351 sd_bus_message **reply,
2352 const char *types, ...) {
2354 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2361 r = sd_bus_message_new_method_call(bus, destination, path, interface, member, &m);
2365 va_start(ap, types);
2366 r = bus_message_append_ap(m, types, ap);
2371 return sd_bus_send_with_reply_and_block(bus, m, 0, error, reply);
2374 int sd_bus_reply_method_return(
2376 sd_bus_message *call,
2377 const char *types, ...) {
2379 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2389 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2392 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2395 r = sd_bus_message_new_method_return(bus, call, &m);
2399 va_start(ap, types);
2400 r = bus_message_append_ap(m, types, ap);
2405 return sd_bus_send(bus, m, NULL);
2408 int sd_bus_reply_method_error(
2410 sd_bus_message *call,
2411 const sd_bus_error *e) {
2413 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2422 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2424 if (!sd_bus_error_is_set(e))
2427 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2430 r = sd_bus_message_new_method_error(bus, call, e, &m);
2434 return sd_bus_send(bus, m, NULL);