1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
37 #include "bus-internal.h"
38 #include "bus-message.h"
40 #include "bus-socket.h"
41 #include "bus-kernel.h"
42 #include "bus-control.h"
44 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
46 static void bus_free(sd_bus *b) {
47 struct filter_callback *f;
48 struct object_callback *c;
62 strv_free(b->exec_argv);
64 close_many(b->fds, b->n_fds);
67 for (i = 0; i < b->rqueue_size; i++)
68 sd_bus_message_unref(b->rqueue[i]);
71 for (i = 0; i < b->wqueue_size; i++)
72 sd_bus_message_unref(b->wqueue[i]);
75 hashmap_free_free(b->reply_callbacks);
76 prioq_free(b->reply_callbacks_prioq);
78 while ((f = b->filter_callbacks)) {
79 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
83 while ((c = hashmap_steal_first(b->object_callbacks))) {
88 hashmap_free(b->object_callbacks);
89 bus_match_free(&b->match_callbacks);
91 bus_kernel_flush_memfd(b);
96 int sd_bus_new(sd_bus **ret) {
107 r->input_fd = r->output_fd = -1;
108 r->message_version = 1;
109 r->negotiate_fds = true;
111 /* We guarantee that wqueue always has space for at least one
113 r->wqueue = new(sd_bus_message*, 1);
123 int sd_bus_set_address(sd_bus *bus, const char *address) {
128 if (bus->state != BUS_UNSET)
143 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
146 if (bus->state != BUS_UNSET)
153 bus->input_fd = input_fd;
154 bus->output_fd = output_fd;
158 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
163 if (bus->state != BUS_UNSET)
167 if (strv_isempty(argv))
180 free(bus->exec_path);
181 strv_free(bus->exec_argv);
189 int sd_bus_set_bus_client(sd_bus *bus, int b) {
192 if (bus->state != BUS_UNSET)
195 bus->bus_client = !!b;
199 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
202 if (bus->state != BUS_UNSET)
205 bus->negotiate_fds = !!b;
209 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
212 if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
214 if (bus->state != BUS_UNSET)
217 bus->is_server = !!b;
218 bus->server_id = server_id;
222 int sd_bus_set_anonymous(sd_bus *bus, int b) {
225 if (bus->state != BUS_UNSET)
228 bus->anonymous_auth = !!b;
232 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
237 assert(bus->state == BUS_HELLO);
244 r = sd_bus_message_read(reply, "s", &s);
248 if (!service_name_is_valid(s) || s[0] != ':')
251 bus->unique_name = strdup(s);
252 if (!bus->unique_name)
255 bus->state = BUS_RUNNING;
260 static int bus_send_hello(sd_bus *bus) {
261 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
266 if (!bus->bus_client || bus->is_kernel)
269 r = sd_bus_message_new_method_call(
271 "org.freedesktop.DBus",
273 "org.freedesktop.DBus",
279 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
282 int bus_start_running(sd_bus *bus) {
285 if (bus->bus_client && !bus->is_kernel) {
286 bus->state = BUS_HELLO;
290 bus->state = BUS_RUNNING;
294 static int parse_address_key(const char **p, const char *key, char **value) {
305 if (strncmp(*p, key, l) != 0)
318 while (*a != ';' && *a != ',' && *a != 0) {
336 c = (char) ((x << 4) | y);
343 t = realloc(r, n + 2);
371 static void skip_address_key(const char **p) {
375 *p += strcspn(*p, ",");
381 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
382 _cleanup_free_ char *path = NULL, *abstract = NULL;
391 while (**p != 0 && **p != ';') {
392 r = parse_address_key(p, "guid", guid);
398 r = parse_address_key(p, "path", &path);
404 r = parse_address_key(p, "abstract", &abstract);
413 if (!path && !abstract)
416 if (path && abstract)
421 if (l > sizeof(b->sockaddr.un.sun_path))
424 b->sockaddr.un.sun_family = AF_UNIX;
425 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
426 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
427 } else if (abstract) {
428 l = strlen(abstract);
429 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
432 b->sockaddr.un.sun_family = AF_UNIX;
433 b->sockaddr.un.sun_path[0] = 0;
434 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
435 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
441 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
442 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
444 struct addrinfo *result, hints = {
445 .ai_socktype = SOCK_STREAM,
446 .ai_flags = AI_ADDRCONFIG,
454 while (**p != 0 && **p != ';') {
455 r = parse_address_key(p, "guid", guid);
461 r = parse_address_key(p, "host", &host);
467 r = parse_address_key(p, "port", &port);
473 r = parse_address_key(p, "family", &family);
486 if (streq(family, "ipv4"))
487 hints.ai_family = AF_INET;
488 else if (streq(family, "ipv6"))
489 hints.ai_family = AF_INET6;
494 r = getaddrinfo(host, port, &hints, &result);
498 return -EADDRNOTAVAIL;
500 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
501 b->sockaddr_size = result->ai_addrlen;
503 freeaddrinfo(result);
508 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
510 unsigned n_argv = 0, j;
519 while (**p != 0 && **p != ';') {
520 r = parse_address_key(p, "guid", guid);
526 r = parse_address_key(p, "path", &path);
532 if (startswith(*p, "argv")) {
536 ul = strtoul(*p + 4, (char**) p, 10);
537 if (errno > 0 || **p != '=' || ul > 256) {
547 x = realloc(argv, sizeof(char*) * (ul + 2));
553 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
559 r = parse_address_key(p, NULL, argv + ul);
574 /* Make sure there are no holes in the array, with the
575 * exception of argv[0] */
576 for (j = 1; j < n_argv; j++)
582 if (argv && argv[0] == NULL) {
583 argv[0] = strdup(path);
595 for (j = 0; j < n_argv; j++)
603 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
604 _cleanup_free_ char *path = NULL;
612 while (**p != 0 && **p != ';') {
613 r = parse_address_key(p, "guid", guid);
619 r = parse_address_key(p, "path", &path);
638 static void bus_reset_parsed_address(sd_bus *b) {
642 b->sockaddr_size = 0;
643 strv_free(b->exec_argv);
647 b->server_id = SD_ID128_NULL;
652 static int bus_parse_next_address(sd_bus *b) {
653 _cleanup_free_ char *guid = NULL;
661 if (b->address[b->address_index] == 0)
664 bus_reset_parsed_address(b);
666 a = b->address + b->address_index;
675 if (startswith(a, "unix:")) {
678 r = parse_unix_address(b, &a, &guid);
683 } else if (startswith(a, "tcp:")) {
686 r = parse_tcp_address(b, &a, &guid);
692 } else if (startswith(a, "unixexec:")) {
695 r = parse_exec_address(b, &a, &guid);
701 } else if (startswith(a, "kernel:")) {
704 r = parse_kernel_address(b, &a, &guid);
717 r = sd_id128_from_string(guid, &b->server_id);
722 b->address_index = a - b->address;
726 static int bus_start_address(sd_bus *b) {
734 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
736 r = bus_socket_connect(b);
740 b->last_connect_error = -r;
742 } else if (b->exec_path) {
744 r = bus_socket_exec(b);
748 b->last_connect_error = -r;
749 } else if (b->kernel) {
751 r = bus_kernel_connect(b);
755 b->last_connect_error = -r;
758 r = bus_parse_next_address(b);
762 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
766 int bus_next_address(sd_bus *b) {
769 bus_reset_parsed_address(b);
770 return bus_start_address(b);
773 static int bus_start_fd(sd_bus *b) {
778 assert(b->input_fd >= 0);
779 assert(b->output_fd >= 0);
781 r = fd_nonblock(b->input_fd, true);
785 r = fd_cloexec(b->input_fd, true);
789 if (b->input_fd != b->output_fd) {
790 r = fd_nonblock(b->output_fd, true);
794 r = fd_cloexec(b->output_fd, true);
799 if (fstat(b->input_fd, &st) < 0)
802 if (S_ISCHR(b->input_fd))
803 return bus_kernel_take_fd(b);
805 return bus_socket_take_fd(b);
808 int sd_bus_start(sd_bus *bus) {
813 if (bus->state != BUS_UNSET)
816 bus->state = BUS_OPENING;
818 if (bus->is_server && bus->bus_client)
821 if (bus->input_fd >= 0)
822 r = bus_start_fd(bus);
823 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel)
824 r = bus_start_address(bus);
831 return bus_send_hello(bus);
834 int sd_bus_open_system(sd_bus **ret) {
846 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
848 r = sd_bus_set_address(b, e);
852 b->sockaddr.un.sun_family = AF_UNIX;
853 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
854 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
857 b->bus_client = true;
871 int sd_bus_open_user(sd_bus **ret) {
884 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
886 r = sd_bus_set_address(b, e);
890 e = secure_getenv("XDG_RUNTIME_DIR");
897 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
902 b->sockaddr.un.sun_family = AF_UNIX;
903 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
904 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
907 b->bus_client = true;
921 void sd_bus_close(sd_bus *bus) {
925 if (bus->input_fd >= 0)
926 close_nointr_nofail(bus->input_fd);
927 if (bus->output_fd >= 0 && bus->output_fd != bus->input_fd)
928 close_nointr_nofail(bus->output_fd);
930 bus->input_fd = bus->output_fd = -1;
933 sd_bus *sd_bus_ref(sd_bus *bus) {
937 assert(bus->n_ref > 0);
943 sd_bus *sd_bus_unref(sd_bus *bus) {
947 assert(bus->n_ref > 0);
956 int sd_bus_is_open(sd_bus *bus) {
960 return bus->state != BUS_UNSET && bus->input_fd >= 0;
963 int sd_bus_can_send(sd_bus *bus, char type) {
968 if (bus->output_fd < 0)
971 if (type == SD_BUS_TYPE_UNIX_FD) {
972 if (!bus->negotiate_fds)
975 r = bus_ensure_running(bus);
982 return bus_type_is_valid(type);
985 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
993 r = bus_ensure_running(bus);
997 *server_id = bus->server_id;
1001 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1004 if (m->header->version > b->message_version)
1010 return bus_message_seal(m, ++b->serial);
1013 static int dispatch_wqueue(sd_bus *bus) {
1017 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1019 if (bus->output_fd < 0)
1022 while (bus->wqueue_size > 0) {
1025 r = bus_kernel_write_message(bus, bus->wqueue[0]);
1027 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1033 /* Didn't do anything this time */
1035 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1036 /* Fully written. Let's drop the entry from
1039 * This isn't particularly optimized, but
1040 * well, this is supposed to be our worst-case
1041 * buffer only, and the socket buffer is
1042 * supposed to be our primary buffer, and if
1043 * it got full, then all bets are off
1046 sd_bus_message_unref(bus->wqueue[0]);
1047 bus->wqueue_size --;
1048 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1058 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1059 sd_bus_message *z = NULL;
1064 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1066 if (bus->input_fd < 0)
1069 if (bus->rqueue_size > 0) {
1070 /* Dispatch a queued message */
1072 *m = bus->rqueue[0];
1073 bus->rqueue_size --;
1074 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1078 /* Try to read a new message */
1081 r = bus_kernel_read_message(bus, &z);
1083 r = bus_socket_read_message(bus, &z);
1099 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1104 if (bus->state == BUS_UNSET)
1106 if (bus->output_fd < 0)
1112 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1119 /* If the serial number isn't kept, then we know that no reply
1121 if (!serial && !m->sealed)
1122 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1124 r = bus_seal_message(bus, m);
1128 /* If this is a reply and no reply was requested, then let's
1129 * suppress this, if we can */
1130 if (m->dont_send && !serial)
1133 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1137 r = bus_kernel_write_message(bus, m);
1139 r = bus_socket_write_message(bus, m, &idx);
1144 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1145 /* Wasn't fully written. So let's remember how
1146 * much was written. Note that the first entry
1147 * of the wqueue array is always allocated so
1148 * that we always can remember how much was
1150 bus->wqueue[0] = sd_bus_message_ref(m);
1151 bus->wqueue_size = 1;
1157 /* Just append it to the queue. */
1159 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1162 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1167 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1171 *serial = BUS_MESSAGE_SERIAL(m);
1176 static usec_t calc_elapse(uint64_t usec) {
1177 if (usec == (uint64_t) -1)
1181 usec = BUS_DEFAULT_TIMEOUT;
1183 return now(CLOCK_MONOTONIC) + usec;
1186 static int timeout_compare(const void *a, const void *b) {
1187 const struct reply_callback *x = a, *y = b;
1189 if (x->timeout != 0 && y->timeout == 0)
1192 if (x->timeout == 0 && y->timeout != 0)
1195 if (x->timeout < y->timeout)
1198 if (x->timeout > y->timeout)
1204 int sd_bus_send_with_reply(
1207 sd_bus_message_handler_t callback,
1212 struct reply_callback *c;
1217 if (bus->state == BUS_UNSET)
1219 if (bus->output_fd < 0)
1225 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1227 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1230 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1234 if (usec != (uint64_t) -1) {
1235 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1240 r = bus_seal_message(bus, m);
1244 c = new0(struct reply_callback, 1);
1248 c->callback = callback;
1249 c->userdata = userdata;
1250 c->serial = BUS_MESSAGE_SERIAL(m);
1251 c->timeout = calc_elapse(usec);
1253 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1259 if (c->timeout != 0) {
1260 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1263 sd_bus_send_with_reply_cancel(bus, c->serial);
1268 r = sd_bus_send(bus, m, serial);
1270 sd_bus_send_with_reply_cancel(bus, c->serial);
1277 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1278 struct reply_callback *c;
1285 c = hashmap_remove(bus->reply_callbacks, &serial);
1289 if (c->timeout != 0)
1290 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1296 int bus_ensure_running(sd_bus *bus) {
1301 if (bus->input_fd < 0)
1303 if (bus->state == BUS_UNSET)
1306 if (bus->state == BUS_RUNNING)
1310 r = sd_bus_process(bus, NULL);
1313 if (bus->state == BUS_RUNNING)
1318 r = sd_bus_wait(bus, (uint64_t) -1);
1324 int sd_bus_send_with_reply_and_block(
1328 sd_bus_error *error,
1329 sd_bus_message **reply) {
1338 if (bus->output_fd < 0)
1340 if (bus->state == BUS_UNSET)
1344 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1346 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1348 if (bus_error_is_dirty(error))
1351 r = bus_ensure_running(bus);
1355 r = sd_bus_send(bus, m, &serial);
1359 timeout = calc_elapse(usec);
1363 sd_bus_message *incoming = NULL;
1368 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1371 /* Make sure there's room for queuing this
1372 * locally, before we read the message */
1374 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1383 r = bus_kernel_read_message(bus, &incoming);
1385 r = bus_socket_read_message(bus, &incoming);
1390 if (incoming->reply_serial == serial) {
1391 /* Found a match! */
1393 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1398 sd_bus_message_unref(incoming);
1403 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1406 r = sd_bus_error_copy(error, &incoming->error);
1408 sd_bus_message_unref(incoming);
1412 k = bus_error_to_errno(&incoming->error);
1413 sd_bus_message_unref(incoming);
1417 sd_bus_message_unref(incoming);
1421 /* There's already guaranteed to be room for
1422 * this, so need to resize things here */
1423 bus->rqueue[bus->rqueue_size ++] = incoming;
1426 /* Try to read more, right-away */
1435 n = now(CLOCK_MONOTONIC);
1441 left = (uint64_t) -1;
1443 r = bus_poll(bus, true, left);
1447 r = dispatch_wqueue(bus);
1453 int sd_bus_get_fd(sd_bus *bus) {
1456 if (bus->input_fd < 0)
1458 if (bus->input_fd != bus->output_fd)
1461 return bus->input_fd;
1464 int sd_bus_get_events(sd_bus *bus) {
1469 if (bus->state == BUS_UNSET)
1471 if (bus->input_fd < 0)
1474 if (bus->state == BUS_OPENING)
1476 else if (bus->state == BUS_AUTHENTICATING) {
1478 if (bus_socket_auth_needs_write(bus))
1483 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1484 if (bus->rqueue_size <= 0)
1486 if (bus->wqueue_size > 0)
1493 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1494 struct reply_callback *c;
1500 if (bus->state == BUS_UNSET)
1502 if (bus->input_fd < 0)
1505 if (bus->state == BUS_AUTHENTICATING) {
1506 *timeout_usec = bus->auth_timeout;
1510 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1511 *timeout_usec = (uint64_t) -1;
1515 c = prioq_peek(bus->reply_callbacks_prioq);
1517 *timeout_usec = (uint64_t) -1;
1521 *timeout_usec = c->timeout;
1525 static int process_timeout(sd_bus *bus) {
1526 struct reply_callback *c;
1532 c = prioq_peek(bus->reply_callbacks_prioq);
1536 n = now(CLOCK_MONOTONIC);
1540 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1541 hashmap_remove(bus->reply_callbacks, &c->serial);
1543 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1546 return r < 0 ? r : 1;
1549 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1553 if (bus->state != BUS_HELLO)
1556 /* Let's make sure the first message on the bus is the HELLO
1557 * reply. But note that we don't actually parse the message
1558 * here (we leave that to the usual handling), we just verify
1559 * we don't let any earlier msg through. */
1561 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1562 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1565 if (m->reply_serial != bus->hello_serial)
1571 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1572 struct reply_callback *c;
1578 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1579 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1582 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1586 if (c->timeout != 0)
1587 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1589 r = sd_bus_message_rewind(m, true);
1593 r = c->callback(bus, 0, m, c->userdata);
1599 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1600 struct filter_callback *l;
1607 bus->filter_callbacks_modified = false;
1609 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1611 if (bus->filter_callbacks_modified)
1614 /* Don't run this more than once per iteration */
1615 if (l->last_iteration == bus->iteration_counter)
1618 l->last_iteration = bus->iteration_counter;
1620 r = sd_bus_message_rewind(m, true);
1624 r = l->callback(bus, 0, m, l->userdata);
1630 } while (bus->filter_callbacks_modified);
1635 static int process_match(sd_bus *bus, sd_bus_message *m) {
1642 bus->match_callbacks_modified = false;
1644 r = bus_match_run(bus, &bus->match_callbacks, 0, m);
1648 } while (bus->match_callbacks_modified);
1653 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1654 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1660 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1663 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1666 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1669 if (streq_ptr(m->member, "Ping"))
1670 r = sd_bus_message_new_method_return(bus, m, &reply);
1671 else if (streq_ptr(m->member, "GetMachineId")) {
1675 r = sd_id128_get_machine(&id);
1679 r = sd_bus_message_new_method_return(bus, m, &reply);
1683 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1685 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1687 sd_bus_error_set(&error,
1688 "org.freedesktop.DBus.Error.UnknownMethod",
1689 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1691 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1697 r = sd_bus_send(bus, reply, NULL);
1704 static int process_object(sd_bus *bus, sd_bus_message *m) {
1705 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1706 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1707 struct object_callback *c;
1715 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1718 if (hashmap_isempty(bus->object_callbacks))
1721 pl = strlen(m->path);
1726 bus->object_callbacks_modified = false;
1728 c = hashmap_get(bus->object_callbacks, m->path);
1729 if (c && c->last_iteration != bus->iteration_counter) {
1731 c->last_iteration = bus->iteration_counter;
1733 r = sd_bus_message_rewind(m, true);
1737 r = c->callback(bus, 0, m, c->userdata);
1744 /* Look for fallback prefixes */
1749 if (bus->object_callbacks_modified)
1752 e = strrchr(p, '/');
1758 c = hashmap_get(bus->object_callbacks, p);
1759 if (c && c->last_iteration != bus->iteration_counter && c->is_fallback) {
1761 c->last_iteration = bus->iteration_counter;
1763 r = sd_bus_message_rewind(m, true);
1767 r = c->callback(bus, 0, m, c->userdata);
1775 } while (bus->object_callbacks_modified);
1777 /* We found some handlers but none wanted to take this, then
1778 * return this -- with one exception, we can handle
1779 * introspection minimally ourselves */
1780 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1783 sd_bus_error_set(&error,
1784 "org.freedesktop.DBus.Error.UnknownMethod",
1785 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1787 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1791 r = sd_bus_send(bus, reply, NULL);
1798 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1799 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1800 _cleanup_free_ char *introspection = NULL;
1801 _cleanup_set_free_free_ Set *s = NULL;
1802 _cleanup_fclose_ FILE *f = NULL;
1803 struct object_callback *c;
1812 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1818 s = set_new(string_hash_func, string_compare_func);
1822 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1826 if (streq(c->path, "/"))
1829 if (streq(m->path, "/"))
1832 e = startswith(c->path, m->path);
1833 if (!e || *e != '/')
1845 r = set_consume(s, a);
1846 if (r < 0 && r != -EEXIST)
1850 f = open_memstream(&introspection, &size);
1854 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1855 fputs("<node>\n", f);
1856 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1857 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1859 while ((node = set_steal_first(s))) {
1860 fprintf(f, " <node name=\"%s\"/>\n", node);
1864 fputs("</node>\n", f);
1871 r = sd_bus_message_new_method_return(bus, m, &reply);
1875 r = sd_bus_message_append(reply, "s", introspection);
1879 r = sd_bus_send(bus, reply, NULL);
1886 static int process_message(sd_bus *bus, sd_bus_message *m) {
1892 bus->iteration_counter++;
1894 r = process_hello(bus, m);
1898 r = process_reply(bus, m);
1902 r = process_filter(bus, m);
1906 r = process_match(bus, m);
1910 r = process_builtin(bus, m);
1914 r = process_object(bus, m);
1918 return process_introspect(bus, m);
1921 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1922 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1926 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1928 r = process_timeout(bus);
1932 r = dispatch_wqueue(bus);
1936 r = dispatch_rqueue(bus, &m);
1942 r = process_message(bus, m);
1947 r = sd_bus_message_rewind(m, true);
1956 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1957 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1958 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1960 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1962 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1966 r = sd_bus_send(bus, reply, NULL);
1980 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1983 /* Returns 0 when we didn't do anything. This should cause the
1984 * caller to invoke sd_bus_wait() before returning the next
1985 * time. Returns > 0 when we did something, which possibly
1986 * means *ret is filled in with an unprocessed message. */
1990 if (bus->input_fd < 0)
1993 /* We don't allow recursively invoking sd_bus_process(). */
1994 if (bus->processing)
1997 switch (bus->state) {
2003 r = bus_socket_process_opening(bus);
2010 case BUS_AUTHENTICATING:
2012 r = bus_socket_process_authenticating(bus);
2022 bus->processing = true;
2023 r = process_running(bus, ret);
2024 bus->processing = false;
2029 assert_not_reached("Unknown state");
2032 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2033 struct pollfd p[2] = {};
2040 if (bus->input_fd < 0)
2043 e = sd_bus_get_events(bus);
2050 r = sd_bus_get_timeout(bus, &until);
2057 nw = now(CLOCK_MONOTONIC);
2058 m = until > nw ? until - nw : 0;
2061 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2064 p[0].fd = bus->input_fd;
2065 if (bus->output_fd == bus->input_fd) {
2069 p[0].events = e & POLLIN;
2070 p[1].fd = bus->output_fd;
2071 p[1].events = e & POLLOUT;
2075 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2079 return r > 0 ? 1 : 0;
2082 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2086 if (bus->state == BUS_UNSET)
2088 if (bus->input_fd < 0)
2090 if (bus->rqueue_size > 0)
2093 return bus_poll(bus, false, timeout_usec);
2096 int sd_bus_flush(sd_bus *bus) {
2101 if (bus->state == BUS_UNSET)
2103 if (bus->output_fd < 0)
2106 r = bus_ensure_running(bus);
2110 if (bus->wqueue_size <= 0)
2114 r = dispatch_wqueue(bus);
2118 if (bus->wqueue_size <= 0)
2121 r = bus_poll(bus, false, (uint64_t) -1);
2127 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2128 struct filter_callback *f;
2135 f = new0(struct filter_callback, 1);
2138 f->callback = callback;
2139 f->userdata = userdata;
2141 bus->filter_callbacks_modified = true;
2142 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2146 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2147 struct filter_callback *f;
2154 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2155 if (f->callback == callback && f->userdata == userdata) {
2156 bus->filter_callbacks_modified = true;
2157 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2166 static int bus_add_object(
2170 sd_bus_message_handler_t callback,
2173 struct object_callback *c;
2183 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2187 c = new0(struct object_callback, 1);
2191 c->path = strdup(path);
2197 c->callback = callback;
2198 c->userdata = userdata;
2199 c->is_fallback = fallback;
2201 bus->object_callbacks_modified = true;
2202 r = hashmap_put(bus->object_callbacks, c->path, c);
2212 static int bus_remove_object(
2216 sd_bus_message_handler_t callback,
2219 struct object_callback *c;
2228 c = hashmap_get(bus->object_callbacks, path);
2232 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2235 bus->object_callbacks_modified = true;
2236 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2244 int sd_bus_add_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2245 return bus_add_object(bus, false, path, callback, userdata);
2248 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2249 return bus_remove_object(bus, false, path, callback, userdata);
2252 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2253 return bus_add_object(bus, true, prefix, callback, userdata);
2256 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2257 return bus_remove_object(bus, true, prefix, callback, userdata);
2260 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2268 if (bus->bus_client) {
2269 r = bus_add_match_internal(bus, match);
2275 bus->match_callbacks_modified = true;
2276 r = bus_match_add(&bus->match_callbacks, match, callback, userdata, NULL);
2279 if (bus->bus_client)
2280 bus_remove_match_internal(bus, match);
2287 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2295 if (bus->bus_client)
2296 r = bus_remove_match_internal(bus, match);
2299 bus->match_callbacks_modified = true;
2300 q = bus_match_remove(&bus->match_callbacks, match, callback, userdata);
2308 int sd_bus_emit_signal(
2311 const char *interface,
2313 const char *types, ...) {
2315 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2322 r = sd_bus_message_new_signal(bus, path, interface, member, &m);
2326 va_start(ap, types);
2327 r = bus_message_append_ap(m, types, ap);
2332 return sd_bus_send(bus, m, NULL);
2335 int sd_bus_call_method(
2337 const char *destination,
2339 const char *interface,
2341 sd_bus_error *error,
2342 sd_bus_message **reply,
2343 const char *types, ...) {
2345 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2352 r = sd_bus_message_new_method_call(bus, destination, path, interface, member, &m);
2356 va_start(ap, types);
2357 r = bus_message_append_ap(m, types, ap);
2362 return sd_bus_send_with_reply_and_block(bus, m, 0, error, reply);
2365 int sd_bus_reply_method_return(
2367 sd_bus_message *call,
2368 const char *types, ...) {
2370 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2380 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2383 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2386 r = sd_bus_message_new_method_return(bus, call, &m);
2390 va_start(ap, types);
2391 r = bus_message_append_ap(m, types, ap);
2396 return sd_bus_send(bus, m, NULL);
2399 int sd_bus_reply_method_error(
2401 sd_bus_message *call,
2402 const sd_bus_error *e) {
2404 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2413 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2415 if (!sd_bus_error_is_set(e))
2418 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2421 r = sd_bus_message_new_method_error(bus, call, e, &m);
2425 return sd_bus_send(bus, m, NULL);