1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
35 #include "bus-internal.h"
36 #include "bus-message.h"
39 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
41 static void bus_free(sd_bus *b) {
42 struct filter_callback *f;
43 struct object_callback *c;
49 close_nointr_nofail(b->fd);
56 close_many(b->fds, b->n_fds);
59 for (i = 0; i < b->rqueue_size; i++)
60 sd_bus_message_unref(b->rqueue[i]);
63 for (i = 0; i < b->wqueue_size; i++)
64 sd_bus_message_unref(b->wqueue[i]);
67 hashmap_free_free(b->reply_callbacks);
68 prioq_free(b->reply_callbacks_prioq);
70 while ((f = b->filter_callbacks)) {
71 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
75 while ((c = hashmap_steal_first(b->object_callbacks))) {
80 hashmap_free(b->object_callbacks);
85 int sd_bus_new(sd_bus **ret) {
97 r->message_version = 1;
98 r->negotiate_fds = true;
100 /* We guarantee that wqueue always has space for at least one
102 r->wqueue = new(sd_bus_message*, 1);
112 int sd_bus_set_address(sd_bus *bus, const char *address) {
117 if (bus->state != BUS_UNSET)
132 int sd_bus_set_fd(sd_bus *bus, int fd) {
135 if (bus->state != BUS_UNSET)
144 int sd_bus_set_hello(sd_bus *bus, int b) {
147 if (bus->state != BUS_UNSET)
150 bus->send_hello = !!b;
154 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
157 if (bus->state != BUS_UNSET)
160 bus->negotiate_fds = !!b;
164 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
169 assert(bus->state == BUS_HELLO);
176 r = sd_bus_message_read(reply, "s", &s);
180 if (!service_name_is_valid(s) || s[0] != ':')
183 bus->unique_name = strdup(s);
184 if (!bus->unique_name)
187 bus->state = BUS_RUNNING;
192 static int bus_send_hello(sd_bus *bus) {
193 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
198 if (!bus->send_hello)
201 r = sd_bus_message_new_method_call(
203 "org.freedesktop.DBus",
205 "org.freedesktop.DBus",
211 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
218 static int bus_start_running(sd_bus *bus) {
221 if (bus->send_hello) {
222 bus->state = BUS_HELLO;
226 bus->state = BUS_RUNNING;
230 static int parse_address_key(const char **p, const char *key, char **value) {
241 if (strncmp(*p, key, l) != 0)
251 while (*a != ',' && *a != 0) {
269 c = (char) ((x << 4) | y);
276 t = realloc(r, n + 2);
301 static void skip_address_key(const char **p) {
305 *p += strcspn(*p, ",");
311 static int bus_parse_next_address(sd_bus *b) {
313 _cleanup_free_ char *guid = NULL;
320 if (b->address[b->address_index] == 0)
323 a = b->address + b->address_index;
326 b->sockaddr_size = 0;
327 b->peer = SD_ID128_NULL;
329 if (startswith(a, "unix:")) {
330 _cleanup_free_ char *path = NULL, *abstract = NULL;
334 r = parse_address_key(&p, "guid", &guid);
340 r = parse_address_key(&p, "path", &path);
346 r = parse_address_key(&p, "abstract", &abstract);
352 skip_address_key(&p);
355 if (!path && !abstract)
358 if (path && abstract)
365 if (l > sizeof(b->sockaddr.un.sun_path))
368 b->sockaddr.un.sun_family = AF_UNIX;
369 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
370 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
371 } else if (abstract) {
374 l = strlen(abstract);
375 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
378 b->sockaddr.un.sun_family = AF_UNIX;
379 b->sockaddr.un.sun_path[0] = 0;
380 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
381 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
384 } else if (startswith(a, "tcp:")) {
385 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
386 struct addrinfo hints, *result;
390 r = parse_address_key(&p, "guid", &guid);
396 r = parse_address_key(&p, "host", &host);
402 r = parse_address_key(&p, "port", &port);
408 r = parse_address_key(&p, "family", &family);
414 skip_address_key(&p);
421 hints.ai_socktype = SOCK_STREAM;
422 hints.ai_flags = AI_ADDRCONFIG;
425 if (streq(family, "ipv4"))
426 hints.ai_family = AF_INET;
427 else if (streq(family, "ipv6"))
428 hints.ai_family = AF_INET6;
433 r = getaddrinfo(host, port, &hints, &result);
437 return -EADDRNOTAVAIL;
439 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
440 b->sockaddr_size = result->ai_addrlen;
442 freeaddrinfo(result);
446 r = sd_id128_from_string(guid, &b->peer);
451 b->address_index = p - b->address;
455 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
458 struct iovec *i = iov + *idx;
460 if (i->iov_len > size) {
461 i->iov_base = (uint8_t*) i->iov_base + size;
475 static int bus_write_auth(sd_bus *b) {
480 assert(b->state == BUS_AUTHENTICATING);
482 if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
485 if (b->auth_timeout == 0)
486 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
489 mh.msg_iov = b->auth_iovec + b->auth_index;
490 mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
492 k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
494 return errno == EAGAIN ? 0 : -errno;
496 iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
501 static int bus_auth_verify(sd_bus *b) {
507 /* We expect two response lines: "OK" and possibly
510 e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
514 if (b->negotiate_fds) {
515 f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
525 /* Nice! We got all the lines we need. First check the OK
528 if (e - (char*) b->rbuffer != 3 + 32)
531 if (memcmp(b->rbuffer, "OK ", 3))
534 for (i = 0; i < 32; i += 2) {
537 x = unhexchar(((char*) b->rbuffer)[3 + i]);
538 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
543 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
546 if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
547 !sd_id128_equal(b->peer, peer))
552 /* And possibly check the second line, too */
556 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
557 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
559 b->rbuffer_size -= (start - (char*) b->rbuffer);
560 memmove(b->rbuffer, start, b->rbuffer_size);
562 r = bus_start_running(b);
569 static int bus_read_auth(sd_bus *b) {
579 r = bus_auth_verify(b);
583 n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
585 if (n > BUS_AUTH_SIZE_MAX)
586 n = BUS_AUTH_SIZE_MAX;
588 if (b->rbuffer_size >= n)
591 p = realloc(b->rbuffer, n);
598 iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
599 iov.iov_len = n - b->rbuffer_size;
605 k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
607 return errno == EAGAIN ? 0 : -errno;
611 b->rbuffer_size += k;
613 r = bus_auth_verify(b);
620 static int bus_setup_fd(sd_bus *b) {
625 /* Enable SO_PASSCRED + SO_PASSEC. We try this on any socket,
626 * just in case. This is actually irrelavant for */
628 setsockopt(b->fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
629 setsockopt(b->fd, SOL_SOCKET, SO_PASSSEC, &one, sizeof(one));
631 /* Increase the buffers to a MB */
632 fd_inc_rcvbuf(b->fd, 1024*1024);
633 fd_inc_sndbuf(b->fd, 1024*1024);
638 static int bus_start_auth(sd_bus *b) {
639 static const char auth_prefix[] = "\0AUTH EXTERNAL ";
640 static const char auth_suffix_with_unix_fd[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
641 static const char auth_suffix_without_unix_fd[] = "\r\nBEGIN\r\n";
643 char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
645 const char *auth_suffix;
649 b->state = BUS_AUTHENTICATING;
651 snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
655 b->auth_uid = hexmem(text, l);
659 auth_suffix = b->negotiate_fds ? auth_suffix_with_unix_fd : auth_suffix_without_unix_fd;
661 b->auth_iovec[0].iov_base = (void*) auth_prefix;
662 b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
663 b->auth_iovec[1].iov_base = (void*) b->auth_uid;
664 b->auth_iovec[1].iov_len = l * 2;
665 b->auth_iovec[2].iov_base = (void*) auth_suffix;
666 b->auth_iovec[2].iov_len = strlen(auth_suffix);
667 b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
669 return bus_write_auth(b);
672 static int bus_start_connect(sd_bus *b) {
679 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
680 r = bus_parse_next_address(b);
684 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
687 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
689 b->last_connect_error = errno;
695 b->last_connect_error = errno;
699 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
701 if (errno == EINPROGRESS)
704 b->last_connect_error = errno;
708 return bus_start_auth(b);
714 close_nointr_nofail(b->fd);
720 static int bus_start_fd(sd_bus *b) {
725 r = fd_nonblock(b->fd, true);
729 r = fd_cloexec(b->fd, true);
737 return bus_start_auth(b);
740 int sd_bus_start(sd_bus *bus) {
745 if (bus->state != BUS_UNSET)
748 bus->state = BUS_OPENING;
751 r = bus_start_fd(bus);
752 else if (bus->address)
753 r = bus_start_connect(bus);
760 return bus_send_hello(bus);
763 int sd_bus_open_system(sd_bus **ret) {
775 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
777 r = sd_bus_set_address(b, e);
781 b->sockaddr.un.sun_family = AF_UNIX;
782 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
783 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
786 b->send_hello = true;
800 int sd_bus_open_user(sd_bus **ret) {
813 e = getenv("DBUS_SESSION_BUS_ADDRESS");
815 r = sd_bus_set_address(b, e);
819 e = getenv("XDG_RUNTIME_DIR");
826 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
831 b->sockaddr.un.sun_family = AF_UNIX;
832 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
833 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
836 b->send_hello = true;
850 void sd_bus_close(sd_bus *bus) {
856 close_nointr_nofail(bus->fd);
860 sd_bus *sd_bus_ref(sd_bus *bus) {
864 assert(bus->n_ref > 0);
870 sd_bus *sd_bus_unref(sd_bus *bus) {
874 assert(bus->n_ref > 0);
883 int sd_bus_is_open(sd_bus *bus) {
887 return bus->state != BUS_UNSET && bus->fd >= 0;
890 int sd_bus_can_send(sd_bus *bus, char type) {
898 if (type == SD_BUS_TYPE_UNIX_FD) {
899 if (!bus->negotiate_fds)
902 r = bus_ensure_running(bus);
909 return bus_type_is_valid(type);
912 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
920 r = bus_ensure_running(bus);
928 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
931 if (m->header->version > b->message_version)
937 return bus_message_seal(m, ++b->serial);
940 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
950 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
957 struct cmsghdr *control;
958 control = alloca(CMSG_SPACE(sizeof(int) * m->n_fds));
960 mh.msg_control = control;
961 control->cmsg_level = SOL_SOCKET;
962 control->cmsg_type = SCM_RIGHTS;
963 mh.msg_controllen = control->cmsg_len = CMSG_LEN(sizeof(int) * m->n_fds);
964 memcpy(CMSG_DATA(control), m->fds, sizeof(int) * m->n_fds);
967 n = m->n_iovec * sizeof(struct iovec);
969 memcpy(iov, m->iovec, n);
972 iovec_advance(iov, &j, *idx);
975 mh.msg_iovlen = m->n_iovec;
977 k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
979 return errno == EAGAIN ? 0 : -errno;
985 static int message_read_need(sd_bus *bus, size_t *need) {
992 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
994 if (bus->rbuffer_size < sizeof(struct bus_header)) {
995 *need = sizeof(struct bus_header) + 8;
997 /* Minimum message size:
1001 * Method Call: +2 string headers
1002 * Signal: +3 string headers
1003 * Method Error: +1 string headers
1005 * Method Reply: +1 uint32 headers
1007 * A string header is at least 9 bytes
1008 * A uint32 header is at least 8 bytes
1010 * Hence the minimum message size of a valid message
1011 * is header + 8 bytes */
1016 a = ((const uint32_t*) bus->rbuffer)[1];
1017 b = ((const uint32_t*) bus->rbuffer)[3];
1019 e = ((const uint8_t*) bus->rbuffer)[0];
1020 if (e == SD_BUS_LITTLE_ENDIAN) {
1023 } else if (e == SD_BUS_BIG_ENDIAN) {
1029 sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
1030 if (sum >= BUS_MESSAGE_SIZE_MAX)
1033 *need = (size_t) sum;
1037 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
1044 assert(bus->rbuffer_size >= size);
1045 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1047 if (bus->rbuffer_size > size) {
1048 b = memdup((const uint8_t*) bus->rbuffer + size,
1049 bus->rbuffer_size - size);
1055 r = bus_message_from_malloc(bus->rbuffer, size,
1056 bus->fds, bus->n_fds,
1057 bus->ucred_valid ? &bus->ucred : NULL,
1058 bus->label[0] ? bus->label : NULL,
1066 bus->rbuffer_size -= size;
1075 static int message_read(sd_bus *bus, sd_bus_message **m) {
1083 struct cmsghdr cmsghdr;
1084 uint8_t buf[CMSG_SPACE(sizeof(int) * BUS_FDS_MAX) +
1085 CMSG_SPACE(sizeof(struct ucred)) +
1086 CMSG_SPACE(NAME_MAX)]; /*selinux label */
1088 struct cmsghdr *cmsg;
1092 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1094 r = message_read_need(bus, &need);
1098 if (bus->rbuffer_size >= need)
1099 return message_make(bus, need, m);
1101 b = realloc(bus->rbuffer, need);
1108 iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
1109 iov.iov_len = need - bus->rbuffer_size;
1114 mh.msg_control = &control;
1115 mh.msg_controllen = sizeof(control);
1117 k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL|MSG_CMSG_CLOEXEC);
1119 return errno == EAGAIN ? 0 : -errno;
1123 bus->rbuffer_size += k;
1125 for (cmsg = CMSG_FIRSTHDR(&mh); cmsg; cmsg = CMSG_NXTHDR(&mh, cmsg)) {
1126 if (cmsg->cmsg_level == SOL_SOCKET &&
1127 cmsg->cmsg_type == SCM_RIGHTS) {
1130 n = (cmsg->cmsg_len - CMSG_LEN(0)) / sizeof(int);
1132 f = realloc(bus->fds, sizeof(int) + (bus->n_fds + n));
1134 close_many((int*) CMSG_DATA(cmsg), n);
1138 memcpy(f + bus->n_fds, CMSG_DATA(cmsg), n * sizeof(int));
1141 } else if (cmsg->cmsg_level == SOL_SOCKET &&
1142 cmsg->cmsg_type == SCM_CREDENTIALS &&
1143 cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred))) {
1145 memcpy(&bus->ucred, CMSG_DATA(cmsg), sizeof(struct ucred));
1146 bus->ucred_valid = true;
1148 } else if (cmsg->cmsg_level == SOL_SOCKET &&
1149 cmsg->cmsg_type == SCM_SECURITY) {
1152 l = cmsg->cmsg_len - CMSG_LEN(0);
1153 memcpy(&bus->label, CMSG_DATA(cmsg), l);
1158 r = message_read_need(bus, &need);
1162 if (bus->rbuffer_size >= need)
1163 return message_make(bus, need, m);
1168 static int dispatch_wqueue(sd_bus *bus) {
1172 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1177 while (bus->wqueue_size > 0) {
1179 r = message_write(bus, bus->wqueue[0], &bus->windex);
1184 /* Didn't do anything this time */
1186 else if (bus->windex >= bus->wqueue[0]->size) {
1187 /* Fully written. Let's drop the entry from
1190 * This isn't particularly optimized, but
1191 * well, this is supposed to be our worst-case
1192 * buffer only, and the socket buffer is
1193 * supposed to be our primary buffer, and if
1194 * it got full, then all bets are off
1197 sd_bus_message_unref(bus->wqueue[0]);
1198 bus->wqueue_size --;
1199 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1209 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1210 sd_bus_message *z = NULL;
1215 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1220 if (bus->rqueue_size > 0) {
1221 /* Dispatch a queued message */
1223 *m = bus->rqueue[0];
1224 bus->rqueue_size --;
1225 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1229 /* Try to read a new message */
1231 r = message_read(bus, &z);
1246 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1251 if (bus->state == BUS_UNSET)
1259 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1266 /* If the serial number isn't kept, then we know that no reply
1268 if (!serial && !m->sealed)
1269 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1271 r = bus_seal_message(bus, m);
1275 /* If this is a reply and no reply was requested, then let's
1276 * suppress this, if we can */
1277 if (m->dont_send && !serial)
1280 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1283 r = message_write(bus, m, &idx);
1287 } else if (idx < m->size) {
1288 /* Wasn't fully written. So let's remember how
1289 * much was written. Note that the first entry
1290 * of the wqueue array is always allocated so
1291 * that we always can remember how much was
1293 bus->wqueue[0] = sd_bus_message_ref(m);
1294 bus->wqueue_size = 1;
1300 /* Just append it to the queue. */
1302 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1305 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1310 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1314 *serial = BUS_MESSAGE_SERIAL(m);
1319 static usec_t calc_elapse(uint64_t usec) {
1320 if (usec == (uint64_t) -1)
1324 usec = BUS_DEFAULT_TIMEOUT;
1326 return now(CLOCK_MONOTONIC) + usec;
1329 static int timeout_compare(const void *a, const void *b) {
1330 const struct reply_callback *x = a, *y = b;
1332 if (x->timeout != 0 && y->timeout == 0)
1335 if (x->timeout == 0 && y->timeout != 0)
1338 if (x->timeout < y->timeout)
1341 if (x->timeout > y->timeout)
1347 int sd_bus_send_with_reply(
1350 sd_message_handler_t callback,
1355 struct reply_callback *c;
1360 if (bus->state == BUS_UNSET)
1368 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1370 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1373 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1377 if (usec != (uint64_t) -1) {
1378 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1383 r = bus_seal_message(bus, m);
1387 c = new(struct reply_callback, 1);
1391 c->callback = callback;
1392 c->userdata = userdata;
1393 c->serial = BUS_MESSAGE_SERIAL(m);
1394 c->timeout = calc_elapse(usec);
1396 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1402 if (c->timeout != 0) {
1403 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1406 sd_bus_send_with_reply_cancel(bus, c->serial);
1411 r = sd_bus_send(bus, m, serial);
1413 sd_bus_send_with_reply_cancel(bus, c->serial);
1420 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1421 struct reply_callback *c;
1428 c = hashmap_remove(bus->reply_callbacks, &serial);
1432 if (c->timeout != 0)
1433 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1439 int bus_ensure_running(sd_bus *bus) {
1446 if (bus->state == BUS_UNSET)
1449 if (bus->state == BUS_RUNNING)
1453 r = sd_bus_process(bus, NULL);
1456 if (bus->state == BUS_RUNNING)
1461 r = sd_bus_wait(bus, (uint64_t) -1);
1467 int sd_bus_send_with_reply_and_block(
1471 sd_bus_error *error,
1472 sd_bus_message **reply) {
1483 if (bus->state == BUS_UNSET)
1487 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1489 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1491 if (bus_error_is_dirty(error))
1494 r = bus_ensure_running(bus);
1498 r = sd_bus_send(bus, m, &serial);
1502 timeout = calc_elapse(usec);
1506 sd_bus_message *incoming = NULL;
1511 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1514 /* Make sure there's room for queuing this
1515 * locally, before we read the message */
1517 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1525 r = message_read(bus, &incoming);
1530 if (incoming->reply_serial == serial) {
1531 /* Found a match! */
1533 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1538 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1541 r = sd_bus_error_copy(error, &incoming->error);
1543 sd_bus_message_unref(incoming);
1547 k = bus_error_to_errno(&incoming->error);
1548 sd_bus_message_unref(incoming);
1552 sd_bus_message_unref(incoming);
1556 /* There's already guaranteed to be room for
1557 * this, so need to resize things here */
1558 bus->rqueue[bus->rqueue_size ++] = incoming;
1561 /* Try to read more, right-away */
1570 n = now(CLOCK_MONOTONIC);
1576 left = (uint64_t) -1;
1578 r = bus_poll(bus, true, left);
1582 r = dispatch_wqueue(bus);
1588 int sd_bus_get_fd(sd_bus *bus) {
1598 int sd_bus_get_events(sd_bus *bus) {
1603 if (bus->state == BUS_UNSET)
1608 if (bus->state == BUS_OPENING)
1610 else if (bus->state == BUS_AUTHENTICATING) {
1612 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1617 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1618 if (bus->rqueue_size <= 0)
1620 if (bus->wqueue_size > 0)
1627 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1628 struct reply_callback *c;
1634 if (bus->state == BUS_UNSET)
1639 if (bus->state == BUS_AUTHENTICATING) {
1640 *timeout_usec = bus->auth_timeout;
1644 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1647 c = prioq_peek(bus->reply_callbacks_prioq);
1651 *timeout_usec = c->timeout;
1655 static int process_timeout(sd_bus *bus) {
1656 struct reply_callback *c;
1662 c = prioq_peek(bus->reply_callbacks_prioq);
1666 n = now(CLOCK_MONOTONIC);
1670 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1671 hashmap_remove(bus->reply_callbacks, &c->serial);
1673 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1676 return r < 0 ? r : 1;
1679 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1680 struct reply_callback *c;
1686 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1687 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1690 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1694 if (c->timeout != 0)
1695 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1697 r = c->callback(bus, 0, m, c->userdata);
1703 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1704 struct filter_callback *l;
1707 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1708 r = l->callback(bus, 0, m, l->userdata);
1716 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1717 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1723 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1726 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1729 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1732 if (streq_ptr(m->member, "Ping"))
1733 r = sd_bus_message_new_method_return(bus, m, &reply);
1734 else if (streq_ptr(m->member, "GetMachineId")) {
1738 r = sd_id128_get_machine(&id);
1742 r = sd_bus_message_new_method_return(bus, m, &reply);
1746 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1748 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1750 sd_bus_error_set(&error,
1751 "org.freedesktop.DBus.Error.UnknownMethod",
1752 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1754 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1760 r = sd_bus_send(bus, reply, NULL);
1767 static int process_object(sd_bus *bus, sd_bus_message *m) {
1768 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1769 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1770 struct object_callback *c;
1778 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1781 if (hashmap_isempty(bus->object_callbacks))
1784 c = hashmap_get(bus->object_callbacks, m->path);
1786 r = c->callback(bus, 0, m, c->userdata);
1793 /* Look for fallback prefixes */
1794 p = strdupa(m->path);
1798 e = strrchr(p, '/');
1804 c = hashmap_get(bus->object_callbacks, p);
1805 if (c && c->is_fallback) {
1806 r = c->callback(bus, 0, m, c->userdata);
1817 sd_bus_error_set(&error,
1818 "org.freedesktop.DBus.Error.UnknownMethod",
1819 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1821 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1825 r = sd_bus_send(bus, reply, NULL);
1832 static int process_message(sd_bus *bus, sd_bus_message *m) {
1838 r = process_reply(bus, m);
1842 r = process_filter(bus, m);
1846 r = process_builtin(bus, m);
1850 return process_object(bus, m);
1853 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1856 /* Returns 0 when we didn't do anything. This should cause the
1857 * caller to invoke sd_bus_wait() before returning the next
1858 * time. Returns > 0 when we did something, which possibly
1859 * means *ret is filled in with an unprocessed message. */
1863 if (bus->state == BUS_UNSET)
1868 if (bus->state == BUS_OPENING) {
1879 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1881 socklen_t slen = sizeof(error);
1883 r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1885 bus->last_connect_error = errno;
1886 else if (error != 0)
1887 bus->last_connect_error = error;
1888 else if (p.revents & (POLLERR|POLLHUP))
1889 bus->last_connect_error = ECONNREFUSED;
1891 r = bus_start_auth(bus);
1895 /* Try next address */
1896 r = bus_start_connect(bus);
1903 } else if (bus->state == BUS_AUTHENTICATING) {
1905 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1908 r = bus_write_auth(bus);
1912 r = bus_read_auth(bus);
1915 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1916 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1919 r = process_timeout(bus);
1923 r = dispatch_wqueue(bus);
1928 r = dispatch_rqueue(bus, &m);
1937 r = process_message(bus, m);
1947 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1948 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1949 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1951 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1953 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1957 r = sd_bus_send(bus, reply, NULL);
1965 assert_not_reached("Unknown state");
1974 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1985 e = sd_bus_get_events(bus);
1992 r = sd_bus_get_timeout(bus, &until);
1999 n = now(CLOCK_MONOTONIC);
2000 m = until > n ? until - n : 0;
2003 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2010 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2014 return r > 0 ? 1 : 0;
2017 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2021 if (bus->state == BUS_UNSET)
2025 if (bus->rqueue_size > 0)
2028 return bus_poll(bus, false, timeout_usec);
2031 int sd_bus_flush(sd_bus *bus) {
2036 if (bus->state == BUS_UNSET)
2041 r = bus_ensure_running(bus);
2045 if (bus->wqueue_size <= 0)
2049 r = dispatch_wqueue(bus);
2053 if (bus->wqueue_size <= 0)
2056 r = bus_poll(bus, false, (uint64_t) -1);
2062 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
2063 struct filter_callback *f;
2070 f = new(struct filter_callback, 1);
2073 f->callback = callback;
2074 f->userdata = userdata;
2076 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2080 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
2081 struct filter_callback *f;
2088 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2089 if (f->callback == callback && f->userdata == userdata) {
2090 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2099 static int bus_add_object(
2103 sd_message_handler_t callback,
2106 struct object_callback *c;
2116 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2120 c = new(struct object_callback, 1);
2124 c->path = strdup(path);
2130 c->callback = callback;
2131 c->userdata = userdata;
2132 c->is_fallback = fallback;
2134 r = hashmap_put(bus->object_callbacks, c->path, c);
2144 static int bus_remove_object(
2148 sd_message_handler_t callback,
2151 struct object_callback *c;
2160 c = hashmap_get(bus->object_callbacks, path);
2164 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2167 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2175 int sd_bus_add_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2176 return bus_add_object(bus, false, path, callback, userdata);
2179 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2180 return bus_remove_object(bus, false, path, callback, userdata);
2183 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2184 return bus_add_object(bus, true, prefix, callback, userdata);
2187 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2188 return bus_remove_object(bus, true, prefix, callback, userdata);