1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
36 #include "bus-internal.h"
37 #include "bus-message.h"
39 #include "bus-socket.h"
40 #include "bus-control.h"
42 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
44 static void bus_free(sd_bus *b) {
45 struct filter_callback *f;
46 struct object_callback *c;
52 close_nointr_nofail(b->fd);
60 strv_free(b->exec_argv);
62 close_many(b->fds, b->n_fds);
65 for (i = 0; i < b->rqueue_size; i++)
66 sd_bus_message_unref(b->rqueue[i]);
69 for (i = 0; i < b->wqueue_size; i++)
70 sd_bus_message_unref(b->wqueue[i]);
73 hashmap_free_free(b->reply_callbacks);
74 prioq_free(b->reply_callbacks_prioq);
76 while ((f = b->filter_callbacks)) {
77 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
81 while ((c = hashmap_steal_first(b->object_callbacks))) {
86 hashmap_free(b->object_callbacks);
88 bus_match_free(&b->match_callbacks);
93 int sd_bus_new(sd_bus **ret) {
105 r->message_version = 1;
106 r->negotiate_fds = true;
108 /* We guarantee that wqueue always has space for at least one
110 r->wqueue = new(sd_bus_message*, 1);
120 int sd_bus_set_address(sd_bus *bus, const char *address) {
125 if (bus->state != BUS_UNSET)
140 int sd_bus_set_fd(sd_bus *bus, int fd) {
143 if (bus->state != BUS_UNSET)
152 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
157 if (bus->state != BUS_UNSET)
161 if (strv_isempty(argv))
174 free(bus->exec_path);
175 strv_free(bus->exec_argv);
183 int sd_bus_set_bus_client(sd_bus *bus, int b) {
186 if (bus->state != BUS_UNSET)
189 bus->bus_client = !!b;
193 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
196 if (bus->state != BUS_UNSET)
199 bus->negotiate_fds = !!b;
203 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
206 if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
208 if (bus->state != BUS_UNSET)
211 bus->is_server = !!b;
212 bus->server_id = server_id;
216 int sd_bus_set_anonymous(sd_bus *bus, int b) {
219 if (bus->state != BUS_UNSET)
222 bus->anonymous_auth = !!b;
226 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
231 assert(bus->state == BUS_HELLO);
238 r = sd_bus_message_read(reply, "s", &s);
242 if (!service_name_is_valid(s) || s[0] != ':')
245 bus->unique_name = strdup(s);
246 if (!bus->unique_name)
249 bus->state = BUS_RUNNING;
254 static int bus_send_hello(sd_bus *bus) {
255 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
260 if (!bus->bus_client)
263 r = sd_bus_message_new_method_call(
265 "org.freedesktop.DBus",
267 "org.freedesktop.DBus",
273 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
276 int bus_start_running(sd_bus *bus) {
279 if (bus->bus_client) {
280 bus->state = BUS_HELLO;
284 bus->state = BUS_RUNNING;
288 static int parse_address_key(const char **p, const char *key, char **value) {
299 if (strncmp(*p, key, l) != 0)
312 while (*a != ';' && *a != ',' && *a != 0) {
330 c = (char) ((x << 4) | y);
337 t = realloc(r, n + 2);
365 static void skip_address_key(const char **p) {
369 *p += strcspn(*p, ",");
375 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
376 _cleanup_free_ char *path = NULL, *abstract = NULL;
385 while (**p != 0 && **p != ';') {
386 r = parse_address_key(p, "guid", guid);
392 r = parse_address_key(p, "path", &path);
398 r = parse_address_key(p, "abstract", &abstract);
407 if (!path && !abstract)
410 if (path && abstract)
415 if (l > sizeof(b->sockaddr.un.sun_path))
418 b->sockaddr.un.sun_family = AF_UNIX;
419 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
420 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
421 } else if (abstract) {
422 l = strlen(abstract);
423 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
426 b->sockaddr.un.sun_family = AF_UNIX;
427 b->sockaddr.un.sun_path[0] = 0;
428 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
429 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
435 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
436 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
437 struct addrinfo hints, *result;
445 while (**p != 0 && **p != ';') {
446 r = parse_address_key(p, "guid", guid);
452 r = parse_address_key(p, "host", &host);
458 r = parse_address_key(p, "port", &port);
464 r = parse_address_key(p, "family", &family);
477 hints.ai_socktype = SOCK_STREAM;
478 hints.ai_flags = AI_ADDRCONFIG;
481 if (streq(family, "ipv4"))
482 hints.ai_family = AF_INET;
483 else if (streq(family, "ipv6"))
484 hints.ai_family = AF_INET6;
489 r = getaddrinfo(host, port, &hints, &result);
493 return -EADDRNOTAVAIL;
495 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
496 b->sockaddr_size = result->ai_addrlen;
498 freeaddrinfo(result);
503 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
505 unsigned n_argv = 0, j;
514 while (**p != 0 && **p != ';') {
515 r = parse_address_key(p, "guid", guid);
521 r = parse_address_key(p, "path", &path);
527 if (startswith(*p, "argv")) {
531 ul = strtoul(*p + 4, (char**) p, 10);
532 if (errno > 0 || **p != '=' || ul > 256) {
542 x = realloc(argv, sizeof(char*) * (ul + 2));
548 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
554 r = parse_address_key(p, NULL, argv + ul);
569 /* Make sure there are no holes in the array, with the
570 * exception of argv[0] */
571 for (j = 1; j < n_argv; j++)
577 if (argv && argv[0] == NULL) {
578 argv[0] = strdup(path);
590 for (j = 0; j < n_argv; j++)
598 static void bus_reset_parsed_address(sd_bus *b) {
602 b->sockaddr_size = 0;
603 strv_free(b->exec_argv);
607 b->server_id = SD_ID128_NULL;
610 static int bus_parse_next_address(sd_bus *b) {
611 _cleanup_free_ char *guid = NULL;
619 if (b->address[b->address_index] == 0)
622 bus_reset_parsed_address(b);
624 a = b->address + b->address_index;
633 if (startswith(a, "unix:")) {
636 r = parse_unix_address(b, &a, &guid);
641 } else if (startswith(a, "tcp:")) {
644 r = parse_tcp_address(b, &a, &guid);
650 } else if (startswith(a, "unixexec:")) {
653 r = parse_exec_address(b, &a, &guid);
667 r = sd_id128_from_string(guid, &b->server_id);
672 b->address_index = a - b->address;
676 static int bus_start_address(sd_bus *b) {
683 close_nointr_nofail(b->fd);
687 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
689 r = bus_socket_connect(b);
693 b->last_connect_error = -r;
695 } else if (b->exec_path) {
697 r = bus_socket_exec(b);
701 b->last_connect_error = -r;
704 r = bus_parse_next_address(b);
708 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
712 int bus_next_address(sd_bus *b) {
715 bus_reset_parsed_address(b);
716 return bus_start_address(b);
719 static int bus_start_fd(sd_bus *b) {
724 r = fd_nonblock(b->fd, true);
728 r = fd_cloexec(b->fd, true);
732 return bus_socket_take_fd(b);
735 int sd_bus_start(sd_bus *bus) {
740 if (bus->state != BUS_UNSET)
743 bus->state = BUS_OPENING;
745 if (bus->is_server && bus->bus_client)
749 r = bus_start_fd(bus);
750 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path)
751 r = bus_start_address(bus);
758 return bus_send_hello(bus);
761 int sd_bus_open_system(sd_bus **ret) {
773 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
775 r = sd_bus_set_address(b, e);
779 b->sockaddr.un.sun_family = AF_UNIX;
780 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
781 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
784 b->bus_client = true;
798 int sd_bus_open_user(sd_bus **ret) {
811 e = getenv("DBUS_SESSION_BUS_ADDRESS");
813 r = sd_bus_set_address(b, e);
817 e = getenv("XDG_RUNTIME_DIR");
824 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
829 b->sockaddr.un.sun_family = AF_UNIX;
830 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
831 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
834 b->bus_client = true;
848 void sd_bus_close(sd_bus *bus) {
854 close_nointr_nofail(bus->fd);
858 sd_bus *sd_bus_ref(sd_bus *bus) {
862 assert(bus->n_ref > 0);
868 sd_bus *sd_bus_unref(sd_bus *bus) {
872 assert(bus->n_ref > 0);
881 int sd_bus_is_open(sd_bus *bus) {
885 return bus->state != BUS_UNSET && bus->fd >= 0;
888 int sd_bus_can_send(sd_bus *bus, char type) {
896 if (type == SD_BUS_TYPE_UNIX_FD) {
897 if (!bus->negotiate_fds)
900 r = bus_ensure_running(bus);
907 return bus_type_is_valid(type);
910 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
918 r = bus_ensure_running(bus);
922 *server_id = bus->server_id;
926 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
929 if (m->header->version > b->message_version)
935 return bus_message_seal(m, ++b->serial);
938 static int dispatch_wqueue(sd_bus *bus) {
942 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
947 while (bus->wqueue_size > 0) {
949 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
954 /* Didn't do anything this time */
956 else if (bus->windex >= bus->wqueue[0]->size) {
957 /* Fully written. Let's drop the entry from
960 * This isn't particularly optimized, but
961 * well, this is supposed to be our worst-case
962 * buffer only, and the socket buffer is
963 * supposed to be our primary buffer, and if
964 * it got full, then all bets are off
967 sd_bus_message_unref(bus->wqueue[0]);
969 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
979 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
980 sd_bus_message *z = NULL;
985 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
990 if (bus->rqueue_size > 0) {
991 /* Dispatch a queued message */
995 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
999 /* Try to read a new message */
1001 r = bus_socket_read_message(bus, &z);
1016 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1021 if (bus->state == BUS_UNSET)
1029 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1036 /* If the serial number isn't kept, then we know that no reply
1038 if (!serial && !m->sealed)
1039 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1041 r = bus_seal_message(bus, m);
1045 /* If this is a reply and no reply was requested, then let's
1046 * suppress this, if we can */
1047 if (m->dont_send && !serial)
1050 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1053 r = bus_socket_write_message(bus, m, &idx);
1057 } else if (idx < m->size) {
1058 /* Wasn't fully written. So let's remember how
1059 * much was written. Note that the first entry
1060 * of the wqueue array is always allocated so
1061 * that we always can remember how much was
1063 bus->wqueue[0] = sd_bus_message_ref(m);
1064 bus->wqueue_size = 1;
1070 /* Just append it to the queue. */
1072 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1075 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1080 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1084 *serial = BUS_MESSAGE_SERIAL(m);
1089 static usec_t calc_elapse(uint64_t usec) {
1090 if (usec == (uint64_t) -1)
1094 usec = BUS_DEFAULT_TIMEOUT;
1096 return now(CLOCK_MONOTONIC) + usec;
1099 static int timeout_compare(const void *a, const void *b) {
1100 const struct reply_callback *x = a, *y = b;
1102 if (x->timeout != 0 && y->timeout == 0)
1105 if (x->timeout == 0 && y->timeout != 0)
1108 if (x->timeout < y->timeout)
1111 if (x->timeout > y->timeout)
1117 int sd_bus_send_with_reply(
1120 sd_bus_message_handler_t callback,
1125 struct reply_callback *c;
1130 if (bus->state == BUS_UNSET)
1138 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1140 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1143 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1147 if (usec != (uint64_t) -1) {
1148 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1153 r = bus_seal_message(bus, m);
1157 c = new(struct reply_callback, 1);
1161 c->callback = callback;
1162 c->userdata = userdata;
1163 c->serial = BUS_MESSAGE_SERIAL(m);
1164 c->timeout = calc_elapse(usec);
1166 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1172 if (c->timeout != 0) {
1173 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1176 sd_bus_send_with_reply_cancel(bus, c->serial);
1181 r = sd_bus_send(bus, m, serial);
1183 sd_bus_send_with_reply_cancel(bus, c->serial);
1190 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1191 struct reply_callback *c;
1198 c = hashmap_remove(bus->reply_callbacks, &serial);
1202 if (c->timeout != 0)
1203 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1209 int bus_ensure_running(sd_bus *bus) {
1216 if (bus->state == BUS_UNSET)
1219 if (bus->state == BUS_RUNNING)
1223 r = sd_bus_process(bus, NULL);
1226 if (bus->state == BUS_RUNNING)
1231 r = sd_bus_wait(bus, (uint64_t) -1);
1237 int sd_bus_send_with_reply_and_block(
1241 sd_bus_error *error,
1242 sd_bus_message **reply) {
1253 if (bus->state == BUS_UNSET)
1257 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1259 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1261 if (bus_error_is_dirty(error))
1264 r = bus_ensure_running(bus);
1268 r = sd_bus_send(bus, m, &serial);
1272 timeout = calc_elapse(usec);
1276 sd_bus_message *incoming = NULL;
1281 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1284 /* Make sure there's room for queuing this
1285 * locally, before we read the message */
1287 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1295 r = bus_socket_read_message(bus, &incoming);
1300 if (incoming->reply_serial == serial) {
1301 /* Found a match! */
1303 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1308 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1311 r = sd_bus_error_copy(error, &incoming->error);
1313 sd_bus_message_unref(incoming);
1317 k = bus_error_to_errno(&incoming->error);
1318 sd_bus_message_unref(incoming);
1322 sd_bus_message_unref(incoming);
1326 /* There's already guaranteed to be room for
1327 * this, so need to resize things here */
1328 bus->rqueue[bus->rqueue_size ++] = incoming;
1331 /* Try to read more, right-away */
1340 n = now(CLOCK_MONOTONIC);
1346 left = (uint64_t) -1;
1348 r = bus_poll(bus, true, left);
1352 r = dispatch_wqueue(bus);
1358 int sd_bus_get_fd(sd_bus *bus) {
1368 int sd_bus_get_events(sd_bus *bus) {
1373 if (bus->state == BUS_UNSET)
1378 if (bus->state == BUS_OPENING)
1380 else if (bus->state == BUS_AUTHENTICATING) {
1382 if (bus_socket_auth_needs_write(bus))
1387 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1388 if (bus->rqueue_size <= 0)
1390 if (bus->wqueue_size > 0)
1397 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1398 struct reply_callback *c;
1404 if (bus->state == BUS_UNSET)
1409 if (bus->state == BUS_AUTHENTICATING) {
1410 *timeout_usec = bus->auth_timeout;
1414 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1417 c = prioq_peek(bus->reply_callbacks_prioq);
1421 *timeout_usec = c->timeout;
1425 static int process_timeout(sd_bus *bus) {
1426 struct reply_callback *c;
1432 c = prioq_peek(bus->reply_callbacks_prioq);
1436 n = now(CLOCK_MONOTONIC);
1440 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1441 hashmap_remove(bus->reply_callbacks, &c->serial);
1443 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1446 return r < 0 ? r : 1;
1449 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1453 if (bus->state != BUS_HELLO)
1456 /* Let's make sure the first message on the bus is the HELLO
1457 * reply. But note that we don't actually parse the message
1458 * here (we leave that to the usual handling), we just verify
1459 * we don't let any earlier msg through. */
1461 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1462 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1465 if (m->reply_serial != bus->hello_serial)
1471 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1472 struct reply_callback *c;
1478 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1479 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1482 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1486 if (c->timeout != 0)
1487 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1489 r = c->callback(bus, 0, m, c->userdata);
1495 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1496 struct filter_callback *l;
1502 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1503 r = l->callback(bus, 0, m, l->userdata);
1511 static int process_match(sd_bus *bus, sd_bus_message *m) {
1515 return bus_match_run(bus, &bus->match_callbacks, 0, m);
1518 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1519 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1525 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1528 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1531 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1534 if (streq_ptr(m->member, "Ping"))
1535 r = sd_bus_message_new_method_return(bus, m, &reply);
1536 else if (streq_ptr(m->member, "GetMachineId")) {
1540 r = sd_id128_get_machine(&id);
1544 r = sd_bus_message_new_method_return(bus, m, &reply);
1548 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1550 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1552 sd_bus_error_set(&error,
1553 "org.freedesktop.DBus.Error.UnknownMethod",
1554 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1556 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1562 r = sd_bus_send(bus, reply, NULL);
1569 static int process_object(sd_bus *bus, sd_bus_message *m) {
1570 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1571 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1572 struct object_callback *c;
1580 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1583 if (hashmap_isempty(bus->object_callbacks))
1586 c = hashmap_get(bus->object_callbacks, m->path);
1588 r = c->callback(bus, 0, m, c->userdata);
1595 /* Look for fallback prefixes */
1596 p = strdupa(m->path);
1600 e = strrchr(p, '/');
1606 c = hashmap_get(bus->object_callbacks, p);
1607 if (c && c->is_fallback) {
1608 r = c->callback(bus, 0, m, c->userdata);
1616 /* We found some handlers but none wanted to take this, then
1617 * return this -- with one exception, we can handle
1618 * introspection minimally ourselves */
1619 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1622 sd_bus_error_set(&error,
1623 "org.freedesktop.DBus.Error.UnknownMethod",
1624 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1626 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1630 r = sd_bus_send(bus, reply, NULL);
1637 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1638 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1639 _cleanup_free_ char *introspection = NULL;
1640 _cleanup_set_free_free_ Set *s = NULL;
1641 _cleanup_fclose_ FILE *f = NULL;
1642 struct object_callback *c;
1651 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1657 s = set_new(string_hash_func, string_compare_func);
1661 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1665 if (streq(c->path, "/"))
1668 if (streq(m->path, "/"))
1671 e = startswith(c->path, m->path);
1672 if (!e || *e != '/')
1693 f = open_memstream(&introspection, &size);
1697 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1698 fputs("<node>\n", f);
1699 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1700 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1702 while ((node = set_steal_first(s))) {
1703 fprintf(f, " <node name=\"%s\"/>\n", node);
1707 fputs("</node>\n", f);
1714 r = sd_bus_message_new_method_return(bus, m, &reply);
1718 r = sd_bus_message_append(reply, "s", introspection);
1722 r = sd_bus_send(bus, reply, NULL);
1729 static int process_message(sd_bus *bus, sd_bus_message *m) {
1735 r = process_hello(bus, m);
1739 r = process_reply(bus, m);
1743 r = process_filter(bus, m);
1747 r = process_match(bus, m);
1751 r = process_builtin(bus, m);
1755 r = process_object(bus, m);
1759 return process_introspect(bus, m);
1762 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1763 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1767 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1769 r = process_timeout(bus);
1773 r = dispatch_wqueue(bus);
1777 r = dispatch_rqueue(bus, &m);
1783 r = process_message(bus, m);
1793 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1794 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1795 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1797 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1799 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1803 r = sd_bus_send(bus, reply, NULL);
1817 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1820 /* Returns 0 when we didn't do anything. This should cause the
1821 * caller to invoke sd_bus_wait() before returning the next
1822 * time. Returns > 0 when we did something, which possibly
1823 * means *ret is filled in with an unprocessed message. */
1830 switch (bus->state) {
1836 r = bus_socket_process_opening(bus);
1843 case BUS_AUTHENTICATING:
1845 r = bus_socket_process_authenticating(bus);
1855 return process_running(bus, ret);
1858 assert_not_reached("Unknown state");
1861 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1872 e = sd_bus_get_events(bus);
1879 r = sd_bus_get_timeout(bus, &until);
1886 n = now(CLOCK_MONOTONIC);
1887 m = until > n ? until - n : 0;
1890 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1897 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1901 return r > 0 ? 1 : 0;
1904 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1908 if (bus->state == BUS_UNSET)
1912 if (bus->rqueue_size > 0)
1915 return bus_poll(bus, false, timeout_usec);
1918 int sd_bus_flush(sd_bus *bus) {
1923 if (bus->state == BUS_UNSET)
1928 r = bus_ensure_running(bus);
1932 if (bus->wqueue_size <= 0)
1936 r = dispatch_wqueue(bus);
1940 if (bus->wqueue_size <= 0)
1943 r = bus_poll(bus, false, (uint64_t) -1);
1949 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
1950 struct filter_callback *f;
1957 f = new(struct filter_callback, 1);
1960 f->callback = callback;
1961 f->userdata = userdata;
1963 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1967 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
1968 struct filter_callback *f;
1975 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1976 if (f->callback == callback && f->userdata == userdata) {
1977 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1986 static int bus_add_object(
1990 sd_bus_message_handler_t callback,
1993 struct object_callback *c;
2003 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2007 c = new(struct object_callback, 1);
2011 c->path = strdup(path);
2017 c->callback = callback;
2018 c->userdata = userdata;
2019 c->is_fallback = fallback;
2021 r = hashmap_put(bus->object_callbacks, c->path, c);
2031 static int bus_remove_object(
2035 sd_bus_message_handler_t callback,
2038 struct object_callback *c;
2047 c = hashmap_get(bus->object_callbacks, path);
2051 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2054 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2062 int sd_bus_add_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2063 return bus_add_object(bus, false, path, callback, userdata);
2066 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2067 return bus_remove_object(bus, false, path, callback, userdata);
2070 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2071 return bus_add_object(bus, true, prefix, callback, userdata);
2074 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2075 return bus_remove_object(bus, true, prefix, callback, userdata);
2078 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2086 if (bus->bus_client) {
2087 r = bus_add_match_internal(bus, match);
2093 r = bus_match_add(&bus->match_callbacks, match, callback, userdata, NULL);
2096 if (bus->bus_client)
2097 bus_remove_match_internal(bus, match);
2104 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2112 if (bus->bus_client)
2113 r = bus_remove_match_internal(bus, match);
2116 q = bus_match_remove(&bus->match_callbacks, match, callback, userdata);