1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
36 #include "bus-internal.h"
37 #include "bus-message.h"
39 #include "bus-socket.h"
40 #include "bus-control.h"
42 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
44 static void bus_free(sd_bus *b) {
45 struct filter_callback *f;
46 struct object_callback *c;
59 strv_free(b->exec_argv);
61 close_many(b->fds, b->n_fds);
64 for (i = 0; i < b->rqueue_size; i++)
65 sd_bus_message_unref(b->rqueue[i]);
68 for (i = 0; i < b->wqueue_size; i++)
69 sd_bus_message_unref(b->wqueue[i]);
72 hashmap_free_free(b->reply_callbacks);
73 prioq_free(b->reply_callbacks_prioq);
75 while ((f = b->filter_callbacks)) {
76 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
80 while ((c = hashmap_steal_first(b->object_callbacks))) {
85 hashmap_free(b->object_callbacks);
87 bus_match_free(&b->match_callbacks);
92 int sd_bus_new(sd_bus **ret) {
103 r->input_fd = r->output_fd = -1;
104 r->message_version = 1;
105 r->negotiate_fds = true;
107 /* We guarantee that wqueue always has space for at least one
109 r->wqueue = new(sd_bus_message*, 1);
119 int sd_bus_set_address(sd_bus *bus, const char *address) {
124 if (bus->state != BUS_UNSET)
139 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
142 if (bus->state != BUS_UNSET)
149 bus->input_fd = input_fd;
150 bus->output_fd = output_fd;
154 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
159 if (bus->state != BUS_UNSET)
163 if (strv_isempty(argv))
176 free(bus->exec_path);
177 strv_free(bus->exec_argv);
185 int sd_bus_set_bus_client(sd_bus *bus, int b) {
188 if (bus->state != BUS_UNSET)
191 bus->bus_client = !!b;
195 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
198 if (bus->state != BUS_UNSET)
201 bus->negotiate_fds = !!b;
205 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
208 if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
210 if (bus->state != BUS_UNSET)
213 bus->is_server = !!b;
214 bus->server_id = server_id;
218 int sd_bus_set_anonymous(sd_bus *bus, int b) {
221 if (bus->state != BUS_UNSET)
224 bus->anonymous_auth = !!b;
228 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
233 assert(bus->state == BUS_HELLO);
240 r = sd_bus_message_read(reply, "s", &s);
244 if (!service_name_is_valid(s) || s[0] != ':')
247 bus->unique_name = strdup(s);
248 if (!bus->unique_name)
251 bus->state = BUS_RUNNING;
256 static int bus_send_hello(sd_bus *bus) {
257 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
262 if (!bus->bus_client)
265 r = sd_bus_message_new_method_call(
267 "org.freedesktop.DBus",
269 "org.freedesktop.DBus",
275 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
278 int bus_start_running(sd_bus *bus) {
281 if (bus->bus_client) {
282 bus->state = BUS_HELLO;
286 bus->state = BUS_RUNNING;
290 static int parse_address_key(const char **p, const char *key, char **value) {
301 if (strncmp(*p, key, l) != 0)
314 while (*a != ';' && *a != ',' && *a != 0) {
332 c = (char) ((x << 4) | y);
339 t = realloc(r, n + 2);
367 static void skip_address_key(const char **p) {
371 *p += strcspn(*p, ",");
377 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
378 _cleanup_free_ char *path = NULL, *abstract = NULL;
387 while (**p != 0 && **p != ';') {
388 r = parse_address_key(p, "guid", guid);
394 r = parse_address_key(p, "path", &path);
400 r = parse_address_key(p, "abstract", &abstract);
409 if (!path && !abstract)
412 if (path && abstract)
417 if (l > sizeof(b->sockaddr.un.sun_path))
420 b->sockaddr.un.sun_family = AF_UNIX;
421 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
422 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
423 } else if (abstract) {
424 l = strlen(abstract);
425 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
428 b->sockaddr.un.sun_family = AF_UNIX;
429 b->sockaddr.un.sun_path[0] = 0;
430 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
431 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
437 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
438 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
439 struct addrinfo hints, *result;
447 while (**p != 0 && **p != ';') {
448 r = parse_address_key(p, "guid", guid);
454 r = parse_address_key(p, "host", &host);
460 r = parse_address_key(p, "port", &port);
466 r = parse_address_key(p, "family", &family);
479 hints.ai_socktype = SOCK_STREAM;
480 hints.ai_flags = AI_ADDRCONFIG;
483 if (streq(family, "ipv4"))
484 hints.ai_family = AF_INET;
485 else if (streq(family, "ipv6"))
486 hints.ai_family = AF_INET6;
491 r = getaddrinfo(host, port, &hints, &result);
495 return -EADDRNOTAVAIL;
497 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
498 b->sockaddr_size = result->ai_addrlen;
500 freeaddrinfo(result);
505 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
507 unsigned n_argv = 0, j;
516 while (**p != 0 && **p != ';') {
517 r = parse_address_key(p, "guid", guid);
523 r = parse_address_key(p, "path", &path);
529 if (startswith(*p, "argv")) {
533 ul = strtoul(*p + 4, (char**) p, 10);
534 if (errno > 0 || **p != '=' || ul > 256) {
544 x = realloc(argv, sizeof(char*) * (ul + 2));
550 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
556 r = parse_address_key(p, NULL, argv + ul);
571 /* Make sure there are no holes in the array, with the
572 * exception of argv[0] */
573 for (j = 1; j < n_argv; j++)
579 if (argv && argv[0] == NULL) {
580 argv[0] = strdup(path);
592 for (j = 0; j < n_argv; j++)
600 static void bus_reset_parsed_address(sd_bus *b) {
604 b->sockaddr_size = 0;
605 strv_free(b->exec_argv);
609 b->server_id = SD_ID128_NULL;
612 static int bus_parse_next_address(sd_bus *b) {
613 _cleanup_free_ char *guid = NULL;
621 if (b->address[b->address_index] == 0)
624 bus_reset_parsed_address(b);
626 a = b->address + b->address_index;
635 if (startswith(a, "unix:")) {
638 r = parse_unix_address(b, &a, &guid);
643 } else if (startswith(a, "tcp:")) {
646 r = parse_tcp_address(b, &a, &guid);
652 } else if (startswith(a, "unixexec:")) {
655 r = parse_exec_address(b, &a, &guid);
669 r = sd_id128_from_string(guid, &b->server_id);
674 b->address_index = a - b->address;
678 static int bus_start_address(sd_bus *b) {
686 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
688 r = bus_socket_connect(b);
692 b->last_connect_error = -r;
694 } else if (b->exec_path) {
696 r = bus_socket_exec(b);
700 b->last_connect_error = -r;
703 r = bus_parse_next_address(b);
707 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
711 int bus_next_address(sd_bus *b) {
714 bus_reset_parsed_address(b);
715 return bus_start_address(b);
718 static int bus_start_fd(sd_bus *b) {
722 assert(b->input_fd >= 0);
723 assert(b->output_fd >= 0);
725 r = fd_nonblock(b->input_fd, true);
729 r = fd_cloexec(b->input_fd, true);
733 if (b->input_fd != b->output_fd) {
734 r = fd_nonblock(b->output_fd, true);
738 r = fd_cloexec(b->output_fd, true);
743 return bus_socket_take_fd(b);
746 int sd_bus_start(sd_bus *bus) {
751 if (bus->state != BUS_UNSET)
754 bus->state = BUS_OPENING;
756 if (bus->is_server && bus->bus_client)
759 if (bus->input_fd >= 0)
760 r = bus_start_fd(bus);
761 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path)
762 r = bus_start_address(bus);
769 return bus_send_hello(bus);
772 int sd_bus_open_system(sd_bus **ret) {
784 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
786 r = sd_bus_set_address(b, e);
790 b->sockaddr.un.sun_family = AF_UNIX;
791 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
792 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
795 b->bus_client = true;
809 int sd_bus_open_user(sd_bus **ret) {
822 e = getenv("DBUS_SESSION_BUS_ADDRESS");
824 r = sd_bus_set_address(b, e);
828 e = getenv("XDG_RUNTIME_DIR");
835 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
840 b->sockaddr.un.sun_family = AF_UNIX;
841 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
842 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
845 b->bus_client = true;
859 void sd_bus_close(sd_bus *bus) {
863 if (bus->input_fd >= 0)
864 close_nointr_nofail(bus->input_fd);
865 if (bus->output_fd >= 0 && bus->output_fd != bus->input_fd)
866 close_nointr_nofail(bus->output_fd);
868 bus->input_fd = bus->output_fd = -1;
871 sd_bus *sd_bus_ref(sd_bus *bus) {
875 assert(bus->n_ref > 0);
881 sd_bus *sd_bus_unref(sd_bus *bus) {
885 assert(bus->n_ref > 0);
894 int sd_bus_is_open(sd_bus *bus) {
898 return bus->state != BUS_UNSET && bus->input_fd >= 0;
901 int sd_bus_can_send(sd_bus *bus, char type) {
906 if (bus->output_fd < 0)
909 if (type == SD_BUS_TYPE_UNIX_FD) {
910 if (!bus->negotiate_fds)
913 r = bus_ensure_running(bus);
920 return bus_type_is_valid(type);
923 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
931 r = bus_ensure_running(bus);
935 *server_id = bus->server_id;
939 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
942 if (m->header->version > b->message_version)
948 return bus_message_seal(m, ++b->serial);
951 static int dispatch_wqueue(sd_bus *bus) {
955 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
957 if (bus->output_fd < 0)
960 while (bus->wqueue_size > 0) {
962 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
967 /* Didn't do anything this time */
969 else if (bus->windex >= bus->wqueue[0]->size) {
970 /* Fully written. Let's drop the entry from
973 * This isn't particularly optimized, but
974 * well, this is supposed to be our worst-case
975 * buffer only, and the socket buffer is
976 * supposed to be our primary buffer, and if
977 * it got full, then all bets are off
980 sd_bus_message_unref(bus->wqueue[0]);
982 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
992 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
993 sd_bus_message *z = NULL;
998 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1000 if (bus->input_fd < 0)
1003 if (bus->rqueue_size > 0) {
1004 /* Dispatch a queued message */
1006 *m = bus->rqueue[0];
1007 bus->rqueue_size --;
1008 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1012 /* Try to read a new message */
1014 r = bus_socket_read_message(bus, &z);
1029 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1034 if (bus->state == BUS_UNSET)
1036 if (bus->output_fd < 0)
1042 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1049 /* If the serial number isn't kept, then we know that no reply
1051 if (!serial && !m->sealed)
1052 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1054 r = bus_seal_message(bus, m);
1058 /* If this is a reply and no reply was requested, then let's
1059 * suppress this, if we can */
1060 if (m->dont_send && !serial)
1063 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1066 r = bus_socket_write_message(bus, m, &idx);
1070 } else if (idx < m->size) {
1071 /* Wasn't fully written. So let's remember how
1072 * much was written. Note that the first entry
1073 * of the wqueue array is always allocated so
1074 * that we always can remember how much was
1076 bus->wqueue[0] = sd_bus_message_ref(m);
1077 bus->wqueue_size = 1;
1083 /* Just append it to the queue. */
1085 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1088 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1093 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1097 *serial = BUS_MESSAGE_SERIAL(m);
1102 static usec_t calc_elapse(uint64_t usec) {
1103 if (usec == (uint64_t) -1)
1107 usec = BUS_DEFAULT_TIMEOUT;
1109 return now(CLOCK_MONOTONIC) + usec;
1112 static int timeout_compare(const void *a, const void *b) {
1113 const struct reply_callback *x = a, *y = b;
1115 if (x->timeout != 0 && y->timeout == 0)
1118 if (x->timeout == 0 && y->timeout != 0)
1121 if (x->timeout < y->timeout)
1124 if (x->timeout > y->timeout)
1130 int sd_bus_send_with_reply(
1133 sd_bus_message_handler_t callback,
1138 struct reply_callback *c;
1143 if (bus->state == BUS_UNSET)
1145 if (bus->output_fd < 0)
1151 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1153 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1156 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1160 if (usec != (uint64_t) -1) {
1161 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1166 r = bus_seal_message(bus, m);
1170 c = new0(struct reply_callback, 1);
1174 c->callback = callback;
1175 c->userdata = userdata;
1176 c->serial = BUS_MESSAGE_SERIAL(m);
1177 c->timeout = calc_elapse(usec);
1179 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1185 if (c->timeout != 0) {
1186 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1189 sd_bus_send_with_reply_cancel(bus, c->serial);
1194 r = sd_bus_send(bus, m, serial);
1196 sd_bus_send_with_reply_cancel(bus, c->serial);
1203 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1204 struct reply_callback *c;
1211 c = hashmap_remove(bus->reply_callbacks, &serial);
1215 if (c->timeout != 0)
1216 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1222 int bus_ensure_running(sd_bus *bus) {
1227 if (bus->input_fd < 0)
1229 if (bus->state == BUS_UNSET)
1232 if (bus->state == BUS_RUNNING)
1236 r = sd_bus_process(bus, NULL);
1239 if (bus->state == BUS_RUNNING)
1244 r = sd_bus_wait(bus, (uint64_t) -1);
1250 int sd_bus_send_with_reply_and_block(
1254 sd_bus_error *error,
1255 sd_bus_message **reply) {
1264 if (bus->output_fd < 0)
1266 if (bus->state == BUS_UNSET)
1270 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1272 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1274 if (bus_error_is_dirty(error))
1277 r = bus_ensure_running(bus);
1281 r = sd_bus_send(bus, m, &serial);
1285 timeout = calc_elapse(usec);
1289 sd_bus_message *incoming = NULL;
1294 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1297 /* Make sure there's room for queuing this
1298 * locally, before we read the message */
1300 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1308 r = bus_socket_read_message(bus, &incoming);
1313 if (incoming->reply_serial == serial) {
1314 /* Found a match! */
1316 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1321 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1324 r = sd_bus_error_copy(error, &incoming->error);
1326 sd_bus_message_unref(incoming);
1330 k = bus_error_to_errno(&incoming->error);
1331 sd_bus_message_unref(incoming);
1335 sd_bus_message_unref(incoming);
1339 /* There's already guaranteed to be room for
1340 * this, so need to resize things here */
1341 bus->rqueue[bus->rqueue_size ++] = incoming;
1344 /* Try to read more, right-away */
1353 n = now(CLOCK_MONOTONIC);
1359 left = (uint64_t) -1;
1361 r = bus_poll(bus, true, left);
1365 r = dispatch_wqueue(bus);
1371 int sd_bus_get_fd(sd_bus *bus) {
1374 if (bus->input_fd < 0)
1376 if (bus->input_fd != bus->output_fd)
1379 return bus->input_fd;
1382 int sd_bus_get_events(sd_bus *bus) {
1387 if (bus->state == BUS_UNSET)
1389 if (bus->input_fd < 0)
1392 if (bus->state == BUS_OPENING)
1394 else if (bus->state == BUS_AUTHENTICATING) {
1396 if (bus_socket_auth_needs_write(bus))
1401 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1402 if (bus->rqueue_size <= 0)
1404 if (bus->wqueue_size > 0)
1411 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1412 struct reply_callback *c;
1418 if (bus->state == BUS_UNSET)
1420 if (bus->input_fd < 0)
1423 if (bus->state == BUS_AUTHENTICATING) {
1424 *timeout_usec = bus->auth_timeout;
1428 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1429 *timeout_usec = (uint64_t) -1;
1433 c = prioq_peek(bus->reply_callbacks_prioq);
1435 *timeout_usec = (uint64_t) -1;
1439 *timeout_usec = c->timeout;
1443 static int process_timeout(sd_bus *bus) {
1444 struct reply_callback *c;
1450 c = prioq_peek(bus->reply_callbacks_prioq);
1454 n = now(CLOCK_MONOTONIC);
1458 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1459 hashmap_remove(bus->reply_callbacks, &c->serial);
1461 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1464 return r < 0 ? r : 1;
1467 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1471 if (bus->state != BUS_HELLO)
1474 /* Let's make sure the first message on the bus is the HELLO
1475 * reply. But note that we don't actually parse the message
1476 * here (we leave that to the usual handling), we just verify
1477 * we don't let any earlier msg through. */
1479 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1480 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1483 if (m->reply_serial != bus->hello_serial)
1489 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1490 struct reply_callback *c;
1496 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1497 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1500 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1504 if (c->timeout != 0)
1505 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1507 r = c->callback(bus, 0, m, c->userdata);
1513 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1514 struct filter_callback *l;
1521 bus->filter_callbacks_modified = false;
1523 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1525 if (bus->filter_callbacks_modified)
1528 /* Don't run this more than once per iteration */
1529 if (l->last_iteration == bus->iteration_counter)
1532 l->last_iteration = bus->iteration_counter;
1534 r = l->callback(bus, 0, m, l->userdata);
1540 } while (bus->filter_callbacks_modified);
1545 static int process_match(sd_bus *bus, sd_bus_message *m) {
1552 bus->match_callbacks_modified = false;
1554 r = bus_match_run(bus, &bus->match_callbacks, 0, m);
1558 } while (bus->match_callbacks_modified);
1563 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1564 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1570 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1573 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1576 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1579 if (streq_ptr(m->member, "Ping"))
1580 r = sd_bus_message_new_method_return(bus, m, &reply);
1581 else if (streq_ptr(m->member, "GetMachineId")) {
1585 r = sd_id128_get_machine(&id);
1589 r = sd_bus_message_new_method_return(bus, m, &reply);
1593 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1595 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1597 sd_bus_error_set(&error,
1598 "org.freedesktop.DBus.Error.UnknownMethod",
1599 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1601 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1607 r = sd_bus_send(bus, reply, NULL);
1614 static int process_object(sd_bus *bus, sd_bus_message *m) {
1615 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1616 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1617 struct object_callback *c;
1625 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1628 if (hashmap_isempty(bus->object_callbacks))
1631 pl = strlen(m->path);
1636 bus->object_callbacks_modified = false;
1638 c = hashmap_get(bus->object_callbacks, m->path);
1639 if (c && c->last_iteration != bus->iteration_counter) {
1641 c->last_iteration = bus->iteration_counter;
1643 r = c->callback(bus, 0, m, c->userdata);
1650 /* Look for fallback prefixes */
1655 if (bus->object_callbacks_modified)
1658 e = strrchr(p, '/');
1664 c = hashmap_get(bus->object_callbacks, p);
1665 if (c && c->last_iteration != bus->iteration_counter && c->is_fallback) {
1667 c->last_iteration = bus->iteration_counter;
1669 r = c->callback(bus, 0, m, c->userdata);
1677 } while (bus->object_callbacks_modified);
1679 /* We found some handlers but none wanted to take this, then
1680 * return this -- with one exception, we can handle
1681 * introspection minimally ourselves */
1682 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1685 sd_bus_error_set(&error,
1686 "org.freedesktop.DBus.Error.UnknownMethod",
1687 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1689 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1693 r = sd_bus_send(bus, reply, NULL);
1700 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1701 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1702 _cleanup_free_ char *introspection = NULL;
1703 _cleanup_set_free_free_ Set *s = NULL;
1704 _cleanup_fclose_ FILE *f = NULL;
1705 struct object_callback *c;
1714 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1720 s = set_new(string_hash_func, string_compare_func);
1724 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1728 if (streq(c->path, "/"))
1731 if (streq(m->path, "/"))
1734 e = startswith(c->path, m->path);
1735 if (!e || *e != '/')
1756 f = open_memstream(&introspection, &size);
1760 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1761 fputs("<node>\n", f);
1762 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1763 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1765 while ((node = set_steal_first(s))) {
1766 fprintf(f, " <node name=\"%s\"/>\n", node);
1770 fputs("</node>\n", f);
1777 r = sd_bus_message_new_method_return(bus, m, &reply);
1781 r = sd_bus_message_append(reply, "s", introspection);
1785 r = sd_bus_send(bus, reply, NULL);
1792 static int process_message(sd_bus *bus, sd_bus_message *m) {
1798 bus->iteration_counter++;
1800 r = process_hello(bus, m);
1804 r = process_reply(bus, m);
1808 r = process_filter(bus, m);
1812 r = process_match(bus, m);
1816 r = process_builtin(bus, m);
1820 r = process_object(bus, m);
1824 return process_introspect(bus, m);
1827 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1828 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1832 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1834 r = process_timeout(bus);
1838 r = dispatch_wqueue(bus);
1842 r = dispatch_rqueue(bus, &m);
1848 r = process_message(bus, m);
1858 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1859 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1860 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1862 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1864 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1868 r = sd_bus_send(bus, reply, NULL);
1882 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1885 /* Returns 0 when we didn't do anything. This should cause the
1886 * caller to invoke sd_bus_wait() before returning the next
1887 * time. Returns > 0 when we did something, which possibly
1888 * means *ret is filled in with an unprocessed message. */
1892 if (bus->input_fd < 0)
1895 /* We don't allow recursively invoking sd_bus_process(). */
1896 if (bus->processing)
1899 switch (bus->state) {
1905 r = bus_socket_process_opening(bus);
1912 case BUS_AUTHENTICATING:
1914 r = bus_socket_process_authenticating(bus);
1924 bus->processing = true;
1925 r = process_running(bus, ret);
1926 bus->processing = false;
1931 assert_not_reached("Unknown state");
1934 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1942 if (bus->input_fd < 0)
1945 e = sd_bus_get_events(bus);
1952 r = sd_bus_get_timeout(bus, &until);
1959 nw = now(CLOCK_MONOTONIC);
1960 m = until > nw ? until - nw : 0;
1963 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1967 p[0].fd = bus->input_fd;
1969 if (bus->output_fd == bus->input_fd) {
1973 p[0].events = e & POLLIN;
1974 p[1].fd = bus->output_fd;
1975 p[1].events = e & POLLOUT;
1979 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1983 return r > 0 ? 1 : 0;
1986 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1990 if (bus->state == BUS_UNSET)
1992 if (bus->input_fd < 0)
1994 if (bus->rqueue_size > 0)
1997 return bus_poll(bus, false, timeout_usec);
2000 int sd_bus_flush(sd_bus *bus) {
2005 if (bus->state == BUS_UNSET)
2007 if (bus->output_fd < 0)
2010 r = bus_ensure_running(bus);
2014 if (bus->wqueue_size <= 0)
2018 r = dispatch_wqueue(bus);
2022 if (bus->wqueue_size <= 0)
2025 r = bus_poll(bus, false, (uint64_t) -1);
2031 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2032 struct filter_callback *f;
2039 f = new0(struct filter_callback, 1);
2042 f->callback = callback;
2043 f->userdata = userdata;
2045 bus->filter_callbacks_modified = true;
2046 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2050 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2051 struct filter_callback *f;
2058 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2059 if (f->callback == callback && f->userdata == userdata) {
2060 bus->filter_callbacks_modified = true;
2061 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2070 static int bus_add_object(
2074 sd_bus_message_handler_t callback,
2077 struct object_callback *c;
2087 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2091 c = new0(struct object_callback, 1);
2095 c->path = strdup(path);
2101 c->callback = callback;
2102 c->userdata = userdata;
2103 c->is_fallback = fallback;
2105 bus->object_callbacks_modified = true;
2106 r = hashmap_put(bus->object_callbacks, c->path, c);
2116 static int bus_remove_object(
2120 sd_bus_message_handler_t callback,
2123 struct object_callback *c;
2132 c = hashmap_get(bus->object_callbacks, path);
2136 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2139 bus->object_callbacks_modified = true;
2140 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2148 int sd_bus_add_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2149 return bus_add_object(bus, false, path, callback, userdata);
2152 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2153 return bus_remove_object(bus, false, path, callback, userdata);
2156 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2157 return bus_add_object(bus, true, prefix, callback, userdata);
2160 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2161 return bus_remove_object(bus, true, prefix, callback, userdata);
2164 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2172 if (bus->bus_client) {
2173 r = bus_add_match_internal(bus, match);
2179 bus->match_callbacks_modified = true;
2180 r = bus_match_add(&bus->match_callbacks, match, callback, userdata, NULL);
2183 if (bus->bus_client)
2184 bus_remove_match_internal(bus, match);
2191 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2199 if (bus->bus_client)
2200 r = bus_remove_match_internal(bus, match);
2203 bus->match_callbacks_modified = true;
2204 q = bus_match_remove(&bus->match_callbacks, match, callback, userdata);
2212 int sd_bus_emit_signal(
2215 const char *interface,
2217 const char *types, ...) {
2219 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2226 r = sd_bus_message_new_signal(bus, path, interface, member, &m);
2230 va_start(ap, types);
2231 r = bus_message_append_ap(m, types, ap);
2236 return sd_bus_send(bus, m, NULL);
2239 int sd_bus_call_method(
2241 const char *destination,
2243 const char *interface,
2245 sd_bus_error *error,
2246 sd_bus_message **reply,
2247 const char *types, ...) {
2249 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2256 r = sd_bus_message_new_method_call(bus, destination, path, interface, member, &m);
2260 va_start(ap, types);
2261 r = bus_message_append_ap(m, types, ap);
2266 return sd_bus_send_with_reply_and_block(bus, m, 0, error, reply);
2269 int sd_bus_reply_method_return(
2271 sd_bus_message *call,
2272 const char *types, ...) {
2274 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2284 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2287 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2290 r = sd_bus_message_new_method_return(bus, call, &m);
2294 va_start(ap, types);
2295 r = bus_message_append_ap(m, types, ap);
2300 return sd_bus_send(bus, m, NULL);
2303 int sd_bus_reply_method_error(
2305 sd_bus_message *call,
2306 const sd_bus_error *e) {
2308 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2317 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2319 if (!sd_bus_error_is_set(e))
2322 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2325 r = sd_bus_message_new_method_error(bus, call, e, &m);
2329 return sd_bus_send(bus, m, NULL);