1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
35 #include "bus-internal.h"
36 #include "bus-message.h"
39 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
41 static void bus_free(sd_bus *b) {
42 struct filter_callback *f;
43 struct object_callback *c;
49 close_nointr_nofail(b->fd);
56 for (i = 0; i < b->rqueue_size; i++)
57 sd_bus_message_unref(b->rqueue[i]);
60 for (i = 0; i < b->wqueue_size; i++)
61 sd_bus_message_unref(b->wqueue[i]);
64 hashmap_free_free(b->reply_callbacks);
65 prioq_free(b->reply_callbacks_prioq);
67 while ((f = b->filter_callbacks)) {
68 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
72 while ((c = hashmap_steal_first(b->object_callbacks))) {
77 hashmap_free(b->object_callbacks);
82 static sd_bus* bus_new(void) {
91 r->message_version = 1;
93 /* We guarantee that wqueue always has space for at least one
95 r->wqueue = new(sd_bus_message*, 1);
104 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
115 r = sd_bus_message_read(reply, "s", &s);
119 if (!service_name_is_valid(s) || s[0] != ':')
122 bus->unique_name = strdup(s);
123 if (!bus->unique_name)
126 bus->state = BUS_RUNNING;
131 static int bus_send_hello(sd_bus *bus) {
132 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
137 r = sd_bus_message_new_method_call(
139 "org.freedesktop.DBus",
141 "org.freedesktop.DBus",
147 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
151 bus->sent_hello = true;
155 static int bus_start_running(sd_bus *bus) {
158 if (bus->sent_hello) {
159 bus->state = BUS_HELLO;
163 bus->state = BUS_RUNNING;
167 static int parse_address_key(const char **p, const char *key, char **value) {
178 if (strncmp(*p, key, l) != 0)
188 while (*a != ',' && *a != 0) {
206 c = (char) ((x << 4) | y);
213 t = realloc(r, n + 2);
238 static void skip_address_key(const char **p) {
242 *p += strcspn(*p, ",");
248 static int bus_parse_next_address(sd_bus *b) {
250 _cleanup_free_ char *guid = NULL;
257 if (b->address[b->address_index] == 0)
260 a = b->address + b->address_index;
263 b->sockaddr_size = 0;
264 b->peer = SD_ID128_NULL;
266 if (startswith(a, "unix:")) {
267 _cleanup_free_ char *path = NULL, *abstract = NULL;
271 r = parse_address_key(&p, "guid", &guid);
277 r = parse_address_key(&p, "path", &path);
283 r = parse_address_key(&p, "abstract", &abstract);
289 skip_address_key(&p);
292 if (!path && !abstract)
295 if (path && abstract)
302 if (l > sizeof(b->sockaddr.un.sun_path))
305 b->sockaddr.un.sun_family = AF_UNIX;
306 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
307 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
308 } else if (abstract) {
311 l = strlen(abstract);
312 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
315 b->sockaddr.un.sun_family = AF_UNIX;
316 b->sockaddr.un.sun_path[0] = 0;
317 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
318 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
321 } else if (startswith(a, "tcp:")) {
322 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
323 struct addrinfo hints, *result;
327 r = parse_address_key(&p, "guid", &guid);
333 r = parse_address_key(&p, "host", &host);
339 r = parse_address_key(&p, "port", &port);
345 r = parse_address_key(&p, "family", &family);
351 skip_address_key(&p);
358 hints.ai_socktype = SOCK_STREAM;
359 hints.ai_flags = AI_ADDRCONFIG;
362 if (streq(family, "ipv4"))
363 hints.ai_family = AF_INET;
364 else if (streq(family, "ipv6"))
365 hints.ai_family = AF_INET6;
370 r = getaddrinfo(host, port, &hints, &result);
374 return -EADDRNOTAVAIL;
376 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
377 b->sockaddr_size = result->ai_addrlen;
379 freeaddrinfo(result);
383 r = sd_id128_from_string(guid, &b->peer);
388 b->address_index = p - b->address;
392 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
395 struct iovec *i = iov + *idx;
397 if (i->iov_len > size) {
398 i->iov_base = (uint8_t*) i->iov_base + size;
412 static int bus_write_auth(sd_bus *b) {
417 assert(b->state == BUS_AUTHENTICATING);
419 if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
422 if (b->auth_timeout == 0)
423 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
426 mh.msg_iov = b->auth_iovec + b->auth_index;
427 mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
429 k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
431 return errno == EAGAIN ? 0 : -errno;
433 iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
438 static int bus_auth_verify(sd_bus *b) {
444 /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
447 e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
451 f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
455 if (e - (char*) b->rbuffer != 3 + 32)
458 if (memcmp(b->rbuffer, "OK ", 3))
461 for (i = 0; i < 32; i += 2) {
464 x = unhexchar(((char*) b->rbuffer)[3 + i]);
465 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
470 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
473 if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
474 !sd_id128_equal(b->peer, peer))
480 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
481 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
483 b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
484 memmove(b->rbuffer, f + 2, b->rbuffer_size);
486 r = bus_start_running(b);
493 static int bus_read_auth(sd_bus *b) {
503 r = bus_auth_verify(b);
507 n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
509 if (n > BUS_AUTH_SIZE_MAX)
510 n = BUS_AUTH_SIZE_MAX;
512 if (b->rbuffer_size >= n)
515 p = realloc(b->rbuffer, n);
522 iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
523 iov.iov_len = n - b->rbuffer_size;
529 k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
531 return errno == EAGAIN ? 0 : -errno;
533 b->rbuffer_size += k;
535 r = bus_auth_verify(b);
542 static int bus_setup_fd(sd_bus *b) {
547 /* Enable SO_PASSCRED + SO_PASSEC. We try this on any socket,
548 * just in case. This is actually irrelavant for */
550 setsockopt(b->fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
551 setsockopt(b->fd, SOL_SOCKET, SO_PASSSEC, &one, sizeof(one));
553 /* Increase the buffers to a MB */
554 fd_inc_rcvbuf(b->fd, 1024*1024);
555 fd_inc_sndbuf(b->fd, 1024*1024);
560 static int bus_start_auth(sd_bus *b) {
561 static const char auth_prefix[] = "\0AUTH EXTERNAL ";
562 static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
564 char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
569 b->state = BUS_AUTHENTICATING;
571 snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
575 b->auth_uid = hexmem(text, l);
579 b->auth_iovec[0].iov_base = (void*) auth_prefix;
580 b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
581 b->auth_iovec[1].iov_base = (void*) b->auth_uid;
582 b->auth_iovec[1].iov_len = l * 2;
583 b->auth_iovec[2].iov_base = (void*) auth_suffix;
584 b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
585 b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
587 return bus_write_auth(b);
590 static int bus_start_connect(sd_bus *b) {
597 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
598 r = bus_parse_next_address(b);
602 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
605 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
607 b->last_connect_error = errno;
613 b->last_connect_error = errno;
617 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
619 if (errno == EINPROGRESS)
622 b->last_connect_error = errno;
626 return bus_start_auth(b);
632 close_nointr_nofail(b->fd);
638 int sd_bus_open_system(sd_bus **ret) {
646 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
648 r = sd_bus_open_address(e, &b);
656 b->sockaddr.un.sun_family = AF_UNIX;
657 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
658 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
660 r = bus_start_connect(b);
667 r = bus_send_hello(b);
677 int sd_bus_open_user(sd_bus **ret) {
686 e = getenv("DBUS_SESSION_BUS_ADDRESS");
688 r = sd_bus_open_address(e, &b);
692 e = getenv("XDG_RUNTIME_DIR");
697 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
704 b->sockaddr.un.sun_family = AF_UNIX;
705 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
706 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
708 r = bus_start_connect(b);
715 r = bus_send_hello(b);
725 int sd_bus_open_address(const char *address, sd_bus **ret) {
738 b->address = strdup(address);
744 r = bus_start_connect(b);
754 int sd_bus_open_fd(int fd, sd_bus **ret) {
769 r = fd_nonblock(b->fd, true);
773 fd_cloexec(b->fd, true);
781 r = bus_start_auth(b);
793 void sd_bus_close(sd_bus *bus) {
799 close_nointr_nofail(bus->fd);
803 sd_bus *sd_bus_ref(sd_bus *bus) {
807 assert(bus->n_ref > 0);
813 sd_bus *sd_bus_unref(sd_bus *bus) {
817 assert(bus->n_ref > 0);
826 int sd_bus_is_open(sd_bus *bus) {
833 int sd_bus_can_send(sd_bus *bus, char type) {
839 if (type == SD_BUS_TYPE_UNIX_FD) {
840 r = bus_ensure_running(bus);
847 return bus_type_is_valid(type);
850 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
858 r = bus_ensure_running(bus);
866 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
869 if (m->header->version > b->message_version)
875 return bus_message_seal(m, ++b->serial);
878 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
888 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
893 n = m->n_iovec * sizeof(struct iovec);
895 memcpy(iov, m->iovec, n);
898 iovec_advance(iov, &j, *idx);
902 mh.msg_iovlen = m->n_iovec;
904 k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
906 return errno == EAGAIN ? 0 : -errno;
912 static int message_read_need(sd_bus *bus, size_t *need) {
919 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
921 if (bus->rbuffer_size < sizeof(struct bus_header)) {
922 *need = sizeof(struct bus_header) + 8;
924 /* Minimum message size:
928 * Method Call: +2 string headers
929 * Signal: +3 string headers
930 * Method Error: +1 string headers
932 * Method Reply: +1 uint32 headers
934 * A string header is at least 9 bytes
935 * A uint32 header is at least 8 bytes
937 * Hence the minimum message size of a valid message
938 * is header + 8 bytes */
943 a = ((const uint32_t*) bus->rbuffer)[1];
944 b = ((const uint32_t*) bus->rbuffer)[3];
946 e = ((const uint8_t*) bus->rbuffer)[0];
947 if (e == SD_BUS_LITTLE_ENDIAN) {
950 } else if (e == SD_BUS_BIG_ENDIAN) {
956 sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
957 if (sum >= BUS_MESSAGE_SIZE_MAX)
960 *need = (size_t) sum;
964 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
971 assert(bus->rbuffer_size >= size);
972 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
974 if (bus->rbuffer_size > size) {
975 b = memdup((const uint8_t*) bus->rbuffer + size,
976 bus->rbuffer_size - size);
981 r = bus_message_from_malloc(bus->rbuffer, size,
982 bus->ucred_valid ? &bus->ucred : NULL,
983 bus->label[0] ? bus->label : NULL, &t);
990 bus->rbuffer_size -= size;
996 static int message_read(sd_bus *bus, sd_bus_message **m) {
1004 struct cmsghdr cmsghdr;
1005 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1006 CMSG_SPACE(NAME_MAX)]; /*selinux label */
1008 struct cmsghdr *cmsg;
1012 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1014 r = message_read_need(bus, &need);
1018 if (bus->rbuffer_size >= need)
1019 return message_make(bus, need, m);
1021 b = realloc(bus->rbuffer, need);
1028 iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
1029 iov.iov_len = need - bus->rbuffer_size;
1034 mh.msg_control = &control;
1035 mh.msg_controllen = sizeof(control);
1037 k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL|MSG_CMSG_CLOEXEC);
1039 return errno == EAGAIN ? 0 : -errno;
1041 bus->rbuffer_size += k;
1042 bus->ucred_valid = false;
1045 for (cmsg = CMSG_FIRSTHDR(&mh); cmsg; cmsg = CMSG_NXTHDR(&mh, cmsg)) {
1046 if (cmsg->cmsg_level == SOL_SOCKET &&
1047 cmsg->cmsg_type == SCM_CREDENTIALS &&
1048 cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred))) {
1050 memcpy(&bus->ucred, CMSG_DATA(cmsg), sizeof(struct ucred));
1051 bus->ucred_valid = true;
1053 } else if (cmsg->cmsg_level == SOL_SOCKET &&
1054 cmsg->cmsg_type == SCM_SECURITY) {
1057 l = cmsg->cmsg_len - CMSG_LEN(0);
1058 memcpy(&bus->label, CMSG_DATA(cmsg), l);
1063 r = message_read_need(bus, &need);
1067 if (bus->rbuffer_size >= need)
1068 return message_make(bus, need, m);
1073 static int dispatch_wqueue(sd_bus *bus) {
1077 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1082 while (bus->wqueue_size > 0) {
1084 r = message_write(bus, bus->wqueue[0], &bus->windex);
1089 /* Didn't do anything this time */
1091 else if (bus->windex >= bus->wqueue[0]->size) {
1092 /* Fully written. Let's drop the entry from
1095 * This isn't particularly optimized, but
1096 * well, this is supposed to be our worst-case
1097 * buffer only, and the socket buffer is
1098 * supposed to be our primary buffer, and if
1099 * it got full, then all bets are off
1102 sd_bus_message_unref(bus->wqueue[0]);
1103 bus->wqueue_size --;
1104 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1114 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1115 sd_bus_message *z = NULL;
1120 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1125 if (bus->rqueue_size > 0) {
1126 /* Dispatch a queued message */
1128 *m = bus->rqueue[0];
1129 bus->rqueue_size --;
1130 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1134 /* Try to read a new message */
1136 r = message_read(bus, &z);
1151 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1161 /* If the serial number isn't kept, then we know that no reply
1163 if (!serial && !m->sealed)
1164 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1166 r = bus_seal_message(bus, m);
1170 /* If this is a reply and no reply was requested, then let's
1171 * suppress this, if we can */
1172 if (m->dont_send && !serial)
1175 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1178 r = message_write(bus, m, &idx);
1182 } else if (idx < m->size) {
1183 /* Wasn't fully written. So let's remember how
1184 * much was written. Note that the first entry
1185 * of the wqueue array is always allocated so
1186 * that we always can remember how much was
1188 bus->wqueue[0] = sd_bus_message_ref(m);
1189 bus->wqueue_size = 1;
1195 /* Just append it to the queue. */
1197 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1200 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1205 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1209 *serial = BUS_MESSAGE_SERIAL(m);
1214 static usec_t calc_elapse(uint64_t usec) {
1215 if (usec == (uint64_t) -1)
1219 usec = BUS_DEFAULT_TIMEOUT;
1221 return now(CLOCK_MONOTONIC) + usec;
1224 static int timeout_compare(const void *a, const void *b) {
1225 const struct reply_callback *x = a, *y = b;
1227 if (x->timeout != 0 && y->timeout == 0)
1230 if (x->timeout == 0 && y->timeout != 0)
1233 if (x->timeout < y->timeout)
1236 if (x->timeout > y->timeout)
1242 int sd_bus_send_with_reply(
1245 sd_message_handler_t callback,
1250 struct reply_callback *c;
1261 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1263 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1266 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1270 if (usec != (uint64_t) -1) {
1271 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1276 r = bus_seal_message(bus, m);
1280 c = new(struct reply_callback, 1);
1284 c->callback = callback;
1285 c->userdata = userdata;
1286 c->serial = BUS_MESSAGE_SERIAL(m);
1287 c->timeout = calc_elapse(usec);
1289 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1295 if (c->timeout != 0) {
1296 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1299 sd_bus_send_with_reply_cancel(bus, c->serial);
1304 r = sd_bus_send(bus, m, serial);
1306 sd_bus_send_with_reply_cancel(bus, c->serial);
1313 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1314 struct reply_callback *c;
1321 c = hashmap_remove(bus->reply_callbacks, &serial);
1325 if (c->timeout != 0)
1326 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1332 int bus_ensure_running(sd_bus *bus) {
1337 if (bus->state == BUS_RUNNING)
1341 r = sd_bus_process(bus, NULL);
1344 if (bus->state == BUS_RUNNING)
1349 r = sd_bus_wait(bus, (uint64_t) -1);
1355 int sd_bus_send_with_reply_and_block(
1359 sd_bus_error *error,
1360 sd_bus_message **reply) {
1373 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1375 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1377 if (bus_error_is_dirty(error))
1380 r = bus_ensure_running(bus);
1384 r = sd_bus_send(bus, m, &serial);
1388 timeout = calc_elapse(usec);
1392 sd_bus_message *incoming = NULL;
1397 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1400 /* Make sure there's room for queuing this
1401 * locally, before we read the message */
1403 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1411 r = message_read(bus, &incoming);
1416 if (incoming->reply_serial == serial) {
1417 /* Found a match! */
1419 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1424 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1427 r = sd_bus_error_copy(error, &incoming->error);
1429 sd_bus_message_unref(incoming);
1433 k = bus_error_to_errno(&incoming->error);
1434 sd_bus_message_unref(incoming);
1438 sd_bus_message_unref(incoming);
1442 /* There's already guaranteed to be room for
1443 * this, so need to resize things here */
1444 bus->rqueue[bus->rqueue_size ++] = incoming;
1447 /* Try to read more, right-away */
1456 n = now(CLOCK_MONOTONIC);
1462 left = (uint64_t) -1;
1464 r = bus_poll(bus, true, left);
1468 r = dispatch_wqueue(bus);
1474 int sd_bus_get_fd(sd_bus *bus) {
1484 int sd_bus_get_events(sd_bus *bus) {
1492 if (bus->state == BUS_OPENING)
1494 else if (bus->state == BUS_AUTHENTICATING) {
1496 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1501 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1502 if (bus->rqueue_size <= 0)
1504 if (bus->wqueue_size > 0)
1511 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1512 struct reply_callback *c;
1521 if (bus->state == BUS_AUTHENTICATING) {
1522 *timeout_usec = bus->auth_timeout;
1526 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1529 c = prioq_peek(bus->reply_callbacks_prioq);
1533 *timeout_usec = c->timeout;
1537 static int process_timeout(sd_bus *bus) {
1538 struct reply_callback *c;
1544 c = prioq_peek(bus->reply_callbacks_prioq);
1548 n = now(CLOCK_MONOTONIC);
1552 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1553 hashmap_remove(bus->reply_callbacks, &c->serial);
1555 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1558 return r < 0 ? r : 1;
1561 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1562 struct reply_callback *c;
1568 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1569 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1572 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1576 if (c->timeout != 0)
1577 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1579 r = c->callback(bus, 0, m, c->userdata);
1585 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1586 struct filter_callback *l;
1589 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1590 r = l->callback(bus, 0, m, l->userdata);
1598 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1599 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1605 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1608 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1611 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1614 if (streq_ptr(m->member, "Ping"))
1615 r = sd_bus_message_new_method_return(bus, m, &reply);
1616 else if (streq_ptr(m->member, "GetMachineId")) {
1620 r = sd_id128_get_machine(&id);
1624 r = sd_bus_message_new_method_return(bus, m, &reply);
1628 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1630 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1632 sd_bus_error_set(&error,
1633 "org.freedesktop.DBus.Error.UnknownMethod",
1634 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1636 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1642 r = sd_bus_send(bus, reply, NULL);
1649 static int process_object(sd_bus *bus, sd_bus_message *m) {
1650 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1651 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1652 struct object_callback *c;
1660 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1663 if (hashmap_isempty(bus->object_callbacks))
1666 c = hashmap_get(bus->object_callbacks, m->path);
1668 r = c->callback(bus, 0, m, c->userdata);
1675 /* Look for fallback prefixes */
1676 p = strdupa(m->path);
1680 e = strrchr(p, '/');
1686 c = hashmap_get(bus->object_callbacks, p);
1687 if (c && c->is_fallback) {
1688 r = c->callback(bus, 0, m, c->userdata);
1699 sd_bus_error_set(&error,
1700 "org.freedesktop.DBus.Error.UnknownMethod",
1701 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1703 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1707 r = sd_bus_send(bus, reply, NULL);
1714 static int process_message(sd_bus *bus, sd_bus_message *m) {
1720 r = process_reply(bus, m);
1724 r = process_filter(bus, m);
1728 r = process_builtin(bus, m);
1732 return process_object(bus, m);
1735 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1738 /* Returns 0 when we didn't do anything. This should cause the
1739 * caller to invoke sd_bus_wait() before returning the next
1740 * time. Returns > 0 when we did something, which possibly
1741 * means *ret is filled in with an unprocessed message. */
1748 if (bus->state == BUS_OPENING) {
1759 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1761 socklen_t slen = sizeof(error);
1763 r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1765 bus->last_connect_error = errno;
1766 else if (error != 0)
1767 bus->last_connect_error = error;
1768 else if (p.revents & (POLLERR|POLLHUP))
1769 bus->last_connect_error = ECONNREFUSED;
1771 r = bus_start_auth(bus);
1775 /* Try next address */
1776 r = bus_start_connect(bus);
1783 } else if (bus->state == BUS_AUTHENTICATING) {
1785 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1788 r = bus_write_auth(bus);
1792 r = bus_read_auth(bus);
1795 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1796 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1799 r = process_timeout(bus);
1803 r = dispatch_wqueue(bus);
1808 r = dispatch_rqueue(bus, &m);
1817 r = process_message(bus, m);
1827 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1828 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1829 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1831 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1833 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1837 r = sd_bus_send(bus, reply, NULL);
1845 assert_not_reached("Unknown state");
1854 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1865 e = sd_bus_get_events(bus);
1872 r = sd_bus_get_timeout(bus, &until);
1879 n = now(CLOCK_MONOTONIC);
1880 m = until > n ? until - n : 0;
1883 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1890 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1894 return r > 0 ? 1 : 0;
1897 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1903 if (bus->rqueue_size > 0)
1906 return bus_poll(bus, false, timeout_usec);
1909 int sd_bus_flush(sd_bus *bus) {
1917 r = bus_ensure_running(bus);
1921 if (bus->wqueue_size <= 0)
1925 r = dispatch_wqueue(bus);
1929 if (bus->wqueue_size <= 0)
1932 r = bus_poll(bus, false, (uint64_t) -1);
1938 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1939 struct filter_callback *f;
1946 f = new(struct filter_callback, 1);
1949 f->callback = callback;
1950 f->userdata = userdata;
1952 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1956 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1957 struct filter_callback *f;
1964 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1965 if (f->callback == callback && f->userdata == userdata) {
1966 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1975 static int bus_add_object(
1979 sd_message_handler_t callback,
1982 struct object_callback *c;
1992 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
1996 c = new(struct object_callback, 1);
2000 c->path = strdup(path);
2006 c->callback = callback;
2007 c->userdata = userdata;
2008 c->is_fallback = fallback;
2010 r = hashmap_put(bus->object_callbacks, c->path, c);
2020 static int bus_remove_object(
2024 sd_message_handler_t callback,
2027 struct object_callback *c;
2036 c = hashmap_get(bus->object_callbacks, path);
2040 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2043 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2051 int sd_bus_add_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2052 return bus_add_object(bus, false, path, callback, userdata);
2055 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2056 return bus_remove_object(bus, false, path, callback, userdata);
2059 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2060 return bus_add_object(bus, true, prefix, callback, userdata);
2063 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2064 return bus_remove_object(bus, true, prefix, callback, userdata);