1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
37 #include "bus-internal.h"
38 #include "bus-message.h"
40 #include "bus-socket.h"
41 #include "bus-kernel.h"
42 #include "bus-control.h"
44 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
46 static void bus_close_fds(sd_bus *b) {
50 close_nointr_nofail(b->input_fd);
52 if (b->output_fd >= 0 && b->output_fd != b->input_fd)
53 close_nointr_nofail(b->output_fd);
55 b->input_fd = b->output_fd = -1;
58 static void bus_free(sd_bus *b) {
59 struct filter_callback *f;
60 struct object_callback *c;
74 strv_free(b->exec_argv);
76 close_many(b->fds, b->n_fds);
79 for (i = 0; i < b->rqueue_size; i++)
80 sd_bus_message_unref(b->rqueue[i]);
83 for (i = 0; i < b->wqueue_size; i++)
84 sd_bus_message_unref(b->wqueue[i]);
87 hashmap_free_free(b->reply_callbacks);
88 prioq_free(b->reply_callbacks_prioq);
90 while ((f = b->filter_callbacks)) {
91 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
95 while ((c = hashmap_steal_first(b->object_callbacks))) {
100 hashmap_free(b->object_callbacks);
101 bus_match_free(&b->match_callbacks);
103 bus_kernel_flush_memfd(b);
108 int sd_bus_new(sd_bus **ret) {
118 r->n_ref = REFCNT_INIT;
119 r->input_fd = r->output_fd = -1;
120 r->message_version = 1;
121 r->negotiate_fds = true;
123 /* We guarantee that wqueue always has space for at least one
125 r->wqueue = new(sd_bus_message*, 1);
135 int sd_bus_set_address(sd_bus *bus, const char *address) {
140 if (bus->state != BUS_UNSET)
155 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
158 if (bus->state != BUS_UNSET)
165 bus->input_fd = input_fd;
166 bus->output_fd = output_fd;
170 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
175 if (bus->state != BUS_UNSET)
179 if (strv_isempty(argv))
192 free(bus->exec_path);
193 strv_free(bus->exec_argv);
201 int sd_bus_set_bus_client(sd_bus *bus, int b) {
204 if (bus->state != BUS_UNSET)
207 bus->bus_client = !!b;
211 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
214 if (bus->state != BUS_UNSET)
217 bus->negotiate_fds = !!b;
221 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
224 if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
226 if (bus->state != BUS_UNSET)
229 bus->is_server = !!b;
230 bus->server_id = server_id;
234 int sd_bus_set_anonymous(sd_bus *bus, int b) {
237 if (bus->state != BUS_UNSET)
240 bus->anonymous_auth = !!b;
244 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
249 assert(bus->state == BUS_HELLO);
252 r = bus_message_to_errno(reply);
256 r = sd_bus_message_read(reply, "s", &s);
260 if (!service_name_is_valid(s) || s[0] != ':')
263 bus->unique_name = strdup(s);
264 if (!bus->unique_name)
267 bus->state = BUS_RUNNING;
272 static int bus_send_hello(sd_bus *bus) {
273 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
278 if (!bus->bus_client || bus->is_kernel)
281 r = sd_bus_message_new_method_call(
283 "org.freedesktop.DBus",
285 "org.freedesktop.DBus",
291 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
294 int bus_start_running(sd_bus *bus) {
297 if (bus->bus_client && !bus->is_kernel) {
298 bus->state = BUS_HELLO;
302 bus->state = BUS_RUNNING;
306 static int parse_address_key(const char **p, const char *key, char **value) {
317 if (strncmp(*p, key, l) != 0)
330 while (*a != ';' && *a != ',' && *a != 0) {
348 c = (char) ((x << 4) | y);
355 t = realloc(r, n + 2);
383 static void skip_address_key(const char **p) {
387 *p += strcspn(*p, ",");
393 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
394 _cleanup_free_ char *path = NULL, *abstract = NULL;
403 while (**p != 0 && **p != ';') {
404 r = parse_address_key(p, "guid", guid);
410 r = parse_address_key(p, "path", &path);
416 r = parse_address_key(p, "abstract", &abstract);
425 if (!path && !abstract)
428 if (path && abstract)
433 if (l > sizeof(b->sockaddr.un.sun_path))
436 b->sockaddr.un.sun_family = AF_UNIX;
437 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
438 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
439 } else if (abstract) {
440 l = strlen(abstract);
441 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
444 b->sockaddr.un.sun_family = AF_UNIX;
445 b->sockaddr.un.sun_path[0] = 0;
446 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
447 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
453 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
454 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
456 struct addrinfo *result, hints = {
457 .ai_socktype = SOCK_STREAM,
458 .ai_flags = AI_ADDRCONFIG,
466 while (**p != 0 && **p != ';') {
467 r = parse_address_key(p, "guid", guid);
473 r = parse_address_key(p, "host", &host);
479 r = parse_address_key(p, "port", &port);
485 r = parse_address_key(p, "family", &family);
498 if (streq(family, "ipv4"))
499 hints.ai_family = AF_INET;
500 else if (streq(family, "ipv6"))
501 hints.ai_family = AF_INET6;
506 r = getaddrinfo(host, port, &hints, &result);
510 return -EADDRNOTAVAIL;
512 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
513 b->sockaddr_size = result->ai_addrlen;
515 freeaddrinfo(result);
520 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
522 unsigned n_argv = 0, j;
531 while (**p != 0 && **p != ';') {
532 r = parse_address_key(p, "guid", guid);
538 r = parse_address_key(p, "path", &path);
544 if (startswith(*p, "argv")) {
548 ul = strtoul(*p + 4, (char**) p, 10);
549 if (errno > 0 || **p != '=' || ul > 256) {
559 x = realloc(argv, sizeof(char*) * (ul + 2));
565 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
571 r = parse_address_key(p, NULL, argv + ul);
586 /* Make sure there are no holes in the array, with the
587 * exception of argv[0] */
588 for (j = 1; j < n_argv; j++)
594 if (argv && argv[0] == NULL) {
595 argv[0] = strdup(path);
607 for (j = 0; j < n_argv; j++)
615 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
616 _cleanup_free_ char *path = NULL;
624 while (**p != 0 && **p != ';') {
625 r = parse_address_key(p, "guid", guid);
631 r = parse_address_key(p, "path", &path);
650 static void bus_reset_parsed_address(sd_bus *b) {
654 b->sockaddr_size = 0;
655 strv_free(b->exec_argv);
659 b->server_id = SD_ID128_NULL;
664 static int bus_parse_next_address(sd_bus *b) {
665 _cleanup_free_ char *guid = NULL;
673 if (b->address[b->address_index] == 0)
676 bus_reset_parsed_address(b);
678 a = b->address + b->address_index;
687 if (startswith(a, "unix:")) {
690 r = parse_unix_address(b, &a, &guid);
695 } else if (startswith(a, "tcp:")) {
698 r = parse_tcp_address(b, &a, &guid);
704 } else if (startswith(a, "unixexec:")) {
707 r = parse_exec_address(b, &a, &guid);
713 } else if (startswith(a, "kernel:")) {
716 r = parse_kernel_address(b, &a, &guid);
729 r = sd_id128_from_string(guid, &b->server_id);
734 b->address_index = a - b->address;
738 static int bus_start_address(sd_bus *b) {
746 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
748 r = bus_socket_connect(b);
752 b->last_connect_error = -r;
754 } else if (b->exec_path) {
756 r = bus_socket_exec(b);
760 b->last_connect_error = -r;
761 } else if (b->kernel) {
763 r = bus_kernel_connect(b);
767 b->last_connect_error = -r;
770 r = bus_parse_next_address(b);
774 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
778 int bus_next_address(sd_bus *b) {
781 bus_reset_parsed_address(b);
782 return bus_start_address(b);
785 static int bus_start_fd(sd_bus *b) {
790 assert(b->input_fd >= 0);
791 assert(b->output_fd >= 0);
793 r = fd_nonblock(b->input_fd, true);
797 r = fd_cloexec(b->input_fd, true);
801 if (b->input_fd != b->output_fd) {
802 r = fd_nonblock(b->output_fd, true);
806 r = fd_cloexec(b->output_fd, true);
811 if (fstat(b->input_fd, &st) < 0)
814 if (S_ISCHR(b->input_fd))
815 return bus_kernel_take_fd(b);
817 return bus_socket_take_fd(b);
820 int sd_bus_start(sd_bus *bus) {
825 if (bus->state != BUS_UNSET)
828 bus->state = BUS_OPENING;
830 if (bus->is_server && bus->bus_client)
833 if (bus->input_fd >= 0)
834 r = bus_start_fd(bus);
835 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel)
836 r = bus_start_address(bus);
843 return bus_send_hello(bus);
846 int sd_bus_open_system(sd_bus **ret) {
858 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
860 r = sd_bus_set_address(b, e);
864 b->sockaddr.un.sun_family = AF_UNIX;
865 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
866 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
869 b->bus_client = true;
883 int sd_bus_open_user(sd_bus **ret) {
896 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
898 r = sd_bus_set_address(b, e);
902 e = secure_getenv("XDG_RUNTIME_DIR");
909 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
914 b->sockaddr.un.sun_family = AF_UNIX;
915 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
916 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
919 b->bus_client = true;
933 void sd_bus_close(sd_bus *bus) {
937 if (bus->state != BUS_CLOSED)
940 bus->state = BUS_CLOSED;
945 /* We'll leave the fd open in case this is a kernel bus, since
946 * there might still be memblocks around that reference this
947 * bus, and they might need to invoke the
948 * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
952 sd_bus *sd_bus_ref(sd_bus *bus) {
956 assert_se(REFCNT_INC(bus->n_ref) >= 2);
961 sd_bus *sd_bus_unref(sd_bus *bus) {
965 if (REFCNT_DEC(bus->n_ref) <= 0)
971 int sd_bus_is_open(sd_bus *bus) {
975 return BUS_IS_OPEN(bus->state);
978 int sd_bus_can_send(sd_bus *bus, char type) {
983 if (bus->state == BUS_UNSET)
986 if (type == SD_BUS_TYPE_UNIX_FD) {
987 if (!bus->negotiate_fds)
990 r = bus_ensure_running(bus);
997 return bus_type_is_valid(type);
1000 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1008 r = bus_ensure_running(bus);
1012 *server_id = bus->server_id;
1016 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1019 if (m->header->version > b->message_version)
1025 return bus_message_seal(m, ++b->serial);
1028 static int dispatch_wqueue(sd_bus *bus) {
1032 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1034 while (bus->wqueue_size > 0) {
1037 r = bus_kernel_write_message(bus, bus->wqueue[0]);
1039 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1045 /* Didn't do anything this time */
1047 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1048 /* Fully written. Let's drop the entry from
1051 * This isn't particularly optimized, but
1052 * well, this is supposed to be our worst-case
1053 * buffer only, and the socket buffer is
1054 * supposed to be our primary buffer, and if
1055 * it got full, then all bets are off
1058 sd_bus_message_unref(bus->wqueue[0]);
1059 bus->wqueue_size --;
1060 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1070 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1071 sd_bus_message *z = NULL;
1076 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1078 if (bus->rqueue_size > 0) {
1079 /* Dispatch a queued message */
1081 *m = bus->rqueue[0];
1082 bus->rqueue_size --;
1083 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1087 /* Try to read a new message */
1090 r = bus_kernel_read_message(bus, &z);
1092 r = bus_socket_read_message(bus, &z);
1108 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1113 if (!BUS_IS_OPEN(bus->state))
1119 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1126 /* If the serial number isn't kept, then we know that no reply
1128 if (!serial && !m->sealed)
1129 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1131 r = bus_seal_message(bus, m);
1135 /* If this is a reply and no reply was requested, then let's
1136 * suppress this, if we can */
1137 if (m->dont_send && !serial)
1140 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1144 r = bus_kernel_write_message(bus, m);
1146 r = bus_socket_write_message(bus, m, &idx);
1151 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1152 /* Wasn't fully written. So let's remember how
1153 * much was written. Note that the first entry
1154 * of the wqueue array is always allocated so
1155 * that we always can remember how much was
1157 bus->wqueue[0] = sd_bus_message_ref(m);
1158 bus->wqueue_size = 1;
1164 /* Just append it to the queue. */
1166 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1169 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1174 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1178 *serial = BUS_MESSAGE_SERIAL(m);
1183 static usec_t calc_elapse(uint64_t usec) {
1184 if (usec == (uint64_t) -1)
1188 usec = BUS_DEFAULT_TIMEOUT;
1190 return now(CLOCK_MONOTONIC) + usec;
1193 static int timeout_compare(const void *a, const void *b) {
1194 const struct reply_callback *x = a, *y = b;
1196 if (x->timeout != 0 && y->timeout == 0)
1199 if (x->timeout == 0 && y->timeout != 0)
1202 if (x->timeout < y->timeout)
1205 if (x->timeout > y->timeout)
1211 int sd_bus_send_with_reply(
1214 sd_bus_message_handler_t callback,
1219 struct reply_callback *c;
1224 if (!BUS_IS_OPEN(bus->state))
1230 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1232 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1235 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1239 if (usec != (uint64_t) -1) {
1240 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1245 r = bus_seal_message(bus, m);
1249 c = new0(struct reply_callback, 1);
1253 c->callback = callback;
1254 c->userdata = userdata;
1255 c->serial = BUS_MESSAGE_SERIAL(m);
1256 c->timeout = calc_elapse(usec);
1258 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1264 if (c->timeout != 0) {
1265 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1268 sd_bus_send_with_reply_cancel(bus, c->serial);
1273 r = sd_bus_send(bus, m, serial);
1275 sd_bus_send_with_reply_cancel(bus, c->serial);
1282 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1283 struct reply_callback *c;
1290 c = hashmap_remove(bus->reply_callbacks, &serial);
1294 if (c->timeout != 0)
1295 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1301 int bus_ensure_running(sd_bus *bus) {
1306 if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED)
1308 if (bus->state == BUS_RUNNING)
1312 r = sd_bus_process(bus, NULL);
1315 if (bus->state == BUS_RUNNING)
1320 r = sd_bus_wait(bus, (uint64_t) -1);
1326 int sd_bus_send_with_reply_and_block(
1330 sd_bus_error *error,
1331 sd_bus_message **reply) {
1340 if (!BUS_IS_OPEN(bus->state))
1344 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1346 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1348 if (bus_error_is_dirty(error))
1351 r = bus_ensure_running(bus);
1355 r = sd_bus_send(bus, m, &serial);
1359 timeout = calc_elapse(usec);
1363 sd_bus_message *incoming = NULL;
1368 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1371 /* Make sure there's room for queuing this
1372 * locally, before we read the message */
1374 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1383 r = bus_kernel_read_message(bus, &incoming);
1385 r = bus_socket_read_message(bus, &incoming);
1390 if (incoming->reply_serial == serial) {
1391 /* Found a match! */
1393 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1398 sd_bus_message_unref(incoming);
1403 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1406 r = sd_bus_error_copy(error, &incoming->error);
1408 sd_bus_message_unref(incoming);
1412 k = bus_error_to_errno(&incoming->error);
1413 sd_bus_message_unref(incoming);
1417 sd_bus_message_unref(incoming);
1421 /* There's already guaranteed to be room for
1422 * this, so need to resize things here */
1423 bus->rqueue[bus->rqueue_size ++] = incoming;
1426 /* Try to read more, right-away */
1435 n = now(CLOCK_MONOTONIC);
1441 left = (uint64_t) -1;
1443 r = bus_poll(bus, true, left);
1447 r = dispatch_wqueue(bus);
1453 int sd_bus_get_fd(sd_bus *bus) {
1456 if (!BUS_IS_OPEN(bus->state))
1458 if (bus->input_fd != bus->output_fd)
1461 return bus->input_fd;
1464 int sd_bus_get_events(sd_bus *bus) {
1469 if (!BUS_IS_OPEN(bus->state))
1472 if (bus->state == BUS_OPENING)
1474 else if (bus->state == BUS_AUTHENTICATING) {
1476 if (bus_socket_auth_needs_write(bus))
1481 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1482 if (bus->rqueue_size <= 0)
1484 if (bus->wqueue_size > 0)
1491 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1492 struct reply_callback *c;
1498 if (!BUS_IS_OPEN(bus->state))
1501 if (bus->state == BUS_AUTHENTICATING) {
1502 *timeout_usec = bus->auth_timeout;
1506 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1507 *timeout_usec = (uint64_t) -1;
1511 c = prioq_peek(bus->reply_callbacks_prioq);
1513 *timeout_usec = (uint64_t) -1;
1517 *timeout_usec = c->timeout;
1521 static int process_timeout(sd_bus *bus) {
1522 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1523 struct reply_callback *c;
1529 c = prioq_peek(bus->reply_callbacks_prioq);
1533 n = now(CLOCK_MONOTONIC);
1537 r = bus_message_new_synthetic_error(
1540 &SD_BUS_ERROR_MAKE("org.freedesktop.DBus.Error.Timeout", "Timed out"),
1545 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1546 hashmap_remove(bus->reply_callbacks, &c->serial);
1548 r = c->callback(bus, m, c->userdata);
1551 return r < 0 ? r : 1;
1554 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1558 if (bus->state != BUS_HELLO)
1561 /* Let's make sure the first message on the bus is the HELLO
1562 * reply. But note that we don't actually parse the message
1563 * here (we leave that to the usual handling), we just verify
1564 * we don't let any earlier msg through. */
1566 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1567 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1570 if (m->reply_serial != bus->hello_serial)
1576 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1577 struct reply_callback *c;
1583 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1584 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1587 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1591 if (c->timeout != 0)
1592 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1594 r = sd_bus_message_rewind(m, true);
1598 r = c->callback(bus, m, c->userdata);
1604 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1605 struct filter_callback *l;
1612 bus->filter_callbacks_modified = false;
1614 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1616 if (bus->filter_callbacks_modified)
1619 /* Don't run this more than once per iteration */
1620 if (l->last_iteration == bus->iteration_counter)
1623 l->last_iteration = bus->iteration_counter;
1625 r = sd_bus_message_rewind(m, true);
1629 r = l->callback(bus, m, l->userdata);
1635 } while (bus->filter_callbacks_modified);
1640 static int process_match(sd_bus *bus, sd_bus_message *m) {
1647 bus->match_callbacks_modified = false;
1649 r = bus_match_run(bus, &bus->match_callbacks, m);
1653 } while (bus->match_callbacks_modified);
1658 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1659 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1665 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1668 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1671 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1674 if (streq_ptr(m->member, "Ping"))
1675 r = sd_bus_message_new_method_return(bus, m, &reply);
1676 else if (streq_ptr(m->member, "GetMachineId")) {
1680 r = sd_id128_get_machine(&id);
1684 r = sd_bus_message_new_method_return(bus, m, &reply);
1688 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1690 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1692 sd_bus_error_set(&error,
1693 "org.freedesktop.DBus.Error.UnknownMethod",
1694 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1696 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1702 r = sd_bus_send(bus, reply, NULL);
1709 static int process_object(sd_bus *bus, sd_bus_message *m) {
1710 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1711 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1712 struct object_callback *c;
1720 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1723 if (hashmap_isempty(bus->object_callbacks))
1726 pl = strlen(m->path);
1731 bus->object_callbacks_modified = false;
1733 c = hashmap_get(bus->object_callbacks, m->path);
1734 if (c && c->last_iteration != bus->iteration_counter) {
1736 c->last_iteration = bus->iteration_counter;
1738 r = sd_bus_message_rewind(m, true);
1742 r = c->callback(bus, m, c->userdata);
1749 /* Look for fallback prefixes */
1754 if (bus->object_callbacks_modified)
1757 e = strrchr(p, '/');
1763 c = hashmap_get(bus->object_callbacks, p);
1764 if (c && c->last_iteration != bus->iteration_counter && c->is_fallback) {
1766 c->last_iteration = bus->iteration_counter;
1768 r = sd_bus_message_rewind(m, true);
1772 r = c->callback(bus, m, c->userdata);
1780 } while (bus->object_callbacks_modified);
1782 /* We found some handlers but none wanted to take this, then
1783 * return this -- with one exception, we can handle
1784 * introspection minimally ourselves */
1785 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1788 sd_bus_error_set(&error,
1789 "org.freedesktop.DBus.Error.UnknownMethod",
1790 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1792 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1796 r = sd_bus_send(bus, reply, NULL);
1803 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1804 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1805 _cleanup_free_ char *introspection = NULL;
1806 _cleanup_set_free_free_ Set *s = NULL;
1807 _cleanup_fclose_ FILE *f = NULL;
1808 struct object_callback *c;
1817 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1823 s = set_new(string_hash_func, string_compare_func);
1827 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1831 if (streq(c->path, "/"))
1834 if (streq(m->path, "/"))
1837 e = startswith(c->path, m->path);
1838 if (!e || *e != '/')
1850 r = set_consume(s, a);
1851 if (r < 0 && r != -EEXIST)
1855 f = open_memstream(&introspection, &size);
1859 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1860 fputs("<node>\n", f);
1861 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1862 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1864 while ((node = set_steal_first(s))) {
1865 fprintf(f, " <node name=\"%s\"/>\n", node);
1869 fputs("</node>\n", f);
1876 r = sd_bus_message_new_method_return(bus, m, &reply);
1880 r = sd_bus_message_append(reply, "s", introspection);
1884 r = sd_bus_send(bus, reply, NULL);
1891 static int process_message(sd_bus *bus, sd_bus_message *m) {
1897 bus->iteration_counter++;
1899 r = process_hello(bus, m);
1903 r = process_reply(bus, m);
1907 r = process_filter(bus, m);
1911 r = process_match(bus, m);
1915 r = process_builtin(bus, m);
1919 r = process_object(bus, m);
1923 return process_introspect(bus, m);
1926 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1927 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1931 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1933 r = process_timeout(bus);
1937 r = dispatch_wqueue(bus);
1941 r = dispatch_rqueue(bus, &m);
1947 r = process_message(bus, m);
1952 r = sd_bus_message_rewind(m, true);
1961 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1962 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1963 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1965 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1967 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1971 r = sd_bus_send(bus, reply, NULL);
1985 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1988 /* Returns 0 when we didn't do anything. This should cause the
1989 * caller to invoke sd_bus_wait() before returning the next
1990 * time. Returns > 0 when we did something, which possibly
1991 * means *ret is filled in with an unprocessed message. */
1996 /* We don't allow recursively invoking sd_bus_process(). */
1997 if (bus->processing)
2000 switch (bus->state) {
2007 r = bus_socket_process_opening(bus);
2014 case BUS_AUTHENTICATING:
2016 r = bus_socket_process_authenticating(bus);
2026 bus->processing = true;
2027 r = process_running(bus, ret);
2028 bus->processing = false;
2033 assert_not_reached("Unknown state");
2036 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2037 struct pollfd p[2] = {};
2044 if (!BUS_IS_OPEN(bus->state))
2047 e = sd_bus_get_events(bus);
2054 r = sd_bus_get_timeout(bus, &until);
2061 nw = now(CLOCK_MONOTONIC);
2062 m = until > nw ? until - nw : 0;
2065 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2068 p[0].fd = bus->input_fd;
2069 if (bus->output_fd == bus->input_fd) {
2073 p[0].events = e & POLLIN;
2074 p[1].fd = bus->output_fd;
2075 p[1].events = e & POLLOUT;
2079 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2083 return r > 0 ? 1 : 0;
2086 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2090 if (!BUS_IS_OPEN(bus->state))
2092 if (bus->rqueue_size > 0)
2095 return bus_poll(bus, false, timeout_usec);
2098 int sd_bus_flush(sd_bus *bus) {
2103 if (!BUS_IS_OPEN(bus->state))
2106 r = bus_ensure_running(bus);
2110 if (bus->wqueue_size <= 0)
2114 r = dispatch_wqueue(bus);
2118 if (bus->wqueue_size <= 0)
2121 r = bus_poll(bus, false, (uint64_t) -1);
2127 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2128 struct filter_callback *f;
2135 f = new0(struct filter_callback, 1);
2138 f->callback = callback;
2139 f->userdata = userdata;
2141 bus->filter_callbacks_modified = true;
2142 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2146 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2147 struct filter_callback *f;
2154 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2155 if (f->callback == callback && f->userdata == userdata) {
2156 bus->filter_callbacks_modified = true;
2157 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2166 static int bus_add_object(
2170 sd_bus_message_handler_t callback,
2173 struct object_callback *c;
2183 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2187 c = new0(struct object_callback, 1);
2191 c->path = strdup(path);
2197 c->callback = callback;
2198 c->userdata = userdata;
2199 c->is_fallback = fallback;
2201 bus->object_callbacks_modified = true;
2202 r = hashmap_put(bus->object_callbacks, c->path, c);
2212 static int bus_remove_object(
2216 sd_bus_message_handler_t callback,
2219 struct object_callback *c;
2228 c = hashmap_get(bus->object_callbacks, path);
2232 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2235 bus->object_callbacks_modified = true;
2236 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2244 int sd_bus_add_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2245 return bus_add_object(bus, false, path, callback, userdata);
2248 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2249 return bus_remove_object(bus, false, path, callback, userdata);
2252 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2253 return bus_add_object(bus, true, prefix, callback, userdata);
2256 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2257 return bus_remove_object(bus, true, prefix, callback, userdata);
2260 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2268 if (bus->bus_client) {
2269 r = bus_add_match_internal(bus, match);
2275 bus->match_callbacks_modified = true;
2276 r = bus_match_add(&bus->match_callbacks, match, callback, userdata, NULL);
2279 if (bus->bus_client)
2280 bus_remove_match_internal(bus, match);
2287 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2295 if (bus->bus_client)
2296 r = bus_remove_match_internal(bus, match);
2299 bus->match_callbacks_modified = true;
2300 q = bus_match_remove(&bus->match_callbacks, match, callback, userdata);
2308 int sd_bus_emit_signal(
2311 const char *interface,
2313 const char *types, ...) {
2315 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2322 r = sd_bus_message_new_signal(bus, path, interface, member, &m);
2326 va_start(ap, types);
2327 r = bus_message_append_ap(m, types, ap);
2332 return sd_bus_send(bus, m, NULL);
2335 int sd_bus_call_method(
2337 const char *destination,
2339 const char *interface,
2341 sd_bus_error *error,
2342 sd_bus_message **reply,
2343 const char *types, ...) {
2345 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2352 r = sd_bus_message_new_method_call(bus, destination, path, interface, member, &m);
2356 va_start(ap, types);
2357 r = bus_message_append_ap(m, types, ap);
2362 return sd_bus_send_with_reply_and_block(bus, m, 0, error, reply);
2365 int sd_bus_reply_method_return(
2367 sd_bus_message *call,
2368 const char *types, ...) {
2370 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2380 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2383 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2386 r = sd_bus_message_new_method_return(bus, call, &m);
2390 va_start(ap, types);
2391 r = bus_message_append_ap(m, types, ap);
2396 return sd_bus_send(bus, m, NULL);
2399 int sd_bus_reply_method_error(
2401 sd_bus_message *call,
2402 const sd_bus_error *e) {
2404 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2413 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2415 if (!sd_bus_error_is_set(e))
2418 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2421 r = sd_bus_message_new_method_error(bus, call, e, &m);
2425 return sd_bus_send(bus, m, NULL);