1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
36 #include "bus-internal.h"
37 #include "bus-message.h"
39 #include "bus-socket.h"
40 #include "bus-control.h"
42 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
44 static void bus_free(sd_bus *b) {
45 struct filter_callback *f;
46 struct object_callback *c;
59 strv_free(b->exec_argv);
61 close_many(b->fds, b->n_fds);
64 for (i = 0; i < b->rqueue_size; i++)
65 sd_bus_message_unref(b->rqueue[i]);
68 for (i = 0; i < b->wqueue_size; i++)
69 sd_bus_message_unref(b->wqueue[i]);
72 hashmap_free_free(b->reply_callbacks);
73 prioq_free(b->reply_callbacks_prioq);
75 while ((f = b->filter_callbacks)) {
76 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
80 while ((c = hashmap_steal_first(b->object_callbacks))) {
85 hashmap_free(b->object_callbacks);
87 bus_match_free(&b->match_callbacks);
92 int sd_bus_new(sd_bus **ret) {
103 r->input_fd = r->output_fd = -1;
104 r->message_version = 1;
105 r->negotiate_fds = true;
107 /* We guarantee that wqueue always has space for at least one
109 r->wqueue = new(sd_bus_message*, 1);
119 int sd_bus_set_address(sd_bus *bus, const char *address) {
124 if (bus->state != BUS_UNSET)
139 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
142 if (bus->state != BUS_UNSET)
149 bus->input_fd = input_fd;
150 bus->output_fd = output_fd;
154 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
159 if (bus->state != BUS_UNSET)
163 if (strv_isempty(argv))
176 free(bus->exec_path);
177 strv_free(bus->exec_argv);
185 int sd_bus_set_bus_client(sd_bus *bus, int b) {
188 if (bus->state != BUS_UNSET)
191 bus->bus_client = !!b;
195 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
198 if (bus->state != BUS_UNSET)
201 bus->negotiate_fds = !!b;
205 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
208 if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
210 if (bus->state != BUS_UNSET)
213 bus->is_server = !!b;
214 bus->server_id = server_id;
218 int sd_bus_set_anonymous(sd_bus *bus, int b) {
221 if (bus->state != BUS_UNSET)
224 bus->anonymous_auth = !!b;
228 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
233 assert(bus->state == BUS_HELLO);
240 r = sd_bus_message_read(reply, "s", &s);
244 if (!service_name_is_valid(s) || s[0] != ':')
247 bus->unique_name = strdup(s);
248 if (!bus->unique_name)
251 bus->state = BUS_RUNNING;
256 static int bus_send_hello(sd_bus *bus) {
257 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
262 if (!bus->bus_client)
265 r = sd_bus_message_new_method_call(
267 "org.freedesktop.DBus",
269 "org.freedesktop.DBus",
275 return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
278 int bus_start_running(sd_bus *bus) {
281 if (bus->bus_client) {
282 bus->state = BUS_HELLO;
286 bus->state = BUS_RUNNING;
290 static int parse_address_key(const char **p, const char *key, char **value) {
301 if (strncmp(*p, key, l) != 0)
314 while (*a != ';' && *a != ',' && *a != 0) {
332 c = (char) ((x << 4) | y);
339 t = realloc(r, n + 2);
367 static void skip_address_key(const char **p) {
371 *p += strcspn(*p, ",");
377 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
378 _cleanup_free_ char *path = NULL, *abstract = NULL;
387 while (**p != 0 && **p != ';') {
388 r = parse_address_key(p, "guid", guid);
394 r = parse_address_key(p, "path", &path);
400 r = parse_address_key(p, "abstract", &abstract);
409 if (!path && !abstract)
412 if (path && abstract)
417 if (l > sizeof(b->sockaddr.un.sun_path))
420 b->sockaddr.un.sun_family = AF_UNIX;
421 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
422 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
423 } else if (abstract) {
424 l = strlen(abstract);
425 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
428 b->sockaddr.un.sun_family = AF_UNIX;
429 b->sockaddr.un.sun_path[0] = 0;
430 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
431 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
437 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
438 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
439 struct addrinfo hints, *result;
447 while (**p != 0 && **p != ';') {
448 r = parse_address_key(p, "guid", guid);
454 r = parse_address_key(p, "host", &host);
460 r = parse_address_key(p, "port", &port);
466 r = parse_address_key(p, "family", &family);
479 hints.ai_socktype = SOCK_STREAM;
480 hints.ai_flags = AI_ADDRCONFIG;
483 if (streq(family, "ipv4"))
484 hints.ai_family = AF_INET;
485 else if (streq(family, "ipv6"))
486 hints.ai_family = AF_INET6;
491 r = getaddrinfo(host, port, &hints, &result);
495 return -EADDRNOTAVAIL;
497 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
498 b->sockaddr_size = result->ai_addrlen;
500 freeaddrinfo(result);
505 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
507 unsigned n_argv = 0, j;
516 while (**p != 0 && **p != ';') {
517 r = parse_address_key(p, "guid", guid);
523 r = parse_address_key(p, "path", &path);
529 if (startswith(*p, "argv")) {
533 ul = strtoul(*p + 4, (char**) p, 10);
534 if (errno > 0 || **p != '=' || ul > 256) {
544 x = realloc(argv, sizeof(char*) * (ul + 2));
550 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
556 r = parse_address_key(p, NULL, argv + ul);
571 /* Make sure there are no holes in the array, with the
572 * exception of argv[0] */
573 for (j = 1; j < n_argv; j++)
579 if (argv && argv[0] == NULL) {
580 argv[0] = strdup(path);
592 for (j = 0; j < n_argv; j++)
600 static void bus_reset_parsed_address(sd_bus *b) {
604 b->sockaddr_size = 0;
605 strv_free(b->exec_argv);
609 b->server_id = SD_ID128_NULL;
612 static int bus_parse_next_address(sd_bus *b) {
613 _cleanup_free_ char *guid = NULL;
621 if (b->address[b->address_index] == 0)
624 bus_reset_parsed_address(b);
626 a = b->address + b->address_index;
635 if (startswith(a, "unix:")) {
638 r = parse_unix_address(b, &a, &guid);
643 } else if (startswith(a, "tcp:")) {
646 r = parse_tcp_address(b, &a, &guid);
652 } else if (startswith(a, "unixexec:")) {
655 r = parse_exec_address(b, &a, &guid);
669 r = sd_id128_from_string(guid, &b->server_id);
674 b->address_index = a - b->address;
678 static int bus_start_address(sd_bus *b) {
686 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
688 r = bus_socket_connect(b);
692 b->last_connect_error = -r;
694 } else if (b->exec_path) {
696 r = bus_socket_exec(b);
700 b->last_connect_error = -r;
703 r = bus_parse_next_address(b);
707 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
711 int bus_next_address(sd_bus *b) {
714 bus_reset_parsed_address(b);
715 return bus_start_address(b);
718 static int bus_start_fd(sd_bus *b) {
722 assert(b->input_fd >= 0);
723 assert(b->output_fd >= 0);
725 r = fd_nonblock(b->input_fd, true);
729 r = fd_cloexec(b->input_fd, true);
733 if (b->input_fd != b->output_fd) {
734 r = fd_nonblock(b->output_fd, true);
738 r = fd_cloexec(b->output_fd, true);
743 return bus_socket_take_fd(b);
746 int sd_bus_start(sd_bus *bus) {
751 if (bus->state != BUS_UNSET)
754 bus->state = BUS_OPENING;
756 if (bus->is_server && bus->bus_client)
759 if (bus->input_fd >= 0)
760 r = bus_start_fd(bus);
761 else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path)
762 r = bus_start_address(bus);
769 return bus_send_hello(bus);
772 int sd_bus_open_system(sd_bus **ret) {
784 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
786 r = sd_bus_set_address(b, e);
790 b->sockaddr.un.sun_family = AF_UNIX;
791 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
792 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
795 b->bus_client = true;
809 int sd_bus_open_user(sd_bus **ret) {
822 e = getenv("DBUS_SESSION_BUS_ADDRESS");
824 r = sd_bus_set_address(b, e);
828 e = getenv("XDG_RUNTIME_DIR");
835 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
840 b->sockaddr.un.sun_family = AF_UNIX;
841 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
842 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
845 b->bus_client = true;
859 void sd_bus_close(sd_bus *bus) {
863 if (bus->input_fd >= 0)
864 close_nointr_nofail(bus->input_fd);
865 if (bus->output_fd >= 0 && bus->output_fd != bus->input_fd)
866 close_nointr_nofail(bus->output_fd);
868 bus->input_fd = bus->output_fd = -1;
871 sd_bus *sd_bus_ref(sd_bus *bus) {
875 assert(bus->n_ref > 0);
881 sd_bus *sd_bus_unref(sd_bus *bus) {
885 assert(bus->n_ref > 0);
894 int sd_bus_is_open(sd_bus *bus) {
898 return bus->state != BUS_UNSET && bus->input_fd >= 0;
901 int sd_bus_can_send(sd_bus *bus, char type) {
906 if (bus->output_fd < 0)
909 if (type == SD_BUS_TYPE_UNIX_FD) {
910 if (!bus->negotiate_fds)
913 r = bus_ensure_running(bus);
920 return bus_type_is_valid(type);
923 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
931 r = bus_ensure_running(bus);
935 *server_id = bus->server_id;
939 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
942 if (m->header->version > b->message_version)
948 return bus_message_seal(m, ++b->serial);
951 static int dispatch_wqueue(sd_bus *bus) {
955 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
957 if (bus->output_fd < 0)
960 while (bus->wqueue_size > 0) {
962 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
967 /* Didn't do anything this time */
969 else if (bus->windex >= bus->wqueue[0]->size) {
970 /* Fully written. Let's drop the entry from
973 * This isn't particularly optimized, but
974 * well, this is supposed to be our worst-case
975 * buffer only, and the socket buffer is
976 * supposed to be our primary buffer, and if
977 * it got full, then all bets are off
980 sd_bus_message_unref(bus->wqueue[0]);
982 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
992 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
993 sd_bus_message *z = NULL;
998 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1000 if (bus->input_fd < 0)
1003 if (bus->rqueue_size > 0) {
1004 /* Dispatch a queued message */
1006 *m = bus->rqueue[0];
1007 bus->rqueue_size --;
1008 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1012 /* Try to read a new message */
1014 r = bus_socket_read_message(bus, &z);
1029 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1034 if (bus->state == BUS_UNSET)
1036 if (bus->output_fd < 0)
1042 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1049 /* If the serial number isn't kept, then we know that no reply
1051 if (!serial && !m->sealed)
1052 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1054 r = bus_seal_message(bus, m);
1058 /* If this is a reply and no reply was requested, then let's
1059 * suppress this, if we can */
1060 if (m->dont_send && !serial)
1063 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1066 r = bus_socket_write_message(bus, m, &idx);
1070 } else if (idx < m->size) {
1071 /* Wasn't fully written. So let's remember how
1072 * much was written. Note that the first entry
1073 * of the wqueue array is always allocated so
1074 * that we always can remember how much was
1076 bus->wqueue[0] = sd_bus_message_ref(m);
1077 bus->wqueue_size = 1;
1083 /* Just append it to the queue. */
1085 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1088 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1093 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1097 *serial = BUS_MESSAGE_SERIAL(m);
1102 static usec_t calc_elapse(uint64_t usec) {
1103 if (usec == (uint64_t) -1)
1107 usec = BUS_DEFAULT_TIMEOUT;
1109 return now(CLOCK_MONOTONIC) + usec;
1112 static int timeout_compare(const void *a, const void *b) {
1113 const struct reply_callback *x = a, *y = b;
1115 if (x->timeout != 0 && y->timeout == 0)
1118 if (x->timeout == 0 && y->timeout != 0)
1121 if (x->timeout < y->timeout)
1124 if (x->timeout > y->timeout)
1130 int sd_bus_send_with_reply(
1133 sd_bus_message_handler_t callback,
1138 struct reply_callback *c;
1143 if (bus->state == BUS_UNSET)
1145 if (bus->output_fd < 0)
1151 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1153 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1156 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1160 if (usec != (uint64_t) -1) {
1161 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1166 r = bus_seal_message(bus, m);
1170 c = new0(struct reply_callback, 1);
1174 c->callback = callback;
1175 c->userdata = userdata;
1176 c->serial = BUS_MESSAGE_SERIAL(m);
1177 c->timeout = calc_elapse(usec);
1179 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1185 if (c->timeout != 0) {
1186 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1189 sd_bus_send_with_reply_cancel(bus, c->serial);
1194 r = sd_bus_send(bus, m, serial);
1196 sd_bus_send_with_reply_cancel(bus, c->serial);
1203 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1204 struct reply_callback *c;
1211 c = hashmap_remove(bus->reply_callbacks, &serial);
1215 if (c->timeout != 0)
1216 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1222 int bus_ensure_running(sd_bus *bus) {
1227 if (bus->input_fd < 0)
1229 if (bus->state == BUS_UNSET)
1232 if (bus->state == BUS_RUNNING)
1236 r = sd_bus_process(bus, NULL);
1239 if (bus->state == BUS_RUNNING)
1244 r = sd_bus_wait(bus, (uint64_t) -1);
1250 int sd_bus_send_with_reply_and_block(
1254 sd_bus_error *error,
1255 sd_bus_message **reply) {
1264 if (bus->output_fd < 0)
1266 if (bus->state == BUS_UNSET)
1270 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1272 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1274 if (bus_error_is_dirty(error))
1277 r = bus_ensure_running(bus);
1281 r = sd_bus_send(bus, m, &serial);
1285 timeout = calc_elapse(usec);
1289 sd_bus_message *incoming = NULL;
1294 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1297 /* Make sure there's room for queuing this
1298 * locally, before we read the message */
1300 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1308 r = bus_socket_read_message(bus, &incoming);
1313 if (incoming->reply_serial == serial) {
1314 /* Found a match! */
1316 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1321 sd_bus_message_unref(incoming);
1326 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1329 r = sd_bus_error_copy(error, &incoming->error);
1331 sd_bus_message_unref(incoming);
1335 k = bus_error_to_errno(&incoming->error);
1336 sd_bus_message_unref(incoming);
1340 sd_bus_message_unref(incoming);
1344 /* There's already guaranteed to be room for
1345 * this, so need to resize things here */
1346 bus->rqueue[bus->rqueue_size ++] = incoming;
1349 /* Try to read more, right-away */
1358 n = now(CLOCK_MONOTONIC);
1364 left = (uint64_t) -1;
1366 r = bus_poll(bus, true, left);
1370 r = dispatch_wqueue(bus);
1376 int sd_bus_get_fd(sd_bus *bus) {
1379 if (bus->input_fd < 0)
1381 if (bus->input_fd != bus->output_fd)
1384 return bus->input_fd;
1387 int sd_bus_get_events(sd_bus *bus) {
1392 if (bus->state == BUS_UNSET)
1394 if (bus->input_fd < 0)
1397 if (bus->state == BUS_OPENING)
1399 else if (bus->state == BUS_AUTHENTICATING) {
1401 if (bus_socket_auth_needs_write(bus))
1406 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1407 if (bus->rqueue_size <= 0)
1409 if (bus->wqueue_size > 0)
1416 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1417 struct reply_callback *c;
1423 if (bus->state == BUS_UNSET)
1425 if (bus->input_fd < 0)
1428 if (bus->state == BUS_AUTHENTICATING) {
1429 *timeout_usec = bus->auth_timeout;
1433 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1434 *timeout_usec = (uint64_t) -1;
1438 c = prioq_peek(bus->reply_callbacks_prioq);
1440 *timeout_usec = (uint64_t) -1;
1444 *timeout_usec = c->timeout;
1448 static int process_timeout(sd_bus *bus) {
1449 struct reply_callback *c;
1455 c = prioq_peek(bus->reply_callbacks_prioq);
1459 n = now(CLOCK_MONOTONIC);
1463 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1464 hashmap_remove(bus->reply_callbacks, &c->serial);
1466 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1469 return r < 0 ? r : 1;
1472 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1476 if (bus->state != BUS_HELLO)
1479 /* Let's make sure the first message on the bus is the HELLO
1480 * reply. But note that we don't actually parse the message
1481 * here (we leave that to the usual handling), we just verify
1482 * we don't let any earlier msg through. */
1484 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1485 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1488 if (m->reply_serial != bus->hello_serial)
1494 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1495 struct reply_callback *c;
1501 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1502 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1505 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1509 if (c->timeout != 0)
1510 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1512 r = c->callback(bus, 0, m, c->userdata);
1518 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1519 struct filter_callback *l;
1526 bus->filter_callbacks_modified = false;
1528 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1530 if (bus->filter_callbacks_modified)
1533 /* Don't run this more than once per iteration */
1534 if (l->last_iteration == bus->iteration_counter)
1537 l->last_iteration = bus->iteration_counter;
1539 r = l->callback(bus, 0, m, l->userdata);
1545 } while (bus->filter_callbacks_modified);
1550 static int process_match(sd_bus *bus, sd_bus_message *m) {
1557 bus->match_callbacks_modified = false;
1559 r = bus_match_run(bus, &bus->match_callbacks, 0, m);
1563 } while (bus->match_callbacks_modified);
1568 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1569 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1575 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1578 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1581 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1584 if (streq_ptr(m->member, "Ping"))
1585 r = sd_bus_message_new_method_return(bus, m, &reply);
1586 else if (streq_ptr(m->member, "GetMachineId")) {
1590 r = sd_id128_get_machine(&id);
1594 r = sd_bus_message_new_method_return(bus, m, &reply);
1598 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1600 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1602 sd_bus_error_set(&error,
1603 "org.freedesktop.DBus.Error.UnknownMethod",
1604 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1606 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1612 r = sd_bus_send(bus, reply, NULL);
1619 static int process_object(sd_bus *bus, sd_bus_message *m) {
1620 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1621 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1622 struct object_callback *c;
1630 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1633 if (hashmap_isempty(bus->object_callbacks))
1636 pl = strlen(m->path);
1641 bus->object_callbacks_modified = false;
1643 c = hashmap_get(bus->object_callbacks, m->path);
1644 if (c && c->last_iteration != bus->iteration_counter) {
1646 c->last_iteration = bus->iteration_counter;
1648 r = c->callback(bus, 0, m, c->userdata);
1655 /* Look for fallback prefixes */
1660 if (bus->object_callbacks_modified)
1663 e = strrchr(p, '/');
1669 c = hashmap_get(bus->object_callbacks, p);
1670 if (c && c->last_iteration != bus->iteration_counter && c->is_fallback) {
1672 c->last_iteration = bus->iteration_counter;
1674 r = c->callback(bus, 0, m, c->userdata);
1682 } while (bus->object_callbacks_modified);
1684 /* We found some handlers but none wanted to take this, then
1685 * return this -- with one exception, we can handle
1686 * introspection minimally ourselves */
1687 if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1690 sd_bus_error_set(&error,
1691 "org.freedesktop.DBus.Error.UnknownMethod",
1692 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1694 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1698 r = sd_bus_send(bus, reply, NULL);
1705 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1706 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1707 _cleanup_free_ char *introspection = NULL;
1708 _cleanup_set_free_free_ Set *s = NULL;
1709 _cleanup_fclose_ FILE *f = NULL;
1710 struct object_callback *c;
1719 if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1725 s = set_new(string_hash_func, string_compare_func);
1729 HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1733 if (streq(c->path, "/"))
1736 if (streq(m->path, "/"))
1739 e = startswith(c->path, m->path);
1740 if (!e || *e != '/')
1761 f = open_memstream(&introspection, &size);
1765 fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1766 fputs("<node>\n", f);
1767 fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1768 fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1770 while ((node = set_steal_first(s))) {
1771 fprintf(f, " <node name=\"%s\"/>\n", node);
1775 fputs("</node>\n", f);
1782 r = sd_bus_message_new_method_return(bus, m, &reply);
1786 r = sd_bus_message_append(reply, "s", introspection);
1790 r = sd_bus_send(bus, reply, NULL);
1797 static int process_message(sd_bus *bus, sd_bus_message *m) {
1803 bus->iteration_counter++;
1805 r = process_hello(bus, m);
1809 r = process_reply(bus, m);
1813 r = process_filter(bus, m);
1817 r = process_match(bus, m);
1821 r = process_builtin(bus, m);
1825 r = process_object(bus, m);
1829 return process_introspect(bus, m);
1832 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1833 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1837 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1839 r = process_timeout(bus);
1843 r = dispatch_wqueue(bus);
1847 r = dispatch_rqueue(bus, &m);
1853 r = process_message(bus, m);
1863 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1864 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1865 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1867 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1869 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1873 r = sd_bus_send(bus, reply, NULL);
1887 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1890 /* Returns 0 when we didn't do anything. This should cause the
1891 * caller to invoke sd_bus_wait() before returning the next
1892 * time. Returns > 0 when we did something, which possibly
1893 * means *ret is filled in with an unprocessed message. */
1897 if (bus->input_fd < 0)
1900 /* We don't allow recursively invoking sd_bus_process(). */
1901 if (bus->processing)
1904 switch (bus->state) {
1910 r = bus_socket_process_opening(bus);
1917 case BUS_AUTHENTICATING:
1919 r = bus_socket_process_authenticating(bus);
1929 bus->processing = true;
1930 r = process_running(bus, ret);
1931 bus->processing = false;
1936 assert_not_reached("Unknown state");
1939 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1947 if (bus->input_fd < 0)
1950 e = sd_bus_get_events(bus);
1957 r = sd_bus_get_timeout(bus, &until);
1964 nw = now(CLOCK_MONOTONIC);
1965 m = until > nw ? until - nw : 0;
1968 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1972 p[0].fd = bus->input_fd;
1974 if (bus->output_fd == bus->input_fd) {
1978 p[0].events = e & POLLIN;
1979 p[1].fd = bus->output_fd;
1980 p[1].events = e & POLLOUT;
1984 r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1988 return r > 0 ? 1 : 0;
1991 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1995 if (bus->state == BUS_UNSET)
1997 if (bus->input_fd < 0)
1999 if (bus->rqueue_size > 0)
2002 return bus_poll(bus, false, timeout_usec);
2005 int sd_bus_flush(sd_bus *bus) {
2010 if (bus->state == BUS_UNSET)
2012 if (bus->output_fd < 0)
2015 r = bus_ensure_running(bus);
2019 if (bus->wqueue_size <= 0)
2023 r = dispatch_wqueue(bus);
2027 if (bus->wqueue_size <= 0)
2030 r = bus_poll(bus, false, (uint64_t) -1);
2036 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2037 struct filter_callback *f;
2044 f = new0(struct filter_callback, 1);
2047 f->callback = callback;
2048 f->userdata = userdata;
2050 bus->filter_callbacks_modified = true;
2051 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2055 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2056 struct filter_callback *f;
2063 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2064 if (f->callback == callback && f->userdata == userdata) {
2065 bus->filter_callbacks_modified = true;
2066 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2075 static int bus_add_object(
2079 sd_bus_message_handler_t callback,
2082 struct object_callback *c;
2092 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2096 c = new0(struct object_callback, 1);
2100 c->path = strdup(path);
2106 c->callback = callback;
2107 c->userdata = userdata;
2108 c->is_fallback = fallback;
2110 bus->object_callbacks_modified = true;
2111 r = hashmap_put(bus->object_callbacks, c->path, c);
2121 static int bus_remove_object(
2125 sd_bus_message_handler_t callback,
2128 struct object_callback *c;
2137 c = hashmap_get(bus->object_callbacks, path);
2141 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2144 bus->object_callbacks_modified = true;
2145 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2153 int sd_bus_add_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2154 return bus_add_object(bus, false, path, callback, userdata);
2157 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2158 return bus_remove_object(bus, false, path, callback, userdata);
2161 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2162 return bus_add_object(bus, true, prefix, callback, userdata);
2165 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2166 return bus_remove_object(bus, true, prefix, callback, userdata);
2169 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2177 if (bus->bus_client) {
2178 r = bus_add_match_internal(bus, match);
2184 bus->match_callbacks_modified = true;
2185 r = bus_match_add(&bus->match_callbacks, match, callback, userdata, NULL);
2188 if (bus->bus_client)
2189 bus_remove_match_internal(bus, match);
2196 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2204 if (bus->bus_client)
2205 r = bus_remove_match_internal(bus, match);
2208 bus->match_callbacks_modified = true;
2209 q = bus_match_remove(&bus->match_callbacks, match, callback, userdata);
2217 int sd_bus_emit_signal(
2220 const char *interface,
2222 const char *types, ...) {
2224 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2231 r = sd_bus_message_new_signal(bus, path, interface, member, &m);
2235 va_start(ap, types);
2236 r = bus_message_append_ap(m, types, ap);
2241 return sd_bus_send(bus, m, NULL);
2244 int sd_bus_call_method(
2246 const char *destination,
2248 const char *interface,
2250 sd_bus_error *error,
2251 sd_bus_message **reply,
2252 const char *types, ...) {
2254 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2261 r = sd_bus_message_new_method_call(bus, destination, path, interface, member, &m);
2265 va_start(ap, types);
2266 r = bus_message_append_ap(m, types, ap);
2271 return sd_bus_send_with_reply_and_block(bus, m, 0, error, reply);
2274 int sd_bus_reply_method_return(
2276 sd_bus_message *call,
2277 const char *types, ...) {
2279 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2289 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2292 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2295 r = sd_bus_message_new_method_return(bus, call, &m);
2299 va_start(ap, types);
2300 r = bus_message_append_ap(m, types, ap);
2305 return sd_bus_send(bus, m, NULL);
2308 int sd_bus_reply_method_error(
2310 sd_bus_message *call,
2311 const sd_bus_error *e) {
2313 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2322 if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2324 if (!sd_bus_error_is_set(e))
2327 if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2330 r = sd_bus_message_new_method_error(bus, call, e, &m);
2334 return sd_bus_send(bus, m, NULL);