1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
36 #include "bus-internal.h"
37 #include "bus-message.h"
39 #include "bus-socket.h"
41 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
43 static void bus_free(sd_bus *b) {
44 struct filter_callback *f;
45 struct object_callback *c;
51 close_nointr_nofail(b->fd);
59 strv_free(b->exec_argv);
61 close_many(b->fds, b->n_fds);
64 for (i = 0; i < b->rqueue_size; i++)
65 sd_bus_message_unref(b->rqueue[i]);
68 for (i = 0; i < b->wqueue_size; i++)
69 sd_bus_message_unref(b->wqueue[i]);
72 hashmap_free_free(b->reply_callbacks);
73 prioq_free(b->reply_callbacks_prioq);
75 while ((f = b->filter_callbacks)) {
76 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
80 while ((c = hashmap_steal_first(b->object_callbacks))) {
85 hashmap_free(b->object_callbacks);
90 int sd_bus_new(sd_bus **ret) {
102 r->message_version = 1;
103 r->negotiate_fds = true;
105 /* We guarantee that wqueue always has space for at least one
107 r->wqueue = new(sd_bus_message*, 1);
117 int sd_bus_set_address(sd_bus *bus, const char *address) {
122 if (bus->state != BUS_UNSET)
137 int sd_bus_set_fd(sd_bus *bus, int fd) {
140 if (bus->state != BUS_UNSET)
149 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
154 if (bus->state != BUS_UNSET)
158 if (strv_isempty(argv))
171 free(bus->exec_path);
172 strv_free(bus->exec_argv);
180 int sd_bus_set_bus_client(sd_bus *bus, int b) {
183 if (bus->state != BUS_UNSET)
186 bus->bus_client = !!b;
190 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
193 if (bus->state != BUS_UNSET)
196 bus->negotiate_fds = !!b;
200 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server) {
203 if (!b && !sd_id128_equal(server, SD_ID128_NULL))
205 if (bus->state != BUS_UNSET)
208 bus->is_server = !!b;
213 int sd_bus_set_anonymous(sd_bus *bus, int b) {
216 if (bus->state != BUS_UNSET)
219 bus->anonymous_auth = !!b;
223 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
228 assert(bus->state == BUS_HELLO);
235 r = sd_bus_message_read(reply, "s", &s);
239 if (!service_name_is_valid(s) || s[0] != ':')
242 bus->unique_name = strdup(s);
243 if (!bus->unique_name)
246 bus->state = BUS_RUNNING;
251 static int bus_send_hello(sd_bus *bus) {
252 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
257 if (!bus->bus_client)
260 r = sd_bus_message_new_method_call(
262 "org.freedesktop.DBus",
264 "org.freedesktop.DBus",
270 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
273 int bus_start_running(sd_bus *bus) {
276 if (bus->bus_client) {
277 bus->state = BUS_HELLO;
281 bus->state = BUS_RUNNING;
285 static int parse_address_key(const char **p, const char *key, char **value) {
296 if (strncmp(*p, key, l) != 0)
309 while (*a != ';' && *a != ',' && *a != 0) {
327 c = (char) ((x << 4) | y);
334 t = realloc(r, n + 2);
362 static void skip_address_key(const char **p) {
366 *p += strcspn(*p, ",");
372 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
373 _cleanup_free_ char *path = NULL, *abstract = NULL;
382 while (**p != 0 && **p != ';') {
383 r = parse_address_key(p, "guid", guid);
389 r = parse_address_key(p, "path", &path);
395 r = parse_address_key(p, "abstract", &abstract);
404 if (!path && !abstract)
407 if (path && abstract)
412 if (l > sizeof(b->sockaddr.un.sun_path))
415 b->sockaddr.un.sun_family = AF_UNIX;
416 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
417 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
418 } else if (abstract) {
419 l = strlen(abstract);
420 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
423 b->sockaddr.un.sun_family = AF_UNIX;
424 b->sockaddr.un.sun_path[0] = 0;
425 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
426 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
432 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
433 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
434 struct addrinfo hints, *result;
442 while (**p != 0 && **p != ';') {
443 r = parse_address_key(p, "guid", guid);
449 r = parse_address_key(p, "host", &host);
455 r = parse_address_key(p, "port", &port);
461 r = parse_address_key(p, "family", &family);
474 hints.ai_socktype = SOCK_STREAM;
475 hints.ai_flags = AI_ADDRCONFIG;
478 if (streq(family, "ipv4"))
479 hints.ai_family = AF_INET;
480 else if (streq(family, "ipv6"))
481 hints.ai_family = AF_INET6;
486 r = getaddrinfo(host, port, &hints, &result);
490 return -EADDRNOTAVAIL;
492 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
493 b->sockaddr_size = result->ai_addrlen;
495 freeaddrinfo(result);
500 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
502 unsigned n_argv = 0, j;
511 while (**p != 0 && **p != ';') {
512 r = parse_address_key(p, "guid", guid);
518 r = parse_address_key(p, "path", &path);
524 if (startswith(*p, "argv")) {
528 ul = strtoul(*p + 4, (char**) p, 10);
529 if (errno > 0 || **p != '=' || ul > 256) {
539 x = realloc(argv, sizeof(char*) * (ul + 2));
545 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
551 r = parse_address_key(p, NULL, argv + ul);
566 /* Make sure there are no holes in the array, with the
567 * exception of argv[0] */
568 for (j = 1; j < n_argv; j++)
574 if (argv && argv[0] == NULL) {
575 argv[0] = strdup(path);
587 for (j = 0; j < n_argv; j++)
595 static void bus_reset_parsed_address(sd_bus *b) {
599 b->sockaddr_size = 0;
600 strv_free(b->exec_argv);
604 b->peer = SD_ID128_NULL;
607 static int bus_parse_next_address(sd_bus *b) {
608 _cleanup_free_ char *guid = NULL;
616 if (b->address[b->address_index] == 0)
619 bus_reset_parsed_address(b);
621 a = b->address + b->address_index;
630 if (startswith(a, "unix:")) {
633 r = parse_unix_address(b, &a, &guid);
638 } else if (startswith(a, "tcp:")) {
641 r = parse_tcp_address(b, &a, &guid);
647 } else if (startswith(a, "unixexec:")) {
650 r = parse_exec_address(b, &a, &guid);
664 r = sd_id128_from_string(guid, &b->peer);
669 b->address_index = a - b->address;
673 static int bus_start_address(sd_bus *b) {
680 close_nointr_nofail(b->fd);
684 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
686 r = bus_socket_connect(b);
690 b->last_connect_error = -r;
692 } else if (b->exec_path) {
694 r = bus_socket_exec(b);
698 b->last_connect_error = -r;
701 r = bus_parse_next_address(b);
705 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
709 int bus_next_address(sd_bus *b) {
712 bus_reset_parsed_address(b);
713 return bus_start_address(b);
716 static int bus_start_fd(sd_bus *b) {
721 r = fd_nonblock(b->fd, true);
725 r = fd_cloexec(b->fd, true);
729 return bus_socket_take_fd(b);
732 int sd_bus_start(sd_bus *bus) {
737 if (bus->state != BUS_UNSET)
740 bus->state = BUS_OPENING;
742 if (bus->is_server && bus->bus_client)
746 r = bus_start_fd(bus);
747 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path)
748 r = bus_start_address(bus);
755 return bus_send_hello(bus);
758 int sd_bus_open_system(sd_bus **ret) {
770 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
772 r = sd_bus_set_address(b, e);
776 b->sockaddr.un.sun_family = AF_UNIX;
777 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
778 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
781 b->bus_client = true;
795 int sd_bus_open_user(sd_bus **ret) {
808 e = getenv("DBUS_SESSION_BUS_ADDRESS");
810 r = sd_bus_set_address(b, e);
814 e = getenv("XDG_RUNTIME_DIR");
821 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
826 b->sockaddr.un.sun_family = AF_UNIX;
827 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
828 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
831 b->bus_client = true;
845 void sd_bus_close(sd_bus *bus) {
851 close_nointr_nofail(bus->fd);
855 sd_bus *sd_bus_ref(sd_bus *bus) {
859 assert(bus->n_ref > 0);
865 sd_bus *sd_bus_unref(sd_bus *bus) {
869 assert(bus->n_ref > 0);
878 int sd_bus_is_open(sd_bus *bus) {
882 return bus->state != BUS_UNSET && bus->fd >= 0;
885 int sd_bus_can_send(sd_bus *bus, char type) {
893 if (type == SD_BUS_TYPE_UNIX_FD) {
894 if (!bus->negotiate_fds)
897 r = bus_ensure_running(bus);
904 return bus_type_is_valid(type);
907 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
915 r = bus_ensure_running(bus);
923 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
926 if (m->header->version > b->message_version)
932 return bus_message_seal(m, ++b->serial);
935 static int dispatch_wqueue(sd_bus *bus) {
939 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
944 while (bus->wqueue_size > 0) {
946 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
951 /* Didn't do anything this time */
953 else if (bus->windex >= bus->wqueue[0]->size) {
954 /* Fully written. Let's drop the entry from
957 * This isn't particularly optimized, but
958 * well, this is supposed to be our worst-case
959 * buffer only, and the socket buffer is
960 * supposed to be our primary buffer, and if
961 * it got full, then all bets are off
964 sd_bus_message_unref(bus->wqueue[0]);
966 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
976 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
977 sd_bus_message *z = NULL;
982 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
987 if (bus->rqueue_size > 0) {
988 /* Dispatch a queued message */
992 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
996 /* Try to read a new message */
998 r = bus_socket_read_message(bus, &z);
1013 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1018 if (bus->state == BUS_UNSET)
1026 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1033 /* If the serial number isn't kept, then we know that no reply
1035 if (!serial && !m->sealed)
1036 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1038 r = bus_seal_message(bus, m);
1042 /* If this is a reply and no reply was requested, then let's
1043 * suppress this, if we can */
1044 if (m->dont_send && !serial)
1047 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1050 r = bus_socket_write_message(bus, m, &idx);
1054 } else if (idx < m->size) {
1055 /* Wasn't fully written. So let's remember how
1056 * much was written. Note that the first entry
1057 * of the wqueue array is always allocated so
1058 * that we always can remember how much was
1060 bus->wqueue[0] = sd_bus_message_ref(m);
1061 bus->wqueue_size = 1;
1067 /* Just append it to the queue. */
1069 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1072 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1077 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1081 *serial = BUS_MESSAGE_SERIAL(m);
1086 static usec_t calc_elapse(uint64_t usec) {
1087 if (usec == (uint64_t) -1)
1091 usec = BUS_DEFAULT_TIMEOUT;
1093 return now(CLOCK_MONOTONIC) + usec;
1096 static int timeout_compare(const void *a, const void *b) {
1097 const struct reply_callback *x = a, *y = b;
1099 if (x->timeout != 0 && y->timeout == 0)
1102 if (x->timeout == 0 && y->timeout != 0)
1105 if (x->timeout < y->timeout)
1108 if (x->timeout > y->timeout)
1114 int sd_bus_send_with_reply(
1117 sd_message_handler_t callback,
1122 struct reply_callback *c;
1127 if (bus->state == BUS_UNSET)
1135 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1137 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1140 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1144 if (usec != (uint64_t) -1) {
1145 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1150 r = bus_seal_message(bus, m);
1154 c = new(struct reply_callback, 1);
1158 c->callback = callback;
1159 c->userdata = userdata;
1160 c->serial = BUS_MESSAGE_SERIAL(m);
1161 c->timeout = calc_elapse(usec);
1163 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1169 if (c->timeout != 0) {
1170 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1173 sd_bus_send_with_reply_cancel(bus, c->serial);
1178 r = sd_bus_send(bus, m, serial);
1180 sd_bus_send_with_reply_cancel(bus, c->serial);
1187 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1188 struct reply_callback *c;
1195 c = hashmap_remove(bus->reply_callbacks, &serial);
1199 if (c->timeout != 0)
1200 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1206 int bus_ensure_running(sd_bus *bus) {
1213 if (bus->state == BUS_UNSET)
1216 if (bus->state == BUS_RUNNING)
1220 r = sd_bus_process(bus, NULL);
1223 if (bus->state == BUS_RUNNING)
1228 r = sd_bus_wait(bus, (uint64_t) -1);
1234 int sd_bus_send_with_reply_and_block(
1238 sd_bus_error *error,
1239 sd_bus_message **reply) {
1250 if (bus->state == BUS_UNSET)
1254 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1256 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1258 if (bus_error_is_dirty(error))
1261 r = bus_ensure_running(bus);
1265 r = sd_bus_send(bus, m, &serial);
1269 timeout = calc_elapse(usec);
1273 sd_bus_message *incoming = NULL;
1278 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1281 /* Make sure there's room for queuing this
1282 * locally, before we read the message */
1284 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1292 r = bus_socket_read_message(bus, &incoming);
1297 if (incoming->reply_serial == serial) {
1298 /* Found a match! */
1300 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1305 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1308 r = sd_bus_error_copy(error, &incoming->error);
1310 sd_bus_message_unref(incoming);
1314 k = bus_error_to_errno(&incoming->error);
1315 sd_bus_message_unref(incoming);
1319 sd_bus_message_unref(incoming);
1323 /* There's already guaranteed to be room for
1324 * this, so need to resize things here */
1325 bus->rqueue[bus->rqueue_size ++] = incoming;
1328 /* Try to read more, right-away */
1337 n = now(CLOCK_MONOTONIC);
1343 left = (uint64_t) -1;
1345 r = bus_poll(bus, true, left);
1349 r = dispatch_wqueue(bus);
1355 int sd_bus_get_fd(sd_bus *bus) {
1365 int sd_bus_get_events(sd_bus *bus) {
1370 if (bus->state == BUS_UNSET)
1375 if (bus->state == BUS_OPENING)
1377 else if (bus->state == BUS_AUTHENTICATING) {
1379 if (bus_socket_auth_needs_write(bus))
1384 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1385 if (bus->rqueue_size <= 0)
1387 if (bus->wqueue_size > 0)
1394 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1395 struct reply_callback *c;
1401 if (bus->state == BUS_UNSET)
1406 if (bus->state == BUS_AUTHENTICATING) {
1407 *timeout_usec = bus->auth_timeout;
1411 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1414 c = prioq_peek(bus->reply_callbacks_prioq);
1418 *timeout_usec = c->timeout;
1422 static int process_timeout(sd_bus *bus) {
1423 struct reply_callback *c;
1429 c = prioq_peek(bus->reply_callbacks_prioq);
1433 n = now(CLOCK_MONOTONIC);
1437 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1438 hashmap_remove(bus->reply_callbacks, &c->serial);
1440 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1443 return r < 0 ? r : 1;
1446 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1450 if (bus->state != BUS_HELLO)
1453 /* Let's make sure the first message on the bus is the HELLO
1454 * reply. But note that we don't actually parse the message
1455 * here (we leave that to the usual handling), we just verify
1456 * we don't let any earlier msg through. */
1458 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1459 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1462 if (m->reply_serial != bus->hello_serial)
1468 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1469 struct reply_callback *c;
1475 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1476 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1479 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1483 if (c->timeout != 0)
1484 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1486 r = c->callback(bus, 0, m, c->userdata);
1492 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1493 struct filter_callback *l;
1496 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1497 r = l->callback(bus, 0, m, l->userdata);
1505 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1506 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1512 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1515 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1518 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1521 if (streq_ptr(m->member, "Ping"))
1522 r = sd_bus_message_new_method_return(bus, m, &reply);
1523 else if (streq_ptr(m->member, "GetMachineId")) {
1527 r = sd_id128_get_machine(&id);
1531 r = sd_bus_message_new_method_return(bus, m, &reply);
1535 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1537 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1539 sd_bus_error_set(&error,
1540 "org.freedesktop.DBus.Error.UnknownMethod",
1541 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1543 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1549 r = sd_bus_send(bus, reply, NULL);
1556 static int process_object(sd_bus *bus, sd_bus_message *m) {
1557 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1558 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1559 struct object_callback *c;
1567 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1570 if (hashmap_isempty(bus->object_callbacks))
1573 c = hashmap_get(bus->object_callbacks, m->path);
1575 r = c->callback(bus, 0, m, c->userdata);
1582 /* Look for fallback prefixes */
1583 p = strdupa(m->path);
1587 e = strrchr(p, '/');
1593 c = hashmap_get(bus->object_callbacks, p);
1594 if (c && c->is_fallback) {
1595 r = c->callback(bus, 0, m, c->userdata);
1603 /* We found some handlers but none wanted to take this, then
1604 * return this -- with one exception, we can handle
1605 * introspection minimally ourselves */
1606 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1609 sd_bus_error_set(&error,
1610 "org.freedesktop.DBus.Error.UnknownMethod",
1611 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1613 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1617 r = sd_bus_send(bus, reply, NULL);
1624 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1625 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1626 _cleanup_free_ char *introspection = NULL;
1627 _cleanup_set_free_free_ Set *s = NULL;
1628 _cleanup_fclose_ FILE *f = NULL;
1629 struct object_callback *c;
1638 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1644 s = set_new(string_hash_func, string_compare_func);
1648 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1652 if (streq(c->path, "/"))
1655 if (streq(m->path, "/"))
1658 e = startswith(c->path, m->path);
1659 if (!e || *e != '/')
1680 f = open_memstream(&introspection, &size);
1684 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1685 fputs("<node>\n", f);
1686 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1687 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1689 while ((node = set_steal_first(s))) {
1690 fprintf(f, " <node name=\"%s\"/>\n", node);
1694 fputs("</node>\n", f);
1701 r = sd_bus_message_new_method_return(bus, m, &reply);
1705 r = sd_bus_message_append(reply, "s", introspection);
1709 r = sd_bus_send(bus, reply, NULL);
1716 static int process_message(sd_bus *bus, sd_bus_message *m) {
1722 r = process_hello(bus, m);
1726 r = process_reply(bus, m);
1730 r = process_filter(bus, m);
1734 r = process_builtin(bus, m);
1738 r = process_object(bus, m);
1742 return process_introspect(bus, m);
1745 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1746 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1750 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1752 r = process_timeout(bus);
1756 r = dispatch_wqueue(bus);
1760 r = dispatch_rqueue(bus, &m);
1766 r = process_message(bus, m);
1776 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1777 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1778 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1780 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1782 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1786 r = sd_bus_send(bus, reply, NULL);
1800 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1803 /* Returns 0 when we didn't do anything. This should cause the
1804 * caller to invoke sd_bus_wait() before returning the next
1805 * time. Returns > 0 when we did something, which possibly
1806 * means *ret is filled in with an unprocessed message. */
1813 switch (bus->state) {
1819 r = bus_socket_process_opening(bus);
1826 case BUS_AUTHENTICATING:
1828 r = bus_socket_process_authenticating(bus);
1838 return process_running(bus, ret);
1841 assert_not_reached("Unknown state");
1844 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1855 e = sd_bus_get_events(bus);
1862 r = sd_bus_get_timeout(bus, &until);
1869 n = now(CLOCK_MONOTONIC);
1870 m = until > n ? until - n : 0;
1873 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1880 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1884 return r > 0 ? 1 : 0;
1887 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1891 if (bus->state == BUS_UNSET)
1895 if (bus->rqueue_size > 0)
1898 return bus_poll(bus, false, timeout_usec);
1901 int sd_bus_flush(sd_bus *bus) {
1906 if (bus->state == BUS_UNSET)
1911 r = bus_ensure_running(bus);
1915 if (bus->wqueue_size <= 0)
1919 r = dispatch_wqueue(bus);
1923 if (bus->wqueue_size <= 0)
1926 r = bus_poll(bus, false, (uint64_t) -1);
1932 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1933 struct filter_callback *f;
1940 f = new(struct filter_callback, 1);
1943 f->callback = callback;
1944 f->userdata = userdata;
1946 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1950 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1951 struct filter_callback *f;
1958 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1959 if (f->callback == callback && f->userdata == userdata) {
1960 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1969 static int bus_add_object(
1973 sd_message_handler_t callback,
1976 struct object_callback *c;
1986 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
1990 c = new(struct object_callback, 1);
1994 c->path = strdup(path);
2000 c->callback = callback;
2001 c->userdata = userdata;
2002 c->is_fallback = fallback;
2004 r = hashmap_put(bus->object_callbacks, c->path, c);
2014 static int bus_remove_object(
2018 sd_message_handler_t callback,
2021 struct object_callback *c;
2030 c = hashmap_get(bus->object_callbacks, path);
2034 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2037 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2045 int sd_bus_add_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2046 return bus_add_object(bus, false, path, callback, userdata);
2049 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2050 return bus_remove_object(bus, false, path, callback, userdata);
2053 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2054 return bus_add_object(bus, true, prefix, callback, userdata);
2057 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2058 return bus_remove_object(bus, true, prefix, callback, userdata);