1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
37 #include "bus-internal.h"
38 #include "bus-message.h"
40 #include "bus-socket.h"
41 #include "bus-kernel.h"
42 #include "bus-control.h"
44 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
46 static void bus_free(sd_bus *b) {
47 struct filter_callback *f;
48 struct object_callback *c;
62 strv_free(b->exec_argv);
64 close_many(b->fds, b->n_fds);
67 for (i = 0; i < b->rqueue_size; i++)
68 sd_bus_message_unref(b->rqueue[i]);
71 for (i = 0; i < b->wqueue_size; i++)
72 sd_bus_message_unref(b->wqueue[i]);
75 hashmap_free_free(b->reply_callbacks);
76 prioq_free(b->reply_callbacks_prioq);
78 while ((f = b->filter_callbacks)) {
79 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
83 while ((c = hashmap_steal_first(b->object_callbacks))) {
88 hashmap_free(b->object_callbacks);
89 bus_match_free(&b->match_callbacks);
91 bus_kernel_flush_memfd(b);
96 int sd_bus_new(sd_bus **ret) {
106 r->n_ref = REFCNT_INIT;
107 r->input_fd = r->output_fd = -1;
108 r->message_version = 1;
109 r->negotiate_fds = true;
111 /* We guarantee that wqueue always has space for at least one
113 r->wqueue = new(sd_bus_message*, 1);
123 int sd_bus_set_address(sd_bus *bus, const char *address) {
128 if (bus->state != BUS_UNSET)
143 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
146 if (bus->state != BUS_UNSET)
153 bus->input_fd = input_fd;
154 bus->output_fd = output_fd;
158 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
163 if (bus->state != BUS_UNSET)
167 if (strv_isempty(argv))
180 free(bus->exec_path);
181 strv_free(bus->exec_argv);
189 int sd_bus_set_bus_client(sd_bus *bus, int b) {
192 if (bus->state != BUS_UNSET)
195 bus->bus_client = !!b;
199 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
202 if (bus->state != BUS_UNSET)
205 bus->negotiate_fds = !!b;
209 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
212 if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
214 if (bus->state != BUS_UNSET)
217 bus->is_server = !!b;
218 bus->server_id = server_id;
222 int sd_bus_set_anonymous(sd_bus *bus, int b) {
225 if (bus->state != BUS_UNSET)
228 bus->anonymous_auth = !!b;
232 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
237 assert(bus->state == BUS_HELLO);
240 r = bus_message_to_errno(reply);
244 r = sd_bus_message_read(reply, "s", &s);
248 if (!service_name_is_valid(s) || s[0] != ':')
251 bus->unique_name = strdup(s);
252 if (!bus->unique_name)
255 bus->state = BUS_RUNNING;
260 static int bus_send_hello(sd_bus *bus) {
261 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
266 if (!bus->bus_client || bus->is_kernel)
269 r = sd_bus_message_new_method_call(
271 "org.freedesktop.DBus",
273 "org.freedesktop.DBus",
279 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
282 int bus_start_running(sd_bus *bus) {
285 if (bus->bus_client && !bus->is_kernel) {
286 bus->state = BUS_HELLO;
290 bus->state = BUS_RUNNING;
294 static int parse_address_key(const char **p, const char *key, char **value) {
305 if (strncmp(*p, key, l) != 0)
318 while (*a != ';' && *a != ',' && *a != 0) {
336 c = (char) ((x << 4) | y);
343 t = realloc(r, n + 2);
371 static void skip_address_key(const char **p) {
375 *p += strcspn(*p, ",");
381 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
382 _cleanup_free_ char *path = NULL, *abstract = NULL;
391 while (**p != 0 && **p != ';') {
392 r = parse_address_key(p, "guid", guid);
398 r = parse_address_key(p, "path", &path);
404 r = parse_address_key(p, "abstract", &abstract);
413 if (!path && !abstract)
416 if (path && abstract)
421 if (l > sizeof(b->sockaddr.un.sun_path))
424 b->sockaddr.un.sun_family = AF_UNIX;
425 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
426 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
427 } else if (abstract) {
428 l = strlen(abstract);
429 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
432 b->sockaddr.un.sun_family = AF_UNIX;
433 b->sockaddr.un.sun_path[0] = 0;
434 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
435 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
441 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
442 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
444 struct addrinfo *result, hints = {
445 .ai_socktype = SOCK_STREAM,
446 .ai_flags = AI_ADDRCONFIG,
454 while (**p != 0 && **p != ';') {
455 r = parse_address_key(p, "guid", guid);
461 r = parse_address_key(p, "host", &host);
467 r = parse_address_key(p, "port", &port);
473 r = parse_address_key(p, "family", &family);
486 if (streq(family, "ipv4"))
487 hints.ai_family = AF_INET;
488 else if (streq(family, "ipv6"))
489 hints.ai_family = AF_INET6;
494 r = getaddrinfo(host, port, &hints, &result);
498 return -EADDRNOTAVAIL;
500 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
501 b->sockaddr_size = result->ai_addrlen;
503 freeaddrinfo(result);
508 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
510 unsigned n_argv = 0, j;
519 while (**p != 0 && **p != ';') {
520 r = parse_address_key(p, "guid", guid);
526 r = parse_address_key(p, "path", &path);
532 if (startswith(*p, "argv")) {
536 ul = strtoul(*p + 4, (char**) p, 10);
537 if (errno > 0 || **p != '=' || ul > 256) {
547 x = realloc(argv, sizeof(char*) * (ul + 2));
553 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
559 r = parse_address_key(p, NULL, argv + ul);
574 /* Make sure there are no holes in the array, with the
575 * exception of argv[0] */
576 for (j = 1; j < n_argv; j++)
582 if (argv && argv[0] == NULL) {
583 argv[0] = strdup(path);
595 for (j = 0; j < n_argv; j++)
603 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
604 _cleanup_free_ char *path = NULL;
612 while (**p != 0 && **p != ';') {
613 r = parse_address_key(p, "guid", guid);
619 r = parse_address_key(p, "path", &path);
638 static void bus_reset_parsed_address(sd_bus *b) {
642 b->sockaddr_size = 0;
643 strv_free(b->exec_argv);
647 b->server_id = SD_ID128_NULL;
652 static int bus_parse_next_address(sd_bus *b) {
653 _cleanup_free_ char *guid = NULL;
661 if (b->address[b->address_index] == 0)
664 bus_reset_parsed_address(b);
666 a = b->address + b->address_index;
675 if (startswith(a, "unix:")) {
678 r = parse_unix_address(b, &a, &guid);
683 } else if (startswith(a, "tcp:")) {
686 r = parse_tcp_address(b, &a, &guid);
692 } else if (startswith(a, "unixexec:")) {
695 r = parse_exec_address(b, &a, &guid);
701 } else if (startswith(a, "kernel:")) {
704 r = parse_kernel_address(b, &a, &guid);
717 r = sd_id128_from_string(guid, &b->server_id);
722 b->address_index = a - b->address;
726 static int bus_start_address(sd_bus *b) {
734 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
736 r = bus_socket_connect(b);
740 b->last_connect_error = -r;
742 } else if (b->exec_path) {
744 r = bus_socket_exec(b);
748 b->last_connect_error = -r;
749 } else if (b->kernel) {
751 r = bus_kernel_connect(b);
755 b->last_connect_error = -r;
758 r = bus_parse_next_address(b);
762 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
766 int bus_next_address(sd_bus *b) {
769 bus_reset_parsed_address(b);
770 return bus_start_address(b);
773 static int bus_start_fd(sd_bus *b) {
778 assert(b->input_fd >= 0);
779 assert(b->output_fd >= 0);
781 r = fd_nonblock(b->input_fd, true);
785 r = fd_cloexec(b->input_fd, true);
789 if (b->input_fd != b->output_fd) {
790 r = fd_nonblock(b->output_fd, true);
794 r = fd_cloexec(b->output_fd, true);
799 if (fstat(b->input_fd, &st) < 0)
802 if (S_ISCHR(b->input_fd))
803 return bus_kernel_take_fd(b);
805 return bus_socket_take_fd(b);
808 int sd_bus_start(sd_bus *bus) {
813 if (bus->state != BUS_UNSET)
816 bus->state = BUS_OPENING;
818 if (bus->is_server && bus->bus_client)
821 if (bus->input_fd >= 0)
822 r = bus_start_fd(bus);
823 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel)
824 r = bus_start_address(bus);
831 return bus_send_hello(bus);
834 int sd_bus_open_system(sd_bus **ret) {
846 e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
848 r = sd_bus_set_address(b, e);
852 b->sockaddr.un.sun_family = AF_UNIX;
853 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
854 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
857 b->bus_client = true;
871 int sd_bus_open_user(sd_bus **ret) {
884 e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
886 r = sd_bus_set_address(b, e);
890 e = secure_getenv("XDG_RUNTIME_DIR");
897 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
902 b->sockaddr.un.sun_family = AF_UNIX;
903 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
904 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
907 b->bus_client = true;
921 void sd_bus_close(sd_bus *bus) {
925 if (bus->input_fd >= 0)
926 close_nointr_nofail(bus->input_fd);
927 if (bus->output_fd >= 0 && bus->output_fd != bus->input_fd)
928 close_nointr_nofail(bus->output_fd);
930 bus->input_fd = bus->output_fd = -1;
933 sd_bus *sd_bus_ref(sd_bus *bus) {
937 assert_se(REFCNT_INC(bus->n_ref) >= 2);
942 sd_bus *sd_bus_unref(sd_bus *bus) {
946 if (REFCNT_DEC(bus->n_ref) <= 0)
952 int sd_bus_is_open(sd_bus *bus) {
956 return bus->state != BUS_UNSET && bus->input_fd >= 0;
959 int sd_bus_can_send(sd_bus *bus, char type) {
964 if (bus->output_fd < 0)
967 if (type == SD_BUS_TYPE_UNIX_FD) {
968 if (!bus->negotiate_fds)
971 r = bus_ensure_running(bus);
978 return bus_type_is_valid(type);
981 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
989 r = bus_ensure_running(bus);
993 *server_id = bus->server_id;
997 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1000 if (m->header->version > b->message_version)
1006 return bus_message_seal(m, ++b->serial);
1009 static int dispatch_wqueue(sd_bus *bus) {
1013 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1015 if (bus->output_fd < 0)
1018 while (bus->wqueue_size > 0) {
1021 r = bus_kernel_write_message(bus, bus->wqueue[0]);
1023 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1029 /* Didn't do anything this time */
1031 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1032 /* Fully written. Let's drop the entry from
1035 * This isn't particularly optimized, but
1036 * well, this is supposed to be our worst-case
1037 * buffer only, and the socket buffer is
1038 * supposed to be our primary buffer, and if
1039 * it got full, then all bets are off
1042 sd_bus_message_unref(bus->wqueue[0]);
1043 bus->wqueue_size --;
1044 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1054 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1055 sd_bus_message *z = NULL;
1060 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1062 if (bus->input_fd < 0)
1065 if (bus->rqueue_size > 0) {
1066 /* Dispatch a queued message */
1068 *m = bus->rqueue[0];
1069 bus->rqueue_size --;
1070 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1074 /* Try to read a new message */
1077 r = bus_kernel_read_message(bus, &z);
1079 r = bus_socket_read_message(bus, &z);
1095 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1100 if (bus->state == BUS_UNSET)
1102 if (bus->output_fd < 0)
1108 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1115 /* If the serial number isn't kept, then we know that no reply
1117 if (!serial && !m->sealed)
1118 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1120 r = bus_seal_message(bus, m);
1124 /* If this is a reply and no reply was requested, then let's
1125 * suppress this, if we can */
1126 if (m->dont_send && !serial)
1129 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1133 r = bus_kernel_write_message(bus, m);
1135 r = bus_socket_write_message(bus, m, &idx);
1140 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
1141 /* Wasn't fully written. So let's remember how
1142 * much was written. Note that the first entry
1143 * of the wqueue array is always allocated so
1144 * that we always can remember how much was
1146 bus->wqueue[0] = sd_bus_message_ref(m);
1147 bus->wqueue_size = 1;
1153 /* Just append it to the queue. */
1155 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1158 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1163 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1167 *serial = BUS_MESSAGE_SERIAL(m);
1172 static usec_t calc_elapse(uint64_t usec) {
1173 if (usec == (uint64_t) -1)
1177 usec = BUS_DEFAULT_TIMEOUT;
1179 return now(CLOCK_MONOTONIC) + usec;
1182 static int timeout_compare(const void *a, const void *b) {
1183 const struct reply_callback *x = a, *y = b;
1185 if (x->timeout != 0 && y->timeout == 0)
1188 if (x->timeout == 0 && y->timeout != 0)
1191 if (x->timeout < y->timeout)
1194 if (x->timeout > y->timeout)
1200 int sd_bus_send_with_reply(
1203 sd_bus_message_handler_t callback,
1208 struct reply_callback *c;
1213 if (bus->state == BUS_UNSET)
1215 if (bus->output_fd < 0)
1221 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1223 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1226 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1230 if (usec != (uint64_t) -1) {
1231 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1236 r = bus_seal_message(bus, m);
1240 c = new0(struct reply_callback, 1);
1244 c->callback = callback;
1245 c->userdata = userdata;
1246 c->serial = BUS_MESSAGE_SERIAL(m);
1247 c->timeout = calc_elapse(usec);
1249 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1255 if (c->timeout != 0) {
1256 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1259 sd_bus_send_with_reply_cancel(bus, c->serial);
1264 r = sd_bus_send(bus, m, serial);
1266 sd_bus_send_with_reply_cancel(bus, c->serial);
1273 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1274 struct reply_callback *c;
1281 c = hashmap_remove(bus->reply_callbacks, &serial);
1285 if (c->timeout != 0)
1286 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1292 int bus_ensure_running(sd_bus *bus) {
1297 if (bus->input_fd < 0)
1299 if (bus->state == BUS_UNSET)
1302 if (bus->state == BUS_RUNNING)
1306 r = sd_bus_process(bus, NULL);
1309 if (bus->state == BUS_RUNNING)
1314 r = sd_bus_wait(bus, (uint64_t) -1);
1320 int sd_bus_send_with_reply_and_block(
1324 sd_bus_error *error,
1325 sd_bus_message **reply) {
1334 if (bus->output_fd < 0)
1336 if (bus->state == BUS_UNSET)
1340 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1342 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1344 if (bus_error_is_dirty(error))
1347 r = bus_ensure_running(bus);
1351 r = sd_bus_send(bus, m, &serial);
1355 timeout = calc_elapse(usec);
1359 sd_bus_message *incoming = NULL;
1364 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1367 /* Make sure there's room for queuing this
1368 * locally, before we read the message */
1370 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1379 r = bus_kernel_read_message(bus, &incoming);
1381 r = bus_socket_read_message(bus, &incoming);
1386 if (incoming->reply_serial == serial) {
1387 /* Found a match! */
1389 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1394 sd_bus_message_unref(incoming);
1399 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1402 r = sd_bus_error_copy(error, &incoming->error);
1404 sd_bus_message_unref(incoming);
1408 k = bus_error_to_errno(&incoming->error);
1409 sd_bus_message_unref(incoming);
1413 sd_bus_message_unref(incoming);
1417 /* There's already guaranteed to be room for
1418 * this, so need to resize things here */
1419 bus->rqueue[bus->rqueue_size ++] = incoming;
1422 /* Try to read more, right-away */
1431 n = now(CLOCK_MONOTONIC);
1437 left = (uint64_t) -1;
1439 r = bus_poll(bus, true, left);
1443 r = dispatch_wqueue(bus);
1449 int sd_bus_get_fd(sd_bus *bus) {
1452 if (bus->input_fd < 0)
1454 if (bus->input_fd != bus->output_fd)
1457 return bus->input_fd;
1460 int sd_bus_get_events(sd_bus *bus) {
1465 if (bus->state == BUS_UNSET)
1467 if (bus->input_fd < 0)
1470 if (bus->state == BUS_OPENING)
1472 else if (bus->state == BUS_AUTHENTICATING) {
1474 if (bus_socket_auth_needs_write(bus))
1479 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1480 if (bus->rqueue_size <= 0)
1482 if (bus->wqueue_size > 0)
1489 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1490 struct reply_callback *c;
1496 if (bus->state == BUS_UNSET)
1498 if (bus->input_fd < 0)
1501 if (bus->state == BUS_AUTHENTICATING) {
1502 *timeout_usec = bus->auth_timeout;
1506 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1507 *timeout_usec = (uint64_t) -1;
1511 c = prioq_peek(bus->reply_callbacks_prioq);
1513 *timeout_usec = (uint64_t) -1;
1517 *timeout_usec = c->timeout;
1521 static int process_timeout(sd_bus *bus) {
1522 _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1523 struct reply_callback *c;
1529 c = prioq_peek(bus->reply_callbacks_prioq);
1533 n = now(CLOCK_MONOTONIC);
1537 r = bus_message_new_synthetic_error(
1540 &SD_BUS_ERROR_MAKE("org.freedesktop.DBus.Error.Timeout", "Timed out"),
1545 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1546 hashmap_remove(bus->reply_callbacks, &c->serial);
1548 r = c->callback(bus, m, c->userdata);
1551 return r < 0 ? r : 1;
1554 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1558 if (bus->state != BUS_HELLO)
1561 /* Let's make sure the first message on the bus is the HELLO
1562 * reply. But note that we don't actually parse the message
1563 * here (we leave that to the usual handling), we just verify
1564 * we don't let any earlier msg through. */
1566 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1567 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1570 if (m->reply_serial != bus->hello_serial)
1576 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1577 struct reply_callback *c;
1583 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1584 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1587 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1591 if (c->timeout != 0)
1592 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1594 r = sd_bus_message_rewind(m, true);
1598 r = c->callback(bus, m, c->userdata);
1604 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1605 struct filter_callback *l;
1612 bus->filter_callbacks_modified = false;
1614 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1616 if (bus->filter_callbacks_modified)
1619 /* Don't run this more than once per iteration */
1620 if (l->last_iteration == bus->iteration_counter)
1623 l->last_iteration = bus->iteration_counter;
1625 r = sd_bus_message_rewind(m, true);
1629 r = l->callback(bus, m, l->userdata);
1635 } while (bus->filter_callbacks_modified);
1640 static int process_match(sd_bus *bus, sd_bus_message *m) {
1647 bus->match_callbacks_modified = false;
1649 r = bus_match_run(bus, &bus->match_callbacks, m);
1653 } while (bus->match_callbacks_modified);
1658 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1659 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1665 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1668 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1671 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1674 if (streq_ptr(m->member, "Ping"))
1675 r = sd_bus_message_new_method_return(bus, m, &reply);
1676 else if (streq_ptr(m->member, "GetMachineId")) {
1680 r = sd_id128_get_machine(&id);
1684 r = sd_bus_message_new_method_return(bus, m, &reply);
1688 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1690 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1692 sd_bus_error_set(&error,
1693 "org.freedesktop.DBus.Error.UnknownMethod",
1694 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1696 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1702 r = sd_bus_send(bus, reply, NULL);
1709 static int process_object(sd_bus *bus, sd_bus_message *m) {
1710 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1711 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1712 struct object_callback *c;
1720 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1723 if (hashmap_isempty(bus->object_callbacks))
1726 pl = strlen(m->path);
1731 bus->object_callbacks_modified = false;
1733 c = hashmap_get(bus->object_callbacks, m->path);
1734 if (c && c->last_iteration != bus->iteration_counter) {
1736 c->last_iteration = bus->iteration_counter;
1738 r = sd_bus_message_rewind(m, true);
1742 r = c->callback(bus, m, c->userdata);
1749 /* Look for fallback prefixes */
1754 if (bus->object_callbacks_modified)
1757 e = strrchr(p, '/');
1763 c = hashmap_get(bus->object_callbacks, p);
1764 if (c && c->last_iteration != bus->iteration_counter && c->is_fallback) {
1766 c->last_iteration = bus->iteration_counter;
1768 r = sd_bus_message_rewind(m, true);
1772 r = c->callback(bus, m, c->userdata);
1780 } while (bus->object_callbacks_modified);
1782 /* We found some handlers but none wanted to take this, then
1783 * return this -- with one exception, we can handle
1784 * introspection minimally ourselves */
1785 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1788 sd_bus_error_set(&error,
1789 "org.freedesktop.DBus.Error.UnknownMethod",
1790 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1792 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1796 r = sd_bus_send(bus, reply, NULL);
1803 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1804 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1805 _cleanup_free_ char *introspection = NULL;
1806 _cleanup_set_free_free_ Set *s = NULL;
1807 _cleanup_fclose_ FILE *f = NULL;
1808 struct object_callback *c;
1817 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1823 s = set_new(string_hash_func, string_compare_func);
1827 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1831 if (streq(c->path, "/"))
1834 if (streq(m->path, "/"))
1837 e = startswith(c->path, m->path);
1838 if (!e || *e != '/')
1850 r = set_consume(s, a);
1851 if (r < 0 && r != -EEXIST)
1855 f = open_memstream(&introspection, &size);
1859 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1860 fputs("<node>\n", f);
1861 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1862 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1864 while ((node = set_steal_first(s))) {
1865 fprintf(f, " <node name=\"%s\"/>\n", node);
1869 fputs("</node>\n", f);
1876 r = sd_bus_message_new_method_return(bus, m, &reply);
1880 r = sd_bus_message_append(reply, "s", introspection);
1884 r = sd_bus_send(bus, reply, NULL);
1891 static int process_message(sd_bus *bus, sd_bus_message *m) {
1897 bus->iteration_counter++;
1899 r = process_hello(bus, m);
1903 r = process_reply(bus, m);
1907 r = process_filter(bus, m);
1911 r = process_match(bus, m);
1915 r = process_builtin(bus, m);
1919 r = process_object(bus, m);
1923 return process_introspect(bus, m);
1926 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1927 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1931 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1933 r = process_timeout(bus);
1937 r = dispatch_wqueue(bus);
1941 r = dispatch_rqueue(bus, &m);
1947 r = process_message(bus, m);
1952 r = sd_bus_message_rewind(m, true);
1961 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1962 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1963 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1965 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1967 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1971 r = sd_bus_send(bus, reply, NULL);
1985 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1988 /* Returns 0 when we didn't do anything. This should cause the
1989 * caller to invoke sd_bus_wait() before returning the next
1990 * time. Returns > 0 when we did something, which possibly
1991 * means *ret is filled in with an unprocessed message. */
1995 if (bus->input_fd < 0)
1998 /* We don't allow recursively invoking sd_bus_process(). */
1999 if (bus->processing)
2002 switch (bus->state) {
2008 r = bus_socket_process_opening(bus);
2015 case BUS_AUTHENTICATING:
2017 r = bus_socket_process_authenticating(bus);
2027 bus->processing = true;
2028 r = process_running(bus, ret);
2029 bus->processing = false;
2034 assert_not_reached("Unknown state");
2037 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2038 struct pollfd p[2] = {};
2045 if (bus->input_fd < 0)
2048 e = sd_bus_get_events(bus);
2055 r = sd_bus_get_timeout(bus, &until);
2062 nw = now(CLOCK_MONOTONIC);
2063 m = until > nw ? until - nw : 0;
2066 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2069 p[0].fd = bus->input_fd;
2070 if (bus->output_fd == bus->input_fd) {
2074 p[0].events = e & POLLIN;
2075 p[1].fd = bus->output_fd;
2076 p[1].events = e & POLLOUT;
2080 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2084 return r > 0 ? 1 : 0;
2087 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2091 if (bus->state == BUS_UNSET)
2093 if (bus->input_fd < 0)
2095 if (bus->rqueue_size > 0)
2098 return bus_poll(bus, false, timeout_usec);
2101 int sd_bus_flush(sd_bus *bus) {
2106 if (bus->state == BUS_UNSET)
2108 if (bus->output_fd < 0)
2111 r = bus_ensure_running(bus);
2115 if (bus->wqueue_size <= 0)
2119 r = dispatch_wqueue(bus);
2123 if (bus->wqueue_size <= 0)
2126 r = bus_poll(bus, false, (uint64_t) -1);
2132 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2133 struct filter_callback *f;
2140 f = new0(struct filter_callback, 1);
2143 f->callback = callback;
2144 f->userdata = userdata;
2146 bus->filter_callbacks_modified = true;
2147 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2151 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2152 struct filter_callback *f;
2159 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2160 if (f->callback == callback && f->userdata == userdata) {
2161 bus->filter_callbacks_modified = true;
2162 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2171 static int bus_add_object(
2175 sd_bus_message_handler_t callback,
2178 struct object_callback *c;
2188 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2192 c = new0(struct object_callback, 1);
2196 c->path = strdup(path);
2202 c->callback = callback;
2203 c->userdata = userdata;
2204 c->is_fallback = fallback;
2206 bus->object_callbacks_modified = true;
2207 r = hashmap_put(bus->object_callbacks, c->path, c);
2217 static int bus_remove_object(
2221 sd_bus_message_handler_t callback,
2224 struct object_callback *c;
2233 c = hashmap_get(bus->object_callbacks, path);
2237 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2240 bus->object_callbacks_modified = true;
2241 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2249 int sd_bus_add_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2250 return bus_add_object(bus, false, path, callback, userdata);
2253 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2254 return bus_remove_object(bus, false, path, callback, userdata);
2257 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2258 return bus_add_object(bus, true, prefix, callback, userdata);
2261 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2262 return bus_remove_object(bus, true, prefix, callback, userdata);
2265 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2273 if (bus->bus_client) {
2274 r = bus_add_match_internal(bus, match);
2280 bus->match_callbacks_modified = true;
2281 r = bus_match_add(&bus->match_callbacks, match, callback, userdata, NULL);
2284 if (bus->bus_client)
2285 bus_remove_match_internal(bus, match);
2292 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2300 if (bus->bus_client)
2301 r = bus_remove_match_internal(bus, match);
2304 bus->match_callbacks_modified = true;
2305 q = bus_match_remove(&bus->match_callbacks, match, callback, userdata);
2313 int sd_bus_emit_signal(
2316 const char *interface,
2318 const char *types, ...) {
2320 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2327 r = sd_bus_message_new_signal(bus, path, interface, member, &m);
2331 va_start(ap, types);
2332 r = bus_message_append_ap(m, types, ap);
2337 return sd_bus_send(bus, m, NULL);
2340 int sd_bus_call_method(
2342 const char *destination,
2344 const char *interface,
2346 sd_bus_error *error,
2347 sd_bus_message **reply,
2348 const char *types, ...) {
2350 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2357 r = sd_bus_message_new_method_call(bus, destination, path, interface, member, &m);
2361 va_start(ap, types);
2362 r = bus_message_append_ap(m, types, ap);
2367 return sd_bus_send_with_reply_and_block(bus, m, 0, error, reply);
2370 int sd_bus_reply_method_return(
2372 sd_bus_message *call,
2373 const char *types, ...) {
2375 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2385 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2388 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2391 r = sd_bus_message_new_method_return(bus, call, &m);
2395 va_start(ap, types);
2396 r = bus_message_append_ap(m, types, ap);
2401 return sd_bus_send(bus, m, NULL);
2404 int sd_bus_reply_method_error(
2406 sd_bus_message *call,
2407 const sd_bus_error *e) {
2409 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2418 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2420 if (!sd_bus_error_is_set(e))
2423 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2426 r = sd_bus_message_new_method_error(bus, call, e, &m);
2430 return sd_bus_send(bus, m, NULL);