1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
37 #include "bus-internal.h"
38 #include "bus-message.h"
40 #include "bus-socket.h"
41 #include "bus-kernel.h"
42 #include "bus-control.h"
44 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
46 static void bus_free(sd_bus *b) {
47 struct filter_callback *f;
48 struct object_callback *c;
62 strv_free(b->exec_argv);
64 close_many(b->fds, b->n_fds);
67 for (i = 0; i < b->rqueue_size; i++)
68 sd_bus_message_unref(b->rqueue[i]);
71 for (i = 0; i < b->wqueue_size; i++)
72 sd_bus_message_unref(b->wqueue[i]);
75 hashmap_free_free(b->reply_callbacks);
76 prioq_free(b->reply_callbacks_prioq);
78 while ((f = b->filter_callbacks)) {
79 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
83 while ((c = hashmap_steal_first(b->object_callbacks))) {
88 hashmap_free(b->object_callbacks);
90 bus_match_free(&b->match_callbacks);
95 int sd_bus_new(sd_bus **ret) {
106 r->input_fd = r->output_fd = -1;
107 r->message_version = 1;
108 r->negotiate_fds = true;
110 /* We guarantee that wqueue always has space for at least one
112 r->wqueue = new(sd_bus_message*, 1);
122 int sd_bus_set_address(sd_bus *bus, const char *address) {
127 if (bus->state != BUS_UNSET)
142 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
145 if (bus->state != BUS_UNSET)
152 bus->input_fd = input_fd;
153 bus->output_fd = output_fd;
157 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
162 if (bus->state != BUS_UNSET)
166 if (strv_isempty(argv))
179 free(bus->exec_path);
180 strv_free(bus->exec_argv);
188 int sd_bus_set_bus_client(sd_bus *bus, int b) {
191 if (bus->state != BUS_UNSET)
194 bus->bus_client = !!b;
198 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
201 if (bus->state != BUS_UNSET)
204 bus->negotiate_fds = !!b;
208 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
211 if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
213 if (bus->state != BUS_UNSET)
216 bus->is_server = !!b;
217 bus->server_id = server_id;
221 int sd_bus_set_anonymous(sd_bus *bus, int b) {
224 if (bus->state != BUS_UNSET)
227 bus->anonymous_auth = !!b;
231 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
236 assert(bus->state == BUS_HELLO);
243 r = sd_bus_message_read(reply, "s", &s);
247 if (!service_name_is_valid(s) || s[0] != ':')
250 bus->unique_name = strdup(s);
251 if (!bus->unique_name)
254 bus->state = BUS_RUNNING;
259 static int bus_send_hello(sd_bus *bus) {
260 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
265 if (!bus->bus_client || bus->is_kernel)
268 r = sd_bus_message_new_method_call(
270 "org.freedesktop.DBus",
272 "org.freedesktop.DBus",
278 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
281 int bus_start_running(sd_bus *bus) {
284 if (bus->bus_client && !bus->is_kernel) {
285 bus->state = BUS_HELLO;
289 bus->state = BUS_RUNNING;
293 static int parse_address_key(const char **p, const char *key, char **value) {
304 if (strncmp(*p, key, l) != 0)
317 while (*a != ';' && *a != ',' && *a != 0) {
335 c = (char) ((x << 4) | y);
342 t = realloc(r, n + 2);
370 static void skip_address_key(const char **p) {
374 *p += strcspn(*p, ",");
380 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
381 _cleanup_free_ char *path = NULL, *abstract = NULL;
390 while (**p != 0 && **p != ';') {
391 r = parse_address_key(p, "guid", guid);
397 r = parse_address_key(p, "path", &path);
403 r = parse_address_key(p, "abstract", &abstract);
412 if (!path && !abstract)
415 if (path && abstract)
420 if (l > sizeof(b->sockaddr.un.sun_path))
423 b->sockaddr.un.sun_family = AF_UNIX;
424 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
425 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
426 } else if (abstract) {
427 l = strlen(abstract);
428 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
431 b->sockaddr.un.sun_family = AF_UNIX;
432 b->sockaddr.un.sun_path[0] = 0;
433 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
434 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
440 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
441 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
443 struct addrinfo *result, hints = {
444 .ai_socktype = SOCK_STREAM,
445 .ai_flags = AI_ADDRCONFIG,
453 while (**p != 0 && **p != ';') {
454 r = parse_address_key(p, "guid", guid);
460 r = parse_address_key(p, "host", &host);
466 r = parse_address_key(p, "port", &port);
472 r = parse_address_key(p, "family", &family);
485 if (streq(family, "ipv4"))
486 hints.ai_family = AF_INET;
487 else if (streq(family, "ipv6"))
488 hints.ai_family = AF_INET6;
493 r = getaddrinfo(host, port, &hints, &result);
497 return -EADDRNOTAVAIL;
499 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
500 b->sockaddr_size = result->ai_addrlen;
502 freeaddrinfo(result);
507 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
509 unsigned n_argv = 0, j;
518 while (**p != 0 && **p != ';') {
519 r = parse_address_key(p, "guid", guid);
525 r = parse_address_key(p, "path", &path);
531 if (startswith(*p, "argv")) {
535 ul = strtoul(*p + 4, (char**) p, 10);
536 if (errno > 0 || **p != '=' || ul > 256) {
546 x = realloc(argv, sizeof(char*) * (ul + 2));
552 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
558 r = parse_address_key(p, NULL, argv + ul);
573 /* Make sure there are no holes in the array, with the
574 * exception of argv[0] */
575 for (j = 1; j < n_argv; j++)
581 if (argv && argv[0] == NULL) {
582 argv[0] = strdup(path);
594 for (j = 0; j < n_argv; j++)
602 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
603 _cleanup_free_ char *path = NULL;
611 while (**p != 0 && **p != ';') {
612 r = parse_address_key(p, "guid", guid);
618 r = parse_address_key(p, "path", &path);
637 static void bus_reset_parsed_address(sd_bus *b) {
641 b->sockaddr_size = 0;
642 strv_free(b->exec_argv);
646 b->server_id = SD_ID128_NULL;
651 static int bus_parse_next_address(sd_bus *b) {
652 _cleanup_free_ char *guid = NULL;
660 if (b->address[b->address_index] == 0)
663 bus_reset_parsed_address(b);
665 a = b->address + b->address_index;
674 if (startswith(a, "unix:")) {
677 r = parse_unix_address(b, &a, &guid);
682 } else if (startswith(a, "tcp:")) {
685 r = parse_tcp_address(b, &a, &guid);
691 } else if (startswith(a, "unixexec:")) {
694 r = parse_exec_address(b, &a, &guid);
700 } else if (startswith(a, "kernel:")) {
703 r = parse_kernel_address(b, &a, &guid);
716 r = sd_id128_from_string(guid, &b->server_id);
721 b->address_index = a - b->address;
725 static int bus_start_address(sd_bus *b) {
733 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
735 r = bus_socket_connect(b);
739 b->last_connect_error = -r;
741 } else if (b->exec_path) {
743 r = bus_socket_exec(b);
747 b->last_connect_error = -r;
748 } else if (b->kernel) {
750 r = bus_kernel_connect(b);
754 b->last_connect_error = -r;
757 r = bus_parse_next_address(b);
761 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
765 int bus_next_address(sd_bus *b) {
768 bus_reset_parsed_address(b);
769 return bus_start_address(b);
772 static int bus_start_fd(sd_bus *b) {
777 assert(b->input_fd >= 0);
778 assert(b->output_fd >= 0);
780 r = fd_nonblock(b->input_fd, true);
784 r = fd_cloexec(b->input_fd, true);
788 if (b->input_fd != b->output_fd) {
789 r = fd_nonblock(b->output_fd, true);
793 r = fd_cloexec(b->output_fd, true);
798 if (fstat(b->input_fd, &st) < 0)
801 if (S_ISCHR(b->input_fd))
802 return bus_kernel_take_fd(b);
804 return bus_socket_take_fd(b);
807 int sd_bus_start(sd_bus *bus) {
812 if (bus->state != BUS_UNSET)
815 bus->state = BUS_OPENING;
817 if (bus->is_server && bus->bus_client)
820 if (bus->input_fd >= 0)
821 r = bus_start_fd(bus);
822 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel)
823 r = bus_start_address(bus);
830 return bus_send_hello(bus);
833 int sd_bus_open_system(sd_bus **ret) {
845 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
847 r = sd_bus_set_address(b, e);
851 b->sockaddr.un.sun_family = AF_UNIX;
852 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
853 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
856 b->bus_client = true;
870 int sd_bus_open_user(sd_bus **ret) {
883 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
885 r = sd_bus_set_address(b, e);
889 e = secure_getenv("XDG_RUNTIME_DIR");
896 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
901 b->sockaddr.un.sun_family = AF_UNIX;
902 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
903 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
906 b->bus_client = true;
920 void sd_bus_close(sd_bus *bus) {
924 if (bus->input_fd >= 0)
925 close_nointr_nofail(bus->input_fd);
926 if (bus->output_fd >= 0 && bus->output_fd != bus->input_fd)
927 close_nointr_nofail(bus->output_fd);
929 bus->input_fd = bus->output_fd = -1;
932 sd_bus *sd_bus_ref(sd_bus *bus) {
936 assert(bus->n_ref > 0);
942 sd_bus *sd_bus_unref(sd_bus *bus) {
946 assert(bus->n_ref > 0);
955 int sd_bus_is_open(sd_bus *bus) {
959 return bus->state != BUS_UNSET && bus->input_fd >= 0;
962 int sd_bus_can_send(sd_bus *bus, char type) {
967 if (bus->output_fd < 0)
970 if (type == SD_BUS_TYPE_UNIX_FD) {
971 if (!bus->negotiate_fds)
974 r = bus_ensure_running(bus);
981 return bus_type_is_valid(type);
984 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
992 r = bus_ensure_running(bus);
996 *server_id = bus->server_id;
1000 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1003 if (m->header->version > b->message_version)
1009 return bus_message_seal(m, ++b->serial);
1012 static int dispatch_wqueue(sd_bus *bus) {
1016 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1018 if (bus->output_fd < 0)
1021 while (bus->wqueue_size > 0) {
1024 r = bus_kernel_write_message(bus, bus->wqueue[0]);
1026 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1032 /* Didn't do anything this time */
1034 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1035 /* Fully written. Let's drop the entry from
1038 * This isn't particularly optimized, but
1039 * well, this is supposed to be our worst-case
1040 * buffer only, and the socket buffer is
1041 * supposed to be our primary buffer, and if
1042 * it got full, then all bets are off
1045 sd_bus_message_unref(bus->wqueue[0]);
1046 bus->wqueue_size --;
1047 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1057 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1058 sd_bus_message *z = NULL;
1063 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1065 if (bus->input_fd < 0)
1068 if (bus->rqueue_size > 0) {
1069 /* Dispatch a queued message */
1071 *m = bus->rqueue[0];
1072 bus->rqueue_size --;
1073 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1077 /* Try to read a new message */
1080 r = bus_kernel_read_message(bus, &z);
1082 r = bus_socket_read_message(bus, &z);
1098 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1103 if (bus->state == BUS_UNSET)
1105 if (bus->output_fd < 0)
1111 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1118 /* If the serial number isn't kept, then we know that no reply
1120 if (!serial && !m->sealed)
1121 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1123 r = bus_seal_message(bus, m);
1127 /* If this is a reply and no reply was requested, then let's
1128 * suppress this, if we can */
1129 if (m->dont_send && !serial)
1132 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1136 r = bus_kernel_write_message(bus, m);
1138 r = bus_socket_write_message(bus, m, &idx);
1143 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1144 /* Wasn't fully written. So let's remember how
1145 * much was written. Note that the first entry
1146 * of the wqueue array is always allocated so
1147 * that we always can remember how much was
1149 bus->wqueue[0] = sd_bus_message_ref(m);
1150 bus->wqueue_size = 1;
1156 /* Just append it to the queue. */
1158 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1161 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1166 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1170 *serial = BUS_MESSAGE_SERIAL(m);
1175 static usec_t calc_elapse(uint64_t usec) {
1176 if (usec == (uint64_t) -1)
1180 usec = BUS_DEFAULT_TIMEOUT;
1182 return now(CLOCK_MONOTONIC) + usec;
1185 static int timeout_compare(const void *a, const void *b) {
1186 const struct reply_callback *x = a, *y = b;
1188 if (x->timeout != 0 && y->timeout == 0)
1191 if (x->timeout == 0 && y->timeout != 0)
1194 if (x->timeout < y->timeout)
1197 if (x->timeout > y->timeout)
1203 int sd_bus_send_with_reply(
1206 sd_bus_message_handler_t callback,
1211 struct reply_callback *c;
1216 if (bus->state == BUS_UNSET)
1218 if (bus->output_fd < 0)
1224 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1226 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1229 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1233 if (usec != (uint64_t) -1) {
1234 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1239 r = bus_seal_message(bus, m);
1243 c = new0(struct reply_callback, 1);
1247 c->callback = callback;
1248 c->userdata = userdata;
1249 c->serial = BUS_MESSAGE_SERIAL(m);
1250 c->timeout = calc_elapse(usec);
1252 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1258 if (c->timeout != 0) {
1259 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1262 sd_bus_send_with_reply_cancel(bus, c->serial);
1267 r = sd_bus_send(bus, m, serial);
1269 sd_bus_send_with_reply_cancel(bus, c->serial);
1276 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1277 struct reply_callback *c;
1284 c = hashmap_remove(bus->reply_callbacks, &serial);
1288 if (c->timeout != 0)
1289 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1295 int bus_ensure_running(sd_bus *bus) {
1300 if (bus->input_fd < 0)
1302 if (bus->state == BUS_UNSET)
1305 if (bus->state == BUS_RUNNING)
1309 r = sd_bus_process(bus, NULL);
1312 if (bus->state == BUS_RUNNING)
1317 r = sd_bus_wait(bus, (uint64_t) -1);
1323 int sd_bus_send_with_reply_and_block(
1327 sd_bus_error *error,
1328 sd_bus_message **reply) {
1337 if (bus->output_fd < 0)
1339 if (bus->state == BUS_UNSET)
1343 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1345 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1347 if (bus_error_is_dirty(error))
1350 r = bus_ensure_running(bus);
1354 r = sd_bus_send(bus, m, &serial);
1358 timeout = calc_elapse(usec);
1362 sd_bus_message *incoming = NULL;
1367 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1370 /* Make sure there's room for queuing this
1371 * locally, before we read the message */
1373 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1382 r = bus_kernel_read_message(bus, &incoming);
1384 r = bus_socket_read_message(bus, &incoming);
1389 if (incoming->reply_serial == serial) {
1390 /* Found a match! */
1392 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1397 sd_bus_message_unref(incoming);
1402 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1405 r = sd_bus_error_copy(error, &incoming->error);
1407 sd_bus_message_unref(incoming);
1411 k = bus_error_to_errno(&incoming->error);
1412 sd_bus_message_unref(incoming);
1416 sd_bus_message_unref(incoming);
1420 /* There's already guaranteed to be room for
1421 * this, so need to resize things here */
1422 bus->rqueue[bus->rqueue_size ++] = incoming;
1425 /* Try to read more, right-away */
1434 n = now(CLOCK_MONOTONIC);
1440 left = (uint64_t) -1;
1442 r = bus_poll(bus, true, left);
1446 r = dispatch_wqueue(bus);
1452 int sd_bus_get_fd(sd_bus *bus) {
1455 if (bus->input_fd < 0)
1457 if (bus->input_fd != bus->output_fd)
1460 return bus->input_fd;
1463 int sd_bus_get_events(sd_bus *bus) {
1468 if (bus->state == BUS_UNSET)
1470 if (bus->input_fd < 0)
1473 if (bus->state == BUS_OPENING)
1475 else if (bus->state == BUS_AUTHENTICATING) {
1477 if (bus_socket_auth_needs_write(bus))
1482 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1483 if (bus->rqueue_size <= 0)
1485 if (bus->wqueue_size > 0)
1492 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1493 struct reply_callback *c;
1499 if (bus->state == BUS_UNSET)
1501 if (bus->input_fd < 0)
1504 if (bus->state == BUS_AUTHENTICATING) {
1505 *timeout_usec = bus->auth_timeout;
1509 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1510 *timeout_usec = (uint64_t) -1;
1514 c = prioq_peek(bus->reply_callbacks_prioq);
1516 *timeout_usec = (uint64_t) -1;
1520 *timeout_usec = c->timeout;
1524 static int process_timeout(sd_bus *bus) {
1525 struct reply_callback *c;
1531 c = prioq_peek(bus->reply_callbacks_prioq);
1535 n = now(CLOCK_MONOTONIC);
1539 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1540 hashmap_remove(bus->reply_callbacks, &c->serial);
1542 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1545 return r < 0 ? r : 1;
1548 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1552 if (bus->state != BUS_HELLO)
1555 /* Let's make sure the first message on the bus is the HELLO
1556 * reply. But note that we don't actually parse the message
1557 * here (we leave that to the usual handling), we just verify
1558 * we don't let any earlier msg through. */
1560 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1561 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1564 if (m->reply_serial != bus->hello_serial)
1570 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1571 struct reply_callback *c;
1577 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1578 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1581 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1585 if (c->timeout != 0)
1586 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1588 r = sd_bus_message_rewind(m, true);
1592 r = c->callback(bus, 0, m, c->userdata);
1598 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1599 struct filter_callback *l;
1606 bus->filter_callbacks_modified = false;
1608 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1610 if (bus->filter_callbacks_modified)
1613 /* Don't run this more than once per iteration */
1614 if (l->last_iteration == bus->iteration_counter)
1617 l->last_iteration = bus->iteration_counter;
1619 r = sd_bus_message_rewind(m, true);
1623 r = l->callback(bus, 0, m, l->userdata);
1629 } while (bus->filter_callbacks_modified);
1634 static int process_match(sd_bus *bus, sd_bus_message *m) {
1641 bus->match_callbacks_modified = false;
1643 r = bus_match_run(bus, &bus->match_callbacks, 0, m);
1647 } while (bus->match_callbacks_modified);
1652 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1653 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1659 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1662 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1665 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1668 if (streq_ptr(m->member, "Ping"))
1669 r = sd_bus_message_new_method_return(bus, m, &reply);
1670 else if (streq_ptr(m->member, "GetMachineId")) {
1674 r = sd_id128_get_machine(&id);
1678 r = sd_bus_message_new_method_return(bus, m, &reply);
1682 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1684 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1686 sd_bus_error_set(&error,
1687 "org.freedesktop.DBus.Error.UnknownMethod",
1688 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1690 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1696 r = sd_bus_send(bus, reply, NULL);
1703 static int process_object(sd_bus *bus, sd_bus_message *m) {
1704 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1705 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1706 struct object_callback *c;
1714 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1717 if (hashmap_isempty(bus->object_callbacks))
1720 pl = strlen(m->path);
1725 bus->object_callbacks_modified = false;
1727 c = hashmap_get(bus->object_callbacks, m->path);
1728 if (c && c->last_iteration != bus->iteration_counter) {
1730 c->last_iteration = bus->iteration_counter;
1732 r = sd_bus_message_rewind(m, true);
1736 r = c->callback(bus, 0, m, c->userdata);
1743 /* Look for fallback prefixes */
1748 if (bus->object_callbacks_modified)
1751 e = strrchr(p, '/');
1757 c = hashmap_get(bus->object_callbacks, p);
1758 if (c && c->last_iteration != bus->iteration_counter && c->is_fallback) {
1760 c->last_iteration = bus->iteration_counter;
1762 r = sd_bus_message_rewind(m, true);
1766 r = c->callback(bus, 0, m, c->userdata);
1774 } while (bus->object_callbacks_modified);
1776 /* We found some handlers but none wanted to take this, then
1777 * return this -- with one exception, we can handle
1778 * introspection minimally ourselves */
1779 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1782 sd_bus_error_set(&error,
1783 "org.freedesktop.DBus.Error.UnknownMethod",
1784 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1786 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1790 r = sd_bus_send(bus, reply, NULL);
1797 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1798 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1799 _cleanup_free_ char *introspection = NULL;
1800 _cleanup_set_free_free_ Set *s = NULL;
1801 _cleanup_fclose_ FILE *f = NULL;
1802 struct object_callback *c;
1811 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1817 s = set_new(string_hash_func, string_compare_func);
1821 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1825 if (streq(c->path, "/"))
1828 if (streq(m->path, "/"))
1831 e = startswith(c->path, m->path);
1832 if (!e || *e != '/')
1844 r = set_consume(s, a);
1845 if (r < 0 && r != -EEXIST)
1849 f = open_memstream(&introspection, &size);
1853 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1854 fputs("<node>\n", f);
1855 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1856 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1858 while ((node = set_steal_first(s))) {
1859 fprintf(f, " <node name=\"%s\"/>\n", node);
1863 fputs("</node>\n", f);
1870 r = sd_bus_message_new_method_return(bus, m, &reply);
1874 r = sd_bus_message_append(reply, "s", introspection);
1878 r = sd_bus_send(bus, reply, NULL);
1885 static int process_message(sd_bus *bus, sd_bus_message *m) {
1891 bus->iteration_counter++;
1893 r = process_hello(bus, m);
1897 r = process_reply(bus, m);
1901 r = process_filter(bus, m);
1905 r = process_match(bus, m);
1909 r = process_builtin(bus, m);
1913 r = process_object(bus, m);
1917 return process_introspect(bus, m);
1920 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1921 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1925 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1927 r = process_timeout(bus);
1931 r = dispatch_wqueue(bus);
1935 r = dispatch_rqueue(bus, &m);
1941 r = process_message(bus, m);
1946 r = sd_bus_message_rewind(m, true);
1955 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1956 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1957 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1959 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1961 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1965 r = sd_bus_send(bus, reply, NULL);
1979 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1982 /* Returns 0 when we didn't do anything. This should cause the
1983 * caller to invoke sd_bus_wait() before returning the next
1984 * time. Returns > 0 when we did something, which possibly
1985 * means *ret is filled in with an unprocessed message. */
1989 if (bus->input_fd < 0)
1992 /* We don't allow recursively invoking sd_bus_process(). */
1993 if (bus->processing)
1996 switch (bus->state) {
2002 r = bus_socket_process_opening(bus);
2009 case BUS_AUTHENTICATING:
2011 r = bus_socket_process_authenticating(bus);
2021 bus->processing = true;
2022 r = process_running(bus, ret);
2023 bus->processing = false;
2028 assert_not_reached("Unknown state");
2031 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2032 struct pollfd p[2] = {};
2039 if (bus->input_fd < 0)
2042 e = sd_bus_get_events(bus);
2049 r = sd_bus_get_timeout(bus, &until);
2056 nw = now(CLOCK_MONOTONIC);
2057 m = until > nw ? until - nw : 0;
2060 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2063 p[0].fd = bus->input_fd;
2064 if (bus->output_fd == bus->input_fd) {
2068 p[0].events = e & POLLIN;
2069 p[1].fd = bus->output_fd;
2070 p[1].events = e & POLLOUT;
2074 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2078 return r > 0 ? 1 : 0;
2081 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2085 if (bus->state == BUS_UNSET)
2087 if (bus->input_fd < 0)
2089 if (bus->rqueue_size > 0)
2092 return bus_poll(bus, false, timeout_usec);
2095 int sd_bus_flush(sd_bus *bus) {
2100 if (bus->state == BUS_UNSET)
2102 if (bus->output_fd < 0)
2105 r = bus_ensure_running(bus);
2109 if (bus->wqueue_size <= 0)
2113 r = dispatch_wqueue(bus);
2117 if (bus->wqueue_size <= 0)
2120 r = bus_poll(bus, false, (uint64_t) -1);
2126 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2127 struct filter_callback *f;
2134 f = new0(struct filter_callback, 1);
2137 f->callback = callback;
2138 f->userdata = userdata;
2140 bus->filter_callbacks_modified = true;
2141 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2145 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2146 struct filter_callback *f;
2153 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2154 if (f->callback == callback && f->userdata == userdata) {
2155 bus->filter_callbacks_modified = true;
2156 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2165 static int bus_add_object(
2169 sd_bus_message_handler_t callback,
2172 struct object_callback *c;
2182 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2186 c = new0(struct object_callback, 1);
2190 c->path = strdup(path);
2196 c->callback = callback;
2197 c->userdata = userdata;
2198 c->is_fallback = fallback;
2200 bus->object_callbacks_modified = true;
2201 r = hashmap_put(bus->object_callbacks, c->path, c);
2211 static int bus_remove_object(
2215 sd_bus_message_handler_t callback,
2218 struct object_callback *c;
2227 c = hashmap_get(bus->object_callbacks, path);
2231 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2234 bus->object_callbacks_modified = true;
2235 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2243 int sd_bus_add_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2244 return bus_add_object(bus, false, path, callback, userdata);
2247 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2248 return bus_remove_object(bus, false, path, callback, userdata);
2251 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2252 return bus_add_object(bus, true, prefix, callback, userdata);
2255 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2256 return bus_remove_object(bus, true, prefix, callback, userdata);
2259 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2267 if (bus->bus_client) {
2268 r = bus_add_match_internal(bus, match);
2274 bus->match_callbacks_modified = true;
2275 r = bus_match_add(&bus->match_callbacks, match, callback, userdata, NULL);
2278 if (bus->bus_client)
2279 bus_remove_match_internal(bus, match);
2286 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2294 if (bus->bus_client)
2295 r = bus_remove_match_internal(bus, match);
2298 bus->match_callbacks_modified = true;
2299 q = bus_match_remove(&bus->match_callbacks, match, callback, userdata);
2307 int sd_bus_emit_signal(
2310 const char *interface,
2312 const char *types, ...) {
2314 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2321 r = sd_bus_message_new_signal(bus, path, interface, member, &m);
2325 va_start(ap, types);
2326 r = bus_message_append_ap(m, types, ap);
2331 return sd_bus_send(bus, m, NULL);
2334 int sd_bus_call_method(
2336 const char *destination,
2338 const char *interface,
2340 sd_bus_error *error,
2341 sd_bus_message **reply,
2342 const char *types, ...) {
2344 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2351 r = sd_bus_message_new_method_call(bus, destination, path, interface, member, &m);
2355 va_start(ap, types);
2356 r = bus_message_append_ap(m, types, ap);
2361 return sd_bus_send_with_reply_and_block(bus, m, 0, error, reply);
2364 int sd_bus_reply_method_return(
2366 sd_bus_message *call,
2367 const char *types, ...) {
2369 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2379 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2382 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2385 r = sd_bus_message_new_method_return(bus, call, &m);
2389 va_start(ap, types);
2390 r = bus_message_append_ap(m, types, ap);
2395 return sd_bus_send(bus, m, NULL);
2398 int sd_bus_reply_method_error(
2400 sd_bus_message *call,
2401 const sd_bus_error *e) {
2403 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2412 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2414 if (!sd_bus_error_is_set(e))
2417 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2420 r = sd_bus_message_new_method_error(bus, call, e, &m);
2424 return sd_bus_send(bus, m, NULL);