1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
36 #include "bus-internal.h"
37 #include "bus-message.h"
39 #include "bus-socket.h"
40 #include "bus-kernel.h"
41 #include "bus-control.h"
43 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
45 static void bus_free(sd_bus *b) {
46 struct filter_callback *f;
47 struct object_callback *c;
61 strv_free(b->exec_argv);
63 close_many(b->fds, b->n_fds);
66 for (i = 0; i < b->rqueue_size; i++)
67 sd_bus_message_unref(b->rqueue[i]);
70 for (i = 0; i < b->wqueue_size; i++)
71 sd_bus_message_unref(b->wqueue[i]);
74 hashmap_free_free(b->reply_callbacks);
75 prioq_free(b->reply_callbacks_prioq);
77 while ((f = b->filter_callbacks)) {
78 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
82 while ((c = hashmap_steal_first(b->object_callbacks))) {
87 hashmap_free(b->object_callbacks);
89 bus_match_free(&b->match_callbacks);
94 int sd_bus_new(sd_bus **ret) {
105 r->input_fd = r->output_fd = -1;
106 r->message_version = 1;
107 r->negotiate_fds = true;
109 /* We guarantee that wqueue always has space for at least one
111 r->wqueue = new(sd_bus_message*, 1);
121 int sd_bus_set_address(sd_bus *bus, const char *address) {
126 if (bus->state != BUS_UNSET)
141 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
144 if (bus->state != BUS_UNSET)
151 bus->input_fd = input_fd;
152 bus->output_fd = output_fd;
156 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
161 if (bus->state != BUS_UNSET)
165 if (strv_isempty(argv))
178 free(bus->exec_path);
179 strv_free(bus->exec_argv);
187 int sd_bus_set_bus_client(sd_bus *bus, int b) {
190 if (bus->state != BUS_UNSET)
193 bus->bus_client = !!b;
197 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
200 if (bus->state != BUS_UNSET)
203 bus->negotiate_fds = !!b;
207 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
210 if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
212 if (bus->state != BUS_UNSET)
215 bus->is_server = !!b;
216 bus->server_id = server_id;
220 int sd_bus_set_anonymous(sd_bus *bus, int b) {
223 if (bus->state != BUS_UNSET)
226 bus->anonymous_auth = !!b;
230 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
235 assert(bus->state == BUS_HELLO);
242 r = sd_bus_message_read(reply, "s", &s);
246 if (!service_name_is_valid(s) || s[0] != ':')
249 bus->unique_name = strdup(s);
250 if (!bus->unique_name)
253 bus->state = BUS_RUNNING;
258 static int bus_send_hello(sd_bus *bus) {
259 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
264 if (!bus->bus_client || bus->is_kernel)
267 r = sd_bus_message_new_method_call(
269 "org.freedesktop.DBus",
271 "org.freedesktop.DBus",
277 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
280 int bus_start_running(sd_bus *bus) {
283 if (bus->bus_client && !bus->is_kernel) {
284 bus->state = BUS_HELLO;
288 bus->state = BUS_RUNNING;
292 static int parse_address_key(const char **p, const char *key, char **value) {
303 if (strncmp(*p, key, l) != 0)
316 while (*a != ';' && *a != ',' && *a != 0) {
334 c = (char) ((x << 4) | y);
341 t = realloc(r, n + 2);
369 static void skip_address_key(const char **p) {
373 *p += strcspn(*p, ",");
379 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
380 _cleanup_free_ char *path = NULL, *abstract = NULL;
389 while (**p != 0 && **p != ';') {
390 r = parse_address_key(p, "guid", guid);
396 r = parse_address_key(p, "path", &path);
402 r = parse_address_key(p, "abstract", &abstract);
411 if (!path && !abstract)
414 if (path && abstract)
419 if (l > sizeof(b->sockaddr.un.sun_path))
422 b->sockaddr.un.sun_family = AF_UNIX;
423 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
424 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
425 } else if (abstract) {
426 l = strlen(abstract);
427 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
430 b->sockaddr.un.sun_family = AF_UNIX;
431 b->sockaddr.un.sun_path[0] = 0;
432 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
433 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
439 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
440 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
442 struct addrinfo *result, hints = {
443 .ai_socktype = SOCK_STREAM,
444 .ai_flags = AI_ADDRCONFIG,
452 while (**p != 0 && **p != ';') {
453 r = parse_address_key(p, "guid", guid);
459 r = parse_address_key(p, "host", &host);
465 r = parse_address_key(p, "port", &port);
471 r = parse_address_key(p, "family", &family);
484 if (streq(family, "ipv4"))
485 hints.ai_family = AF_INET;
486 else if (streq(family, "ipv6"))
487 hints.ai_family = AF_INET6;
492 r = getaddrinfo(host, port, &hints, &result);
496 return -EADDRNOTAVAIL;
498 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
499 b->sockaddr_size = result->ai_addrlen;
501 freeaddrinfo(result);
506 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
508 unsigned n_argv = 0, j;
517 while (**p != 0 && **p != ';') {
518 r = parse_address_key(p, "guid", guid);
524 r = parse_address_key(p, "path", &path);
530 if (startswith(*p, "argv")) {
534 ul = strtoul(*p + 4, (char**) p, 10);
535 if (errno > 0 || **p != '=' || ul > 256) {
545 x = realloc(argv, sizeof(char*) * (ul + 2));
551 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
557 r = parse_address_key(p, NULL, argv + ul);
572 /* Make sure there are no holes in the array, with the
573 * exception of argv[0] */
574 for (j = 1; j < n_argv; j++)
580 if (argv && argv[0] == NULL) {
581 argv[0] = strdup(path);
593 for (j = 0; j < n_argv; j++)
601 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
602 _cleanup_free_ char *path = NULL;
610 while (**p != 0 && **p != ';') {
611 r = parse_address_key(p, "guid", guid);
617 r = parse_address_key(p, "path", &path);
636 static void bus_reset_parsed_address(sd_bus *b) {
640 b->sockaddr_size = 0;
641 strv_free(b->exec_argv);
645 b->server_id = SD_ID128_NULL;
650 static int bus_parse_next_address(sd_bus *b) {
651 _cleanup_free_ char *guid = NULL;
659 if (b->address[b->address_index] == 0)
662 bus_reset_parsed_address(b);
664 a = b->address + b->address_index;
673 if (startswith(a, "unix:")) {
676 r = parse_unix_address(b, &a, &guid);
681 } else if (startswith(a, "tcp:")) {
684 r = parse_tcp_address(b, &a, &guid);
690 } else if (startswith(a, "unixexec:")) {
693 r = parse_exec_address(b, &a, &guid);
699 } else if (startswith(a, "kernel:")) {
702 r = parse_kernel_address(b, &a, &guid);
715 r = sd_id128_from_string(guid, &b->server_id);
720 b->address_index = a - b->address;
724 static int bus_start_address(sd_bus *b) {
732 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
734 r = bus_socket_connect(b);
738 b->last_connect_error = -r;
740 } else if (b->exec_path) {
742 r = bus_socket_exec(b);
746 b->last_connect_error = -r;
747 } else if (b->kernel) {
749 r = bus_kernel_connect(b);
753 b->last_connect_error = -r;
756 r = bus_parse_next_address(b);
760 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
764 int bus_next_address(sd_bus *b) {
767 bus_reset_parsed_address(b);
768 return bus_start_address(b);
771 static int bus_start_fd(sd_bus *b) {
776 assert(b->input_fd >= 0);
777 assert(b->output_fd >= 0);
779 r = fd_nonblock(b->input_fd, true);
783 r = fd_cloexec(b->input_fd, true);
787 if (b->input_fd != b->output_fd) {
788 r = fd_nonblock(b->output_fd, true);
792 r = fd_cloexec(b->output_fd, true);
797 if (fstat(b->input_fd, &st) < 0)
800 if (S_ISCHR(b->input_fd))
801 return bus_kernel_take_fd(b);
803 return bus_socket_take_fd(b);
806 int sd_bus_start(sd_bus *bus) {
811 if (bus->state != BUS_UNSET)
814 bus->state = BUS_OPENING;
816 if (bus->is_server && bus->bus_client)
819 if (bus->input_fd >= 0)
820 r = bus_start_fd(bus);
821 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel)
822 r = bus_start_address(bus);
829 return bus_send_hello(bus);
832 int sd_bus_open_system(sd_bus **ret) {
844 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
846 r = sd_bus_set_address(b, e);
850 b->sockaddr.un.sun_family = AF_UNIX;
851 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
852 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
855 b->bus_client = true;
869 int sd_bus_open_user(sd_bus **ret) {
882 e = getenv("DBUS_SESSION_BUS_ADDRESS");
884 r = sd_bus_set_address(b, e);
888 e = getenv("XDG_RUNTIME_DIR");
895 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
900 b->sockaddr.un.sun_family = AF_UNIX;
901 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
902 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
905 b->bus_client = true;
919 void sd_bus_close(sd_bus *bus) {
923 if (bus->input_fd >= 0)
924 close_nointr_nofail(bus->input_fd);
925 if (bus->output_fd >= 0 && bus->output_fd != bus->input_fd)
926 close_nointr_nofail(bus->output_fd);
928 bus->input_fd = bus->output_fd = -1;
931 sd_bus *sd_bus_ref(sd_bus *bus) {
935 assert(bus->n_ref > 0);
941 sd_bus *sd_bus_unref(sd_bus *bus) {
945 assert(bus->n_ref > 0);
954 int sd_bus_is_open(sd_bus *bus) {
958 return bus->state != BUS_UNSET && bus->input_fd >= 0;
961 int sd_bus_can_send(sd_bus *bus, char type) {
966 if (bus->output_fd < 0)
969 if (type == SD_BUS_TYPE_UNIX_FD) {
970 if (!bus->negotiate_fds)
973 r = bus_ensure_running(bus);
980 return bus_type_is_valid(type);
983 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
991 r = bus_ensure_running(bus);
995 *server_id = bus->server_id;
999 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1002 if (m->header->version > b->message_version)
1008 return bus_message_seal(m, ++b->serial);
1011 static int dispatch_wqueue(sd_bus *bus) {
1015 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1017 if (bus->output_fd < 0)
1020 while (bus->wqueue_size > 0) {
1023 r = bus_kernel_write_message(bus, bus->wqueue[0]);
1025 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1031 /* Didn't do anything this time */
1033 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1034 /* Fully written. Let's drop the entry from
1037 * This isn't particularly optimized, but
1038 * well, this is supposed to be our worst-case
1039 * buffer only, and the socket buffer is
1040 * supposed to be our primary buffer, and if
1041 * it got full, then all bets are off
1044 sd_bus_message_unref(bus->wqueue[0]);
1045 bus->wqueue_size --;
1046 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1056 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1057 sd_bus_message *z = NULL;
1062 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1064 if (bus->input_fd < 0)
1067 if (bus->rqueue_size > 0) {
1068 /* Dispatch a queued message */
1070 *m = bus->rqueue[0];
1071 bus->rqueue_size --;
1072 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1076 /* Try to read a new message */
1079 r = bus_kernel_read_message(bus, &z);
1081 r = bus_socket_read_message(bus, &z);
1097 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1102 if (bus->state == BUS_UNSET)
1104 if (bus->output_fd < 0)
1110 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1117 /* If the serial number isn't kept, then we know that no reply
1119 if (!serial && !m->sealed)
1120 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1122 r = bus_seal_message(bus, m);
1126 /* If this is a reply and no reply was requested, then let's
1127 * suppress this, if we can */
1128 if (m->dont_send && !serial)
1131 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1135 r = bus_kernel_write_message(bus, m);
1137 r = bus_socket_write_message(bus, m, &idx);
1142 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1143 /* Wasn't fully written. So let's remember how
1144 * much was written. Note that the first entry
1145 * of the wqueue array is always allocated so
1146 * that we always can remember how much was
1148 bus->wqueue[0] = sd_bus_message_ref(m);
1149 bus->wqueue_size = 1;
1155 /* Just append it to the queue. */
1157 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1160 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1165 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1169 *serial = BUS_MESSAGE_SERIAL(m);
1174 static usec_t calc_elapse(uint64_t usec) {
1175 if (usec == (uint64_t) -1)
1179 usec = BUS_DEFAULT_TIMEOUT;
1181 return now(CLOCK_MONOTONIC) + usec;
1184 static int timeout_compare(const void *a, const void *b) {
1185 const struct reply_callback *x = a, *y = b;
1187 if (x->timeout != 0 && y->timeout == 0)
1190 if (x->timeout == 0 && y->timeout != 0)
1193 if (x->timeout < y->timeout)
1196 if (x->timeout > y->timeout)
1202 int sd_bus_send_with_reply(
1205 sd_bus_message_handler_t callback,
1210 struct reply_callback *c;
1215 if (bus->state == BUS_UNSET)
1217 if (bus->output_fd < 0)
1223 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1225 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1228 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1232 if (usec != (uint64_t) -1) {
1233 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1238 r = bus_seal_message(bus, m);
1242 c = new0(struct reply_callback, 1);
1246 c->callback = callback;
1247 c->userdata = userdata;
1248 c->serial = BUS_MESSAGE_SERIAL(m);
1249 c->timeout = calc_elapse(usec);
1251 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1257 if (c->timeout != 0) {
1258 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1261 sd_bus_send_with_reply_cancel(bus, c->serial);
1266 r = sd_bus_send(bus, m, serial);
1268 sd_bus_send_with_reply_cancel(bus, c->serial);
1275 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1276 struct reply_callback *c;
1283 c = hashmap_remove(bus->reply_callbacks, &serial);
1287 if (c->timeout != 0)
1288 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1294 int bus_ensure_running(sd_bus *bus) {
1299 if (bus->input_fd < 0)
1301 if (bus->state == BUS_UNSET)
1304 if (bus->state == BUS_RUNNING)
1308 r = sd_bus_process(bus, NULL);
1311 if (bus->state == BUS_RUNNING)
1316 r = sd_bus_wait(bus, (uint64_t) -1);
1322 int sd_bus_send_with_reply_and_block(
1326 sd_bus_error *error,
1327 sd_bus_message **reply) {
1336 if (bus->output_fd < 0)
1338 if (bus->state == BUS_UNSET)
1342 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1344 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1346 if (bus_error_is_dirty(error))
1349 r = bus_ensure_running(bus);
1353 r = sd_bus_send(bus, m, &serial);
1357 timeout = calc_elapse(usec);
1361 sd_bus_message *incoming = NULL;
1366 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1369 /* Make sure there's room for queuing this
1370 * locally, before we read the message */
1372 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1381 r = bus_kernel_read_message(bus, &incoming);
1383 r = bus_socket_read_message(bus, &incoming);
1388 if (incoming->reply_serial == serial) {
1389 /* Found a match! */
1391 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1396 sd_bus_message_unref(incoming);
1401 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1404 r = sd_bus_error_copy(error, &incoming->error);
1406 sd_bus_message_unref(incoming);
1410 k = bus_error_to_errno(&incoming->error);
1411 sd_bus_message_unref(incoming);
1415 sd_bus_message_unref(incoming);
1419 /* There's already guaranteed to be room for
1420 * this, so need to resize things here */
1421 bus->rqueue[bus->rqueue_size ++] = incoming;
1424 /* Try to read more, right-away */
1433 n = now(CLOCK_MONOTONIC);
1439 left = (uint64_t) -1;
1441 r = bus_poll(bus, true, left);
1445 r = dispatch_wqueue(bus);
1451 int sd_bus_get_fd(sd_bus *bus) {
1454 if (bus->input_fd < 0)
1456 if (bus->input_fd != bus->output_fd)
1459 return bus->input_fd;
1462 int sd_bus_get_events(sd_bus *bus) {
1467 if (bus->state == BUS_UNSET)
1469 if (bus->input_fd < 0)
1472 if (bus->state == BUS_OPENING)
1474 else if (bus->state == BUS_AUTHENTICATING) {
1476 if (bus_socket_auth_needs_write(bus))
1481 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1482 if (bus->rqueue_size <= 0)
1484 if (bus->wqueue_size > 0)
1491 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1492 struct reply_callback *c;
1498 if (bus->state == BUS_UNSET)
1500 if (bus->input_fd < 0)
1503 if (bus->state == BUS_AUTHENTICATING) {
1504 *timeout_usec = bus->auth_timeout;
1508 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1509 *timeout_usec = (uint64_t) -1;
1513 c = prioq_peek(bus->reply_callbacks_prioq);
1515 *timeout_usec = (uint64_t) -1;
1519 *timeout_usec = c->timeout;
1523 static int process_timeout(sd_bus *bus) {
1524 struct reply_callback *c;
1530 c = prioq_peek(bus->reply_callbacks_prioq);
1534 n = now(CLOCK_MONOTONIC);
1538 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1539 hashmap_remove(bus->reply_callbacks, &c->serial);
1541 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1544 return r < 0 ? r : 1;
1547 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1551 if (bus->state != BUS_HELLO)
1554 /* Let's make sure the first message on the bus is the HELLO
1555 * reply. But note that we don't actually parse the message
1556 * here (we leave that to the usual handling), we just verify
1557 * we don't let any earlier msg through. */
1559 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1560 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1563 if (m->reply_serial != bus->hello_serial)
1569 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1570 struct reply_callback *c;
1576 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1577 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1580 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1584 if (c->timeout != 0)
1585 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1587 r = c->callback(bus, 0, m, c->userdata);
1593 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1594 struct filter_callback *l;
1601 bus->filter_callbacks_modified = false;
1603 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1605 if (bus->filter_callbacks_modified)
1608 /* Don't run this more than once per iteration */
1609 if (l->last_iteration == bus->iteration_counter)
1612 l->last_iteration = bus->iteration_counter;
1614 r = l->callback(bus, 0, m, l->userdata);
1620 } while (bus->filter_callbacks_modified);
1625 static int process_match(sd_bus *bus, sd_bus_message *m) {
1632 bus->match_callbacks_modified = false;
1634 r = bus_match_run(bus, &bus->match_callbacks, 0, m);
1638 } while (bus->match_callbacks_modified);
1643 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1644 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1650 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1653 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1656 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1659 if (streq_ptr(m->member, "Ping"))
1660 r = sd_bus_message_new_method_return(bus, m, &reply);
1661 else if (streq_ptr(m->member, "GetMachineId")) {
1665 r = sd_id128_get_machine(&id);
1669 r = sd_bus_message_new_method_return(bus, m, &reply);
1673 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1675 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1677 sd_bus_error_set(&error,
1678 "org.freedesktop.DBus.Error.UnknownMethod",
1679 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1681 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1687 r = sd_bus_send(bus, reply, NULL);
1694 static int process_object(sd_bus *bus, sd_bus_message *m) {
1695 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1696 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1697 struct object_callback *c;
1705 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1708 if (hashmap_isempty(bus->object_callbacks))
1711 pl = strlen(m->path);
1716 bus->object_callbacks_modified = false;
1718 c = hashmap_get(bus->object_callbacks, m->path);
1719 if (c && c->last_iteration != bus->iteration_counter) {
1721 c->last_iteration = bus->iteration_counter;
1723 r = c->callback(bus, 0, m, c->userdata);
1730 /* Look for fallback prefixes */
1735 if (bus->object_callbacks_modified)
1738 e = strrchr(p, '/');
1744 c = hashmap_get(bus->object_callbacks, p);
1745 if (c && c->last_iteration != bus->iteration_counter && c->is_fallback) {
1747 c->last_iteration = bus->iteration_counter;
1749 r = c->callback(bus, 0, m, c->userdata);
1757 } while (bus->object_callbacks_modified);
1759 /* We found some handlers but none wanted to take this, then
1760 * return this -- with one exception, we can handle
1761 * introspection minimally ourselves */
1762 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1765 sd_bus_error_set(&error,
1766 "org.freedesktop.DBus.Error.UnknownMethod",
1767 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1769 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1773 r = sd_bus_send(bus, reply, NULL);
1780 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1781 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1782 _cleanup_free_ char *introspection = NULL;
1783 _cleanup_set_free_free_ Set *s = NULL;
1784 _cleanup_fclose_ FILE *f = NULL;
1785 struct object_callback *c;
1794 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1800 s = set_new(string_hash_func, string_compare_func);
1804 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1808 if (streq(c->path, "/"))
1811 if (streq(m->path, "/"))
1814 e = startswith(c->path, m->path);
1815 if (!e || *e != '/')
1836 f = open_memstream(&introspection, &size);
1840 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1841 fputs("<node>\n", f);
1842 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1843 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1845 while ((node = set_steal_first(s))) {
1846 fprintf(f, " <node name=\"%s\"/>\n", node);
1850 fputs("</node>\n", f);
1857 r = sd_bus_message_new_method_return(bus, m, &reply);
1861 r = sd_bus_message_append(reply, "s", introspection);
1865 r = sd_bus_send(bus, reply, NULL);
1872 static int process_message(sd_bus *bus, sd_bus_message *m) {
1878 bus->iteration_counter++;
1880 r = process_hello(bus, m);
1884 r = process_reply(bus, m);
1888 r = process_filter(bus, m);
1892 r = process_match(bus, m);
1896 r = process_builtin(bus, m);
1900 r = process_object(bus, m);
1904 return process_introspect(bus, m);
1907 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1908 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1912 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1914 r = process_timeout(bus);
1918 r = dispatch_wqueue(bus);
1922 r = dispatch_rqueue(bus, &m);
1928 r = process_message(bus, m);
1938 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1939 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1940 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1942 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1944 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1948 r = sd_bus_send(bus, reply, NULL);
1962 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1965 /* Returns 0 when we didn't do anything. This should cause the
1966 * caller to invoke sd_bus_wait() before returning the next
1967 * time. Returns > 0 when we did something, which possibly
1968 * means *ret is filled in with an unprocessed message. */
1972 if (bus->input_fd < 0)
1975 /* We don't allow recursively invoking sd_bus_process(). */
1976 if (bus->processing)
1979 switch (bus->state) {
1985 r = bus_socket_process_opening(bus);
1992 case BUS_AUTHENTICATING:
1994 r = bus_socket_process_authenticating(bus);
2004 bus->processing = true;
2005 r = process_running(bus, ret);
2006 bus->processing = false;
2011 assert_not_reached("Unknown state");
2014 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2015 struct pollfd p[2] = {};
2022 if (bus->input_fd < 0)
2025 e = sd_bus_get_events(bus);
2032 r = sd_bus_get_timeout(bus, &until);
2039 nw = now(CLOCK_MONOTONIC);
2040 m = until > nw ? until - nw : 0;
2043 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2046 p[0].fd = bus->input_fd;
2047 if (bus->output_fd == bus->input_fd) {
2051 p[0].events = e & POLLIN;
2052 p[1].fd = bus->output_fd;
2053 p[1].events = e & POLLOUT;
2057 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2061 return r > 0 ? 1 : 0;
2064 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2068 if (bus->state == BUS_UNSET)
2070 if (bus->input_fd < 0)
2072 if (bus->rqueue_size > 0)
2075 return bus_poll(bus, false, timeout_usec);
2078 int sd_bus_flush(sd_bus *bus) {
2083 if (bus->state == BUS_UNSET)
2085 if (bus->output_fd < 0)
2088 r = bus_ensure_running(bus);
2092 if (bus->wqueue_size <= 0)
2096 r = dispatch_wqueue(bus);
2100 if (bus->wqueue_size <= 0)
2103 r = bus_poll(bus, false, (uint64_t) -1);
2109 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2110 struct filter_callback *f;
2117 f = new0(struct filter_callback, 1);
2120 f->callback = callback;
2121 f->userdata = userdata;
2123 bus->filter_callbacks_modified = true;
2124 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2128 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2129 struct filter_callback *f;
2136 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2137 if (f->callback == callback && f->userdata == userdata) {
2138 bus->filter_callbacks_modified = true;
2139 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2148 static int bus_add_object(
2152 sd_bus_message_handler_t callback,
2155 struct object_callback *c;
2165 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2169 c = new0(struct object_callback, 1);
2173 c->path = strdup(path);
2179 c->callback = callback;
2180 c->userdata = userdata;
2181 c->is_fallback = fallback;
2183 bus->object_callbacks_modified = true;
2184 r = hashmap_put(bus->object_callbacks, c->path, c);
2194 static int bus_remove_object(
2198 sd_bus_message_handler_t callback,
2201 struct object_callback *c;
2210 c = hashmap_get(bus->object_callbacks, path);
2214 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2217 bus->object_callbacks_modified = true;
2218 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2226 int sd_bus_add_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2227 return bus_add_object(bus, false, path, callback, userdata);
2230 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2231 return bus_remove_object(bus, false, path, callback, userdata);
2234 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2235 return bus_add_object(bus, true, prefix, callback, userdata);
2238 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2239 return bus_remove_object(bus, true, prefix, callback, userdata);
2242 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2250 if (bus->bus_client) {
2251 r = bus_add_match_internal(bus, match);
2257 bus->match_callbacks_modified = true;
2258 r = bus_match_add(&bus->match_callbacks, match, callback, userdata, NULL);
2261 if (bus->bus_client)
2262 bus_remove_match_internal(bus, match);
2269 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2277 if (bus->bus_client)
2278 r = bus_remove_match_internal(bus, match);
2281 bus->match_callbacks_modified = true;
2282 q = bus_match_remove(&bus->match_callbacks, match, callback, userdata);
2290 int sd_bus_emit_signal(
2293 const char *interface,
2295 const char *types, ...) {
2297 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2304 r = sd_bus_message_new_signal(bus, path, interface, member, &m);
2308 va_start(ap, types);
2309 r = bus_message_append_ap(m, types, ap);
2314 return sd_bus_send(bus, m, NULL);
2317 int sd_bus_call_method(
2319 const char *destination,
2321 const char *interface,
2323 sd_bus_error *error,
2324 sd_bus_message **reply,
2325 const char *types, ...) {
2327 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2334 r = sd_bus_message_new_method_call(bus, destination, path, interface, member, &m);
2338 va_start(ap, types);
2339 r = bus_message_append_ap(m, types, ap);
2344 return sd_bus_send_with_reply_and_block(bus, m, 0, error, reply);
2347 int sd_bus_reply_method_return(
2349 sd_bus_message *call,
2350 const char *types, ...) {
2352 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2362 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2365 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2368 r = sd_bus_message_new_method_return(bus, call, &m);
2372 va_start(ap, types);
2373 r = bus_message_append_ap(m, types, ap);
2378 return sd_bus_send(bus, m, NULL);
2381 int sd_bus_reply_method_error(
2383 sd_bus_message *call,
2384 const sd_bus_error *e) {
2386 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2395 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2397 if (!sd_bus_error_is_set(e))
2400 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2403 r = sd_bus_message_new_method_error(bus, call, e, &m);
2407 return sd_bus_send(bus, m, NULL);