1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
36 #include "bus-internal.h"
37 #include "bus-message.h"
39 #include "bus-socket.h"
40 #include "bus-kernel.h"
41 #include "bus-control.h"
43 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
45 static void bus_free(sd_bus *b) {
46 struct filter_callback *f;
47 struct object_callback *c;
61 strv_free(b->exec_argv);
63 close_many(b->fds, b->n_fds);
66 for (i = 0; i < b->rqueue_size; i++)
67 sd_bus_message_unref(b->rqueue[i]);
70 for (i = 0; i < b->wqueue_size; i++)
71 sd_bus_message_unref(b->wqueue[i]);
74 hashmap_free_free(b->reply_callbacks);
75 prioq_free(b->reply_callbacks_prioq);
77 while ((f = b->filter_callbacks)) {
78 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
82 while ((c = hashmap_steal_first(b->object_callbacks))) {
87 hashmap_free(b->object_callbacks);
89 bus_match_free(&b->match_callbacks);
94 int sd_bus_new(sd_bus **ret) {
105 r->input_fd = r->output_fd = -1;
106 r->message_version = 1;
107 r->negotiate_fds = true;
109 /* We guarantee that wqueue always has space for at least one
111 r->wqueue = new(sd_bus_message*, 1);
121 int sd_bus_set_address(sd_bus *bus, const char *address) {
126 if (bus->state != BUS_UNSET)
141 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
144 if (bus->state != BUS_UNSET)
151 bus->input_fd = input_fd;
152 bus->output_fd = output_fd;
156 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
161 if (bus->state != BUS_UNSET)
165 if (strv_isempty(argv))
178 free(bus->exec_path);
179 strv_free(bus->exec_argv);
187 int sd_bus_set_bus_client(sd_bus *bus, int b) {
190 if (bus->state != BUS_UNSET)
193 bus->bus_client = !!b;
197 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
200 if (bus->state != BUS_UNSET)
203 bus->negotiate_fds = !!b;
207 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
210 if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
212 if (bus->state != BUS_UNSET)
215 bus->is_server = !!b;
216 bus->server_id = server_id;
220 int sd_bus_set_anonymous(sd_bus *bus, int b) {
223 if (bus->state != BUS_UNSET)
226 bus->anonymous_auth = !!b;
230 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
235 assert(bus->state == BUS_HELLO);
242 r = sd_bus_message_read(reply, "s", &s);
246 if (!service_name_is_valid(s) || s[0] != ':')
249 bus->unique_name = strdup(s);
250 if (!bus->unique_name)
253 bus->state = BUS_RUNNING;
258 static int bus_send_hello(sd_bus *bus) {
259 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
264 if (!bus->bus_client || bus->is_kernel)
267 r = sd_bus_message_new_method_call(
269 "org.freedesktop.DBus",
271 "org.freedesktop.DBus",
277 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
280 int bus_start_running(sd_bus *bus) {
283 if (bus->bus_client && !bus->is_kernel) {
284 bus->state = BUS_HELLO;
288 bus->state = BUS_RUNNING;
292 static int parse_address_key(const char **p, const char *key, char **value) {
303 if (strncmp(*p, key, l) != 0)
316 while (*a != ';' && *a != ',' && *a != 0) {
334 c = (char) ((x << 4) | y);
341 t = realloc(r, n + 2);
369 static void skip_address_key(const char **p) {
373 *p += strcspn(*p, ",");
379 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
380 _cleanup_free_ char *path = NULL, *abstract = NULL;
389 while (**p != 0 && **p != ';') {
390 r = parse_address_key(p, "guid", guid);
396 r = parse_address_key(p, "path", &path);
402 r = parse_address_key(p, "abstract", &abstract);
411 if (!path && !abstract)
414 if (path && abstract)
419 if (l > sizeof(b->sockaddr.un.sun_path))
422 b->sockaddr.un.sun_family = AF_UNIX;
423 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
424 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
425 } else if (abstract) {
426 l = strlen(abstract);
427 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
430 b->sockaddr.un.sun_family = AF_UNIX;
431 b->sockaddr.un.sun_path[0] = 0;
432 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
433 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
439 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
440 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
442 struct addrinfo *result, hints = {
443 .ai_socktype = SOCK_STREAM,
444 .ai_flags = AI_ADDRCONFIG,
452 while (**p != 0 && **p != ';') {
453 r = parse_address_key(p, "guid", guid);
459 r = parse_address_key(p, "host", &host);
465 r = parse_address_key(p, "port", &port);
471 r = parse_address_key(p, "family", &family);
484 if (streq(family, "ipv4"))
485 hints.ai_family = AF_INET;
486 else if (streq(family, "ipv6"))
487 hints.ai_family = AF_INET6;
492 r = getaddrinfo(host, port, &hints, &result);
496 return -EADDRNOTAVAIL;
498 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
499 b->sockaddr_size = result->ai_addrlen;
501 freeaddrinfo(result);
506 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
508 unsigned n_argv = 0, j;
517 while (**p != 0 && **p != ';') {
518 r = parse_address_key(p, "guid", guid);
524 r = parse_address_key(p, "path", &path);
530 if (startswith(*p, "argv")) {
534 ul = strtoul(*p + 4, (char**) p, 10);
535 if (errno > 0 || **p != '=' || ul > 256) {
545 x = realloc(argv, sizeof(char*) * (ul + 2));
551 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
557 r = parse_address_key(p, NULL, argv + ul);
572 /* Make sure there are no holes in the array, with the
573 * exception of argv[0] */
574 for (j = 1; j < n_argv; j++)
580 if (argv && argv[0] == NULL) {
581 argv[0] = strdup(path);
593 for (j = 0; j < n_argv; j++)
601 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
602 _cleanup_free_ char *path = NULL;
610 while (**p != 0 && **p != ';') {
611 r = parse_address_key(p, "guid", guid);
617 r = parse_address_key(p, "path", &path);
636 static void bus_reset_parsed_address(sd_bus *b) {
640 b->sockaddr_size = 0;
641 strv_free(b->exec_argv);
645 b->server_id = SD_ID128_NULL;
650 static int bus_parse_next_address(sd_bus *b) {
651 _cleanup_free_ char *guid = NULL;
659 if (b->address[b->address_index] == 0)
662 bus_reset_parsed_address(b);
664 a = b->address + b->address_index;
673 if (startswith(a, "unix:")) {
676 r = parse_unix_address(b, &a, &guid);
681 } else if (startswith(a, "tcp:")) {
684 r = parse_tcp_address(b, &a, &guid);
690 } else if (startswith(a, "unixexec:")) {
693 r = parse_exec_address(b, &a, &guid);
699 } else if (startswith(a, "kernel:")) {
702 r = parse_kernel_address(b, &a, &guid);
715 r = sd_id128_from_string(guid, &b->server_id);
720 b->address_index = a - b->address;
724 static int bus_start_address(sd_bus *b) {
732 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
734 r = bus_socket_connect(b);
738 b->last_connect_error = -r;
740 } else if (b->exec_path) {
742 r = bus_socket_exec(b);
746 b->last_connect_error = -r;
747 } else if (b->kernel) {
749 r = bus_kernel_connect(b);
753 b->last_connect_error = -r;
756 r = bus_parse_next_address(b);
760 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
764 int bus_next_address(sd_bus *b) {
767 bus_reset_parsed_address(b);
768 return bus_start_address(b);
771 static int bus_start_fd(sd_bus *b) {
776 assert(b->input_fd >= 0);
777 assert(b->output_fd >= 0);
779 r = fd_nonblock(b->input_fd, true);
783 r = fd_cloexec(b->input_fd, true);
787 if (b->input_fd != b->output_fd) {
788 r = fd_nonblock(b->output_fd, true);
792 r = fd_cloexec(b->output_fd, true);
797 if (fstat(b->input_fd, &st) < 0)
800 if (S_ISCHR(b->input_fd))
801 return bus_kernel_take_fd(b);
803 return bus_socket_take_fd(b);
806 int sd_bus_start(sd_bus *bus) {
811 if (bus->state != BUS_UNSET)
814 bus->state = BUS_OPENING;
816 if (bus->is_server && bus->bus_client)
819 if (bus->input_fd >= 0)
820 r = bus_start_fd(bus);
821 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel)
822 r = bus_start_address(bus);
829 return bus_send_hello(bus);
832 int sd_bus_open_system(sd_bus **ret) {
844 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
846 r = sd_bus_set_address(b, e);
850 b->sockaddr.un.sun_family = AF_UNIX;
851 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
852 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
855 b->bus_client = true;
869 int sd_bus_open_user(sd_bus **ret) {
882 e = getenv("DBUS_SESSION_BUS_ADDRESS");
884 r = sd_bus_set_address(b, e);
888 e = getenv("XDG_RUNTIME_DIR");
895 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
900 b->sockaddr.un.sun_family = AF_UNIX;
901 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
902 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
905 b->bus_client = true;
919 void sd_bus_close(sd_bus *bus) {
923 if (bus->input_fd >= 0)
924 close_nointr_nofail(bus->input_fd);
925 if (bus->output_fd >= 0 && bus->output_fd != bus->input_fd)
926 close_nointr_nofail(bus->output_fd);
928 bus->input_fd = bus->output_fd = -1;
931 sd_bus *sd_bus_ref(sd_bus *bus) {
935 assert(bus->n_ref > 0);
941 sd_bus *sd_bus_unref(sd_bus *bus) {
945 assert(bus->n_ref > 0);
954 int sd_bus_is_open(sd_bus *bus) {
958 return bus->state != BUS_UNSET && bus->input_fd >= 0;
961 int sd_bus_can_send(sd_bus *bus, char type) {
966 if (bus->output_fd < 0)
969 if (type == SD_BUS_TYPE_UNIX_FD) {
970 if (!bus->negotiate_fds)
973 r = bus_ensure_running(bus);
980 return bus_type_is_valid(type);
983 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
991 r = bus_ensure_running(bus);
995 *server_id = bus->server_id;
999 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1002 if (m->header->version > b->message_version)
1008 return bus_message_seal(m, ++b->serial);
1011 static int dispatch_wqueue(sd_bus *bus) {
1015 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1017 if (bus->output_fd < 0)
1020 while (bus->wqueue_size > 0) {
1023 r = bus_kernel_write_message(bus, bus->wqueue[0]);
1025 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1031 /* Didn't do anything this time */
1033 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1034 /* Fully written. Let's drop the entry from
1037 * This isn't particularly optimized, but
1038 * well, this is supposed to be our worst-case
1039 * buffer only, and the socket buffer is
1040 * supposed to be our primary buffer, and if
1041 * it got full, then all bets are off
1044 sd_bus_message_unref(bus->wqueue[0]);
1045 bus->wqueue_size --;
1046 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1056 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1057 sd_bus_message *z = NULL;
1062 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1064 if (bus->input_fd < 0)
1067 if (bus->rqueue_size > 0) {
1068 /* Dispatch a queued message */
1070 *m = bus->rqueue[0];
1071 bus->rqueue_size --;
1072 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1076 /* Try to read a new message */
1079 r = bus_kernel_read_message(bus, &z);
1081 r = bus_socket_read_message(bus, &z);
1097 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1102 if (bus->state == BUS_UNSET)
1104 if (bus->output_fd < 0)
1110 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1117 /* If the serial number isn't kept, then we know that no reply
1119 if (!serial && !m->sealed)
1120 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1122 r = bus_seal_message(bus, m);
1126 /* If this is a reply and no reply was requested, then let's
1127 * suppress this, if we can */
1128 if (m->dont_send && !serial)
1131 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1135 r = bus_kernel_write_message(bus, m);
1137 r = bus_socket_write_message(bus, m, &idx);
1142 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1143 /* Wasn't fully written. So let's remember how
1144 * much was written. Note that the first entry
1145 * of the wqueue array is always allocated so
1146 * that we always can remember how much was
1148 bus->wqueue[0] = sd_bus_message_ref(m);
1149 bus->wqueue_size = 1;
1155 /* Just append it to the queue. */
1157 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1160 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1165 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1169 *serial = BUS_MESSAGE_SERIAL(m);
1174 static usec_t calc_elapse(uint64_t usec) {
1175 if (usec == (uint64_t) -1)
1179 usec = BUS_DEFAULT_TIMEOUT;
1181 return now(CLOCK_MONOTONIC) + usec;
1184 static int timeout_compare(const void *a, const void *b) {
1185 const struct reply_callback *x = a, *y = b;
1187 if (x->timeout != 0 && y->timeout == 0)
1190 if (x->timeout == 0 && y->timeout != 0)
1193 if (x->timeout < y->timeout)
1196 if (x->timeout > y->timeout)
1202 int sd_bus_send_with_reply(
1205 sd_bus_message_handler_t callback,
1210 struct reply_callback *c;
1215 if (bus->state == BUS_UNSET)
1217 if (bus->output_fd < 0)
1223 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1225 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1228 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1232 if (usec != (uint64_t) -1) {
1233 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1238 r = bus_seal_message(bus, m);
1242 c = new0(struct reply_callback, 1);
1246 c->callback = callback;
1247 c->userdata = userdata;
1248 c->serial = BUS_MESSAGE_SERIAL(m);
1249 c->timeout = calc_elapse(usec);
1251 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1257 if (c->timeout != 0) {
1258 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1261 sd_bus_send_with_reply_cancel(bus, c->serial);
1266 r = sd_bus_send(bus, m, serial);
1268 sd_bus_send_with_reply_cancel(bus, c->serial);
1275 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1276 struct reply_callback *c;
1283 c = hashmap_remove(bus->reply_callbacks, &serial);
1287 if (c->timeout != 0)
1288 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1294 int bus_ensure_running(sd_bus *bus) {
1299 if (bus->input_fd < 0)
1301 if (bus->state == BUS_UNSET)
1304 if (bus->state == BUS_RUNNING)
1308 r = sd_bus_process(bus, NULL);
1311 if (bus->state == BUS_RUNNING)
1316 r = sd_bus_wait(bus, (uint64_t) -1);
1322 int sd_bus_send_with_reply_and_block(
1326 sd_bus_error *error,
1327 sd_bus_message **reply) {
1336 if (bus->output_fd < 0)
1338 if (bus->state == BUS_UNSET)
1342 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1344 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1346 if (bus_error_is_dirty(error))
1349 r = bus_ensure_running(bus);
1353 r = sd_bus_send(bus, m, &serial);
1357 timeout = calc_elapse(usec);
1361 sd_bus_message *incoming = NULL;
1366 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1369 /* Make sure there's room for queuing this
1370 * locally, before we read the message */
1372 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1381 r = bus_kernel_read_message(bus, &incoming);
1383 r = bus_socket_read_message(bus, &incoming);
1388 if (incoming->reply_serial == serial) {
1389 /* Found a match! */
1391 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1396 sd_bus_message_unref(incoming);
1401 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1404 r = sd_bus_error_copy(error, &incoming->error);
1406 sd_bus_message_unref(incoming);
1410 k = bus_error_to_errno(&incoming->error);
1411 sd_bus_message_unref(incoming);
1415 sd_bus_message_unref(incoming);
1419 /* There's already guaranteed to be room for
1420 * this, so need to resize things here */
1421 bus->rqueue[bus->rqueue_size ++] = incoming;
1424 /* Try to read more, right-away */
1433 n = now(CLOCK_MONOTONIC);
1439 left = (uint64_t) -1;
1441 r = bus_poll(bus, true, left);
1445 r = dispatch_wqueue(bus);
1451 int sd_bus_get_fd(sd_bus *bus) {
1454 if (bus->input_fd < 0)
1456 if (bus->input_fd != bus->output_fd)
1459 return bus->input_fd;
1462 int sd_bus_get_events(sd_bus *bus) {
1467 if (bus->state == BUS_UNSET)
1469 if (bus->input_fd < 0)
1472 if (bus->state == BUS_OPENING)
1474 else if (bus->state == BUS_AUTHENTICATING) {
1476 if (bus_socket_auth_needs_write(bus))
1481 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1482 if (bus->rqueue_size <= 0)
1484 if (bus->wqueue_size > 0)
1491 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1492 struct reply_callback *c;
1498 if (bus->state == BUS_UNSET)
1500 if (bus->input_fd < 0)
1503 if (bus->state == BUS_AUTHENTICATING) {
1504 *timeout_usec = bus->auth_timeout;
1508 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1509 *timeout_usec = (uint64_t) -1;
1513 c = prioq_peek(bus->reply_callbacks_prioq);
1515 *timeout_usec = (uint64_t) -1;
1519 *timeout_usec = c->timeout;
1523 static int process_timeout(sd_bus *bus) {
1524 struct reply_callback *c;
1530 c = prioq_peek(bus->reply_callbacks_prioq);
1534 n = now(CLOCK_MONOTONIC);
1538 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1539 hashmap_remove(bus->reply_callbacks, &c->serial);
1541 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1544 return r < 0 ? r : 1;
1547 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1551 if (bus->state != BUS_HELLO)
1554 /* Let's make sure the first message on the bus is the HELLO
1555 * reply. But note that we don't actually parse the message
1556 * here (we leave that to the usual handling), we just verify
1557 * we don't let any earlier msg through. */
1559 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1560 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1563 if (m->reply_serial != bus->hello_serial)
1569 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1570 struct reply_callback *c;
1576 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1577 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1580 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1584 if (c->timeout != 0)
1585 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1587 r = sd_bus_message_rewind(m, true);
1591 r = c->callback(bus, 0, m, c->userdata);
1597 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1598 struct filter_callback *l;
1605 bus->filter_callbacks_modified = false;
1607 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1609 if (bus->filter_callbacks_modified)
1612 /* Don't run this more than once per iteration */
1613 if (l->last_iteration == bus->iteration_counter)
1616 l->last_iteration = bus->iteration_counter;
1618 r = sd_bus_message_rewind(m, true);
1622 r = l->callback(bus, 0, m, l->userdata);
1628 } while (bus->filter_callbacks_modified);
1633 static int process_match(sd_bus *bus, sd_bus_message *m) {
1640 bus->match_callbacks_modified = false;
1642 r = bus_match_run(bus, &bus->match_callbacks, 0, m);
1646 } while (bus->match_callbacks_modified);
1651 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1652 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1658 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1661 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1664 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1667 if (streq_ptr(m->member, "Ping"))
1668 r = sd_bus_message_new_method_return(bus, m, &reply);
1669 else if (streq_ptr(m->member, "GetMachineId")) {
1673 r = sd_id128_get_machine(&id);
1677 r = sd_bus_message_new_method_return(bus, m, &reply);
1681 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1683 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1685 sd_bus_error_set(&error,
1686 "org.freedesktop.DBus.Error.UnknownMethod",
1687 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1689 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1695 r = sd_bus_send(bus, reply, NULL);
1702 static int process_object(sd_bus *bus, sd_bus_message *m) {
1703 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1704 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1705 struct object_callback *c;
1713 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1716 if (hashmap_isempty(bus->object_callbacks))
1719 pl = strlen(m->path);
1724 bus->object_callbacks_modified = false;
1726 c = hashmap_get(bus->object_callbacks, m->path);
1727 if (c && c->last_iteration != bus->iteration_counter) {
1729 c->last_iteration = bus->iteration_counter;
1731 r = sd_bus_message_rewind(m, true);
1735 r = c->callback(bus, 0, m, c->userdata);
1742 /* Look for fallback prefixes */
1747 if (bus->object_callbacks_modified)
1750 e = strrchr(p, '/');
1756 c = hashmap_get(bus->object_callbacks, p);
1757 if (c && c->last_iteration != bus->iteration_counter && c->is_fallback) {
1759 c->last_iteration = bus->iteration_counter;
1761 r = sd_bus_message_rewind(m, true);
1765 r = c->callback(bus, 0, m, c->userdata);
1773 } while (bus->object_callbacks_modified);
1775 /* We found some handlers but none wanted to take this, then
1776 * return this -- with one exception, we can handle
1777 * introspection minimally ourselves */
1778 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1781 sd_bus_error_set(&error,
1782 "org.freedesktop.DBus.Error.UnknownMethod",
1783 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1785 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1789 r = sd_bus_send(bus, reply, NULL);
1796 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1797 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1798 _cleanup_free_ char *introspection = NULL;
1799 _cleanup_set_free_free_ Set *s = NULL;
1800 _cleanup_fclose_ FILE *f = NULL;
1801 struct object_callback *c;
1810 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1816 s = set_new(string_hash_func, string_compare_func);
1820 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1824 if (streq(c->path, "/"))
1827 if (streq(m->path, "/"))
1830 e = startswith(c->path, m->path);
1831 if (!e || *e != '/')
1852 f = open_memstream(&introspection, &size);
1856 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1857 fputs("<node>\n", f);
1858 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1859 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1861 while ((node = set_steal_first(s))) {
1862 fprintf(f, " <node name=\"%s\"/>\n", node);
1866 fputs("</node>\n", f);
1873 r = sd_bus_message_new_method_return(bus, m, &reply);
1877 r = sd_bus_message_append(reply, "s", introspection);
1881 r = sd_bus_send(bus, reply, NULL);
1888 static int process_message(sd_bus *bus, sd_bus_message *m) {
1894 bus->iteration_counter++;
1896 r = process_hello(bus, m);
1900 r = process_reply(bus, m);
1904 r = process_filter(bus, m);
1908 r = process_match(bus, m);
1912 r = process_builtin(bus, m);
1916 r = process_object(bus, m);
1920 return process_introspect(bus, m);
1923 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1924 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1928 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1930 r = process_timeout(bus);
1934 r = dispatch_wqueue(bus);
1938 r = dispatch_rqueue(bus, &m);
1944 r = process_message(bus, m);
1949 r = sd_bus_message_rewind(m, true);
1958 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1959 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1960 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1962 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1964 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1968 r = sd_bus_send(bus, reply, NULL);
1982 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1985 /* Returns 0 when we didn't do anything. This should cause the
1986 * caller to invoke sd_bus_wait() before returning the next
1987 * time. Returns > 0 when we did something, which possibly
1988 * means *ret is filled in with an unprocessed message. */
1992 if (bus->input_fd < 0)
1995 /* We don't allow recursively invoking sd_bus_process(). */
1996 if (bus->processing)
1999 switch (bus->state) {
2005 r = bus_socket_process_opening(bus);
2012 case BUS_AUTHENTICATING:
2014 r = bus_socket_process_authenticating(bus);
2024 bus->processing = true;
2025 r = process_running(bus, ret);
2026 bus->processing = false;
2031 assert_not_reached("Unknown state");
2034 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2035 struct pollfd p[2] = {};
2042 if (bus->input_fd < 0)
2045 e = sd_bus_get_events(bus);
2052 r = sd_bus_get_timeout(bus, &until);
2059 nw = now(CLOCK_MONOTONIC);
2060 m = until > nw ? until - nw : 0;
2063 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2066 p[0].fd = bus->input_fd;
2067 if (bus->output_fd == bus->input_fd) {
2071 p[0].events = e & POLLIN;
2072 p[1].fd = bus->output_fd;
2073 p[1].events = e & POLLOUT;
2077 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2081 return r > 0 ? 1 : 0;
2084 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2088 if (bus->state == BUS_UNSET)
2090 if (bus->input_fd < 0)
2092 if (bus->rqueue_size > 0)
2095 return bus_poll(bus, false, timeout_usec);
2098 int sd_bus_flush(sd_bus *bus) {
2103 if (bus->state == BUS_UNSET)
2105 if (bus->output_fd < 0)
2108 r = bus_ensure_running(bus);
2112 if (bus->wqueue_size <= 0)
2116 r = dispatch_wqueue(bus);
2120 if (bus->wqueue_size <= 0)
2123 r = bus_poll(bus, false, (uint64_t) -1);
2129 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2130 struct filter_callback *f;
2137 f = new0(struct filter_callback, 1);
2140 f->callback = callback;
2141 f->userdata = userdata;
2143 bus->filter_callbacks_modified = true;
2144 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2148 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2149 struct filter_callback *f;
2156 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2157 if (f->callback == callback && f->userdata == userdata) {
2158 bus->filter_callbacks_modified = true;
2159 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2168 static int bus_add_object(
2172 sd_bus_message_handler_t callback,
2175 struct object_callback *c;
2185 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2189 c = new0(struct object_callback, 1);
2193 c->path = strdup(path);
2199 c->callback = callback;
2200 c->userdata = userdata;
2201 c->is_fallback = fallback;
2203 bus->object_callbacks_modified = true;
2204 r = hashmap_put(bus->object_callbacks, c->path, c);
2214 static int bus_remove_object(
2218 sd_bus_message_handler_t callback,
2221 struct object_callback *c;
2230 c = hashmap_get(bus->object_callbacks, path);
2234 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2237 bus->object_callbacks_modified = true;
2238 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2246 int sd_bus_add_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2247 return bus_add_object(bus, false, path, callback, userdata);
2250 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2251 return bus_remove_object(bus, false, path, callback, userdata);
2254 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2255 return bus_add_object(bus, true, prefix, callback, userdata);
2258 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2259 return bus_remove_object(bus, true, prefix, callback, userdata);
2262 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2270 if (bus->bus_client) {
2271 r = bus_add_match_internal(bus, match);
2277 bus->match_callbacks_modified = true;
2278 r = bus_match_add(&bus->match_callbacks, match, callback, userdata, NULL);
2281 if (bus->bus_client)
2282 bus_remove_match_internal(bus, match);
2289 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2297 if (bus->bus_client)
2298 r = bus_remove_match_internal(bus, match);
2301 bus->match_callbacks_modified = true;
2302 q = bus_match_remove(&bus->match_callbacks, match, callback, userdata);
2310 int sd_bus_emit_signal(
2313 const char *interface,
2315 const char *types, ...) {
2317 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2324 r = sd_bus_message_new_signal(bus, path, interface, member, &m);
2328 va_start(ap, types);
2329 r = bus_message_append_ap(m, types, ap);
2334 return sd_bus_send(bus, m, NULL);
2337 int sd_bus_call_method(
2339 const char *destination,
2341 const char *interface,
2343 sd_bus_error *error,
2344 sd_bus_message **reply,
2345 const char *types, ...) {
2347 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2354 r = sd_bus_message_new_method_call(bus, destination, path, interface, member, &m);
2358 va_start(ap, types);
2359 r = bus_message_append_ap(m, types, ap);
2364 return sd_bus_send_with_reply_and_block(bus, m, 0, error, reply);
2367 int sd_bus_reply_method_return(
2369 sd_bus_message *call,
2370 const char *types, ...) {
2372 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2382 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2385 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2388 r = sd_bus_message_new_method_return(bus, call, &m);
2392 va_start(ap, types);
2393 r = bus_message_append_ap(m, types, ap);
2398 return sd_bus_send(bus, m, NULL);
2401 int sd_bus_reply_method_error(
2403 sd_bus_message *call,
2404 const sd_bus_error *e) {
2406 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2415 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2417 if (!sd_bus_error_is_set(e))
2420 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2423 r = sd_bus_message_new_method_error(bus, call, e, &m);
2427 return sd_bus_send(bus, m, NULL);