1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
36 #include "bus-internal.h"
37 #include "bus-message.h"
39 #include "bus-socket.h"
40 #include "bus-control.h"
42 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
44 static void bus_free(sd_bus *b) {
45 struct filter_callback *f;
46 struct object_callback *c;
59 strv_free(b->exec_argv);
61 close_many(b->fds, b->n_fds);
64 for (i = 0; i < b->rqueue_size; i++)
65 sd_bus_message_unref(b->rqueue[i]);
68 for (i = 0; i < b->wqueue_size; i++)
69 sd_bus_message_unref(b->wqueue[i]);
72 hashmap_free_free(b->reply_callbacks);
73 prioq_free(b->reply_callbacks_prioq);
75 while ((f = b->filter_callbacks)) {
76 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
80 while ((c = hashmap_steal_first(b->object_callbacks))) {
85 hashmap_free(b->object_callbacks);
87 bus_match_free(&b->match_callbacks);
92 int sd_bus_new(sd_bus **ret) {
103 r->input_fd = r->output_fd = -1;
104 r->message_version = 1;
105 r->negotiate_fds = true;
107 /* We guarantee that wqueue always has space for at least one
109 r->wqueue = new(sd_bus_message*, 1);
119 int sd_bus_set_address(sd_bus *bus, const char *address) {
124 if (bus->state != BUS_UNSET)
139 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
142 if (bus->state != BUS_UNSET)
149 bus->input_fd = input_fd;
150 bus->output_fd = output_fd;
154 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
159 if (bus->state != BUS_UNSET)
163 if (strv_isempty(argv))
176 free(bus->exec_path);
177 strv_free(bus->exec_argv);
185 int sd_bus_set_bus_client(sd_bus *bus, int b) {
188 if (bus->state != BUS_UNSET)
191 bus->bus_client = !!b;
195 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
198 if (bus->state != BUS_UNSET)
201 bus->negotiate_fds = !!b;
205 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
208 if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
210 if (bus->state != BUS_UNSET)
213 bus->is_server = !!b;
214 bus->server_id = server_id;
218 int sd_bus_set_anonymous(sd_bus *bus, int b) {
221 if (bus->state != BUS_UNSET)
224 bus->anonymous_auth = !!b;
228 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
233 assert(bus->state == BUS_HELLO);
240 r = sd_bus_message_read(reply, "s", &s);
244 if (!service_name_is_valid(s) || s[0] != ':')
247 bus->unique_name = strdup(s);
248 if (!bus->unique_name)
251 bus->state = BUS_RUNNING;
256 static int bus_send_hello(sd_bus *bus) {
257 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
262 if (!bus->bus_client)
265 r = sd_bus_message_new_method_call(
267 "org.freedesktop.DBus",
269 "org.freedesktop.DBus",
275 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
278 int bus_start_running(sd_bus *bus) {
281 if (bus->bus_client) {
282 bus->state = BUS_HELLO;
286 bus->state = BUS_RUNNING;
290 static int parse_address_key(const char **p, const char *key, char **value) {
301 if (strncmp(*p, key, l) != 0)
314 while (*a != ';' && *a != ',' && *a != 0) {
332 c = (char) ((x << 4) | y);
339 t = realloc(r, n + 2);
367 static void skip_address_key(const char **p) {
371 *p += strcspn(*p, ",");
377 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
378 _cleanup_free_ char *path = NULL, *abstract = NULL;
387 while (**p != 0 && **p != ';') {
388 r = parse_address_key(p, "guid", guid);
394 r = parse_address_key(p, "path", &path);
400 r = parse_address_key(p, "abstract", &abstract);
409 if (!path && !abstract)
412 if (path && abstract)
417 if (l > sizeof(b->sockaddr.un.sun_path))
420 b->sockaddr.un.sun_family = AF_UNIX;
421 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
422 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
423 } else if (abstract) {
424 l = strlen(abstract);
425 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
428 b->sockaddr.un.sun_family = AF_UNIX;
429 b->sockaddr.un.sun_path[0] = 0;
430 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
431 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
437 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
438 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
439 struct addrinfo hints, *result;
447 while (**p != 0 && **p != ';') {
448 r = parse_address_key(p, "guid", guid);
454 r = parse_address_key(p, "host", &host);
460 r = parse_address_key(p, "port", &port);
466 r = parse_address_key(p, "family", &family);
479 hints.ai_socktype = SOCK_STREAM;
480 hints.ai_flags = AI_ADDRCONFIG;
483 if (streq(family, "ipv4"))
484 hints.ai_family = AF_INET;
485 else if (streq(family, "ipv6"))
486 hints.ai_family = AF_INET6;
491 r = getaddrinfo(host, port, &hints, &result);
495 return -EADDRNOTAVAIL;
497 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
498 b->sockaddr_size = result->ai_addrlen;
500 freeaddrinfo(result);
505 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
507 unsigned n_argv = 0, j;
516 while (**p != 0 && **p != ';') {
517 r = parse_address_key(p, "guid", guid);
523 r = parse_address_key(p, "path", &path);
529 if (startswith(*p, "argv")) {
533 ul = strtoul(*p + 4, (char**) p, 10);
534 if (errno > 0 || **p != '=' || ul > 256) {
544 x = realloc(argv, sizeof(char*) * (ul + 2));
550 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
556 r = parse_address_key(p, NULL, argv + ul);
571 /* Make sure there are no holes in the array, with the
572 * exception of argv[0] */
573 for (j = 1; j < n_argv; j++)
579 if (argv && argv[0] == NULL) {
580 argv[0] = strdup(path);
592 for (j = 0; j < n_argv; j++)
600 static void bus_reset_parsed_address(sd_bus *b) {
604 b->sockaddr_size = 0;
605 strv_free(b->exec_argv);
609 b->server_id = SD_ID128_NULL;
612 static int bus_parse_next_address(sd_bus *b) {
613 _cleanup_free_ char *guid = NULL;
621 if (b->address[b->address_index] == 0)
624 bus_reset_parsed_address(b);
626 a = b->address + b->address_index;
635 if (startswith(a, "unix:")) {
638 r = parse_unix_address(b, &a, &guid);
643 } else if (startswith(a, "tcp:")) {
646 r = parse_tcp_address(b, &a, &guid);
652 } else if (startswith(a, "unixexec:")) {
655 r = parse_exec_address(b, &a, &guid);
669 r = sd_id128_from_string(guid, &b->server_id);
674 b->address_index = a - b->address;
678 static int bus_start_address(sd_bus *b) {
686 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
688 r = bus_socket_connect(b);
692 b->last_connect_error = -r;
694 } else if (b->exec_path) {
696 r = bus_socket_exec(b);
700 b->last_connect_error = -r;
703 r = bus_parse_next_address(b);
707 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
711 int bus_next_address(sd_bus *b) {
714 bus_reset_parsed_address(b);
715 return bus_start_address(b);
718 static int bus_start_fd(sd_bus *b) {
722 assert(b->input_fd >= 0);
723 assert(b->output_fd >= 0);
725 r = fd_nonblock(b->input_fd, true);
729 r = fd_cloexec(b->input_fd, true);
733 if (b->input_fd != b->output_fd) {
734 r = fd_nonblock(b->output_fd, true);
738 r = fd_cloexec(b->output_fd, true);
743 return bus_socket_take_fd(b);
746 int sd_bus_start(sd_bus *bus) {
751 if (bus->state != BUS_UNSET)
754 bus->state = BUS_OPENING;
756 if (bus->is_server && bus->bus_client)
759 if (bus->input_fd >= 0)
760 r = bus_start_fd(bus);
761 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path)
762 r = bus_start_address(bus);
769 return bus_send_hello(bus);
772 int sd_bus_open_system(sd_bus **ret) {
784 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
786 r = sd_bus_set_address(b, e);
790 b->sockaddr.un.sun_family = AF_UNIX;
791 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
792 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
795 b->bus_client = true;
809 int sd_bus_open_user(sd_bus **ret) {
822 e = getenv("DBUS_SESSION_BUS_ADDRESS");
824 r = sd_bus_set_address(b, e);
828 e = getenv("XDG_RUNTIME_DIR");
835 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
840 b->sockaddr.un.sun_family = AF_UNIX;
841 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
842 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
845 b->bus_client = true;
859 void sd_bus_close(sd_bus *bus) {
863 if (bus->input_fd >= 0)
864 close_nointr_nofail(bus->input_fd);
865 if (bus->output_fd >= 0 && bus->output_fd != bus->input_fd)
866 close_nointr_nofail(bus->output_fd);
868 bus->input_fd = bus->output_fd = -1;
871 sd_bus *sd_bus_ref(sd_bus *bus) {
875 assert(bus->n_ref > 0);
881 sd_bus *sd_bus_unref(sd_bus *bus) {
885 assert(bus->n_ref > 0);
894 int sd_bus_is_open(sd_bus *bus) {
898 return bus->state != BUS_UNSET && bus->input_fd >= 0;
901 int sd_bus_can_send(sd_bus *bus, char type) {
906 if (bus->output_fd < 0)
909 if (type == SD_BUS_TYPE_UNIX_FD) {
910 if (!bus->negotiate_fds)
913 r = bus_ensure_running(bus);
920 return bus_type_is_valid(type);
923 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
931 r = bus_ensure_running(bus);
935 *server_id = bus->server_id;
939 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
942 if (m->header->version > b->message_version)
948 return bus_message_seal(m, ++b->serial);
951 static int dispatch_wqueue(sd_bus *bus) {
955 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
957 if (bus->output_fd < 0)
960 while (bus->wqueue_size > 0) {
962 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
967 /* Didn't do anything this time */
969 else if (bus->windex >= bus->wqueue[0]->size) {
970 /* Fully written. Let's drop the entry from
973 * This isn't particularly optimized, but
974 * well, this is supposed to be our worst-case
975 * buffer only, and the socket buffer is
976 * supposed to be our primary buffer, and if
977 * it got full, then all bets are off
980 sd_bus_message_unref(bus->wqueue[0]);
982 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
992 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
993 sd_bus_message *z = NULL;
998 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1000 if (bus->input_fd < 0)
1003 if (bus->rqueue_size > 0) {
1004 /* Dispatch a queued message */
1006 *m = bus->rqueue[0];
1007 bus->rqueue_size --;
1008 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1012 /* Try to read a new message */
1014 r = bus_socket_read_message(bus, &z);
1029 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1034 if (bus->state == BUS_UNSET)
1036 if (bus->output_fd < 0)
1042 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1049 /* If the serial number isn't kept, then we know that no reply
1051 if (!serial && !m->sealed)
1052 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1054 r = bus_seal_message(bus, m);
1058 /* If this is a reply and no reply was requested, then let's
1059 * suppress this, if we can */
1060 if (m->dont_send && !serial)
1063 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1066 r = bus_socket_write_message(bus, m, &idx);
1070 } else if (idx < m->size) {
1071 /* Wasn't fully written. So let's remember how
1072 * much was written. Note that the first entry
1073 * of the wqueue array is always allocated so
1074 * that we always can remember how much was
1076 bus->wqueue[0] = sd_bus_message_ref(m);
1077 bus->wqueue_size = 1;
1083 /* Just append it to the queue. */
1085 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1088 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1093 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1097 *serial = BUS_MESSAGE_SERIAL(m);
1102 static usec_t calc_elapse(uint64_t usec) {
1103 if (usec == (uint64_t) -1)
1107 usec = BUS_DEFAULT_TIMEOUT;
1109 return now(CLOCK_MONOTONIC) + usec;
1112 static int timeout_compare(const void *a, const void *b) {
1113 const struct reply_callback *x = a, *y = b;
1115 if (x->timeout != 0 && y->timeout == 0)
1118 if (x->timeout == 0 && y->timeout != 0)
1121 if (x->timeout < y->timeout)
1124 if (x->timeout > y->timeout)
1130 int sd_bus_send_with_reply(
1133 sd_bus_message_handler_t callback,
1138 struct reply_callback *c;
1143 if (bus->state == BUS_UNSET)
1145 if (bus->output_fd < 0)
1151 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1153 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1156 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1160 if (usec != (uint64_t) -1) {
1161 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1166 r = bus_seal_message(bus, m);
1170 c = new(struct reply_callback, 1);
1174 c->callback = callback;
1175 c->userdata = userdata;
1176 c->serial = BUS_MESSAGE_SERIAL(m);
1177 c->timeout = calc_elapse(usec);
1179 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1185 if (c->timeout != 0) {
1186 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1189 sd_bus_send_with_reply_cancel(bus, c->serial);
1194 r = sd_bus_send(bus, m, serial);
1196 sd_bus_send_with_reply_cancel(bus, c->serial);
1203 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1204 struct reply_callback *c;
1211 c = hashmap_remove(bus->reply_callbacks, &serial);
1215 if (c->timeout != 0)
1216 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1222 int bus_ensure_running(sd_bus *bus) {
1227 if (bus->input_fd < 0)
1229 if (bus->state == BUS_UNSET)
1232 if (bus->state == BUS_RUNNING)
1236 r = sd_bus_process(bus, NULL);
1239 if (bus->state == BUS_RUNNING)
1244 r = sd_bus_wait(bus, (uint64_t) -1);
1250 int sd_bus_send_with_reply_and_block(
1254 sd_bus_error *error,
1255 sd_bus_message **reply) {
1264 if (bus->output_fd < 0)
1266 if (bus->state == BUS_UNSET)
1270 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1272 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1274 if (bus_error_is_dirty(error))
1277 r = bus_ensure_running(bus);
1281 r = sd_bus_send(bus, m, &serial);
1285 timeout = calc_elapse(usec);
1289 sd_bus_message *incoming = NULL;
1294 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1297 /* Make sure there's room for queuing this
1298 * locally, before we read the message */
1300 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1308 r = bus_socket_read_message(bus, &incoming);
1313 if (incoming->reply_serial == serial) {
1314 /* Found a match! */
1316 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1321 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1324 r = sd_bus_error_copy(error, &incoming->error);
1326 sd_bus_message_unref(incoming);
1330 k = bus_error_to_errno(&incoming->error);
1331 sd_bus_message_unref(incoming);
1335 sd_bus_message_unref(incoming);
1339 /* There's already guaranteed to be room for
1340 * this, so need to resize things here */
1341 bus->rqueue[bus->rqueue_size ++] = incoming;
1344 /* Try to read more, right-away */
1353 n = now(CLOCK_MONOTONIC);
1359 left = (uint64_t) -1;
1361 r = bus_poll(bus, true, left);
1365 r = dispatch_wqueue(bus);
1371 int sd_bus_get_fd(sd_bus *bus) {
1374 if (bus->input_fd < 0)
1376 if (bus->input_fd != bus->output_fd)
1379 return bus->input_fd;
1382 int sd_bus_get_events(sd_bus *bus) {
1387 if (bus->state == BUS_UNSET)
1389 if (bus->input_fd < 0)
1392 if (bus->state == BUS_OPENING)
1394 else if (bus->state == BUS_AUTHENTICATING) {
1396 if (bus_socket_auth_needs_write(bus))
1401 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1402 if (bus->rqueue_size <= 0)
1404 if (bus->wqueue_size > 0)
1411 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1412 struct reply_callback *c;
1418 if (bus->state == BUS_UNSET)
1420 if (bus->input_fd < 0)
1423 if (bus->state == BUS_AUTHENTICATING) {
1424 *timeout_usec = bus->auth_timeout;
1428 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1431 c = prioq_peek(bus->reply_callbacks_prioq);
1435 *timeout_usec = c->timeout;
1439 static int process_timeout(sd_bus *bus) {
1440 struct reply_callback *c;
1446 c = prioq_peek(bus->reply_callbacks_prioq);
1450 n = now(CLOCK_MONOTONIC);
1454 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1455 hashmap_remove(bus->reply_callbacks, &c->serial);
1457 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1460 return r < 0 ? r : 1;
1463 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1467 if (bus->state != BUS_HELLO)
1470 /* Let's make sure the first message on the bus is the HELLO
1471 * reply. But note that we don't actually parse the message
1472 * here (we leave that to the usual handling), we just verify
1473 * we don't let any earlier msg through. */
1475 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1476 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1479 if (m->reply_serial != bus->hello_serial)
1485 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1486 struct reply_callback *c;
1492 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1493 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1496 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1500 if (c->timeout != 0)
1501 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1503 r = c->callback(bus, 0, m, c->userdata);
1509 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1510 struct filter_callback *l;
1516 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1517 r = l->callback(bus, 0, m, l->userdata);
1525 static int process_match(sd_bus *bus, sd_bus_message *m) {
1529 return bus_match_run(bus, &bus->match_callbacks, 0, m);
1532 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1533 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1539 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1542 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1545 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1548 if (streq_ptr(m->member, "Ping"))
1549 r = sd_bus_message_new_method_return(bus, m, &reply);
1550 else if (streq_ptr(m->member, "GetMachineId")) {
1554 r = sd_id128_get_machine(&id);
1558 r = sd_bus_message_new_method_return(bus, m, &reply);
1562 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1564 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1566 sd_bus_error_set(&error,
1567 "org.freedesktop.DBus.Error.UnknownMethod",
1568 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1570 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1576 r = sd_bus_send(bus, reply, NULL);
1583 static int process_object(sd_bus *bus, sd_bus_message *m) {
1584 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1585 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1586 struct object_callback *c;
1594 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1597 if (hashmap_isempty(bus->object_callbacks))
1600 c = hashmap_get(bus->object_callbacks, m->path);
1602 r = c->callback(bus, 0, m, c->userdata);
1609 /* Look for fallback prefixes */
1610 p = strdupa(m->path);
1614 e = strrchr(p, '/');
1620 c = hashmap_get(bus->object_callbacks, p);
1621 if (c && c->is_fallback) {
1622 r = c->callback(bus, 0, m, c->userdata);
1630 /* We found some handlers but none wanted to take this, then
1631 * return this -- with one exception, we can handle
1632 * introspection minimally ourselves */
1633 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1636 sd_bus_error_set(&error,
1637 "org.freedesktop.DBus.Error.UnknownMethod",
1638 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1640 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1644 r = sd_bus_send(bus, reply, NULL);
1651 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1652 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1653 _cleanup_free_ char *introspection = NULL;
1654 _cleanup_set_free_free_ Set *s = NULL;
1655 _cleanup_fclose_ FILE *f = NULL;
1656 struct object_callback *c;
1665 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1671 s = set_new(string_hash_func, string_compare_func);
1675 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1679 if (streq(c->path, "/"))
1682 if (streq(m->path, "/"))
1685 e = startswith(c->path, m->path);
1686 if (!e || *e != '/')
1707 f = open_memstream(&introspection, &size);
1711 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1712 fputs("<node>\n", f);
1713 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1714 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1716 while ((node = set_steal_first(s))) {
1717 fprintf(f, " <node name=\"%s\"/>\n", node);
1721 fputs("</node>\n", f);
1728 r = sd_bus_message_new_method_return(bus, m, &reply);
1732 r = sd_bus_message_append(reply, "s", introspection);
1736 r = sd_bus_send(bus, reply, NULL);
1743 static int process_message(sd_bus *bus, sd_bus_message *m) {
1749 r = process_hello(bus, m);
1753 r = process_reply(bus, m);
1757 r = process_filter(bus, m);
1761 r = process_match(bus, m);
1765 r = process_builtin(bus, m);
1769 r = process_object(bus, m);
1773 return process_introspect(bus, m);
1776 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1777 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1781 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1783 r = process_timeout(bus);
1787 r = dispatch_wqueue(bus);
1791 r = dispatch_rqueue(bus, &m);
1797 r = process_message(bus, m);
1807 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1808 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1809 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1811 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1813 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1817 r = sd_bus_send(bus, reply, NULL);
1831 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1834 /* Returns 0 when we didn't do anything. This should cause the
1835 * caller to invoke sd_bus_wait() before returning the next
1836 * time. Returns > 0 when we did something, which possibly
1837 * means *ret is filled in with an unprocessed message. */
1841 if (bus->input_fd < 0)
1844 switch (bus->state) {
1850 r = bus_socket_process_opening(bus);
1857 case BUS_AUTHENTICATING:
1859 r = bus_socket_process_authenticating(bus);
1869 return process_running(bus, ret);
1872 assert_not_reached("Unknown state");
1875 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1883 if (bus->input_fd < 0)
1886 e = sd_bus_get_events(bus);
1893 r = sd_bus_get_timeout(bus, &until);
1900 nw = now(CLOCK_MONOTONIC);
1901 m = until > nw ? until - nw : 0;
1904 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1908 p[0].fd = bus->input_fd;
1910 if (bus->output_fd == bus->input_fd) {
1914 p[0].events = e & POLLIN;
1915 p[1].fd = bus->output_fd;
1916 p[1].events = e & POLLOUT;
1920 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1924 return r > 0 ? 1 : 0;
1927 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1931 if (bus->state == BUS_UNSET)
1933 if (bus->input_fd < 0)
1935 if (bus->rqueue_size > 0)
1938 return bus_poll(bus, false, timeout_usec);
1941 int sd_bus_flush(sd_bus *bus) {
1946 if (bus->state == BUS_UNSET)
1948 if (bus->output_fd < 0)
1951 r = bus_ensure_running(bus);
1955 if (bus->wqueue_size <= 0)
1959 r = dispatch_wqueue(bus);
1963 if (bus->wqueue_size <= 0)
1966 r = bus_poll(bus, false, (uint64_t) -1);
1972 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
1973 struct filter_callback *f;
1980 f = new(struct filter_callback, 1);
1983 f->callback = callback;
1984 f->userdata = userdata;
1986 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1990 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
1991 struct filter_callback *f;
1998 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1999 if (f->callback == callback && f->userdata == userdata) {
2000 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2009 static int bus_add_object(
2013 sd_bus_message_handler_t callback,
2016 struct object_callback *c;
2026 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2030 c = new(struct object_callback, 1);
2034 c->path = strdup(path);
2040 c->callback = callback;
2041 c->userdata = userdata;
2042 c->is_fallback = fallback;
2044 r = hashmap_put(bus->object_callbacks, c->path, c);
2054 static int bus_remove_object(
2058 sd_bus_message_handler_t callback,
2061 struct object_callback *c;
2070 c = hashmap_get(bus->object_callbacks, path);
2074 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2077 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2085 int sd_bus_add_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2086 return bus_add_object(bus, false, path, callback, userdata);
2089 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2090 return bus_remove_object(bus, false, path, callback, userdata);
2093 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2094 return bus_add_object(bus, true, prefix, callback, userdata);
2097 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2098 return bus_remove_object(bus, true, prefix, callback, userdata);
2101 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2109 if (bus->bus_client) {
2110 r = bus_add_match_internal(bus, match);
2116 r = bus_match_add(&bus->match_callbacks, match, callback, userdata, NULL);
2119 if (bus->bus_client)
2120 bus_remove_match_internal(bus, match);
2127 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2135 if (bus->bus_client)
2136 r = bus_remove_match_internal(bus, match);
2139 q = bus_match_remove(&bus->match_callbacks, match, callback, userdata);