1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
36 #include "bus-internal.h"
37 #include "bus-message.h"
40 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
42 static void bus_free(sd_bus *b) {
43 struct filter_callback *f;
44 struct object_callback *c;
50 close_nointr_nofail(b->fd);
58 strv_free(b->exec_argv);
60 close_many(b->fds, b->n_fds);
63 for (i = 0; i < b->rqueue_size; i++)
64 sd_bus_message_unref(b->rqueue[i]);
67 for (i = 0; i < b->wqueue_size; i++)
68 sd_bus_message_unref(b->wqueue[i]);
71 hashmap_free_free(b->reply_callbacks);
72 prioq_free(b->reply_callbacks_prioq);
74 while ((f = b->filter_callbacks)) {
75 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
79 while ((c = hashmap_steal_first(b->object_callbacks))) {
84 hashmap_free(b->object_callbacks);
89 int sd_bus_new(sd_bus **ret) {
101 r->message_version = 1;
102 r->negotiate_fds = true;
104 /* We guarantee that wqueue always has space for at least one
106 r->wqueue = new(sd_bus_message*, 1);
116 int sd_bus_set_address(sd_bus *bus, const char *address) {
121 if (bus->state != BUS_UNSET)
136 int sd_bus_set_fd(sd_bus *bus, int fd) {
139 if (bus->state != BUS_UNSET)
148 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
153 if (bus->state != BUS_UNSET)
157 if (strv_isempty(argv))
170 free(bus->exec_path);
171 strv_free(bus->exec_argv);
179 int sd_bus_set_hello(sd_bus *bus, int b) {
182 if (bus->state != BUS_UNSET)
185 bus->send_hello = !!b;
189 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
192 if (bus->state != BUS_UNSET)
195 bus->negotiate_fds = !!b;
199 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
204 assert(bus->state == BUS_HELLO);
211 r = sd_bus_message_read(reply, "s", &s);
215 if (!service_name_is_valid(s) || s[0] != ':')
218 bus->unique_name = strdup(s);
219 if (!bus->unique_name)
222 bus->state = BUS_RUNNING;
227 static int bus_send_hello(sd_bus *bus) {
228 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
233 if (!bus->send_hello)
236 r = sd_bus_message_new_method_call(
238 "org.freedesktop.DBus",
240 "org.freedesktop.DBus",
246 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
253 static int bus_start_running(sd_bus *bus) {
256 if (bus->send_hello) {
257 bus->state = BUS_HELLO;
261 bus->state = BUS_RUNNING;
265 static int parse_address_key(const char **p, const char *key, char **value) {
276 if (strncmp(*p, key, l) != 0)
289 while (*a != ';' && *a != ',' && *a != 0) {
307 c = (char) ((x << 4) | y);
314 t = realloc(r, n + 2);
342 static void skip_address_key(const char **p) {
346 *p += strcspn(*p, ",");
352 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
353 _cleanup_free_ char *path = NULL, *abstract = NULL;
362 while (**p != 0 && **p != ';') {
363 r = parse_address_key(p, "guid", guid);
369 r = parse_address_key(p, "path", &path);
375 r = parse_address_key(p, "abstract", &abstract);
384 if (!path && !abstract)
387 if (path && abstract)
392 if (l > sizeof(b->sockaddr.un.sun_path))
395 b->sockaddr.un.sun_family = AF_UNIX;
396 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
397 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
398 } else if (abstract) {
399 l = strlen(abstract);
400 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
403 b->sockaddr.un.sun_family = AF_UNIX;
404 b->sockaddr.un.sun_path[0] = 0;
405 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
406 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
412 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
413 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
414 struct addrinfo hints, *result;
422 while (**p != 0 && **p != ';') {
423 r = parse_address_key(p, "guid", guid);
429 r = parse_address_key(p, "host", &host);
435 r = parse_address_key(p, "port", &port);
441 r = parse_address_key(p, "family", &family);
454 hints.ai_socktype = SOCK_STREAM;
455 hints.ai_flags = AI_ADDRCONFIG;
458 if (streq(family, "ipv4"))
459 hints.ai_family = AF_INET;
460 else if (streq(family, "ipv6"))
461 hints.ai_family = AF_INET6;
466 r = getaddrinfo(host, port, &hints, &result);
470 return -EADDRNOTAVAIL;
472 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
473 b->sockaddr_size = result->ai_addrlen;
475 freeaddrinfo(result);
480 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
482 unsigned n_argv = 0, j;
491 while (**p != 0 && **p != ';') {
492 r = parse_address_key(p, "guid", guid);
498 r = parse_address_key(p, "path", &path);
504 if (startswith(*p, "argv")) {
508 ul = strtoul(*p + 4, (char**) p, 10);
509 if (errno != 0 || **p != '=' || ul > 256) {
519 x = realloc(argv, sizeof(char*) * (ul + 2));
525 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
531 r = parse_address_key(p, NULL, argv + ul);
544 /* Make sure there are no holes in the array, with the
545 * exception of argv[0] */
546 for (j = 1; j < n_argv; j++)
552 if (argv && argv[0] == NULL) {
553 argv[0] = strdup(path);
565 for (j = 0; j < n_argv; j++)
573 static void bus_reset_parsed_address(sd_bus *b) {
577 b->sockaddr_size = 0;
578 strv_free(b->exec_argv);
582 b->peer = SD_ID128_NULL;
585 static int bus_parse_next_address(sd_bus *b) {
586 _cleanup_free_ char *guid = NULL;
594 if (b->address[b->address_index] == 0)
597 bus_reset_parsed_address(b);
599 a = b->address + b->address_index;
608 if (startswith(a, "unix:")) {
611 r = parse_unix_address(b, &a, &guid);
616 } else if (startswith(a, "tcp:")) {
619 r = parse_tcp_address(b, &a, &guid);
625 } else if (startswith(a, "unixexec:")) {
628 r = parse_exec_address(b, &a, &guid);
642 r = sd_id128_from_string(guid, &b->peer);
647 b->address_index = a - b->address;
651 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
654 struct iovec *i = iov + *idx;
656 if (i->iov_len > size) {
657 i->iov_base = (uint8_t*) i->iov_base + size;
671 static int bus_write_auth(sd_bus *b) {
676 assert(b->state == BUS_AUTHENTICATING);
678 if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
681 if (b->auth_timeout == 0)
682 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
685 mh.msg_iov = b->auth_iovec + b->auth_index;
686 mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
688 k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
690 return errno == EAGAIN ? 0 : -errno;
692 iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
697 static int bus_auth_verify(sd_bus *b) {
703 /* We expect two response lines: "OK" and possibly
706 e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
710 if (b->negotiate_fds) {
711 f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
721 /* Nice! We got all the lines we need. First check the OK
724 if (e - (char*) b->rbuffer != 3 + 32)
727 if (memcmp(b->rbuffer, "OK ", 3))
730 for (i = 0; i < 32; i += 2) {
733 x = unhexchar(((char*) b->rbuffer)[3 + i]);
734 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
739 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
742 if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
743 !sd_id128_equal(b->peer, peer))
748 /* And possibly check the second line, too */
752 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
753 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
755 b->rbuffer_size -= (start - (char*) b->rbuffer);
756 memmove(b->rbuffer, start, b->rbuffer_size);
758 r = bus_start_running(b);
765 static int bus_read_auth(sd_bus *b) {
775 r = bus_auth_verify(b);
779 n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
781 if (n > BUS_AUTH_SIZE_MAX)
782 n = BUS_AUTH_SIZE_MAX;
784 if (b->rbuffer_size >= n)
787 p = realloc(b->rbuffer, n);
794 iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
795 iov.iov_len = n - b->rbuffer_size;
801 k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
803 return errno == EAGAIN ? 0 : -errno;
807 b->rbuffer_size += k;
809 r = bus_auth_verify(b);
816 static int bus_setup_fd(sd_bus *b) {
821 /* Enable SO_PASSCRED + SO_PASSEC. We try this on any socket,
822 * just in case. This is actually irrelavant for */
824 setsockopt(b->fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
825 setsockopt(b->fd, SOL_SOCKET, SO_PASSSEC, &one, sizeof(one));
827 /* Increase the buffers to a MB */
828 fd_inc_rcvbuf(b->fd, 1024*1024);
829 fd_inc_sndbuf(b->fd, 1024*1024);
834 static int bus_start_auth(sd_bus *b) {
835 static const char auth_prefix[] = "\0AUTH EXTERNAL ";
836 static const char auth_suffix_with_unix_fd[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
837 static const char auth_suffix_without_unix_fd[] = "\r\nBEGIN\r\n";
839 char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
841 const char *auth_suffix;
847 b->state = BUS_AUTHENTICATING;
850 r = getsockopt(b->fd, SOL_SOCKET, SO_DOMAIN, &domain, &sl);
854 if (domain != AF_UNIX)
855 b->negotiate_fds = false;
857 snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
861 b->auth_uid = hexmem(text, l);
865 auth_suffix = b->negotiate_fds ? auth_suffix_with_unix_fd : auth_suffix_without_unix_fd;
867 b->auth_iovec[0].iov_base = (void*) auth_prefix;
868 b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
869 b->auth_iovec[1].iov_base = (void*) b->auth_uid;
870 b->auth_iovec[1].iov_len = l * 2;
871 b->auth_iovec[2].iov_base = (void*) auth_suffix;
872 b->auth_iovec[2].iov_len = strlen(auth_suffix);
873 b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
875 return bus_write_auth(b);
878 static int bus_connect(sd_bus *b) {
883 assert(b->sockaddr.sa.sa_family != AF_UNSPEC);
885 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
893 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
895 if (errno == EINPROGRESS)
901 return bus_start_auth(b);
904 static int bus_exec(sd_bus *b) {
910 assert(b->exec_path);
912 b->fd = socketpair(AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0, s);
925 close_nointr_nofail(s[0]);
927 assert_se(dup3(s[1], STDIN_FILENO, 0) == STDIN_FILENO);
928 assert_se(dup3(s[1], STDOUT_FILENO, 0) == STDOUT_FILENO);
930 if (s[1] != STDIN_FILENO && s[1] != STDOUT_FILENO)
931 close_nointr_nofail(s[1]);
933 fd_cloexec(STDIN_FILENO, false);
934 fd_cloexec(STDOUT_FILENO, false);
935 fd_nonblock(STDIN_FILENO, false);
936 fd_nonblock(STDOUT_FILENO, false);
939 execvp(b->exec_path, b->exec_argv);
941 const char *argv[] = { b->exec_path, NULL };
942 execvp(b->exec_path, (char**) argv);
948 close_nointr_nofail(s[1]);
951 return bus_start_auth(b);
954 static int bus_start_connect(sd_bus *b) {
961 close_nointr_nofail(b->fd);
965 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
970 b->last_connect_error = -r;
972 } else if (b->exec_path) {
978 b->last_connect_error = -r;
981 r = bus_parse_next_address(b);
985 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
989 static int bus_start_fd(sd_bus *b) {
994 r = fd_nonblock(b->fd, true);
998 r = fd_cloexec(b->fd, true);
1002 r = bus_setup_fd(b);
1006 return bus_start_auth(b);
1009 int sd_bus_start(sd_bus *bus) {
1014 if (bus->state != BUS_UNSET)
1017 bus->state = BUS_OPENING;
1020 r = bus_start_fd(bus);
1021 else if (bus->address)
1022 r = bus_start_connect(bus);
1029 return bus_send_hello(bus);
1032 int sd_bus_open_system(sd_bus **ret) {
1044 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
1046 r = sd_bus_set_address(b, e);
1050 b->sockaddr.un.sun_family = AF_UNIX;
1051 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
1052 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
1055 b->send_hello = true;
1057 r = sd_bus_start(b);
1069 int sd_bus_open_user(sd_bus **ret) {
1082 e = getenv("DBUS_SESSION_BUS_ADDRESS");
1084 r = sd_bus_set_address(b, e);
1088 e = getenv("XDG_RUNTIME_DIR");
1095 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1100 b->sockaddr.un.sun_family = AF_UNIX;
1101 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1102 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1105 b->send_hello = true;
1107 r = sd_bus_start(b);
1119 void sd_bus_close(sd_bus *bus) {
1125 close_nointr_nofail(bus->fd);
1129 sd_bus *sd_bus_ref(sd_bus *bus) {
1133 assert(bus->n_ref > 0);
1139 sd_bus *sd_bus_unref(sd_bus *bus) {
1143 assert(bus->n_ref > 0);
1146 if (bus->n_ref <= 0)
1152 int sd_bus_is_open(sd_bus *bus) {
1156 return bus->state != BUS_UNSET && bus->fd >= 0;
1159 int sd_bus_can_send(sd_bus *bus, char type) {
1167 if (type == SD_BUS_TYPE_UNIX_FD) {
1168 if (!bus->negotiate_fds)
1171 r = bus_ensure_running(bus);
1175 return bus->can_fds;
1178 return bus_type_is_valid(type);
1181 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
1189 r = bus_ensure_running(bus);
1197 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1200 if (m->header->version > b->message_version)
1206 return bus_message_seal(m, ++b->serial);
1209 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
1219 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1221 if (*idx >= m->size)
1226 struct cmsghdr *control;
1227 control = alloca(CMSG_SPACE(sizeof(int) * m->n_fds));
1229 mh.msg_control = control;
1230 control->cmsg_level = SOL_SOCKET;
1231 control->cmsg_type = SCM_RIGHTS;
1232 mh.msg_controllen = control->cmsg_len = CMSG_LEN(sizeof(int) * m->n_fds);
1233 memcpy(CMSG_DATA(control), m->fds, sizeof(int) * m->n_fds);
1236 n = m->n_iovec * sizeof(struct iovec);
1238 memcpy(iov, m->iovec, n);
1241 iovec_advance(iov, &j, *idx);
1244 mh.msg_iovlen = m->n_iovec;
1246 k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
1248 return errno == EAGAIN ? 0 : -errno;
1254 static int message_read_need(sd_bus *bus, size_t *need) {
1261 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1263 if (bus->rbuffer_size < sizeof(struct bus_header)) {
1264 *need = sizeof(struct bus_header) + 8;
1266 /* Minimum message size:
1270 * Method Call: +2 string headers
1271 * Signal: +3 string headers
1272 * Method Error: +1 string headers
1274 * Method Reply: +1 uint32 headers
1276 * A string header is at least 9 bytes
1277 * A uint32 header is at least 8 bytes
1279 * Hence the minimum message size of a valid message
1280 * is header + 8 bytes */
1285 a = ((const uint32_t*) bus->rbuffer)[1];
1286 b = ((const uint32_t*) bus->rbuffer)[3];
1288 e = ((const uint8_t*) bus->rbuffer)[0];
1289 if (e == SD_BUS_LITTLE_ENDIAN) {
1292 } else if (e == SD_BUS_BIG_ENDIAN) {
1298 sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
1299 if (sum >= BUS_MESSAGE_SIZE_MAX)
1302 *need = (size_t) sum;
1306 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
1313 assert(bus->rbuffer_size >= size);
1314 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1316 if (bus->rbuffer_size > size) {
1317 b = memdup((const uint8_t*) bus->rbuffer + size,
1318 bus->rbuffer_size - size);
1324 r = bus_message_from_malloc(bus->rbuffer, size,
1325 bus->fds, bus->n_fds,
1326 bus->ucred_valid ? &bus->ucred : NULL,
1327 bus->label[0] ? bus->label : NULL,
1335 bus->rbuffer_size -= size;
1344 static int message_read(sd_bus *bus, sd_bus_message **m) {
1352 struct cmsghdr cmsghdr;
1353 uint8_t buf[CMSG_SPACE(sizeof(int) * BUS_FDS_MAX) +
1354 CMSG_SPACE(sizeof(struct ucred)) +
1355 CMSG_SPACE(NAME_MAX)]; /*selinux label */
1357 struct cmsghdr *cmsg;
1361 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1363 r = message_read_need(bus, &need);
1367 if (bus->rbuffer_size >= need)
1368 return message_make(bus, need, m);
1370 b = realloc(bus->rbuffer, need);
1377 iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
1378 iov.iov_len = need - bus->rbuffer_size;
1383 mh.msg_control = &control;
1384 mh.msg_controllen = sizeof(control);
1386 k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL|MSG_CMSG_CLOEXEC);
1388 return errno == EAGAIN ? 0 : -errno;
1392 bus->rbuffer_size += k;
1394 for (cmsg = CMSG_FIRSTHDR(&mh); cmsg; cmsg = CMSG_NXTHDR(&mh, cmsg)) {
1395 if (cmsg->cmsg_level == SOL_SOCKET &&
1396 cmsg->cmsg_type == SCM_RIGHTS) {
1399 n = (cmsg->cmsg_len - CMSG_LEN(0)) / sizeof(int);
1401 f = realloc(bus->fds, sizeof(int) + (bus->n_fds + n));
1403 close_many((int*) CMSG_DATA(cmsg), n);
1407 memcpy(f + bus->n_fds, CMSG_DATA(cmsg), n * sizeof(int));
1410 } else if (cmsg->cmsg_level == SOL_SOCKET &&
1411 cmsg->cmsg_type == SCM_CREDENTIALS &&
1412 cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred))) {
1414 memcpy(&bus->ucred, CMSG_DATA(cmsg), sizeof(struct ucred));
1415 bus->ucred_valid = true;
1417 } else if (cmsg->cmsg_level == SOL_SOCKET &&
1418 cmsg->cmsg_type == SCM_SECURITY) {
1421 l = cmsg->cmsg_len - CMSG_LEN(0);
1422 memcpy(&bus->label, CMSG_DATA(cmsg), l);
1427 r = message_read_need(bus, &need);
1431 if (bus->rbuffer_size >= need)
1432 return message_make(bus, need, m);
1437 static int dispatch_wqueue(sd_bus *bus) {
1441 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1446 while (bus->wqueue_size > 0) {
1448 r = message_write(bus, bus->wqueue[0], &bus->windex);
1453 /* Didn't do anything this time */
1455 else if (bus->windex >= bus->wqueue[0]->size) {
1456 /* Fully written. Let's drop the entry from
1459 * This isn't particularly optimized, but
1460 * well, this is supposed to be our worst-case
1461 * buffer only, and the socket buffer is
1462 * supposed to be our primary buffer, and if
1463 * it got full, then all bets are off
1466 sd_bus_message_unref(bus->wqueue[0]);
1467 bus->wqueue_size --;
1468 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1478 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1479 sd_bus_message *z = NULL;
1484 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1489 if (bus->rqueue_size > 0) {
1490 /* Dispatch a queued message */
1492 *m = bus->rqueue[0];
1493 bus->rqueue_size --;
1494 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1498 /* Try to read a new message */
1500 r = message_read(bus, &z);
1515 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1520 if (bus->state == BUS_UNSET)
1528 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1535 /* If the serial number isn't kept, then we know that no reply
1537 if (!serial && !m->sealed)
1538 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1540 r = bus_seal_message(bus, m);
1544 /* If this is a reply and no reply was requested, then let's
1545 * suppress this, if we can */
1546 if (m->dont_send && !serial)
1549 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1552 r = message_write(bus, m, &idx);
1556 } else if (idx < m->size) {
1557 /* Wasn't fully written. So let's remember how
1558 * much was written. Note that the first entry
1559 * of the wqueue array is always allocated so
1560 * that we always can remember how much was
1562 bus->wqueue[0] = sd_bus_message_ref(m);
1563 bus->wqueue_size = 1;
1569 /* Just append it to the queue. */
1571 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1574 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1579 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1583 *serial = BUS_MESSAGE_SERIAL(m);
1588 static usec_t calc_elapse(uint64_t usec) {
1589 if (usec == (uint64_t) -1)
1593 usec = BUS_DEFAULT_TIMEOUT;
1595 return now(CLOCK_MONOTONIC) + usec;
1598 static int timeout_compare(const void *a, const void *b) {
1599 const struct reply_callback *x = a, *y = b;
1601 if (x->timeout != 0 && y->timeout == 0)
1604 if (x->timeout == 0 && y->timeout != 0)
1607 if (x->timeout < y->timeout)
1610 if (x->timeout > y->timeout)
1616 int sd_bus_send_with_reply(
1619 sd_message_handler_t callback,
1624 struct reply_callback *c;
1629 if (bus->state == BUS_UNSET)
1637 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1639 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1642 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1646 if (usec != (uint64_t) -1) {
1647 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1652 r = bus_seal_message(bus, m);
1656 c = new(struct reply_callback, 1);
1660 c->callback = callback;
1661 c->userdata = userdata;
1662 c->serial = BUS_MESSAGE_SERIAL(m);
1663 c->timeout = calc_elapse(usec);
1665 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1671 if (c->timeout != 0) {
1672 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1675 sd_bus_send_with_reply_cancel(bus, c->serial);
1680 r = sd_bus_send(bus, m, serial);
1682 sd_bus_send_with_reply_cancel(bus, c->serial);
1689 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1690 struct reply_callback *c;
1697 c = hashmap_remove(bus->reply_callbacks, &serial);
1701 if (c->timeout != 0)
1702 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1708 int bus_ensure_running(sd_bus *bus) {
1715 if (bus->state == BUS_UNSET)
1718 if (bus->state == BUS_RUNNING)
1722 r = sd_bus_process(bus, NULL);
1725 if (bus->state == BUS_RUNNING)
1730 r = sd_bus_wait(bus, (uint64_t) -1);
1736 int sd_bus_send_with_reply_and_block(
1740 sd_bus_error *error,
1741 sd_bus_message **reply) {
1752 if (bus->state == BUS_UNSET)
1756 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1758 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1760 if (bus_error_is_dirty(error))
1763 r = bus_ensure_running(bus);
1767 r = sd_bus_send(bus, m, &serial);
1771 timeout = calc_elapse(usec);
1775 sd_bus_message *incoming = NULL;
1780 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1783 /* Make sure there's room for queuing this
1784 * locally, before we read the message */
1786 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1794 r = message_read(bus, &incoming);
1799 if (incoming->reply_serial == serial) {
1800 /* Found a match! */
1802 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1807 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1810 r = sd_bus_error_copy(error, &incoming->error);
1812 sd_bus_message_unref(incoming);
1816 k = bus_error_to_errno(&incoming->error);
1817 sd_bus_message_unref(incoming);
1821 sd_bus_message_unref(incoming);
1825 /* There's already guaranteed to be room for
1826 * this, so need to resize things here */
1827 bus->rqueue[bus->rqueue_size ++] = incoming;
1830 /* Try to read more, right-away */
1839 n = now(CLOCK_MONOTONIC);
1845 left = (uint64_t) -1;
1847 r = bus_poll(bus, true, left);
1851 r = dispatch_wqueue(bus);
1857 int sd_bus_get_fd(sd_bus *bus) {
1867 int sd_bus_get_events(sd_bus *bus) {
1872 if (bus->state == BUS_UNSET)
1877 if (bus->state == BUS_OPENING)
1879 else if (bus->state == BUS_AUTHENTICATING) {
1881 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1886 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1887 if (bus->rqueue_size <= 0)
1889 if (bus->wqueue_size > 0)
1896 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1897 struct reply_callback *c;
1903 if (bus->state == BUS_UNSET)
1908 if (bus->state == BUS_AUTHENTICATING) {
1909 *timeout_usec = bus->auth_timeout;
1913 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1916 c = prioq_peek(bus->reply_callbacks_prioq);
1920 *timeout_usec = c->timeout;
1924 static int process_timeout(sd_bus *bus) {
1925 struct reply_callback *c;
1931 c = prioq_peek(bus->reply_callbacks_prioq);
1935 n = now(CLOCK_MONOTONIC);
1939 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1940 hashmap_remove(bus->reply_callbacks, &c->serial);
1942 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1945 return r < 0 ? r : 1;
1948 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1949 struct reply_callback *c;
1955 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1956 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1959 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1963 if (c->timeout != 0)
1964 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1966 r = c->callback(bus, 0, m, c->userdata);
1972 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1973 struct filter_callback *l;
1976 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1977 r = l->callback(bus, 0, m, l->userdata);
1985 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1986 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1992 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1995 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1998 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2001 if (streq_ptr(m->member, "Ping"))
2002 r = sd_bus_message_new_method_return(bus, m, &reply);
2003 else if (streq_ptr(m->member, "GetMachineId")) {
2007 r = sd_id128_get_machine(&id);
2011 r = sd_bus_message_new_method_return(bus, m, &reply);
2015 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
2017 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
2019 sd_bus_error_set(&error,
2020 "org.freedesktop.DBus.Error.UnknownMethod",
2021 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
2023 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
2029 r = sd_bus_send(bus, reply, NULL);
2036 static int process_object(sd_bus *bus, sd_bus_message *m) {
2037 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
2038 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
2039 struct object_callback *c;
2047 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2050 if (hashmap_isempty(bus->object_callbacks))
2053 c = hashmap_get(bus->object_callbacks, m->path);
2055 r = c->callback(bus, 0, m, c->userdata);
2062 /* Look for fallback prefixes */
2063 p = strdupa(m->path);
2067 e = strrchr(p, '/');
2073 c = hashmap_get(bus->object_callbacks, p);
2074 if (c && c->is_fallback) {
2075 r = c->callback(bus, 0, m, c->userdata);
2086 sd_bus_error_set(&error,
2087 "org.freedesktop.DBus.Error.UnknownMethod",
2088 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
2090 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
2094 r = sd_bus_send(bus, reply, NULL);
2101 static int process_message(sd_bus *bus, sd_bus_message *m) {
2107 r = process_reply(bus, m);
2111 r = process_filter(bus, m);
2115 r = process_builtin(bus, m);
2119 return process_object(bus, m);
2122 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2125 /* Returns 0 when we didn't do anything. This should cause the
2126 * caller to invoke sd_bus_wait() before returning the next
2127 * time. Returns > 0 when we did something, which possibly
2128 * means *ret is filled in with an unprocessed message. */
2132 if (bus->state == BUS_UNSET)
2137 if (bus->state == BUS_OPENING) {
2148 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
2150 socklen_t slen = sizeof(error);
2152 r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
2154 bus->last_connect_error = errno;
2155 else if (error != 0)
2156 bus->last_connect_error = error;
2157 else if (p.revents & (POLLERR|POLLHUP))
2158 bus->last_connect_error = ECONNREFUSED;
2160 r = bus_start_auth(bus);
2164 /* Try next address */
2165 bus_reset_parsed_address(bus);
2166 r = bus_start_connect(bus);
2173 } else if (bus->state == BUS_AUTHENTICATING) {
2175 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
2178 r = bus_write_auth(bus);
2182 r = bus_read_auth(bus);
2185 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
2186 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2189 r = process_timeout(bus);
2193 r = dispatch_wqueue(bus);
2198 r = dispatch_rqueue(bus, &m);
2207 r = process_message(bus, m);
2217 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
2218 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
2219 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
2221 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
2223 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
2227 r = sd_bus_send(bus, reply, NULL);
2235 assert_not_reached("Unknown state");
2244 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2255 e = sd_bus_get_events(bus);
2262 r = sd_bus_get_timeout(bus, &until);
2269 n = now(CLOCK_MONOTONIC);
2270 m = until > n ? until - n : 0;
2273 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2280 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2284 return r > 0 ? 1 : 0;
2287 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2291 if (bus->state == BUS_UNSET)
2295 if (bus->rqueue_size > 0)
2298 return bus_poll(bus, false, timeout_usec);
2301 int sd_bus_flush(sd_bus *bus) {
2306 if (bus->state == BUS_UNSET)
2311 r = bus_ensure_running(bus);
2315 if (bus->wqueue_size <= 0)
2319 r = dispatch_wqueue(bus);
2323 if (bus->wqueue_size <= 0)
2326 r = bus_poll(bus, false, (uint64_t) -1);
2332 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
2333 struct filter_callback *f;
2340 f = new(struct filter_callback, 1);
2343 f->callback = callback;
2344 f->userdata = userdata;
2346 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2350 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
2351 struct filter_callback *f;
2358 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2359 if (f->callback == callback && f->userdata == userdata) {
2360 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2369 static int bus_add_object(
2373 sd_message_handler_t callback,
2376 struct object_callback *c;
2386 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2390 c = new(struct object_callback, 1);
2394 c->path = strdup(path);
2400 c->callback = callback;
2401 c->userdata = userdata;
2402 c->is_fallback = fallback;
2404 r = hashmap_put(bus->object_callbacks, c->path, c);
2414 static int bus_remove_object(
2418 sd_message_handler_t callback,
2421 struct object_callback *c;
2430 c = hashmap_get(bus->object_callbacks, path);
2434 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2437 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2445 int sd_bus_add_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2446 return bus_add_object(bus, false, path, callback, userdata);
2449 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2450 return bus_remove_object(bus, false, path, callback, userdata);
2453 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2454 return bus_add_object(bus, true, prefix, callback, userdata);
2457 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2458 return bus_remove_object(bus, true, prefix, callback, userdata);