1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
36 #include "bus-internal.h"
37 #include "bus-message.h"
39 #include "bus-socket.h"
41 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
43 static void bus_free(sd_bus *b) {
44 struct filter_callback *f;
45 struct object_callback *c;
51 close_nointr_nofail(b->fd);
59 strv_free(b->exec_argv);
61 close_many(b->fds, b->n_fds);
64 for (i = 0; i < b->rqueue_size; i++)
65 sd_bus_message_unref(b->rqueue[i]);
68 for (i = 0; i < b->wqueue_size; i++)
69 sd_bus_message_unref(b->wqueue[i]);
72 hashmap_free_free(b->reply_callbacks);
73 prioq_free(b->reply_callbacks_prioq);
75 while ((f = b->filter_callbacks)) {
76 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
80 while ((c = hashmap_steal_first(b->object_callbacks))) {
85 hashmap_free(b->object_callbacks);
90 int sd_bus_new(sd_bus **ret) {
102 r->message_version = 1;
103 r->negotiate_fds = true;
105 /* We guarantee that wqueue always has space for at least one
107 r->wqueue = new(sd_bus_message*, 1);
117 int sd_bus_set_address(sd_bus *bus, const char *address) {
122 if (bus->state != BUS_UNSET)
137 int sd_bus_set_fd(sd_bus *bus, int fd) {
140 if (bus->state != BUS_UNSET)
149 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
154 if (bus->state != BUS_UNSET)
158 if (strv_isempty(argv))
171 free(bus->exec_path);
172 strv_free(bus->exec_argv);
180 int sd_bus_set_bus_client(sd_bus *bus, int b) {
183 if (bus->state != BUS_UNSET)
186 bus->bus_client = !!b;
190 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
193 if (bus->state != BUS_UNSET)
196 bus->negotiate_fds = !!b;
200 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
205 assert(bus->state == BUS_HELLO);
212 r = sd_bus_message_read(reply, "s", &s);
216 if (!service_name_is_valid(s) || s[0] != ':')
219 bus->unique_name = strdup(s);
220 if (!bus->unique_name)
223 bus->state = BUS_RUNNING;
228 static int bus_send_hello(sd_bus *bus) {
229 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
234 if (!bus->bus_client)
237 r = sd_bus_message_new_method_call(
239 "org.freedesktop.DBus",
241 "org.freedesktop.DBus",
247 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
254 int bus_start_running(sd_bus *bus) {
257 if (bus->bus_client) {
258 bus->state = BUS_HELLO;
262 bus->state = BUS_RUNNING;
266 static int parse_address_key(const char **p, const char *key, char **value) {
277 if (strncmp(*p, key, l) != 0)
290 while (*a != ';' && *a != ',' && *a != 0) {
308 c = (char) ((x << 4) | y);
315 t = realloc(r, n + 2);
343 static void skip_address_key(const char **p) {
347 *p += strcspn(*p, ",");
353 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
354 _cleanup_free_ char *path = NULL, *abstract = NULL;
363 while (**p != 0 && **p != ';') {
364 r = parse_address_key(p, "guid", guid);
370 r = parse_address_key(p, "path", &path);
376 r = parse_address_key(p, "abstract", &abstract);
385 if (!path && !abstract)
388 if (path && abstract)
393 if (l > sizeof(b->sockaddr.un.sun_path))
396 b->sockaddr.un.sun_family = AF_UNIX;
397 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
398 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
399 } else if (abstract) {
400 l = strlen(abstract);
401 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
404 b->sockaddr.un.sun_family = AF_UNIX;
405 b->sockaddr.un.sun_path[0] = 0;
406 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
407 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
413 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
414 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
415 struct addrinfo hints, *result;
423 while (**p != 0 && **p != ';') {
424 r = parse_address_key(p, "guid", guid);
430 r = parse_address_key(p, "host", &host);
436 r = parse_address_key(p, "port", &port);
442 r = parse_address_key(p, "family", &family);
455 hints.ai_socktype = SOCK_STREAM;
456 hints.ai_flags = AI_ADDRCONFIG;
459 if (streq(family, "ipv4"))
460 hints.ai_family = AF_INET;
461 else if (streq(family, "ipv6"))
462 hints.ai_family = AF_INET6;
467 r = getaddrinfo(host, port, &hints, &result);
471 return -EADDRNOTAVAIL;
473 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
474 b->sockaddr_size = result->ai_addrlen;
476 freeaddrinfo(result);
481 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
483 unsigned n_argv = 0, j;
492 while (**p != 0 && **p != ';') {
493 r = parse_address_key(p, "guid", guid);
499 r = parse_address_key(p, "path", &path);
505 if (startswith(*p, "argv")) {
509 ul = strtoul(*p + 4, (char**) p, 10);
510 if (errno != 0 || **p != '=' || ul > 256) {
520 x = realloc(argv, sizeof(char*) * (ul + 2));
526 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
532 r = parse_address_key(p, NULL, argv + ul);
545 /* Make sure there are no holes in the array, with the
546 * exception of argv[0] */
547 for (j = 1; j < n_argv; j++)
553 if (argv && argv[0] == NULL) {
554 argv[0] = strdup(path);
566 for (j = 0; j < n_argv; j++)
574 static void bus_reset_parsed_address(sd_bus *b) {
578 b->sockaddr_size = 0;
579 strv_free(b->exec_argv);
583 b->peer = SD_ID128_NULL;
586 static int bus_parse_next_address(sd_bus *b) {
587 _cleanup_free_ char *guid = NULL;
595 if (b->address[b->address_index] == 0)
598 bus_reset_parsed_address(b);
600 a = b->address + b->address_index;
609 if (startswith(a, "unix:")) {
612 r = parse_unix_address(b, &a, &guid);
617 } else if (startswith(a, "tcp:")) {
620 r = parse_tcp_address(b, &a, &guid);
626 } else if (startswith(a, "unixexec:")) {
629 r = parse_exec_address(b, &a, &guid);
643 r = sd_id128_from_string(guid, &b->peer);
648 b->address_index = a - b->address;
652 static int bus_start_address(sd_bus *b) {
659 close_nointr_nofail(b->fd);
663 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
665 r = bus_socket_connect(b);
669 b->last_connect_error = -r;
671 } else if (b->exec_path) {
673 r = bus_socket_exec(b);
677 b->last_connect_error = -r;
680 r = bus_parse_next_address(b);
684 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
688 int bus_next_address(sd_bus *b) {
691 bus_reset_parsed_address(b);
692 return bus_start_address(b);
695 static int bus_start_fd(sd_bus *b) {
700 r = fd_nonblock(b->fd, true);
704 r = fd_cloexec(b->fd, true);
708 return bus_socket_take_fd(b);
711 int sd_bus_start(sd_bus *bus) {
716 if (bus->state != BUS_UNSET)
719 bus->state = BUS_OPENING;
722 r = bus_start_fd(bus);
723 else if (bus->address)
724 r = bus_start_address(bus);
731 return bus_send_hello(bus);
734 int sd_bus_open_system(sd_bus **ret) {
746 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
748 r = sd_bus_set_address(b, e);
752 b->sockaddr.un.sun_family = AF_UNIX;
753 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
754 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
757 b->bus_client = true;
771 int sd_bus_open_user(sd_bus **ret) {
784 e = getenv("DBUS_SESSION_BUS_ADDRESS");
786 r = sd_bus_set_address(b, e);
790 e = getenv("XDG_RUNTIME_DIR");
797 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
802 b->sockaddr.un.sun_family = AF_UNIX;
803 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
804 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
807 b->bus_client = true;
821 void sd_bus_close(sd_bus *bus) {
827 close_nointr_nofail(bus->fd);
831 sd_bus *sd_bus_ref(sd_bus *bus) {
835 assert(bus->n_ref > 0);
841 sd_bus *sd_bus_unref(sd_bus *bus) {
845 assert(bus->n_ref > 0);
854 int sd_bus_is_open(sd_bus *bus) {
858 return bus->state != BUS_UNSET && bus->fd >= 0;
861 int sd_bus_can_send(sd_bus *bus, char type) {
869 if (type == SD_BUS_TYPE_UNIX_FD) {
870 if (!bus->negotiate_fds)
873 r = bus_ensure_running(bus);
880 return bus_type_is_valid(type);
883 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
891 r = bus_ensure_running(bus);
899 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
902 if (m->header->version > b->message_version)
908 return bus_message_seal(m, ++b->serial);
911 static int dispatch_wqueue(sd_bus *bus) {
915 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
920 while (bus->wqueue_size > 0) {
922 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
927 /* Didn't do anything this time */
929 else if (bus->windex >= bus->wqueue[0]->size) {
930 /* Fully written. Let's drop the entry from
933 * This isn't particularly optimized, but
934 * well, this is supposed to be our worst-case
935 * buffer only, and the socket buffer is
936 * supposed to be our primary buffer, and if
937 * it got full, then all bets are off
940 sd_bus_message_unref(bus->wqueue[0]);
942 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
952 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
953 sd_bus_message *z = NULL;
958 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
963 if (bus->rqueue_size > 0) {
964 /* Dispatch a queued message */
968 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
972 /* Try to read a new message */
974 r = bus_socket_read_message(bus, &z);
989 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
994 if (bus->state == BUS_UNSET)
1002 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1009 /* If the serial number isn't kept, then we know that no reply
1011 if (!serial && !m->sealed)
1012 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1014 r = bus_seal_message(bus, m);
1018 /* If this is a reply and no reply was requested, then let's
1019 * suppress this, if we can */
1020 if (m->dont_send && !serial)
1023 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1026 r = bus_socket_write_message(bus, m, &idx);
1030 } else if (idx < m->size) {
1031 /* Wasn't fully written. So let's remember how
1032 * much was written. Note that the first entry
1033 * of the wqueue array is always allocated so
1034 * that we always can remember how much was
1036 bus->wqueue[0] = sd_bus_message_ref(m);
1037 bus->wqueue_size = 1;
1043 /* Just append it to the queue. */
1045 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1048 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1053 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1057 *serial = BUS_MESSAGE_SERIAL(m);
1062 static usec_t calc_elapse(uint64_t usec) {
1063 if (usec == (uint64_t) -1)
1067 usec = BUS_DEFAULT_TIMEOUT;
1069 return now(CLOCK_MONOTONIC) + usec;
1072 static int timeout_compare(const void *a, const void *b) {
1073 const struct reply_callback *x = a, *y = b;
1075 if (x->timeout != 0 && y->timeout == 0)
1078 if (x->timeout == 0 && y->timeout != 0)
1081 if (x->timeout < y->timeout)
1084 if (x->timeout > y->timeout)
1090 int sd_bus_send_with_reply(
1093 sd_message_handler_t callback,
1098 struct reply_callback *c;
1103 if (bus->state == BUS_UNSET)
1111 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1113 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1116 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1120 if (usec != (uint64_t) -1) {
1121 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1126 r = bus_seal_message(bus, m);
1130 c = new(struct reply_callback, 1);
1134 c->callback = callback;
1135 c->userdata = userdata;
1136 c->serial = BUS_MESSAGE_SERIAL(m);
1137 c->timeout = calc_elapse(usec);
1139 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1145 if (c->timeout != 0) {
1146 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1149 sd_bus_send_with_reply_cancel(bus, c->serial);
1154 r = sd_bus_send(bus, m, serial);
1156 sd_bus_send_with_reply_cancel(bus, c->serial);
1163 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1164 struct reply_callback *c;
1171 c = hashmap_remove(bus->reply_callbacks, &serial);
1175 if (c->timeout != 0)
1176 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1182 int bus_ensure_running(sd_bus *bus) {
1189 if (bus->state == BUS_UNSET)
1192 if (bus->state == BUS_RUNNING)
1196 r = sd_bus_process(bus, NULL);
1199 if (bus->state == BUS_RUNNING)
1204 r = sd_bus_wait(bus, (uint64_t) -1);
1210 int sd_bus_send_with_reply_and_block(
1214 sd_bus_error *error,
1215 sd_bus_message **reply) {
1226 if (bus->state == BUS_UNSET)
1230 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1232 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1234 if (bus_error_is_dirty(error))
1237 r = bus_ensure_running(bus);
1241 r = sd_bus_send(bus, m, &serial);
1245 timeout = calc_elapse(usec);
1249 sd_bus_message *incoming = NULL;
1254 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1257 /* Make sure there's room for queuing this
1258 * locally, before we read the message */
1260 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1268 r = bus_socket_read_message(bus, &incoming);
1273 if (incoming->reply_serial == serial) {
1274 /* Found a match! */
1276 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1281 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1284 r = sd_bus_error_copy(error, &incoming->error);
1286 sd_bus_message_unref(incoming);
1290 k = bus_error_to_errno(&incoming->error);
1291 sd_bus_message_unref(incoming);
1295 sd_bus_message_unref(incoming);
1299 /* There's already guaranteed to be room for
1300 * this, so need to resize things here */
1301 bus->rqueue[bus->rqueue_size ++] = incoming;
1304 /* Try to read more, right-away */
1313 n = now(CLOCK_MONOTONIC);
1319 left = (uint64_t) -1;
1321 r = bus_poll(bus, true, left);
1325 r = dispatch_wqueue(bus);
1331 int sd_bus_get_fd(sd_bus *bus) {
1341 int sd_bus_get_events(sd_bus *bus) {
1346 if (bus->state == BUS_UNSET)
1351 if (bus->state == BUS_OPENING)
1353 else if (bus->state == BUS_AUTHENTICATING) {
1355 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1360 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1361 if (bus->rqueue_size <= 0)
1363 if (bus->wqueue_size > 0)
1370 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1371 struct reply_callback *c;
1377 if (bus->state == BUS_UNSET)
1382 if (bus->state == BUS_AUTHENTICATING) {
1383 *timeout_usec = bus->auth_timeout;
1387 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1390 c = prioq_peek(bus->reply_callbacks_prioq);
1394 *timeout_usec = c->timeout;
1398 static int process_timeout(sd_bus *bus) {
1399 struct reply_callback *c;
1405 c = prioq_peek(bus->reply_callbacks_prioq);
1409 n = now(CLOCK_MONOTONIC);
1413 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1414 hashmap_remove(bus->reply_callbacks, &c->serial);
1416 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1419 return r < 0 ? r : 1;
1422 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1423 struct reply_callback *c;
1429 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1430 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1433 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1437 if (c->timeout != 0)
1438 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1440 r = c->callback(bus, 0, m, c->userdata);
1446 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1447 struct filter_callback *l;
1450 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1451 r = l->callback(bus, 0, m, l->userdata);
1459 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1460 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1466 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1469 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1472 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1475 if (streq_ptr(m->member, "Ping"))
1476 r = sd_bus_message_new_method_return(bus, m, &reply);
1477 else if (streq_ptr(m->member, "GetMachineId")) {
1481 r = sd_id128_get_machine(&id);
1485 r = sd_bus_message_new_method_return(bus, m, &reply);
1489 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1491 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1493 sd_bus_error_set(&error,
1494 "org.freedesktop.DBus.Error.UnknownMethod",
1495 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1497 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1503 r = sd_bus_send(bus, reply, NULL);
1510 static int process_object(sd_bus *bus, sd_bus_message *m) {
1511 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1512 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1513 struct object_callback *c;
1521 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1524 if (hashmap_isempty(bus->object_callbacks))
1527 c = hashmap_get(bus->object_callbacks, m->path);
1529 r = c->callback(bus, 0, m, c->userdata);
1536 /* Look for fallback prefixes */
1537 p = strdupa(m->path);
1541 e = strrchr(p, '/');
1547 c = hashmap_get(bus->object_callbacks, p);
1548 if (c && c->is_fallback) {
1549 r = c->callback(bus, 0, m, c->userdata);
1557 /* We found some handlers but none wanted to take this, then
1558 * return this -- with one exception, we can handle
1559 * introspection minimally ourselves */
1560 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1563 sd_bus_error_set(&error,
1564 "org.freedesktop.DBus.Error.UnknownMethod",
1565 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1567 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1571 r = sd_bus_send(bus, reply, NULL);
1578 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1579 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1580 _cleanup_free_ char *introspection = NULL;
1581 _cleanup_set_free_free_ Set *s = NULL;
1582 _cleanup_fclose_ FILE *f = NULL;
1583 struct object_callback *c;
1592 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1598 s = set_new(string_hash_func, string_compare_func);
1602 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1606 if (streq(c->path, "/"))
1609 if (streq(m->path, "/"))
1612 e = startswith(c->path, m->path);
1613 if (!e || *e != '/')
1634 f = open_memstream(&introspection, &size);
1638 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1639 fputs("<node>\n", f);
1640 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1641 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1643 while ((node = set_steal_first(s))) {
1644 fprintf(f, " <node name=\"%s\"/>\n", node);
1648 fputs("</node>\n", f);
1655 r = sd_bus_message_new_method_return(bus, m, &reply);
1659 r = sd_bus_message_append(reply, "s", introspection);
1663 r = sd_bus_send(bus, reply, NULL);
1670 static int process_message(sd_bus *bus, sd_bus_message *m) {
1676 r = process_reply(bus, m);
1680 r = process_filter(bus, m);
1684 r = process_builtin(bus, m);
1688 r = process_object(bus, m);
1692 return process_introspect(bus, m);
1695 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1696 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1700 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1702 r = process_timeout(bus);
1706 r = dispatch_wqueue(bus);
1710 r = dispatch_rqueue(bus, &m);
1716 r = process_message(bus, m);
1726 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1727 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1728 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1730 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1732 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1736 r = sd_bus_send(bus, reply, NULL);
1750 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1753 /* Returns 0 when we didn't do anything. This should cause the
1754 * caller to invoke sd_bus_wait() before returning the next
1755 * time. Returns > 0 when we did something, which possibly
1756 * means *ret is filled in with an unprocessed message. */
1763 switch (bus->state) {
1769 r = bus_socket_process_opening(bus);
1776 case BUS_AUTHENTICATING:
1778 r = bus_socket_process_authenticating(bus);
1788 return process_running(bus, ret);
1791 assert_not_reached("Unknown state");
1794 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1805 e = sd_bus_get_events(bus);
1812 r = sd_bus_get_timeout(bus, &until);
1819 n = now(CLOCK_MONOTONIC);
1820 m = until > n ? until - n : 0;
1823 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1830 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1834 return r > 0 ? 1 : 0;
1837 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1841 if (bus->state == BUS_UNSET)
1845 if (bus->rqueue_size > 0)
1848 return bus_poll(bus, false, timeout_usec);
1851 int sd_bus_flush(sd_bus *bus) {
1856 if (bus->state == BUS_UNSET)
1861 r = bus_ensure_running(bus);
1865 if (bus->wqueue_size <= 0)
1869 r = dispatch_wqueue(bus);
1873 if (bus->wqueue_size <= 0)
1876 r = bus_poll(bus, false, (uint64_t) -1);
1882 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1883 struct filter_callback *f;
1890 f = new(struct filter_callback, 1);
1893 f->callback = callback;
1894 f->userdata = userdata;
1896 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1900 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1901 struct filter_callback *f;
1908 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1909 if (f->callback == callback && f->userdata == userdata) {
1910 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1919 static int bus_add_object(
1923 sd_message_handler_t callback,
1926 struct object_callback *c;
1936 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
1940 c = new(struct object_callback, 1);
1944 c->path = strdup(path);
1950 c->callback = callback;
1951 c->userdata = userdata;
1952 c->is_fallback = fallback;
1954 r = hashmap_put(bus->object_callbacks, c->path, c);
1964 static int bus_remove_object(
1968 sd_message_handler_t callback,
1971 struct object_callback *c;
1980 c = hashmap_get(bus->object_callbacks, path);
1984 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
1987 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
1995 int sd_bus_add_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
1996 return bus_add_object(bus, false, path, callback, userdata);
1999 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2000 return bus_remove_object(bus, false, path, callback, userdata);
2003 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2004 return bus_add_object(bus, true, prefix, callback, userdata);
2007 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2008 return bus_remove_object(bus, true, prefix, callback, userdata);