1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
37 #include "bus-internal.h"
38 #include "bus-message.h"
40 #include "bus-socket.h"
41 #include "bus-kernel.h"
42 #include "bus-control.h"
44 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
46 static void bus_free(sd_bus *b) {
47 struct filter_callback *f;
48 struct object_callback *c;
62 strv_free(b->exec_argv);
64 close_many(b->fds, b->n_fds);
67 for (i = 0; i < b->rqueue_size; i++)
68 sd_bus_message_unref(b->rqueue[i]);
71 for (i = 0; i < b->wqueue_size; i++)
72 sd_bus_message_unref(b->wqueue[i]);
75 hashmap_free_free(b->reply_callbacks);
76 prioq_free(b->reply_callbacks_prioq);
78 while ((f = b->filter_callbacks)) {
79 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
83 while ((c = hashmap_steal_first(b->object_callbacks))) {
88 hashmap_free(b->object_callbacks);
90 bus_match_free(&b->match_callbacks);
95 int sd_bus_new(sd_bus **ret) {
106 r->input_fd = r->output_fd = -1;
107 r->message_version = 1;
108 r->negotiate_fds = true;
110 /* We guarantee that wqueue always has space for at least one
112 r->wqueue = new(sd_bus_message*, 1);
122 int sd_bus_set_address(sd_bus *bus, const char *address) {
127 if (bus->state != BUS_UNSET)
142 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
145 if (bus->state != BUS_UNSET)
152 bus->input_fd = input_fd;
153 bus->output_fd = output_fd;
157 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
162 if (bus->state != BUS_UNSET)
166 if (strv_isempty(argv))
179 free(bus->exec_path);
180 strv_free(bus->exec_argv);
188 int sd_bus_set_bus_client(sd_bus *bus, int b) {
191 if (bus->state != BUS_UNSET)
194 bus->bus_client = !!b;
198 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
201 if (bus->state != BUS_UNSET)
204 bus->negotiate_fds = !!b;
208 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
211 if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
213 if (bus->state != BUS_UNSET)
216 bus->is_server = !!b;
217 bus->server_id = server_id;
221 int sd_bus_set_anonymous(sd_bus *bus, int b) {
224 if (bus->state != BUS_UNSET)
227 bus->anonymous_auth = !!b;
231 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
236 assert(bus->state == BUS_HELLO);
243 r = sd_bus_message_read(reply, "s", &s);
247 if (!service_name_is_valid(s) || s[0] != ':')
250 bus->unique_name = strdup(s);
251 if (!bus->unique_name)
254 bus->state = BUS_RUNNING;
259 static int bus_send_hello(sd_bus *bus) {
260 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
265 if (!bus->bus_client || bus->is_kernel)
268 r = sd_bus_message_new_method_call(
270 "org.freedesktop.DBus",
272 "org.freedesktop.DBus",
278 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
281 int bus_start_running(sd_bus *bus) {
284 if (bus->bus_client && !bus->is_kernel) {
285 bus->state = BUS_HELLO;
289 bus->state = BUS_RUNNING;
293 static int parse_address_key(const char **p, const char *key, char **value) {
304 if (strncmp(*p, key, l) != 0)
317 while (*a != ';' && *a != ',' && *a != 0) {
335 c = (char) ((x << 4) | y);
342 t = realloc(r, n + 2);
370 static void skip_address_key(const char **p) {
374 *p += strcspn(*p, ",");
380 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
381 _cleanup_free_ char *path = NULL, *abstract = NULL;
390 while (**p != 0 && **p != ';') {
391 r = parse_address_key(p, "guid", guid);
397 r = parse_address_key(p, "path", &path);
403 r = parse_address_key(p, "abstract", &abstract);
412 if (!path && !abstract)
415 if (path && abstract)
420 if (l > sizeof(b->sockaddr.un.sun_path))
423 b->sockaddr.un.sun_family = AF_UNIX;
424 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
425 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
426 } else if (abstract) {
427 l = strlen(abstract);
428 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
431 b->sockaddr.un.sun_family = AF_UNIX;
432 b->sockaddr.un.sun_path[0] = 0;
433 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
434 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
440 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
441 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
443 struct addrinfo *result, hints = {
444 .ai_socktype = SOCK_STREAM,
445 .ai_flags = AI_ADDRCONFIG,
453 while (**p != 0 && **p != ';') {
454 r = parse_address_key(p, "guid", guid);
460 r = parse_address_key(p, "host", &host);
466 r = parse_address_key(p, "port", &port);
472 r = parse_address_key(p, "family", &family);
485 if (streq(family, "ipv4"))
486 hints.ai_family = AF_INET;
487 else if (streq(family, "ipv6"))
488 hints.ai_family = AF_INET6;
493 r = getaddrinfo(host, port, &hints, &result);
497 return -EADDRNOTAVAIL;
499 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
500 b->sockaddr_size = result->ai_addrlen;
502 freeaddrinfo(result);
507 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
509 unsigned n_argv = 0, j;
518 while (**p != 0 && **p != ';') {
519 r = parse_address_key(p, "guid", guid);
525 r = parse_address_key(p, "path", &path);
531 if (startswith(*p, "argv")) {
535 ul = strtoul(*p + 4, (char**) p, 10);
536 if (errno > 0 || **p != '=' || ul > 256) {
546 x = realloc(argv, sizeof(char*) * (ul + 2));
552 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
558 r = parse_address_key(p, NULL, argv + ul);
573 /* Make sure there are no holes in the array, with the
574 * exception of argv[0] */
575 for (j = 1; j < n_argv; j++)
581 if (argv && argv[0] == NULL) {
582 argv[0] = strdup(path);
594 for (j = 0; j < n_argv; j++)
602 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
603 _cleanup_free_ char *path = NULL;
611 while (**p != 0 && **p != ';') {
612 r = parse_address_key(p, "guid", guid);
618 r = parse_address_key(p, "path", &path);
637 static void bus_reset_parsed_address(sd_bus *b) {
641 b->sockaddr_size = 0;
642 strv_free(b->exec_argv);
646 b->server_id = SD_ID128_NULL;
651 static int bus_parse_next_address(sd_bus *b) {
652 _cleanup_free_ char *guid = NULL;
660 if (b->address[b->address_index] == 0)
663 bus_reset_parsed_address(b);
665 a = b->address + b->address_index;
674 if (startswith(a, "unix:")) {
677 r = parse_unix_address(b, &a, &guid);
682 } else if (startswith(a, "tcp:")) {
685 r = parse_tcp_address(b, &a, &guid);
691 } else if (startswith(a, "unixexec:")) {
694 r = parse_exec_address(b, &a, &guid);
700 } else if (startswith(a, "kernel:")) {
703 r = parse_kernel_address(b, &a, &guid);
716 r = sd_id128_from_string(guid, &b->server_id);
721 b->address_index = a - b->address;
725 static int bus_start_address(sd_bus *b) {
733 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
735 r = bus_socket_connect(b);
739 b->last_connect_error = -r;
741 } else if (b->exec_path) {
743 r = bus_socket_exec(b);
747 b->last_connect_error = -r;
748 } else if (b->kernel) {
750 r = bus_kernel_connect(b);
754 b->last_connect_error = -r;
757 r = bus_parse_next_address(b);
761 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
765 int bus_next_address(sd_bus *b) {
768 bus_reset_parsed_address(b);
769 return bus_start_address(b);
772 static int bus_start_fd(sd_bus *b) {
777 assert(b->input_fd >= 0);
778 assert(b->output_fd >= 0);
780 r = fd_nonblock(b->input_fd, true);
784 r = fd_cloexec(b->input_fd, true);
788 if (b->input_fd != b->output_fd) {
789 r = fd_nonblock(b->output_fd, true);
793 r = fd_cloexec(b->output_fd, true);
798 if (fstat(b->input_fd, &st) < 0)
801 if (S_ISCHR(b->input_fd))
802 return bus_kernel_take_fd(b);
804 return bus_socket_take_fd(b);
807 int sd_bus_start(sd_bus *bus) {
812 if (bus->state != BUS_UNSET)
815 bus->state = BUS_OPENING;
817 if (bus->is_server && bus->bus_client)
820 if (bus->input_fd >= 0)
821 r = bus_start_fd(bus);
822 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel)
823 r = bus_start_address(bus);
830 return bus_send_hello(bus);
833 int sd_bus_open_system(sd_bus **ret) {
845 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
847 r = sd_bus_set_address(b, e);
851 b->sockaddr.un.sun_family = AF_UNIX;
852 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
853 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
856 b->bus_client = true;
870 int sd_bus_open_user(sd_bus **ret) {
883 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
885 r = sd_bus_set_address(b, e);
889 e = secure_getenv("XDG_RUNTIME_DIR");
896 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
901 b->sockaddr.un.sun_family = AF_UNIX;
902 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
903 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
906 b->bus_client = true;
920 void sd_bus_close(sd_bus *bus) {
924 if (bus->input_fd >= 0)
925 close_nointr_nofail(bus->input_fd);
926 if (bus->output_fd >= 0 && bus->output_fd != bus->input_fd)
927 close_nointr_nofail(bus->output_fd);
929 bus->input_fd = bus->output_fd = -1;
932 sd_bus *sd_bus_ref(sd_bus *bus) {
936 assert(bus->n_ref > 0);
942 sd_bus *sd_bus_unref(sd_bus *bus) {
946 assert(bus->n_ref > 0);
955 int sd_bus_is_open(sd_bus *bus) {
959 return bus->state != BUS_UNSET && bus->input_fd >= 0;
962 int sd_bus_can_send(sd_bus *bus, char type) {
967 if (bus->output_fd < 0)
970 if (type == SD_BUS_TYPE_UNIX_FD) {
971 if (!bus->negotiate_fds)
974 r = bus_ensure_running(bus);
981 return bus_type_is_valid(type);
984 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
992 r = bus_ensure_running(bus);
996 *server_id = bus->server_id;
1000 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1003 if (m->header->version > b->message_version)
1009 return bus_message_seal(m, ++b->serial);
1012 static int dispatch_wqueue(sd_bus *bus) {
1016 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1018 if (bus->output_fd < 0)
1021 while (bus->wqueue_size > 0) {
1024 r = bus_kernel_write_message(bus, bus->wqueue[0]);
1026 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1032 /* Didn't do anything this time */
1034 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1035 /* Fully written. Let's drop the entry from
1038 * This isn't particularly optimized, but
1039 * well, this is supposed to be our worst-case
1040 * buffer only, and the socket buffer is
1041 * supposed to be our primary buffer, and if
1042 * it got full, then all bets are off
1045 sd_bus_message_unref(bus->wqueue[0]);
1046 bus->wqueue_size --;
1047 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1057 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1058 sd_bus_message *z = NULL;
1063 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1065 if (bus->input_fd < 0)
1068 if (bus->rqueue_size > 0) {
1069 /* Dispatch a queued message */
1071 *m = bus->rqueue[0];
1072 bus->rqueue_size --;
1073 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1077 /* Try to read a new message */
1080 r = bus_kernel_read_message(bus, &z);
1082 r = bus_socket_read_message(bus, &z);
1098 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1103 if (bus->state == BUS_UNSET)
1105 if (bus->output_fd < 0)
1111 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1118 /* If the serial number isn't kept, then we know that no reply
1120 if (!serial && !m->sealed)
1121 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1123 r = bus_seal_message(bus, m);
1127 /* If this is a reply and no reply was requested, then let's
1128 * suppress this, if we can */
1129 if (m->dont_send && !serial)
1132 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1136 r = bus_kernel_write_message(bus, m);
1138 r = bus_socket_write_message(bus, m, &idx);
1143 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1144 /* Wasn't fully written. So let's remember how
1145 * much was written. Note that the first entry
1146 * of the wqueue array is always allocated so
1147 * that we always can remember how much was
1149 bus->wqueue[0] = sd_bus_message_ref(m);
1150 bus->wqueue_size = 1;
1156 /* Just append it to the queue. */
1158 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1161 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1166 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1170 *serial = BUS_MESSAGE_SERIAL(m);
1175 static usec_t calc_elapse(uint64_t usec) {
1176 if (usec == (uint64_t) -1)
1180 usec = BUS_DEFAULT_TIMEOUT;
1182 return now(CLOCK_MONOTONIC) + usec;
1185 static int timeout_compare(const void *a, const void *b) {
1186 const struct reply_callback *x = a, *y = b;
1188 if (x->timeout != 0 && y->timeout == 0)
1191 if (x->timeout == 0 && y->timeout != 0)
1194 if (x->timeout < y->timeout)
1197 if (x->timeout > y->timeout)
1203 int sd_bus_send_with_reply(
1206 sd_bus_message_handler_t callback,
1211 struct reply_callback *c;
1216 if (bus->state == BUS_UNSET)
1218 if (bus->output_fd < 0)
1224 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1226 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1229 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1233 if (usec != (uint64_t) -1) {
1234 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1239 r = bus_seal_message(bus, m);
1243 c = new0(struct reply_callback, 1);
1247 c->callback = callback;
1248 c->userdata = userdata;
1249 c->serial = BUS_MESSAGE_SERIAL(m);
1250 c->timeout = calc_elapse(usec);
1252 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1258 if (c->timeout != 0) {
1259 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1262 sd_bus_send_with_reply_cancel(bus, c->serial);
1267 r = sd_bus_send(bus, m, serial);
1269 sd_bus_send_with_reply_cancel(bus, c->serial);
1276 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1277 struct reply_callback *c;
1284 c = hashmap_remove(bus->reply_callbacks, &serial);
1288 if (c->timeout != 0)
1289 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1295 int bus_ensure_running(sd_bus *bus) {
1300 if (bus->input_fd < 0)
1302 if (bus->state == BUS_UNSET)
1305 if (bus->state == BUS_RUNNING)
1309 r = sd_bus_process(bus, NULL);
1312 if (bus->state == BUS_RUNNING)
1317 r = sd_bus_wait(bus, (uint64_t) -1);
1323 int sd_bus_send_with_reply_and_block(
1327 sd_bus_error *error,
1328 sd_bus_message **reply) {
1337 if (bus->output_fd < 0)
1339 if (bus->state == BUS_UNSET)
1343 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1345 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1347 if (bus_error_is_dirty(error))
1350 r = bus_ensure_running(bus);
1354 r = sd_bus_send(bus, m, &serial);
1358 timeout = calc_elapse(usec);
1362 sd_bus_message *incoming = NULL;
1367 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1370 /* Make sure there's room for queuing this
1371 * locally, before we read the message */
1373 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1382 r = bus_kernel_read_message(bus, &incoming);
1384 r = bus_socket_read_message(bus, &incoming);
1389 if (incoming->reply_serial == serial) {
1390 /* Found a match! */
1392 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1397 sd_bus_message_unref(incoming);
1402 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1405 r = sd_bus_error_copy(error, &incoming->error);
1407 sd_bus_message_unref(incoming);
1411 k = bus_error_to_errno(&incoming->error);
1412 sd_bus_message_unref(incoming);
1416 sd_bus_message_unref(incoming);
1420 /* There's already guaranteed to be room for
1421 * this, so need to resize things here */
1422 bus->rqueue[bus->rqueue_size ++] = incoming;
1425 /* Try to read more, right-away */
1434 n = now(CLOCK_MONOTONIC);
1440 left = (uint64_t) -1;
1442 r = bus_poll(bus, true, left);
1446 r = dispatch_wqueue(bus);
1452 int sd_bus_get_fd(sd_bus *bus) {
1455 if (bus->input_fd < 0)
1457 if (bus->input_fd != bus->output_fd)
1460 return bus->input_fd;
1463 int sd_bus_get_events(sd_bus *bus) {
1468 if (bus->state == BUS_UNSET)
1470 if (bus->input_fd < 0)
1473 if (bus->state == BUS_OPENING)
1475 else if (bus->state == BUS_AUTHENTICATING) {
1477 if (bus_socket_auth_needs_write(bus))
1482 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1483 if (bus->rqueue_size <= 0)
1485 if (bus->wqueue_size > 0)
1492 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1493 struct reply_callback *c;
1499 if (bus->state == BUS_UNSET)
1501 if (bus->input_fd < 0)
1504 if (bus->state == BUS_AUTHENTICATING) {
1505 *timeout_usec = bus->auth_timeout;
1509 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1510 *timeout_usec = (uint64_t) -1;
1514 c = prioq_peek(bus->reply_callbacks_prioq);
1516 *timeout_usec = (uint64_t) -1;
1520 *timeout_usec = c->timeout;
1524 static int process_timeout(sd_bus *bus) {
1525 struct reply_callback *c;
1531 c = prioq_peek(bus->reply_callbacks_prioq);
1535 n = now(CLOCK_MONOTONIC);
1539 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1540 hashmap_remove(bus->reply_callbacks, &c->serial);
1542 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1545 return r < 0 ? r : 1;
1548 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1552 if (bus->state != BUS_HELLO)
1555 /* Let's make sure the first message on the bus is the HELLO
1556 * reply. But note that we don't actually parse the message
1557 * here (we leave that to the usual handling), we just verify
1558 * we don't let any earlier msg through. */
1560 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1561 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1564 if (m->reply_serial != bus->hello_serial)
1570 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1571 struct reply_callback *c;
1577 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1578 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1581 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1585 if (c->timeout != 0)
1586 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1588 r = sd_bus_message_rewind(m, true);
1592 r = c->callback(bus, 0, m, c->userdata);
1598 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1599 struct filter_callback *l;
1606 bus->filter_callbacks_modified = false;
1608 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1610 if (bus->filter_callbacks_modified)
1613 /* Don't run this more than once per iteration */
1614 if (l->last_iteration == bus->iteration_counter)
1617 l->last_iteration = bus->iteration_counter;
1619 r = sd_bus_message_rewind(m, true);
1623 r = l->callback(bus, 0, m, l->userdata);
1629 } while (bus->filter_callbacks_modified);
1634 static int process_match(sd_bus *bus, sd_bus_message *m) {
1641 bus->match_callbacks_modified = false;
1643 r = bus_match_run(bus, &bus->match_callbacks, 0, m);
1647 } while (bus->match_callbacks_modified);
1652 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1653 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1659 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1662 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1665 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1668 if (streq_ptr(m->member, "Ping"))
1669 r = sd_bus_message_new_method_return(bus, m, &reply);
1670 else if (streq_ptr(m->member, "GetMachineId")) {
1674 r = sd_id128_get_machine(&id);
1678 r = sd_bus_message_new_method_return(bus, m, &reply);
1682 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1684 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1686 sd_bus_error_set(&error,
1687 "org.freedesktop.DBus.Error.UnknownMethod",
1688 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1690 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1696 r = sd_bus_send(bus, reply, NULL);
1703 static int process_object(sd_bus *bus, sd_bus_message *m) {
1704 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1705 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1706 struct object_callback *c;
1714 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1717 if (hashmap_isempty(bus->object_callbacks))
1720 pl = strlen(m->path);
1725 bus->object_callbacks_modified = false;
1727 c = hashmap_get(bus->object_callbacks, m->path);
1728 if (c && c->last_iteration != bus->iteration_counter) {
1730 c->last_iteration = bus->iteration_counter;
1732 r = sd_bus_message_rewind(m, true);
1736 r = c->callback(bus, 0, m, c->userdata);
1743 /* Look for fallback prefixes */
1748 if (bus->object_callbacks_modified)
1751 e = strrchr(p, '/');
1757 c = hashmap_get(bus->object_callbacks, p);
1758 if (c && c->last_iteration != bus->iteration_counter && c->is_fallback) {
1760 c->last_iteration = bus->iteration_counter;
1762 r = sd_bus_message_rewind(m, true);
1766 r = c->callback(bus, 0, m, c->userdata);
1774 } while (bus->object_callbacks_modified);
1776 /* We found some handlers but none wanted to take this, then
1777 * return this -- with one exception, we can handle
1778 * introspection minimally ourselves */
1779 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1782 sd_bus_error_set(&error,
1783 "org.freedesktop.DBus.Error.UnknownMethod",
1784 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1786 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1790 r = sd_bus_send(bus, reply, NULL);
1797 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1798 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1799 _cleanup_free_ char *introspection = NULL;
1800 _cleanup_set_free_free_ Set *s = NULL;
1801 _cleanup_fclose_ FILE *f = NULL;
1802 struct object_callback *c;
1811 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1817 s = set_new(string_hash_func, string_compare_func);
1821 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1825 if (streq(c->path, "/"))
1828 if (streq(m->path, "/"))
1831 e = startswith(c->path, m->path);
1832 if (!e || *e != '/')
1853 f = open_memstream(&introspection, &size);
1857 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1858 fputs("<node>\n", f);
1859 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1860 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1862 while ((node = set_steal_first(s))) {
1863 fprintf(f, " <node name=\"%s\"/>\n", node);
1867 fputs("</node>\n", f);
1874 r = sd_bus_message_new_method_return(bus, m, &reply);
1878 r = sd_bus_message_append(reply, "s", introspection);
1882 r = sd_bus_send(bus, reply, NULL);
1889 static int process_message(sd_bus *bus, sd_bus_message *m) {
1895 bus->iteration_counter++;
1897 r = process_hello(bus, m);
1901 r = process_reply(bus, m);
1905 r = process_filter(bus, m);
1909 r = process_match(bus, m);
1913 r = process_builtin(bus, m);
1917 r = process_object(bus, m);
1921 return process_introspect(bus, m);
1924 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1925 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1929 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1931 r = process_timeout(bus);
1935 r = dispatch_wqueue(bus);
1939 r = dispatch_rqueue(bus, &m);
1945 r = process_message(bus, m);
1950 r = sd_bus_message_rewind(m, true);
1959 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1960 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1961 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1963 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1965 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1969 r = sd_bus_send(bus, reply, NULL);
1983 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1986 /* Returns 0 when we didn't do anything. This should cause the
1987 * caller to invoke sd_bus_wait() before returning the next
1988 * time. Returns > 0 when we did something, which possibly
1989 * means *ret is filled in with an unprocessed message. */
1993 if (bus->input_fd < 0)
1996 /* We don't allow recursively invoking sd_bus_process(). */
1997 if (bus->processing)
2000 switch (bus->state) {
2006 r = bus_socket_process_opening(bus);
2013 case BUS_AUTHENTICATING:
2015 r = bus_socket_process_authenticating(bus);
2025 bus->processing = true;
2026 r = process_running(bus, ret);
2027 bus->processing = false;
2032 assert_not_reached("Unknown state");
2035 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2036 struct pollfd p[2] = {};
2043 if (bus->input_fd < 0)
2046 e = sd_bus_get_events(bus);
2053 r = sd_bus_get_timeout(bus, &until);
2060 nw = now(CLOCK_MONOTONIC);
2061 m = until > nw ? until - nw : 0;
2064 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2067 p[0].fd = bus->input_fd;
2068 if (bus->output_fd == bus->input_fd) {
2072 p[0].events = e & POLLIN;
2073 p[1].fd = bus->output_fd;
2074 p[1].events = e & POLLOUT;
2078 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2082 return r > 0 ? 1 : 0;
2085 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2089 if (bus->state == BUS_UNSET)
2091 if (bus->input_fd < 0)
2093 if (bus->rqueue_size > 0)
2096 return bus_poll(bus, false, timeout_usec);
2099 int sd_bus_flush(sd_bus *bus) {
2104 if (bus->state == BUS_UNSET)
2106 if (bus->output_fd < 0)
2109 r = bus_ensure_running(bus);
2113 if (bus->wqueue_size <= 0)
2117 r = dispatch_wqueue(bus);
2121 if (bus->wqueue_size <= 0)
2124 r = bus_poll(bus, false, (uint64_t) -1);
2130 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2131 struct filter_callback *f;
2138 f = new0(struct filter_callback, 1);
2141 f->callback = callback;
2142 f->userdata = userdata;
2144 bus->filter_callbacks_modified = true;
2145 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2149 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2150 struct filter_callback *f;
2157 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2158 if (f->callback == callback && f->userdata == userdata) {
2159 bus->filter_callbacks_modified = true;
2160 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2169 static int bus_add_object(
2173 sd_bus_message_handler_t callback,
2176 struct object_callback *c;
2186 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2190 c = new0(struct object_callback, 1);
2194 c->path = strdup(path);
2200 c->callback = callback;
2201 c->userdata = userdata;
2202 c->is_fallback = fallback;
2204 bus->object_callbacks_modified = true;
2205 r = hashmap_put(bus->object_callbacks, c->path, c);
2215 static int bus_remove_object(
2219 sd_bus_message_handler_t callback,
2222 struct object_callback *c;
2231 c = hashmap_get(bus->object_callbacks, path);
2235 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2238 bus->object_callbacks_modified = true;
2239 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2247 int sd_bus_add_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2248 return bus_add_object(bus, false, path, callback, userdata);
2251 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2252 return bus_remove_object(bus, false, path, callback, userdata);
2255 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2256 return bus_add_object(bus, true, prefix, callback, userdata);
2259 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2260 return bus_remove_object(bus, true, prefix, callback, userdata);
2263 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2271 if (bus->bus_client) {
2272 r = bus_add_match_internal(bus, match);
2278 bus->match_callbacks_modified = true;
2279 r = bus_match_add(&bus->match_callbacks, match, callback, userdata, NULL);
2282 if (bus->bus_client)
2283 bus_remove_match_internal(bus, match);
2290 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2298 if (bus->bus_client)
2299 r = bus_remove_match_internal(bus, match);
2302 bus->match_callbacks_modified = true;
2303 q = bus_match_remove(&bus->match_callbacks, match, callback, userdata);
2311 int sd_bus_emit_signal(
2314 const char *interface,
2316 const char *types, ...) {
2318 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2325 r = sd_bus_message_new_signal(bus, path, interface, member, &m);
2329 va_start(ap, types);
2330 r = bus_message_append_ap(m, types, ap);
2335 return sd_bus_send(bus, m, NULL);
2338 int sd_bus_call_method(
2340 const char *destination,
2342 const char *interface,
2344 sd_bus_error *error,
2345 sd_bus_message **reply,
2346 const char *types, ...) {
2348 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2355 r = sd_bus_message_new_method_call(bus, destination, path, interface, member, &m);
2359 va_start(ap, types);
2360 r = bus_message_append_ap(m, types, ap);
2365 return sd_bus_send_with_reply_and_block(bus, m, 0, error, reply);
2368 int sd_bus_reply_method_return(
2370 sd_bus_message *call,
2371 const char *types, ...) {
2373 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2383 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2386 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2389 r = sd_bus_message_new_method_return(bus, call, &m);
2393 va_start(ap, types);
2394 r = bus_message_append_ap(m, types, ap);
2399 return sd_bus_send(bus, m, NULL);
2402 int sd_bus_reply_method_error(
2404 sd_bus_message *call,
2405 const sd_bus_error *e) {
2407 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2416 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2418 if (!sd_bus_error_is_set(e))
2421 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2424 r = sd_bus_message_new_method_error(bus, call, e, &m);
2428 return sd_bus_send(bus, m, NULL);