1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
36 #include "bus-internal.h"
37 #include "bus-message.h"
39 #include "bus-socket.h"
40 #include "bus-kernel.h"
41 #include "bus-control.h"
43 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
45 static void bus_free(sd_bus *b) {
46 struct filter_callback *f;
47 struct object_callback *c;
60 strv_free(b->exec_argv);
62 close_many(b->fds, b->n_fds);
65 for (i = 0; i < b->rqueue_size; i++)
66 sd_bus_message_unref(b->rqueue[i]);
69 for (i = 0; i < b->wqueue_size; i++)
70 sd_bus_message_unref(b->wqueue[i]);
73 hashmap_free_free(b->reply_callbacks);
74 prioq_free(b->reply_callbacks_prioq);
76 while ((f = b->filter_callbacks)) {
77 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
81 while ((c = hashmap_steal_first(b->object_callbacks))) {
86 hashmap_free(b->object_callbacks);
88 bus_match_free(&b->match_callbacks);
93 int sd_bus_new(sd_bus **ret) {
104 r->input_fd = r->output_fd = -1;
105 r->message_version = 1;
106 r->negotiate_fds = true;
108 /* We guarantee that wqueue always has space for at least one
110 r->wqueue = new(sd_bus_message*, 1);
120 int sd_bus_set_address(sd_bus *bus, const char *address) {
125 if (bus->state != BUS_UNSET)
140 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
143 if (bus->state != BUS_UNSET)
150 bus->input_fd = input_fd;
151 bus->output_fd = output_fd;
155 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
160 if (bus->state != BUS_UNSET)
164 if (strv_isempty(argv))
177 free(bus->exec_path);
178 strv_free(bus->exec_argv);
186 int sd_bus_set_bus_client(sd_bus *bus, int b) {
189 if (bus->state != BUS_UNSET)
192 bus->bus_client = !!b;
196 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
199 if (bus->state != BUS_UNSET)
202 bus->negotiate_fds = !!b;
206 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
209 if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
211 if (bus->state != BUS_UNSET)
214 bus->is_server = !!b;
215 bus->server_id = server_id;
219 int sd_bus_set_anonymous(sd_bus *bus, int b) {
222 if (bus->state != BUS_UNSET)
225 bus->anonymous_auth = !!b;
229 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
234 assert(bus->state == BUS_HELLO);
241 r = sd_bus_message_read(reply, "s", &s);
245 if (!service_name_is_valid(s) || s[0] != ':')
248 bus->unique_name = strdup(s);
249 if (!bus->unique_name)
252 bus->state = BUS_RUNNING;
257 static int bus_send_hello(sd_bus *bus) {
258 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
263 if (!bus->bus_client || bus->is_kernel)
266 r = sd_bus_message_new_method_call(
268 "org.freedesktop.DBus",
270 "org.freedesktop.DBus",
276 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
279 int bus_start_running(sd_bus *bus) {
282 if (bus->bus_client) {
283 bus->state = BUS_HELLO;
287 bus->state = BUS_RUNNING;
291 static int parse_address_key(const char **p, const char *key, char **value) {
302 if (strncmp(*p, key, l) != 0)
315 while (*a != ';' && *a != ',' && *a != 0) {
333 c = (char) ((x << 4) | y);
340 t = realloc(r, n + 2);
368 static void skip_address_key(const char **p) {
372 *p += strcspn(*p, ",");
378 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
379 _cleanup_free_ char *path = NULL, *abstract = NULL;
388 while (**p != 0 && **p != ';') {
389 r = parse_address_key(p, "guid", guid);
395 r = parse_address_key(p, "path", &path);
401 r = parse_address_key(p, "abstract", &abstract);
410 if (!path && !abstract)
413 if (path && abstract)
418 if (l > sizeof(b->sockaddr.un.sun_path))
421 b->sockaddr.un.sun_family = AF_UNIX;
422 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
423 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
424 } else if (abstract) {
425 l = strlen(abstract);
426 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
429 b->sockaddr.un.sun_family = AF_UNIX;
430 b->sockaddr.un.sun_path[0] = 0;
431 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
432 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
438 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
439 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
441 struct addrinfo *result, hints = {
442 .ai_socktype = SOCK_STREAM,
443 .ai_flags = AI_ADDRCONFIG,
451 while (**p != 0 && **p != ';') {
452 r = parse_address_key(p, "guid", guid);
458 r = parse_address_key(p, "host", &host);
464 r = parse_address_key(p, "port", &port);
470 r = parse_address_key(p, "family", &family);
483 if (streq(family, "ipv4"))
484 hints.ai_family = AF_INET;
485 else if (streq(family, "ipv6"))
486 hints.ai_family = AF_INET6;
491 r = getaddrinfo(host, port, &hints, &result);
495 return -EADDRNOTAVAIL;
497 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
498 b->sockaddr_size = result->ai_addrlen;
500 freeaddrinfo(result);
505 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
507 unsigned n_argv = 0, j;
516 while (**p != 0 && **p != ';') {
517 r = parse_address_key(p, "guid", guid);
523 r = parse_address_key(p, "path", &path);
529 if (startswith(*p, "argv")) {
533 ul = strtoul(*p + 4, (char**) p, 10);
534 if (errno > 0 || **p != '=' || ul > 256) {
544 x = realloc(argv, sizeof(char*) * (ul + 2));
550 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
556 r = parse_address_key(p, NULL, argv + ul);
571 /* Make sure there are no holes in the array, with the
572 * exception of argv[0] */
573 for (j = 1; j < n_argv; j++)
579 if (argv && argv[0] == NULL) {
580 argv[0] = strdup(path);
592 for (j = 0; j < n_argv; j++)
600 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
601 _cleanup_free_ char *path = NULL;
609 while (**p != 0 && **p != ';') {
610 r = parse_address_key(p, "guid", guid);
616 r = parse_address_key(p, "path", &path);
635 static void bus_reset_parsed_address(sd_bus *b) {
639 b->sockaddr_size = 0;
640 strv_free(b->exec_argv);
644 b->server_id = SD_ID128_NULL;
649 static int bus_parse_next_address(sd_bus *b) {
650 _cleanup_free_ char *guid = NULL;
658 if (b->address[b->address_index] == 0)
661 bus_reset_parsed_address(b);
663 a = b->address + b->address_index;
672 if (startswith(a, "unix:")) {
675 r = parse_unix_address(b, &a, &guid);
680 } else if (startswith(a, "tcp:")) {
683 r = parse_tcp_address(b, &a, &guid);
689 } else if (startswith(a, "unixexec:")) {
692 r = parse_exec_address(b, &a, &guid);
698 } else if (startswith(a, "kernel:")) {
701 r = parse_kernel_address(b, &a, &guid);
714 r = sd_id128_from_string(guid, &b->server_id);
719 b->address_index = a - b->address;
723 static int bus_start_address(sd_bus *b) {
731 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
733 r = bus_socket_connect(b);
737 b->last_connect_error = -r;
739 } else if (b->exec_path) {
741 r = bus_socket_exec(b);
745 b->last_connect_error = -r;
746 } else if (b->kernel) {
748 r = bus_kernel_connect(b);
752 b->last_connect_error = -r;
755 r = bus_parse_next_address(b);
759 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
763 int bus_next_address(sd_bus *b) {
766 bus_reset_parsed_address(b);
767 return bus_start_address(b);
770 static int bus_start_fd(sd_bus *b) {
775 assert(b->input_fd >= 0);
776 assert(b->output_fd >= 0);
778 r = fd_nonblock(b->input_fd, true);
782 r = fd_cloexec(b->input_fd, true);
786 if (b->input_fd != b->output_fd) {
787 r = fd_nonblock(b->output_fd, true);
791 r = fd_cloexec(b->output_fd, true);
796 if (fstat(b->input_fd, &st) < 0)
799 if (S_ISCHR(b->input_fd))
800 return bus_kernel_take_fd(b);
802 return bus_socket_take_fd(b);
805 int sd_bus_start(sd_bus *bus) {
810 if (bus->state != BUS_UNSET)
813 bus->state = BUS_OPENING;
815 if (bus->is_server && bus->bus_client)
818 if (bus->input_fd >= 0)
819 r = bus_start_fd(bus);
820 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel)
821 r = bus_start_address(bus);
828 return bus_send_hello(bus);
831 int sd_bus_open_system(sd_bus **ret) {
843 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
845 r = sd_bus_set_address(b, e);
849 b->sockaddr.un.sun_family = AF_UNIX;
850 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
851 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
854 b->bus_client = true;
868 int sd_bus_open_user(sd_bus **ret) {
881 e = getenv("DBUS_SESSION_BUS_ADDRESS");
883 r = sd_bus_set_address(b, e);
887 e = getenv("XDG_RUNTIME_DIR");
894 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
899 b->sockaddr.un.sun_family = AF_UNIX;
900 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
901 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
904 b->bus_client = true;
918 void sd_bus_close(sd_bus *bus) {
922 if (bus->input_fd >= 0)
923 close_nointr_nofail(bus->input_fd);
924 if (bus->output_fd >= 0 && bus->output_fd != bus->input_fd)
925 close_nointr_nofail(bus->output_fd);
927 bus->input_fd = bus->output_fd = -1;
930 sd_bus *sd_bus_ref(sd_bus *bus) {
934 assert(bus->n_ref > 0);
940 sd_bus *sd_bus_unref(sd_bus *bus) {
944 assert(bus->n_ref > 0);
953 int sd_bus_is_open(sd_bus *bus) {
957 return bus->state != BUS_UNSET && bus->input_fd >= 0;
960 int sd_bus_can_send(sd_bus *bus, char type) {
965 if (bus->output_fd < 0)
968 if (type == SD_BUS_TYPE_UNIX_FD) {
969 if (!bus->negotiate_fds)
972 r = bus_ensure_running(bus);
979 return bus_type_is_valid(type);
982 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
990 r = bus_ensure_running(bus);
994 *server_id = bus->server_id;
998 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1001 if (m->header->version > b->message_version)
1007 return bus_message_seal(m, ++b->serial);
1010 static int dispatch_wqueue(sd_bus *bus) {
1014 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1016 if (bus->output_fd < 0)
1019 while (bus->wqueue_size > 0) {
1022 r = bus_kernel_write_message(bus, bus->wqueue[0]);
1024 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1030 /* Didn't do anything this time */
1032 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1033 /* Fully written. Let's drop the entry from
1036 * This isn't particularly optimized, but
1037 * well, this is supposed to be our worst-case
1038 * buffer only, and the socket buffer is
1039 * supposed to be our primary buffer, and if
1040 * it got full, then all bets are off
1043 sd_bus_message_unref(bus->wqueue[0]);
1044 bus->wqueue_size --;
1045 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1055 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1056 sd_bus_message *z = NULL;
1061 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1063 if (bus->input_fd < 0)
1066 if (bus->rqueue_size > 0) {
1067 /* Dispatch a queued message */
1069 *m = bus->rqueue[0];
1070 bus->rqueue_size --;
1071 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1075 /* Try to read a new message */
1078 r = bus_kernel_read_message(bus, &z);
1080 r = bus_socket_read_message(bus, &z);
1096 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1101 if (bus->state == BUS_UNSET)
1103 if (bus->output_fd < 0)
1109 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1116 /* If the serial number isn't kept, then we know that no reply
1118 if (!serial && !m->sealed)
1119 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1121 r = bus_seal_message(bus, m);
1125 /* If this is a reply and no reply was requested, then let's
1126 * suppress this, if we can */
1127 if (m->dont_send && !serial)
1130 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1134 r = bus_kernel_write_message(bus, m);
1136 r = bus_socket_write_message(bus, m, &idx);
1141 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1142 /* Wasn't fully written. So let's remember how
1143 * much was written. Note that the first entry
1144 * of the wqueue array is always allocated so
1145 * that we always can remember how much was
1147 bus->wqueue[0] = sd_bus_message_ref(m);
1148 bus->wqueue_size = 1;
1154 /* Just append it to the queue. */
1156 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1159 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1164 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1168 *serial = BUS_MESSAGE_SERIAL(m);
1173 static usec_t calc_elapse(uint64_t usec) {
1174 if (usec == (uint64_t) -1)
1178 usec = BUS_DEFAULT_TIMEOUT;
1180 return now(CLOCK_MONOTONIC) + usec;
1183 static int timeout_compare(const void *a, const void *b) {
1184 const struct reply_callback *x = a, *y = b;
1186 if (x->timeout != 0 && y->timeout == 0)
1189 if (x->timeout == 0 && y->timeout != 0)
1192 if (x->timeout < y->timeout)
1195 if (x->timeout > y->timeout)
1201 int sd_bus_send_with_reply(
1204 sd_bus_message_handler_t callback,
1209 struct reply_callback *c;
1214 if (bus->state == BUS_UNSET)
1216 if (bus->output_fd < 0)
1222 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1224 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1227 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1231 if (usec != (uint64_t) -1) {
1232 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1237 r = bus_seal_message(bus, m);
1241 c = new0(struct reply_callback, 1);
1245 c->callback = callback;
1246 c->userdata = userdata;
1247 c->serial = BUS_MESSAGE_SERIAL(m);
1248 c->timeout = calc_elapse(usec);
1250 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1256 if (c->timeout != 0) {
1257 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1260 sd_bus_send_with_reply_cancel(bus, c->serial);
1265 r = sd_bus_send(bus, m, serial);
1267 sd_bus_send_with_reply_cancel(bus, c->serial);
1274 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1275 struct reply_callback *c;
1282 c = hashmap_remove(bus->reply_callbacks, &serial);
1286 if (c->timeout != 0)
1287 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1293 int bus_ensure_running(sd_bus *bus) {
1298 if (bus->input_fd < 0)
1300 if (bus->state == BUS_UNSET)
1303 if (bus->state == BUS_RUNNING)
1307 r = sd_bus_process(bus, NULL);
1310 if (bus->state == BUS_RUNNING)
1315 r = sd_bus_wait(bus, (uint64_t) -1);
1321 int sd_bus_send_with_reply_and_block(
1325 sd_bus_error *error,
1326 sd_bus_message **reply) {
1335 if (bus->output_fd < 0)
1337 if (bus->state == BUS_UNSET)
1341 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1343 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1345 if (bus_error_is_dirty(error))
1348 r = bus_ensure_running(bus);
1352 r = sd_bus_send(bus, m, &serial);
1356 timeout = calc_elapse(usec);
1360 sd_bus_message *incoming = NULL;
1365 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1368 /* Make sure there's room for queuing this
1369 * locally, before we read the message */
1371 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1380 r = bus_kernel_read_message(bus, &incoming);
1382 r = bus_socket_read_message(bus, &incoming);
1387 if (incoming->reply_serial == serial) {
1388 /* Found a match! */
1390 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1395 sd_bus_message_unref(incoming);
1400 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1403 r = sd_bus_error_copy(error, &incoming->error);
1405 sd_bus_message_unref(incoming);
1409 k = bus_error_to_errno(&incoming->error);
1410 sd_bus_message_unref(incoming);
1414 sd_bus_message_unref(incoming);
1418 /* There's already guaranteed to be room for
1419 * this, so need to resize things here */
1420 bus->rqueue[bus->rqueue_size ++] = incoming;
1423 /* Try to read more, right-away */
1432 n = now(CLOCK_MONOTONIC);
1438 left = (uint64_t) -1;
1440 r = bus_poll(bus, true, left);
1444 r = dispatch_wqueue(bus);
1450 int sd_bus_get_fd(sd_bus *bus) {
1453 if (bus->input_fd < 0)
1455 if (bus->input_fd != bus->output_fd)
1458 return bus->input_fd;
1461 int sd_bus_get_events(sd_bus *bus) {
1466 if (bus->state == BUS_UNSET)
1468 if (bus->input_fd < 0)
1471 if (bus->state == BUS_OPENING)
1473 else if (bus->state == BUS_AUTHENTICATING) {
1475 if (bus_socket_auth_needs_write(bus))
1480 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1481 if (bus->rqueue_size <= 0)
1483 if (bus->wqueue_size > 0)
1490 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1491 struct reply_callback *c;
1497 if (bus->state == BUS_UNSET)
1499 if (bus->input_fd < 0)
1502 if (bus->state == BUS_AUTHENTICATING) {
1503 *timeout_usec = bus->auth_timeout;
1507 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1508 *timeout_usec = (uint64_t) -1;
1512 c = prioq_peek(bus->reply_callbacks_prioq);
1514 *timeout_usec = (uint64_t) -1;
1518 *timeout_usec = c->timeout;
1522 static int process_timeout(sd_bus *bus) {
1523 struct reply_callback *c;
1529 c = prioq_peek(bus->reply_callbacks_prioq);
1533 n = now(CLOCK_MONOTONIC);
1537 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1538 hashmap_remove(bus->reply_callbacks, &c->serial);
1540 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1543 return r < 0 ? r : 1;
1546 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1550 if (bus->state != BUS_HELLO)
1553 /* Let's make sure the first message on the bus is the HELLO
1554 * reply. But note that we don't actually parse the message
1555 * here (we leave that to the usual handling), we just verify
1556 * we don't let any earlier msg through. */
1558 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1559 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1562 if (m->reply_serial != bus->hello_serial)
1568 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1569 struct reply_callback *c;
1575 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1576 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1579 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1583 if (c->timeout != 0)
1584 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1586 r = c->callback(bus, 0, m, c->userdata);
1592 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1593 struct filter_callback *l;
1600 bus->filter_callbacks_modified = false;
1602 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1604 if (bus->filter_callbacks_modified)
1607 /* Don't run this more than once per iteration */
1608 if (l->last_iteration == bus->iteration_counter)
1611 l->last_iteration = bus->iteration_counter;
1613 r = l->callback(bus, 0, m, l->userdata);
1619 } while (bus->filter_callbacks_modified);
1624 static int process_match(sd_bus *bus, sd_bus_message *m) {
1631 bus->match_callbacks_modified = false;
1633 r = bus_match_run(bus, &bus->match_callbacks, 0, m);
1637 } while (bus->match_callbacks_modified);
1642 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1643 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1649 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1652 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1655 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1658 if (streq_ptr(m->member, "Ping"))
1659 r = sd_bus_message_new_method_return(bus, m, &reply);
1660 else if (streq_ptr(m->member, "GetMachineId")) {
1664 r = sd_id128_get_machine(&id);
1668 r = sd_bus_message_new_method_return(bus, m, &reply);
1672 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1674 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1676 sd_bus_error_set(&error,
1677 "org.freedesktop.DBus.Error.UnknownMethod",
1678 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1680 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1686 r = sd_bus_send(bus, reply, NULL);
1693 static int process_object(sd_bus *bus, sd_bus_message *m) {
1694 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1695 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1696 struct object_callback *c;
1704 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1707 if (hashmap_isempty(bus->object_callbacks))
1710 pl = strlen(m->path);
1715 bus->object_callbacks_modified = false;
1717 c = hashmap_get(bus->object_callbacks, m->path);
1718 if (c && c->last_iteration != bus->iteration_counter) {
1720 c->last_iteration = bus->iteration_counter;
1722 r = c->callback(bus, 0, m, c->userdata);
1729 /* Look for fallback prefixes */
1734 if (bus->object_callbacks_modified)
1737 e = strrchr(p, '/');
1743 c = hashmap_get(bus->object_callbacks, p);
1744 if (c && c->last_iteration != bus->iteration_counter && c->is_fallback) {
1746 c->last_iteration = bus->iteration_counter;
1748 r = c->callback(bus, 0, m, c->userdata);
1756 } while (bus->object_callbacks_modified);
1758 /* We found some handlers but none wanted to take this, then
1759 * return this -- with one exception, we can handle
1760 * introspection minimally ourselves */
1761 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1764 sd_bus_error_set(&error,
1765 "org.freedesktop.DBus.Error.UnknownMethod",
1766 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1768 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1772 r = sd_bus_send(bus, reply, NULL);
1779 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1780 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1781 _cleanup_free_ char *introspection = NULL;
1782 _cleanup_set_free_free_ Set *s = NULL;
1783 _cleanup_fclose_ FILE *f = NULL;
1784 struct object_callback *c;
1793 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1799 s = set_new(string_hash_func, string_compare_func);
1803 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1807 if (streq(c->path, "/"))
1810 if (streq(m->path, "/"))
1813 e = startswith(c->path, m->path);
1814 if (!e || *e != '/')
1835 f = open_memstream(&introspection, &size);
1839 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1840 fputs("<node>\n", f);
1841 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1842 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1844 while ((node = set_steal_first(s))) {
1845 fprintf(f, " <node name=\"%s\"/>\n", node);
1849 fputs("</node>\n", f);
1856 r = sd_bus_message_new_method_return(bus, m, &reply);
1860 r = sd_bus_message_append(reply, "s", introspection);
1864 r = sd_bus_send(bus, reply, NULL);
1871 static int process_message(sd_bus *bus, sd_bus_message *m) {
1877 bus->iteration_counter++;
1879 r = process_hello(bus, m);
1883 r = process_reply(bus, m);
1887 r = process_filter(bus, m);
1891 r = process_match(bus, m);
1895 r = process_builtin(bus, m);
1899 r = process_object(bus, m);
1903 return process_introspect(bus, m);
1906 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1907 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1911 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1913 r = process_timeout(bus);
1917 r = dispatch_wqueue(bus);
1921 r = dispatch_rqueue(bus, &m);
1927 r = process_message(bus, m);
1937 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1938 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1939 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1941 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1943 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1947 r = sd_bus_send(bus, reply, NULL);
1961 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1964 /* Returns 0 when we didn't do anything. This should cause the
1965 * caller to invoke sd_bus_wait() before returning the next
1966 * time. Returns > 0 when we did something, which possibly
1967 * means *ret is filled in with an unprocessed message. */
1971 if (bus->input_fd < 0)
1974 /* We don't allow recursively invoking sd_bus_process(). */
1975 if (bus->processing)
1978 switch (bus->state) {
1984 r = bus_socket_process_opening(bus);
1991 case BUS_AUTHENTICATING:
1993 r = bus_socket_process_authenticating(bus);
2003 bus->processing = true;
2004 r = process_running(bus, ret);
2005 bus->processing = false;
2010 assert_not_reached("Unknown state");
2013 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2014 struct pollfd p[2] = {};
2021 if (bus->input_fd < 0)
2024 e = sd_bus_get_events(bus);
2031 r = sd_bus_get_timeout(bus, &until);
2038 nw = now(CLOCK_MONOTONIC);
2039 m = until > nw ? until - nw : 0;
2042 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2045 p[0].fd = bus->input_fd;
2046 if (bus->output_fd == bus->input_fd) {
2050 p[0].events = e & POLLIN;
2051 p[1].fd = bus->output_fd;
2052 p[1].events = e & POLLOUT;
2056 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2060 return r > 0 ? 1 : 0;
2063 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2067 if (bus->state == BUS_UNSET)
2069 if (bus->input_fd < 0)
2071 if (bus->rqueue_size > 0)
2074 return bus_poll(bus, false, timeout_usec);
2077 int sd_bus_flush(sd_bus *bus) {
2082 if (bus->state == BUS_UNSET)
2084 if (bus->output_fd < 0)
2087 r = bus_ensure_running(bus);
2091 if (bus->wqueue_size <= 0)
2095 r = dispatch_wqueue(bus);
2099 if (bus->wqueue_size <= 0)
2102 r = bus_poll(bus, false, (uint64_t) -1);
2108 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2109 struct filter_callback *f;
2116 f = new0(struct filter_callback, 1);
2119 f->callback = callback;
2120 f->userdata = userdata;
2122 bus->filter_callbacks_modified = true;
2123 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2127 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2128 struct filter_callback *f;
2135 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2136 if (f->callback == callback && f->userdata == userdata) {
2137 bus->filter_callbacks_modified = true;
2138 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2147 static int bus_add_object(
2151 sd_bus_message_handler_t callback,
2154 struct object_callback *c;
2164 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2168 c = new0(struct object_callback, 1);
2172 c->path = strdup(path);
2178 c->callback = callback;
2179 c->userdata = userdata;
2180 c->is_fallback = fallback;
2182 bus->object_callbacks_modified = true;
2183 r = hashmap_put(bus->object_callbacks, c->path, c);
2193 static int bus_remove_object(
2197 sd_bus_message_handler_t callback,
2200 struct object_callback *c;
2209 c = hashmap_get(bus->object_callbacks, path);
2213 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2216 bus->object_callbacks_modified = true;
2217 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2225 int sd_bus_add_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2226 return bus_add_object(bus, false, path, callback, userdata);
2229 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2230 return bus_remove_object(bus, false, path, callback, userdata);
2233 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2234 return bus_add_object(bus, true, prefix, callback, userdata);
2237 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2238 return bus_remove_object(bus, true, prefix, callback, userdata);
2241 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2249 if (bus->bus_client) {
2250 r = bus_add_match_internal(bus, match);
2256 bus->match_callbacks_modified = true;
2257 r = bus_match_add(&bus->match_callbacks, match, callback, userdata, NULL);
2260 if (bus->bus_client)
2261 bus_remove_match_internal(bus, match);
2268 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2276 if (bus->bus_client)
2277 r = bus_remove_match_internal(bus, match);
2280 bus->match_callbacks_modified = true;
2281 q = bus_match_remove(&bus->match_callbacks, match, callback, userdata);
2289 int sd_bus_emit_signal(
2292 const char *interface,
2294 const char *types, ...) {
2296 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2303 r = sd_bus_message_new_signal(bus, path, interface, member, &m);
2307 va_start(ap, types);
2308 r = bus_message_append_ap(m, types, ap);
2313 return sd_bus_send(bus, m, NULL);
2316 int sd_bus_call_method(
2318 const char *destination,
2320 const char *interface,
2322 sd_bus_error *error,
2323 sd_bus_message **reply,
2324 const char *types, ...) {
2326 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2333 r = sd_bus_message_new_method_call(bus, destination, path, interface, member, &m);
2337 va_start(ap, types);
2338 r = bus_message_append_ap(m, types, ap);
2343 return sd_bus_send_with_reply_and_block(bus, m, 0, error, reply);
2346 int sd_bus_reply_method_return(
2348 sd_bus_message *call,
2349 const char *types, ...) {
2351 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2361 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2364 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2367 r = sd_bus_message_new_method_return(bus, call, &m);
2371 va_start(ap, types);
2372 r = bus_message_append_ap(m, types, ap);
2377 return sd_bus_send(bus, m, NULL);
2380 int sd_bus_reply_method_error(
2382 sd_bus_message *call,
2383 const sd_bus_error *e) {
2385 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2394 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2396 if (!sd_bus_error_is_set(e))
2399 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2402 r = sd_bus_message_new_method_error(bus, call, e, &m);
2406 return sd_bus_send(bus, m, NULL);