1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
36 #include "bus-internal.h"
37 #include "bus-message.h"
39 #include "bus-socket.h"
41 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
43 static void bus_free(sd_bus *b) {
44 struct filter_callback *f;
45 struct object_callback *c;
51 close_nointr_nofail(b->fd);
59 strv_free(b->exec_argv);
61 close_many(b->fds, b->n_fds);
64 for (i = 0; i < b->rqueue_size; i++)
65 sd_bus_message_unref(b->rqueue[i]);
68 for (i = 0; i < b->wqueue_size; i++)
69 sd_bus_message_unref(b->wqueue[i]);
72 hashmap_free_free(b->reply_callbacks);
73 prioq_free(b->reply_callbacks_prioq);
75 while ((f = b->filter_callbacks)) {
76 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
80 while ((c = hashmap_steal_first(b->object_callbacks))) {
85 hashmap_free(b->object_callbacks);
90 int sd_bus_new(sd_bus **ret) {
102 r->message_version = 1;
103 r->negotiate_fds = true;
105 /* We guarantee that wqueue always has space for at least one
107 r->wqueue = new(sd_bus_message*, 1);
117 int sd_bus_set_address(sd_bus *bus, const char *address) {
122 if (bus->state != BUS_UNSET)
137 int sd_bus_set_fd(sd_bus *bus, int fd) {
140 if (bus->state != BUS_UNSET)
149 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
154 if (bus->state != BUS_UNSET)
158 if (strv_isempty(argv))
171 free(bus->exec_path);
172 strv_free(bus->exec_argv);
180 int sd_bus_set_bus_client(sd_bus *bus, int b) {
183 if (bus->state != BUS_UNSET)
186 bus->bus_client = !!b;
190 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
193 if (bus->state != BUS_UNSET)
196 bus->negotiate_fds = !!b;
200 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
205 assert(bus->state == BUS_HELLO);
212 r = sd_bus_message_read(reply, "s", &s);
216 if (!service_name_is_valid(s) || s[0] != ':')
219 bus->unique_name = strdup(s);
220 if (!bus->unique_name)
223 bus->state = BUS_RUNNING;
228 static int bus_send_hello(sd_bus *bus) {
229 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
234 if (!bus->bus_client)
237 r = sd_bus_message_new_method_call(
239 "org.freedesktop.DBus",
241 "org.freedesktop.DBus",
247 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
254 int bus_start_running(sd_bus *bus) {
257 if (bus->bus_client) {
258 bus->state = BUS_HELLO;
262 bus->state = BUS_RUNNING;
266 static int parse_address_key(const char **p, const char *key, char **value) {
277 if (strncmp(*p, key, l) != 0)
290 while (*a != ';' && *a != ',' && *a != 0) {
308 c = (char) ((x << 4) | y);
315 t = realloc(r, n + 2);
343 static void skip_address_key(const char **p) {
347 *p += strcspn(*p, ",");
353 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
354 _cleanup_free_ char *path = NULL, *abstract = NULL;
363 while (**p != 0 && **p != ';') {
364 r = parse_address_key(p, "guid", guid);
370 r = parse_address_key(p, "path", &path);
376 r = parse_address_key(p, "abstract", &abstract);
385 if (!path && !abstract)
388 if (path && abstract)
393 if (l > sizeof(b->sockaddr.un.sun_path))
396 b->sockaddr.un.sun_family = AF_UNIX;
397 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
398 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
399 } else if (abstract) {
400 l = strlen(abstract);
401 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
404 b->sockaddr.un.sun_family = AF_UNIX;
405 b->sockaddr.un.sun_path[0] = 0;
406 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
407 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
413 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
414 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
415 struct addrinfo hints, *result;
423 while (**p != 0 && **p != ';') {
424 r = parse_address_key(p, "guid", guid);
430 r = parse_address_key(p, "host", &host);
436 r = parse_address_key(p, "port", &port);
442 r = parse_address_key(p, "family", &family);
455 hints.ai_socktype = SOCK_STREAM;
456 hints.ai_flags = AI_ADDRCONFIG;
459 if (streq(family, "ipv4"))
460 hints.ai_family = AF_INET;
461 else if (streq(family, "ipv6"))
462 hints.ai_family = AF_INET6;
467 r = getaddrinfo(host, port, &hints, &result);
471 return -EADDRNOTAVAIL;
473 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
474 b->sockaddr_size = result->ai_addrlen;
476 freeaddrinfo(result);
481 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
483 unsigned n_argv = 0, j;
492 while (**p != 0 && **p != ';') {
493 r = parse_address_key(p, "guid", guid);
499 r = parse_address_key(p, "path", &path);
505 if (startswith(*p, "argv")) {
509 ul = strtoul(*p + 4, (char**) p, 10);
510 if (errno != 0 || **p != '=' || ul > 256) {
520 x = realloc(argv, sizeof(char*) * (ul + 2));
526 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
532 r = parse_address_key(p, NULL, argv + ul);
547 /* Make sure there are no holes in the array, with the
548 * exception of argv[0] */
549 for (j = 1; j < n_argv; j++)
555 if (argv && argv[0] == NULL) {
556 argv[0] = strdup(path);
568 for (j = 0; j < n_argv; j++)
576 static void bus_reset_parsed_address(sd_bus *b) {
580 b->sockaddr_size = 0;
581 strv_free(b->exec_argv);
585 b->peer = SD_ID128_NULL;
588 static int bus_parse_next_address(sd_bus *b) {
589 _cleanup_free_ char *guid = NULL;
597 if (b->address[b->address_index] == 0)
600 bus_reset_parsed_address(b);
602 a = b->address + b->address_index;
611 if (startswith(a, "unix:")) {
614 r = parse_unix_address(b, &a, &guid);
619 } else if (startswith(a, "tcp:")) {
622 r = parse_tcp_address(b, &a, &guid);
628 } else if (startswith(a, "unixexec:")) {
631 r = parse_exec_address(b, &a, &guid);
645 r = sd_id128_from_string(guid, &b->peer);
650 b->address_index = a - b->address;
654 static int bus_start_address(sd_bus *b) {
661 close_nointr_nofail(b->fd);
665 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
667 r = bus_socket_connect(b);
671 b->last_connect_error = -r;
673 } else if (b->exec_path) {
675 r = bus_socket_exec(b);
679 b->last_connect_error = -r;
682 r = bus_parse_next_address(b);
686 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
690 int bus_next_address(sd_bus *b) {
693 bus_reset_parsed_address(b);
694 return bus_start_address(b);
697 static int bus_start_fd(sd_bus *b) {
702 r = fd_nonblock(b->fd, true);
706 r = fd_cloexec(b->fd, true);
710 return bus_socket_take_fd(b);
713 int sd_bus_start(sd_bus *bus) {
718 if (bus->state != BUS_UNSET)
721 bus->state = BUS_OPENING;
724 r = bus_start_fd(bus);
725 else if (bus->address)
726 r = bus_start_address(bus);
733 return bus_send_hello(bus);
736 int sd_bus_open_system(sd_bus **ret) {
748 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
750 r = sd_bus_set_address(b, e);
754 b->sockaddr.un.sun_family = AF_UNIX;
755 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
756 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
759 b->bus_client = true;
773 int sd_bus_open_user(sd_bus **ret) {
786 e = getenv("DBUS_SESSION_BUS_ADDRESS");
788 r = sd_bus_set_address(b, e);
792 e = getenv("XDG_RUNTIME_DIR");
799 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
804 b->sockaddr.un.sun_family = AF_UNIX;
805 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
806 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
809 b->bus_client = true;
823 void sd_bus_close(sd_bus *bus) {
829 close_nointr_nofail(bus->fd);
833 sd_bus *sd_bus_ref(sd_bus *bus) {
837 assert(bus->n_ref > 0);
843 sd_bus *sd_bus_unref(sd_bus *bus) {
847 assert(bus->n_ref > 0);
856 int sd_bus_is_open(sd_bus *bus) {
860 return bus->state != BUS_UNSET && bus->fd >= 0;
863 int sd_bus_can_send(sd_bus *bus, char type) {
871 if (type == SD_BUS_TYPE_UNIX_FD) {
872 if (!bus->negotiate_fds)
875 r = bus_ensure_running(bus);
882 return bus_type_is_valid(type);
885 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
893 r = bus_ensure_running(bus);
901 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
904 if (m->header->version > b->message_version)
910 return bus_message_seal(m, ++b->serial);
913 static int dispatch_wqueue(sd_bus *bus) {
917 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
922 while (bus->wqueue_size > 0) {
924 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
929 /* Didn't do anything this time */
931 else if (bus->windex >= bus->wqueue[0]->size) {
932 /* Fully written. Let's drop the entry from
935 * This isn't particularly optimized, but
936 * well, this is supposed to be our worst-case
937 * buffer only, and the socket buffer is
938 * supposed to be our primary buffer, and if
939 * it got full, then all bets are off
942 sd_bus_message_unref(bus->wqueue[0]);
944 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
954 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
955 sd_bus_message *z = NULL;
960 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
965 if (bus->rqueue_size > 0) {
966 /* Dispatch a queued message */
970 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
974 /* Try to read a new message */
976 r = bus_socket_read_message(bus, &z);
991 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
996 if (bus->state == BUS_UNSET)
1004 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1011 /* If the serial number isn't kept, then we know that no reply
1013 if (!serial && !m->sealed)
1014 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1016 r = bus_seal_message(bus, m);
1020 /* If this is a reply and no reply was requested, then let's
1021 * suppress this, if we can */
1022 if (m->dont_send && !serial)
1025 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1028 r = bus_socket_write_message(bus, m, &idx);
1032 } else if (idx < m->size) {
1033 /* Wasn't fully written. So let's remember how
1034 * much was written. Note that the first entry
1035 * of the wqueue array is always allocated so
1036 * that we always can remember how much was
1038 bus->wqueue[0] = sd_bus_message_ref(m);
1039 bus->wqueue_size = 1;
1045 /* Just append it to the queue. */
1047 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1050 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1055 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1059 *serial = BUS_MESSAGE_SERIAL(m);
1064 static usec_t calc_elapse(uint64_t usec) {
1065 if (usec == (uint64_t) -1)
1069 usec = BUS_DEFAULT_TIMEOUT;
1071 return now(CLOCK_MONOTONIC) + usec;
1074 static int timeout_compare(const void *a, const void *b) {
1075 const struct reply_callback *x = a, *y = b;
1077 if (x->timeout != 0 && y->timeout == 0)
1080 if (x->timeout == 0 && y->timeout != 0)
1083 if (x->timeout < y->timeout)
1086 if (x->timeout > y->timeout)
1092 int sd_bus_send_with_reply(
1095 sd_message_handler_t callback,
1100 struct reply_callback *c;
1105 if (bus->state == BUS_UNSET)
1113 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1115 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1118 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1122 if (usec != (uint64_t) -1) {
1123 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1128 r = bus_seal_message(bus, m);
1132 c = new(struct reply_callback, 1);
1136 c->callback = callback;
1137 c->userdata = userdata;
1138 c->serial = BUS_MESSAGE_SERIAL(m);
1139 c->timeout = calc_elapse(usec);
1141 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1147 if (c->timeout != 0) {
1148 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1151 sd_bus_send_with_reply_cancel(bus, c->serial);
1156 r = sd_bus_send(bus, m, serial);
1158 sd_bus_send_with_reply_cancel(bus, c->serial);
1165 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1166 struct reply_callback *c;
1173 c = hashmap_remove(bus->reply_callbacks, &serial);
1177 if (c->timeout != 0)
1178 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1184 int bus_ensure_running(sd_bus *bus) {
1191 if (bus->state == BUS_UNSET)
1194 if (bus->state == BUS_RUNNING)
1198 r = sd_bus_process(bus, NULL);
1201 if (bus->state == BUS_RUNNING)
1206 r = sd_bus_wait(bus, (uint64_t) -1);
1212 int sd_bus_send_with_reply_and_block(
1216 sd_bus_error *error,
1217 sd_bus_message **reply) {
1228 if (bus->state == BUS_UNSET)
1232 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1234 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1236 if (bus_error_is_dirty(error))
1239 r = bus_ensure_running(bus);
1243 r = sd_bus_send(bus, m, &serial);
1247 timeout = calc_elapse(usec);
1251 sd_bus_message *incoming = NULL;
1256 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1259 /* Make sure there's room for queuing this
1260 * locally, before we read the message */
1262 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1270 r = bus_socket_read_message(bus, &incoming);
1275 if (incoming->reply_serial == serial) {
1276 /* Found a match! */
1278 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1283 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1286 r = sd_bus_error_copy(error, &incoming->error);
1288 sd_bus_message_unref(incoming);
1292 k = bus_error_to_errno(&incoming->error);
1293 sd_bus_message_unref(incoming);
1297 sd_bus_message_unref(incoming);
1301 /* There's already guaranteed to be room for
1302 * this, so need to resize things here */
1303 bus->rqueue[bus->rqueue_size ++] = incoming;
1306 /* Try to read more, right-away */
1315 n = now(CLOCK_MONOTONIC);
1321 left = (uint64_t) -1;
1323 r = bus_poll(bus, true, left);
1327 r = dispatch_wqueue(bus);
1333 int sd_bus_get_fd(sd_bus *bus) {
1343 int sd_bus_get_events(sd_bus *bus) {
1348 if (bus->state == BUS_UNSET)
1353 if (bus->state == BUS_OPENING)
1355 else if (bus->state == BUS_AUTHENTICATING) {
1357 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1362 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1363 if (bus->rqueue_size <= 0)
1365 if (bus->wqueue_size > 0)
1372 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1373 struct reply_callback *c;
1379 if (bus->state == BUS_UNSET)
1384 if (bus->state == BUS_AUTHENTICATING) {
1385 *timeout_usec = bus->auth_timeout;
1389 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1392 c = prioq_peek(bus->reply_callbacks_prioq);
1396 *timeout_usec = c->timeout;
1400 static int process_timeout(sd_bus *bus) {
1401 struct reply_callback *c;
1407 c = prioq_peek(bus->reply_callbacks_prioq);
1411 n = now(CLOCK_MONOTONIC);
1415 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1416 hashmap_remove(bus->reply_callbacks, &c->serial);
1418 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1421 return r < 0 ? r : 1;
1424 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1425 struct reply_callback *c;
1431 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1432 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1435 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1439 if (c->timeout != 0)
1440 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1442 r = c->callback(bus, 0, m, c->userdata);
1448 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1449 struct filter_callback *l;
1452 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1453 r = l->callback(bus, 0, m, l->userdata);
1461 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1462 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1468 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1471 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1474 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1477 if (streq_ptr(m->member, "Ping"))
1478 r = sd_bus_message_new_method_return(bus, m, &reply);
1479 else if (streq_ptr(m->member, "GetMachineId")) {
1483 r = sd_id128_get_machine(&id);
1487 r = sd_bus_message_new_method_return(bus, m, &reply);
1491 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1493 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1495 sd_bus_error_set(&error,
1496 "org.freedesktop.DBus.Error.UnknownMethod",
1497 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1499 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1505 r = sd_bus_send(bus, reply, NULL);
1512 static int process_object(sd_bus *bus, sd_bus_message *m) {
1513 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1514 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1515 struct object_callback *c;
1523 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1526 if (hashmap_isempty(bus->object_callbacks))
1529 c = hashmap_get(bus->object_callbacks, m->path);
1531 r = c->callback(bus, 0, m, c->userdata);
1538 /* Look for fallback prefixes */
1539 p = strdupa(m->path);
1543 e = strrchr(p, '/');
1549 c = hashmap_get(bus->object_callbacks, p);
1550 if (c && c->is_fallback) {
1551 r = c->callback(bus, 0, m, c->userdata);
1559 /* We found some handlers but none wanted to take this, then
1560 * return this -- with one exception, we can handle
1561 * introspection minimally ourselves */
1562 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1565 sd_bus_error_set(&error,
1566 "org.freedesktop.DBus.Error.UnknownMethod",
1567 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1569 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1573 r = sd_bus_send(bus, reply, NULL);
1580 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1581 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1582 _cleanup_free_ char *introspection = NULL;
1583 _cleanup_set_free_free_ Set *s = NULL;
1584 _cleanup_fclose_ FILE *f = NULL;
1585 struct object_callback *c;
1594 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1600 s = set_new(string_hash_func, string_compare_func);
1604 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1608 if (streq(c->path, "/"))
1611 if (streq(m->path, "/"))
1614 e = startswith(c->path, m->path);
1615 if (!e || *e != '/')
1636 f = open_memstream(&introspection, &size);
1640 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1641 fputs("<node>\n", f);
1642 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1643 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1645 while ((node = set_steal_first(s))) {
1646 fprintf(f, " <node name=\"%s\"/>\n", node);
1650 fputs("</node>\n", f);
1657 r = sd_bus_message_new_method_return(bus, m, &reply);
1661 r = sd_bus_message_append(reply, "s", introspection);
1665 r = sd_bus_send(bus, reply, NULL);
1672 static int process_message(sd_bus *bus, sd_bus_message *m) {
1678 r = process_reply(bus, m);
1682 r = process_filter(bus, m);
1686 r = process_builtin(bus, m);
1690 r = process_object(bus, m);
1694 return process_introspect(bus, m);
1697 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1698 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1702 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1704 r = process_timeout(bus);
1708 r = dispatch_wqueue(bus);
1712 r = dispatch_rqueue(bus, &m);
1718 r = process_message(bus, m);
1728 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1729 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1730 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1732 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1734 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1738 r = sd_bus_send(bus, reply, NULL);
1752 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1755 /* Returns 0 when we didn't do anything. This should cause the
1756 * caller to invoke sd_bus_wait() before returning the next
1757 * time. Returns > 0 when we did something, which possibly
1758 * means *ret is filled in with an unprocessed message. */
1765 switch (bus->state) {
1771 r = bus_socket_process_opening(bus);
1778 case BUS_AUTHENTICATING:
1780 r = bus_socket_process_authenticating(bus);
1790 return process_running(bus, ret);
1793 assert_not_reached("Unknown state");
1796 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1807 e = sd_bus_get_events(bus);
1814 r = sd_bus_get_timeout(bus, &until);
1821 n = now(CLOCK_MONOTONIC);
1822 m = until > n ? until - n : 0;
1825 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1832 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1836 return r > 0 ? 1 : 0;
1839 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1843 if (bus->state == BUS_UNSET)
1847 if (bus->rqueue_size > 0)
1850 return bus_poll(bus, false, timeout_usec);
1853 int sd_bus_flush(sd_bus *bus) {
1858 if (bus->state == BUS_UNSET)
1863 r = bus_ensure_running(bus);
1867 if (bus->wqueue_size <= 0)
1871 r = dispatch_wqueue(bus);
1875 if (bus->wqueue_size <= 0)
1878 r = bus_poll(bus, false, (uint64_t) -1);
1884 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1885 struct filter_callback *f;
1892 f = new(struct filter_callback, 1);
1895 f->callback = callback;
1896 f->userdata = userdata;
1898 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1902 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1903 struct filter_callback *f;
1910 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1911 if (f->callback == callback && f->userdata == userdata) {
1912 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1921 static int bus_add_object(
1925 sd_message_handler_t callback,
1928 struct object_callback *c;
1938 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
1942 c = new(struct object_callback, 1);
1946 c->path = strdup(path);
1952 c->callback = callback;
1953 c->userdata = userdata;
1954 c->is_fallback = fallback;
1956 r = hashmap_put(bus->object_callbacks, c->path, c);
1966 static int bus_remove_object(
1970 sd_message_handler_t callback,
1973 struct object_callback *c;
1982 c = hashmap_get(bus->object_callbacks, path);
1986 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
1989 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
1997 int sd_bus_add_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
1998 return bus_add_object(bus, false, path, callback, userdata);
2001 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2002 return bus_remove_object(bus, false, path, callback, userdata);
2005 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2006 return bus_add_object(bus, true, prefix, callback, userdata);
2009 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2010 return bus_remove_object(bus, true, prefix, callback, userdata);