1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
36 #include "bus-internal.h"
37 #include "bus-message.h"
39 #include "bus-socket.h"
40 #include "bus-control.h"
42 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
44 static void bus_free(sd_bus *b) {
45 struct filter_callback *f;
46 struct object_callback *c;
59 strv_free(b->exec_argv);
61 close_many(b->fds, b->n_fds);
64 for (i = 0; i < b->rqueue_size; i++)
65 sd_bus_message_unref(b->rqueue[i]);
68 for (i = 0; i < b->wqueue_size; i++)
69 sd_bus_message_unref(b->wqueue[i]);
72 hashmap_free_free(b->reply_callbacks);
73 prioq_free(b->reply_callbacks_prioq);
75 while ((f = b->filter_callbacks)) {
76 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
80 while ((c = hashmap_steal_first(b->object_callbacks))) {
85 hashmap_free(b->object_callbacks);
87 bus_match_free(&b->match_callbacks);
92 int sd_bus_new(sd_bus **ret) {
103 r->input_fd = r->output_fd = -1;
104 r->message_version = 1;
105 r->negotiate_fds = true;
107 /* We guarantee that wqueue always has space for at least one
109 r->wqueue = new(sd_bus_message*, 1);
119 int sd_bus_set_address(sd_bus *bus, const char *address) {
124 if (bus->state != BUS_UNSET)
139 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
142 if (bus->state != BUS_UNSET)
149 bus->input_fd = input_fd;
150 bus->output_fd = output_fd;
154 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
159 if (bus->state != BUS_UNSET)
163 if (strv_isempty(argv))
176 free(bus->exec_path);
177 strv_free(bus->exec_argv);
185 int sd_bus_set_bus_client(sd_bus *bus, int b) {
188 if (bus->state != BUS_UNSET)
191 bus->bus_client = !!b;
195 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
198 if (bus->state != BUS_UNSET)
201 bus->negotiate_fds = !!b;
205 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
208 if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
210 if (bus->state != BUS_UNSET)
213 bus->is_server = !!b;
214 bus->server_id = server_id;
218 int sd_bus_set_anonymous(sd_bus *bus, int b) {
221 if (bus->state != BUS_UNSET)
224 bus->anonymous_auth = !!b;
228 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
233 assert(bus->state == BUS_HELLO);
240 r = sd_bus_message_read(reply, "s", &s);
244 if (!service_name_is_valid(s) || s[0] != ':')
247 bus->unique_name = strdup(s);
248 if (!bus->unique_name)
251 bus->state = BUS_RUNNING;
256 static int bus_send_hello(sd_bus *bus) {
257 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
262 if (!bus->bus_client)
265 r = sd_bus_message_new_method_call(
267 "org.freedesktop.DBus",
269 "org.freedesktop.DBus",
275 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
278 int bus_start_running(sd_bus *bus) {
281 if (bus->bus_client) {
282 bus->state = BUS_HELLO;
286 bus->state = BUS_RUNNING;
290 static int parse_address_key(const char **p, const char *key, char **value) {
301 if (strncmp(*p, key, l) != 0)
314 while (*a != ';' && *a != ',' && *a != 0) {
332 c = (char) ((x << 4) | y);
339 t = realloc(r, n + 2);
367 static void skip_address_key(const char **p) {
371 *p += strcspn(*p, ",");
377 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
378 _cleanup_free_ char *path = NULL, *abstract = NULL;
387 while (**p != 0 && **p != ';') {
388 r = parse_address_key(p, "guid", guid);
394 r = parse_address_key(p, "path", &path);
400 r = parse_address_key(p, "abstract", &abstract);
409 if (!path && !abstract)
412 if (path && abstract)
417 if (l > sizeof(b->sockaddr.un.sun_path))
420 b->sockaddr.un.sun_family = AF_UNIX;
421 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
422 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
423 } else if (abstract) {
424 l = strlen(abstract);
425 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
428 b->sockaddr.un.sun_family = AF_UNIX;
429 b->sockaddr.un.sun_path[0] = 0;
430 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
431 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
437 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
438 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
440 struct addrinfo *result, hints = {
441 .ai_socktype = SOCK_STREAM,
442 .ai_flags = AI_ADDRCONFIG,
450 while (**p != 0 && **p != ';') {
451 r = parse_address_key(p, "guid", guid);
457 r = parse_address_key(p, "host", &host);
463 r = parse_address_key(p, "port", &port);
469 r = parse_address_key(p, "family", &family);
482 if (streq(family, "ipv4"))
483 hints.ai_family = AF_INET;
484 else if (streq(family, "ipv6"))
485 hints.ai_family = AF_INET6;
490 r = getaddrinfo(host, port, &hints, &result);
494 return -EADDRNOTAVAIL;
496 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
497 b->sockaddr_size = result->ai_addrlen;
499 freeaddrinfo(result);
504 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
506 unsigned n_argv = 0, j;
515 while (**p != 0 && **p != ';') {
516 r = parse_address_key(p, "guid", guid);
522 r = parse_address_key(p, "path", &path);
528 if (startswith(*p, "argv")) {
532 ul = strtoul(*p + 4, (char**) p, 10);
533 if (errno > 0 || **p != '=' || ul > 256) {
543 x = realloc(argv, sizeof(char*) * (ul + 2));
549 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
555 r = parse_address_key(p, NULL, argv + ul);
570 /* Make sure there are no holes in the array, with the
571 * exception of argv[0] */
572 for (j = 1; j < n_argv; j++)
578 if (argv && argv[0] == NULL) {
579 argv[0] = strdup(path);
591 for (j = 0; j < n_argv; j++)
599 static void bus_reset_parsed_address(sd_bus *b) {
603 b->sockaddr_size = 0;
604 strv_free(b->exec_argv);
608 b->server_id = SD_ID128_NULL;
611 static int bus_parse_next_address(sd_bus *b) {
612 _cleanup_free_ char *guid = NULL;
620 if (b->address[b->address_index] == 0)
623 bus_reset_parsed_address(b);
625 a = b->address + b->address_index;
634 if (startswith(a, "unix:")) {
637 r = parse_unix_address(b, &a, &guid);
642 } else if (startswith(a, "tcp:")) {
645 r = parse_tcp_address(b, &a, &guid);
651 } else if (startswith(a, "unixexec:")) {
654 r = parse_exec_address(b, &a, &guid);
668 r = sd_id128_from_string(guid, &b->server_id);
673 b->address_index = a - b->address;
677 static int bus_start_address(sd_bus *b) {
685 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
687 r = bus_socket_connect(b);
691 b->last_connect_error = -r;
693 } else if (b->exec_path) {
695 r = bus_socket_exec(b);
699 b->last_connect_error = -r;
702 r = bus_parse_next_address(b);
706 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
710 int bus_next_address(sd_bus *b) {
713 bus_reset_parsed_address(b);
714 return bus_start_address(b);
717 static int bus_start_fd(sd_bus *b) {
721 assert(b->input_fd >= 0);
722 assert(b->output_fd >= 0);
724 r = fd_nonblock(b->input_fd, true);
728 r = fd_cloexec(b->input_fd, true);
732 if (b->input_fd != b->output_fd) {
733 r = fd_nonblock(b->output_fd, true);
737 r = fd_cloexec(b->output_fd, true);
742 return bus_socket_take_fd(b);
745 int sd_bus_start(sd_bus *bus) {
750 if (bus->state != BUS_UNSET)
753 bus->state = BUS_OPENING;
755 if (bus->is_server && bus->bus_client)
758 if (bus->input_fd >= 0)
759 r = bus_start_fd(bus);
760 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path)
761 r = bus_start_address(bus);
768 return bus_send_hello(bus);
771 int sd_bus_open_system(sd_bus **ret) {
783 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
785 r = sd_bus_set_address(b, e);
789 b->sockaddr.un.sun_family = AF_UNIX;
790 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
791 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
794 b->bus_client = true;
808 int sd_bus_open_user(sd_bus **ret) {
821 e = getenv("DBUS_SESSION_BUS_ADDRESS");
823 r = sd_bus_set_address(b, e);
827 e = getenv("XDG_RUNTIME_DIR");
834 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
839 b->sockaddr.un.sun_family = AF_UNIX;
840 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
841 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
844 b->bus_client = true;
858 void sd_bus_close(sd_bus *bus) {
862 if (bus->input_fd >= 0)
863 close_nointr_nofail(bus->input_fd);
864 if (bus->output_fd >= 0 && bus->output_fd != bus->input_fd)
865 close_nointr_nofail(bus->output_fd);
867 bus->input_fd = bus->output_fd = -1;
870 sd_bus *sd_bus_ref(sd_bus *bus) {
874 assert(bus->n_ref > 0);
880 sd_bus *sd_bus_unref(sd_bus *bus) {
884 assert(bus->n_ref > 0);
893 int sd_bus_is_open(sd_bus *bus) {
897 return bus->state != BUS_UNSET && bus->input_fd >= 0;
900 int sd_bus_can_send(sd_bus *bus, char type) {
905 if (bus->output_fd < 0)
908 if (type == SD_BUS_TYPE_UNIX_FD) {
909 if (!bus->negotiate_fds)
912 r = bus_ensure_running(bus);
919 return bus_type_is_valid(type);
922 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
930 r = bus_ensure_running(bus);
934 *server_id = bus->server_id;
938 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
941 if (m->header->version > b->message_version)
947 return bus_message_seal(m, ++b->serial);
950 static int dispatch_wqueue(sd_bus *bus) {
954 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
956 if (bus->output_fd < 0)
959 while (bus->wqueue_size > 0) {
961 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
966 /* Didn't do anything this time */
968 else if (bus->windex >= bus->wqueue[0]->size) {
969 /* Fully written. Let's drop the entry from
972 * This isn't particularly optimized, but
973 * well, this is supposed to be our worst-case
974 * buffer only, and the socket buffer is
975 * supposed to be our primary buffer, and if
976 * it got full, then all bets are off
979 sd_bus_message_unref(bus->wqueue[0]);
981 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
991 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
992 sd_bus_message *z = NULL;
997 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
999 if (bus->input_fd < 0)
1002 if (bus->rqueue_size > 0) {
1003 /* Dispatch a queued message */
1005 *m = bus->rqueue[0];
1006 bus->rqueue_size --;
1007 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1011 /* Try to read a new message */
1013 r = bus_socket_read_message(bus, &z);
1028 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1033 if (bus->state == BUS_UNSET)
1035 if (bus->output_fd < 0)
1041 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1048 /* If the serial number isn't kept, then we know that no reply
1050 if (!serial && !m->sealed)
1051 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1053 r = bus_seal_message(bus, m);
1057 /* If this is a reply and no reply was requested, then let's
1058 * suppress this, if we can */
1059 if (m->dont_send && !serial)
1062 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1065 r = bus_socket_write_message(bus, m, &idx);
1069 } else if (idx < m->size) {
1070 /* Wasn't fully written. So let's remember how
1071 * much was written. Note that the first entry
1072 * of the wqueue array is always allocated so
1073 * that we always can remember how much was
1075 bus->wqueue[0] = sd_bus_message_ref(m);
1076 bus->wqueue_size = 1;
1082 /* Just append it to the queue. */
1084 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1087 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1092 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1096 *serial = BUS_MESSAGE_SERIAL(m);
1101 static usec_t calc_elapse(uint64_t usec) {
1102 if (usec == (uint64_t) -1)
1106 usec = BUS_DEFAULT_TIMEOUT;
1108 return now(CLOCK_MONOTONIC) + usec;
1111 static int timeout_compare(const void *a, const void *b) {
1112 const struct reply_callback *x = a, *y = b;
1114 if (x->timeout != 0 && y->timeout == 0)
1117 if (x->timeout == 0 && y->timeout != 0)
1120 if (x->timeout < y->timeout)
1123 if (x->timeout > y->timeout)
1129 int sd_bus_send_with_reply(
1132 sd_bus_message_handler_t callback,
1137 struct reply_callback *c;
1142 if (bus->state == BUS_UNSET)
1144 if (bus->output_fd < 0)
1150 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1152 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1155 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1159 if (usec != (uint64_t) -1) {
1160 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1165 r = bus_seal_message(bus, m);
1169 c = new0(struct reply_callback, 1);
1173 c->callback = callback;
1174 c->userdata = userdata;
1175 c->serial = BUS_MESSAGE_SERIAL(m);
1176 c->timeout = calc_elapse(usec);
1178 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1184 if (c->timeout != 0) {
1185 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1188 sd_bus_send_with_reply_cancel(bus, c->serial);
1193 r = sd_bus_send(bus, m, serial);
1195 sd_bus_send_with_reply_cancel(bus, c->serial);
1202 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1203 struct reply_callback *c;
1210 c = hashmap_remove(bus->reply_callbacks, &serial);
1214 if (c->timeout != 0)
1215 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1221 int bus_ensure_running(sd_bus *bus) {
1226 if (bus->input_fd < 0)
1228 if (bus->state == BUS_UNSET)
1231 if (bus->state == BUS_RUNNING)
1235 r = sd_bus_process(bus, NULL);
1238 if (bus->state == BUS_RUNNING)
1243 r = sd_bus_wait(bus, (uint64_t) -1);
1249 int sd_bus_send_with_reply_and_block(
1253 sd_bus_error *error,
1254 sd_bus_message **reply) {
1263 if (bus->output_fd < 0)
1265 if (bus->state == BUS_UNSET)
1269 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1271 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1273 if (bus_error_is_dirty(error))
1276 r = bus_ensure_running(bus);
1280 r = sd_bus_send(bus, m, &serial);
1284 timeout = calc_elapse(usec);
1288 sd_bus_message *incoming = NULL;
1293 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1296 /* Make sure there's room for queuing this
1297 * locally, before we read the message */
1299 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1307 r = bus_socket_read_message(bus, &incoming);
1312 if (incoming->reply_serial == serial) {
1313 /* Found a match! */
1315 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1320 sd_bus_message_unref(incoming);
1325 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1328 r = sd_bus_error_copy(error, &incoming->error);
1330 sd_bus_message_unref(incoming);
1334 k = bus_error_to_errno(&incoming->error);
1335 sd_bus_message_unref(incoming);
1339 sd_bus_message_unref(incoming);
1343 /* There's already guaranteed to be room for
1344 * this, so need to resize things here */
1345 bus->rqueue[bus->rqueue_size ++] = incoming;
1348 /* Try to read more, right-away */
1357 n = now(CLOCK_MONOTONIC);
1363 left = (uint64_t) -1;
1365 r = bus_poll(bus, true, left);
1369 r = dispatch_wqueue(bus);
1375 int sd_bus_get_fd(sd_bus *bus) {
1378 if (bus->input_fd < 0)
1380 if (bus->input_fd != bus->output_fd)
1383 return bus->input_fd;
1386 int sd_bus_get_events(sd_bus *bus) {
1391 if (bus->state == BUS_UNSET)
1393 if (bus->input_fd < 0)
1396 if (bus->state == BUS_OPENING)
1398 else if (bus->state == BUS_AUTHENTICATING) {
1400 if (bus_socket_auth_needs_write(bus))
1405 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1406 if (bus->rqueue_size <= 0)
1408 if (bus->wqueue_size > 0)
1415 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1416 struct reply_callback *c;
1422 if (bus->state == BUS_UNSET)
1424 if (bus->input_fd < 0)
1427 if (bus->state == BUS_AUTHENTICATING) {
1428 *timeout_usec = bus->auth_timeout;
1432 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1433 *timeout_usec = (uint64_t) -1;
1437 c = prioq_peek(bus->reply_callbacks_prioq);
1439 *timeout_usec = (uint64_t) -1;
1443 *timeout_usec = c->timeout;
1447 static int process_timeout(sd_bus *bus) {
1448 struct reply_callback *c;
1454 c = prioq_peek(bus->reply_callbacks_prioq);
1458 n = now(CLOCK_MONOTONIC);
1462 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1463 hashmap_remove(bus->reply_callbacks, &c->serial);
1465 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1468 return r < 0 ? r : 1;
1471 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1475 if (bus->state != BUS_HELLO)
1478 /* Let's make sure the first message on the bus is the HELLO
1479 * reply. But note that we don't actually parse the message
1480 * here (we leave that to the usual handling), we just verify
1481 * we don't let any earlier msg through. */
1483 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1484 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1487 if (m->reply_serial != bus->hello_serial)
1493 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1494 struct reply_callback *c;
1500 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1501 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1504 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1508 if (c->timeout != 0)
1509 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1511 r = c->callback(bus, 0, m, c->userdata);
1517 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1518 struct filter_callback *l;
1525 bus->filter_callbacks_modified = false;
1527 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1529 if (bus->filter_callbacks_modified)
1532 /* Don't run this more than once per iteration */
1533 if (l->last_iteration == bus->iteration_counter)
1536 l->last_iteration = bus->iteration_counter;
1538 r = l->callback(bus, 0, m, l->userdata);
1544 } while (bus->filter_callbacks_modified);
1549 static int process_match(sd_bus *bus, sd_bus_message *m) {
1556 bus->match_callbacks_modified = false;
1558 r = bus_match_run(bus, &bus->match_callbacks, 0, m);
1562 } while (bus->match_callbacks_modified);
1567 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1568 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1574 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1577 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1580 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1583 if (streq_ptr(m->member, "Ping"))
1584 r = sd_bus_message_new_method_return(bus, m, &reply);
1585 else if (streq_ptr(m->member, "GetMachineId")) {
1589 r = sd_id128_get_machine(&id);
1593 r = sd_bus_message_new_method_return(bus, m, &reply);
1597 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1599 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1601 sd_bus_error_set(&error,
1602 "org.freedesktop.DBus.Error.UnknownMethod",
1603 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1605 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1611 r = sd_bus_send(bus, reply, NULL);
1618 static int process_object(sd_bus *bus, sd_bus_message *m) {
1619 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1620 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1621 struct object_callback *c;
1629 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1632 if (hashmap_isempty(bus->object_callbacks))
1635 pl = strlen(m->path);
1640 bus->object_callbacks_modified = false;
1642 c = hashmap_get(bus->object_callbacks, m->path);
1643 if (c && c->last_iteration != bus->iteration_counter) {
1645 c->last_iteration = bus->iteration_counter;
1647 r = c->callback(bus, 0, m, c->userdata);
1654 /* Look for fallback prefixes */
1659 if (bus->object_callbacks_modified)
1662 e = strrchr(p, '/');
1668 c = hashmap_get(bus->object_callbacks, p);
1669 if (c && c->last_iteration != bus->iteration_counter && c->is_fallback) {
1671 c->last_iteration = bus->iteration_counter;
1673 r = c->callback(bus, 0, m, c->userdata);
1681 } while (bus->object_callbacks_modified);
1683 /* We found some handlers but none wanted to take this, then
1684 * return this -- with one exception, we can handle
1685 * introspection minimally ourselves */
1686 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1689 sd_bus_error_set(&error,
1690 "org.freedesktop.DBus.Error.UnknownMethod",
1691 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1693 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1697 r = sd_bus_send(bus, reply, NULL);
1704 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1705 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1706 _cleanup_free_ char *introspection = NULL;
1707 _cleanup_set_free_free_ Set *s = NULL;
1708 _cleanup_fclose_ FILE *f = NULL;
1709 struct object_callback *c;
1718 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1724 s = set_new(string_hash_func, string_compare_func);
1728 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1732 if (streq(c->path, "/"))
1735 if (streq(m->path, "/"))
1738 e = startswith(c->path, m->path);
1739 if (!e || *e != '/')
1760 f = open_memstream(&introspection, &size);
1764 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1765 fputs("<node>\n", f);
1766 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1767 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1769 while ((node = set_steal_first(s))) {
1770 fprintf(f, " <node name=\"%s\"/>\n", node);
1774 fputs("</node>\n", f);
1781 r = sd_bus_message_new_method_return(bus, m, &reply);
1785 r = sd_bus_message_append(reply, "s", introspection);
1789 r = sd_bus_send(bus, reply, NULL);
1796 static int process_message(sd_bus *bus, sd_bus_message *m) {
1802 bus->iteration_counter++;
1804 r = process_hello(bus, m);
1808 r = process_reply(bus, m);
1812 r = process_filter(bus, m);
1816 r = process_match(bus, m);
1820 r = process_builtin(bus, m);
1824 r = process_object(bus, m);
1828 return process_introspect(bus, m);
1831 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1832 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1836 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1838 r = process_timeout(bus);
1842 r = dispatch_wqueue(bus);
1846 r = dispatch_rqueue(bus, &m);
1852 r = process_message(bus, m);
1862 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1863 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1864 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1866 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1868 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1872 r = sd_bus_send(bus, reply, NULL);
1886 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1889 /* Returns 0 when we didn't do anything. This should cause the
1890 * caller to invoke sd_bus_wait() before returning the next
1891 * time. Returns > 0 when we did something, which possibly
1892 * means *ret is filled in with an unprocessed message. */
1896 if (bus->input_fd < 0)
1899 /* We don't allow recursively invoking sd_bus_process(). */
1900 if (bus->processing)
1903 switch (bus->state) {
1909 r = bus_socket_process_opening(bus);
1916 case BUS_AUTHENTICATING:
1918 r = bus_socket_process_authenticating(bus);
1928 bus->processing = true;
1929 r = process_running(bus, ret);
1930 bus->processing = false;
1935 assert_not_reached("Unknown state");
1938 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1939 struct pollfd p[2] = {};
1946 if (bus->input_fd < 0)
1949 e = sd_bus_get_events(bus);
1956 r = sd_bus_get_timeout(bus, &until);
1963 nw = now(CLOCK_MONOTONIC);
1964 m = until > nw ? until - nw : 0;
1967 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1970 p[0].fd = bus->input_fd;
1971 if (bus->output_fd == bus->input_fd) {
1975 p[0].events = e & POLLIN;
1976 p[1].fd = bus->output_fd;
1977 p[1].events = e & POLLOUT;
1981 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1985 return r > 0 ? 1 : 0;
1988 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1992 if (bus->state == BUS_UNSET)
1994 if (bus->input_fd < 0)
1996 if (bus->rqueue_size > 0)
1999 return bus_poll(bus, false, timeout_usec);
2002 int sd_bus_flush(sd_bus *bus) {
2007 if (bus->state == BUS_UNSET)
2009 if (bus->output_fd < 0)
2012 r = bus_ensure_running(bus);
2016 if (bus->wqueue_size <= 0)
2020 r = dispatch_wqueue(bus);
2024 if (bus->wqueue_size <= 0)
2027 r = bus_poll(bus, false, (uint64_t) -1);
2033 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2034 struct filter_callback *f;
2041 f = new0(struct filter_callback, 1);
2044 f->callback = callback;
2045 f->userdata = userdata;
2047 bus->filter_callbacks_modified = true;
2048 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2052 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2053 struct filter_callback *f;
2060 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2061 if (f->callback == callback && f->userdata == userdata) {
2062 bus->filter_callbacks_modified = true;
2063 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2072 static int bus_add_object(
2076 sd_bus_message_handler_t callback,
2079 struct object_callback *c;
2089 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2093 c = new0(struct object_callback, 1);
2097 c->path = strdup(path);
2103 c->callback = callback;
2104 c->userdata = userdata;
2105 c->is_fallback = fallback;
2107 bus->object_callbacks_modified = true;
2108 r = hashmap_put(bus->object_callbacks, c->path, c);
2118 static int bus_remove_object(
2122 sd_bus_message_handler_t callback,
2125 struct object_callback *c;
2134 c = hashmap_get(bus->object_callbacks, path);
2138 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2141 bus->object_callbacks_modified = true;
2142 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2150 int sd_bus_add_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2151 return bus_add_object(bus, false, path, callback, userdata);
2154 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2155 return bus_remove_object(bus, false, path, callback, userdata);
2158 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2159 return bus_add_object(bus, true, prefix, callback, userdata);
2162 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2163 return bus_remove_object(bus, true, prefix, callback, userdata);
2166 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2174 if (bus->bus_client) {
2175 r = bus_add_match_internal(bus, match);
2181 bus->match_callbacks_modified = true;
2182 r = bus_match_add(&bus->match_callbacks, match, callback, userdata, NULL);
2185 if (bus->bus_client)
2186 bus_remove_match_internal(bus, match);
2193 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2201 if (bus->bus_client)
2202 r = bus_remove_match_internal(bus, match);
2205 bus->match_callbacks_modified = true;
2206 q = bus_match_remove(&bus->match_callbacks, match, callback, userdata);
2214 int sd_bus_emit_signal(
2217 const char *interface,
2219 const char *types, ...) {
2221 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2228 r = sd_bus_message_new_signal(bus, path, interface, member, &m);
2232 va_start(ap, types);
2233 r = bus_message_append_ap(m, types, ap);
2238 return sd_bus_send(bus, m, NULL);
2241 int sd_bus_call_method(
2243 const char *destination,
2245 const char *interface,
2247 sd_bus_error *error,
2248 sd_bus_message **reply,
2249 const char *types, ...) {
2251 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2258 r = sd_bus_message_new_method_call(bus, destination, path, interface, member, &m);
2262 va_start(ap, types);
2263 r = bus_message_append_ap(m, types, ap);
2268 return sd_bus_send_with_reply_and_block(bus, m, 0, error, reply);
2271 int sd_bus_reply_method_return(
2273 sd_bus_message *call,
2274 const char *types, ...) {
2276 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2286 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2289 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2292 r = sd_bus_message_new_method_return(bus, call, &m);
2296 va_start(ap, types);
2297 r = bus_message_append_ap(m, types, ap);
2302 return sd_bus_send(bus, m, NULL);
2305 int sd_bus_reply_method_error(
2307 sd_bus_message *call,
2308 const sd_bus_error *e) {
2310 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2319 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2321 if (!sd_bus_error_is_set(e))
2324 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2327 r = sd_bus_message_new_method_error(bus, call, e, &m);
2331 return sd_bus_send(bus, m, NULL);