1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
35 #include "bus-internal.h"
36 #include "bus-message.h"
39 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
41 static void bus_free(sd_bus *b) {
42 struct filter_callback *f;
43 struct object_callback *c;
49 close_nointr_nofail(b->fd);
56 for (i = 0; i < b->rqueue_size; i++)
57 sd_bus_message_unref(b->rqueue[i]);
60 for (i = 0; i < b->wqueue_size; i++)
61 sd_bus_message_unref(b->wqueue[i]);
64 hashmap_free_free(b->reply_callbacks);
65 prioq_free(b->reply_callbacks_prioq);
67 while ((f = b->filter_callbacks)) {
68 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
72 while ((c = hashmap_steal_first(b->object_callbacks))) {
77 hashmap_free(b->object_callbacks);
82 static sd_bus* bus_new(void) {
91 r->message_version = 1;
93 /* We guarantee that wqueue always has space for at least one
95 r->wqueue = new(sd_bus_message*, 1);
104 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
115 r = sd_bus_message_read(reply, "s", &s);
119 if (!service_name_is_valid(s) || s[0] != ':')
122 bus->unique_name = strdup(s);
123 if (!bus->unique_name)
126 bus->state = BUS_RUNNING;
131 static int bus_send_hello(sd_bus *bus) {
132 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
137 r = sd_bus_message_new_method_call(
139 "org.freedesktop.DBus",
141 "org.freedesktop.DBus",
147 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
151 bus->sent_hello = true;
155 static int bus_start_running(sd_bus *bus) {
158 if (bus->sent_hello) {
159 bus->state = BUS_HELLO;
163 bus->state = BUS_RUNNING;
167 static int parse_address_key(const char **p, const char *key, char **value) {
178 if (strncmp(*p, key, l) != 0)
188 while (*a != ',' && *a != 0) {
206 c = (char) ((x << 4) | y);
213 t = realloc(r, n + 2);
238 static void skip_address_key(const char **p) {
242 *p += strcspn(*p, ",");
248 static int bus_parse_next_address(sd_bus *b) {
250 _cleanup_free_ char *guid = NULL;
257 if (b->address[b->address_index] == 0)
260 a = b->address + b->address_index;
263 b->sockaddr_size = 0;
264 b->peer = SD_ID128_NULL;
266 if (startswith(a, "unix:")) {
267 _cleanup_free_ char *path = NULL, *abstract = NULL;
271 r = parse_address_key(&p, "guid", &guid);
277 r = parse_address_key(&p, "path", &path);
283 r = parse_address_key(&p, "abstract", &abstract);
289 skip_address_key(&p);
292 if (!path && !abstract)
295 if (path && abstract)
302 if (l > sizeof(b->sockaddr.un.sun_path))
305 b->sockaddr.un.sun_family = AF_UNIX;
306 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
307 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
308 } else if (abstract) {
311 l = strlen(abstract);
312 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
315 b->sockaddr.un.sun_family = AF_UNIX;
316 b->sockaddr.un.sun_path[0] = 0;
317 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
318 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
321 } else if (startswith(a, "tcp:")) {
322 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
323 struct addrinfo hints, *result;
327 r = parse_address_key(&p, "guid", &guid);
333 r = parse_address_key(&p, "host", &host);
339 r = parse_address_key(&p, "port", &port);
345 r = parse_address_key(&p, "family", &family);
351 skip_address_key(&p);
358 hints.ai_socktype = SOCK_STREAM;
359 hints.ai_flags = AI_ADDRCONFIG;
362 if (streq(family, "ipv4"))
363 hints.ai_family = AF_INET;
364 else if (streq(family, "ipv6"))
365 hints.ai_family = AF_INET6;
370 r = getaddrinfo(host, port, &hints, &result);
374 return -EADDRNOTAVAIL;
376 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
377 b->sockaddr_size = result->ai_addrlen;
379 freeaddrinfo(result);
383 r = sd_id128_from_string(guid, &b->peer);
388 b->address_index = p - b->address;
392 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
395 struct iovec *i = iov + *idx;
397 if (i->iov_len > size) {
398 i->iov_base = (uint8_t*) i->iov_base + size;
412 static int bus_write_auth(sd_bus *b) {
417 assert(b->state == BUS_AUTHENTICATING);
419 if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
422 if (b->auth_timeout == 0)
423 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
426 mh.msg_iov = b->auth_iovec + b->auth_index;
427 mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
429 k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
431 return errno == EAGAIN ? 0 : -errno;
433 iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
438 static int bus_auth_verify(sd_bus *b) {
444 /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
447 e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
451 f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
455 if (e - (char*) b->rbuffer != 3 + 32)
458 if (memcmp(b->rbuffer, "OK ", 3))
461 for (i = 0; i < 32; i += 2) {
464 x = unhexchar(((char*) b->rbuffer)[3 + i]);
465 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
470 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
473 if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
474 !sd_id128_equal(b->peer, peer))
480 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
481 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
483 b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
484 memmove(b->rbuffer, f + 2, b->rbuffer_size);
486 r = bus_start_running(b);
493 static int bus_read_auth(sd_bus *b) {
503 r = bus_auth_verify(b);
507 n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
509 if (n > BUS_AUTH_SIZE_MAX)
510 n = BUS_AUTH_SIZE_MAX;
512 if (b->rbuffer_size >= n)
515 p = realloc(b->rbuffer, n);
522 iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
523 iov.iov_len = n - b->rbuffer_size;
529 k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
531 return errno == EAGAIN ? 0 : -errno;
533 b->rbuffer_size += k;
535 r = bus_auth_verify(b);
542 static int bus_setup_fd(sd_bus *b) {
547 /* Enable SO_PASSCRED + SO_PASSEC. We try this on any socket,
548 * just in case. This is actually irrelavant for */
550 setsockopt(b->fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
551 setsockopt(b->fd, SOL_SOCKET, SO_PASSSEC, &one, sizeof(one));
553 /* Increase the buffers to a MB */
554 fd_inc_rcvbuf(b->fd, 1024*1024);
555 fd_inc_sndbuf(b->fd, 1024*1024);
560 static int bus_start_auth(sd_bus *b) {
561 static const char auth_prefix[] = "\0AUTH EXTERNAL ";
562 static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
564 char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
569 b->state = BUS_AUTHENTICATING;
571 snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
575 b->auth_uid = hexmem(text, l);
579 b->auth_iovec[0].iov_base = (void*) auth_prefix;
580 b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
581 b->auth_iovec[1].iov_base = (void*) b->auth_uid;
582 b->auth_iovec[1].iov_len = l * 2;
583 b->auth_iovec[2].iov_base = (void*) auth_suffix;
584 b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
585 b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
587 return bus_write_auth(b);
590 static int bus_start_connect(sd_bus *b) {
597 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
598 r = bus_parse_next_address(b);
602 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
605 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
607 b->last_connect_error = errno;
613 b->last_connect_error = errno;
617 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
619 if (errno == EINPROGRESS)
622 b->last_connect_error = errno;
626 return bus_start_auth(b);
632 close_nointr_nofail(b->fd);
638 int sd_bus_open_system(sd_bus **ret) {
646 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
648 r = sd_bus_open_address(e, &b);
656 b->sockaddr.un.sun_family = AF_UNIX;
657 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
658 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
660 r = bus_start_connect(b);
667 r = bus_send_hello(b);
677 int sd_bus_open_user(sd_bus **ret) {
686 e = getenv("DBUS_SESSION_BUS_ADDRESS");
688 r = sd_bus_open_address(e, &b);
692 e = getenv("XDG_RUNTIME_DIR");
697 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
704 b->sockaddr.un.sun_family = AF_UNIX;
705 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
706 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
708 r = bus_start_connect(b);
715 r = bus_send_hello(b);
725 int sd_bus_open_address(const char *address, sd_bus **ret) {
738 b->address = strdup(address);
744 r = bus_start_connect(b);
754 int sd_bus_open_fd(int fd, sd_bus **ret) {
769 r = fd_nonblock(b->fd, true);
773 fd_cloexec(b->fd, true);
781 r = bus_start_auth(b);
793 void sd_bus_close(sd_bus *bus) {
799 close_nointr_nofail(bus->fd);
803 sd_bus *sd_bus_ref(sd_bus *bus) {
807 assert(bus->n_ref > 0);
813 sd_bus *sd_bus_unref(sd_bus *bus) {
817 assert(bus->n_ref > 0);
826 int sd_bus_is_open(sd_bus *bus) {
833 int sd_bus_can_send(sd_bus *bus, char type) {
839 if (type == SD_BUS_TYPE_UNIX_FD) {
840 r = bus_ensure_running(bus);
847 return bus_type_is_valid(type);
850 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
858 r = bus_ensure_running(bus);
866 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
869 if (m->header->version > b->message_version)
875 return bus_message_seal(m, ++b->serial);
878 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
888 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
893 n = m->n_iovec * sizeof(struct iovec);
895 memcpy(iov, m->iovec, n);
898 iovec_advance(iov, &j, *idx);
902 mh.msg_iovlen = m->n_iovec;
904 k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
906 return errno == EAGAIN ? 0 : -errno;
912 static int message_read_need(sd_bus *bus, size_t *need) {
919 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
921 if (bus->rbuffer_size < sizeof(struct bus_header)) {
922 *need = sizeof(struct bus_header) + 8;
924 /* Minimum message size:
928 * Method Call: +2 string headers
929 * Signal: +3 string headers
930 * Method Error: +1 string headers
932 * Method Reply: +1 uint32 headers
934 * A string header is at least 9 bytes
935 * A uint32 header is at least 8 bytes
937 * Hence the minimum message size of a valid message
938 * is header + 8 bytes */
943 a = ((const uint32_t*) bus->rbuffer)[1];
944 b = ((const uint32_t*) bus->rbuffer)[3];
946 e = ((const uint8_t*) bus->rbuffer)[0];
947 if (e == SD_BUS_LITTLE_ENDIAN) {
950 } else if (e == SD_BUS_BIG_ENDIAN) {
956 sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
957 if (sum >= BUS_MESSAGE_SIZE_MAX)
960 *need = (size_t) sum;
964 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
971 assert(bus->rbuffer_size >= size);
972 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
974 if (bus->rbuffer_size > size) {
975 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
982 r = bus_message_from_malloc(bus->rbuffer, size,
983 bus->ucred_valid ? &bus->ucred : NULL,
984 bus->label[0] ? bus->label : NULL, &t);
991 bus->rbuffer_size -= size;
997 static int message_read(sd_bus *bus, sd_bus_message **m) {
1005 struct cmsghdr cmsghdr;
1006 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1007 CMSG_SPACE(NAME_MAX)]; /*selinux label */
1009 struct cmsghdr *cmsg;
1013 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1015 r = message_read_need(bus, &need);
1019 if (bus->rbuffer_size >= need)
1020 return message_make(bus, need, m);
1022 b = realloc(bus->rbuffer, need);
1029 iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
1030 iov.iov_len = need - bus->rbuffer_size;
1035 mh.msg_control = &control;
1036 mh.msg_controllen = sizeof(control);
1038 k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL|MSG_CMSG_CLOEXEC);
1040 return errno == EAGAIN ? 0 : -errno;
1042 bus->rbuffer_size += k;
1043 bus->ucred_valid = false;
1046 for (cmsg = CMSG_FIRSTHDR(&mh); cmsg; cmsg = CMSG_NXTHDR(&mh, cmsg)) {
1047 if (cmsg->cmsg_level == SOL_SOCKET &&
1048 cmsg->cmsg_type == SCM_CREDENTIALS &&
1049 cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred))) {
1051 memcpy(&bus->ucred, CMSG_DATA(cmsg), sizeof(struct ucred));
1052 bus->ucred_valid = true;
1054 } else if (cmsg->cmsg_level == SOL_SOCKET &&
1055 cmsg->cmsg_type == SCM_SECURITY) {
1058 l = cmsg->cmsg_len - CMSG_LEN(0);
1059 memcpy(&bus->label, CMSG_DATA(cmsg), l);
1064 r = message_read_need(bus, &need);
1068 if (bus->rbuffer_size >= need)
1069 return message_make(bus, need, m);
1074 static int dispatch_wqueue(sd_bus *bus) {
1078 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1083 while (bus->wqueue_size > 0) {
1085 r = message_write(bus, bus->wqueue[0], &bus->windex);
1090 /* Didn't do anything this time */
1092 else if (bus->windex >= bus->wqueue[0]->size) {
1093 /* Fully written. Let's drop the entry from
1096 * This isn't particularly optimized, but
1097 * well, this is supposed to be our worst-case
1098 * buffer only, and the socket buffer is
1099 * supposed to be our primary buffer, and if
1100 * it got full, then all bets are off
1103 sd_bus_message_unref(bus->wqueue[0]);
1104 bus->wqueue_size --;
1105 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1115 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1116 sd_bus_message *z = NULL;
1121 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1126 if (bus->rqueue_size > 0) {
1127 /* Dispatch a queued message */
1129 *m = bus->rqueue[0];
1130 bus->rqueue_size --;
1131 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1135 /* Try to read a new message */
1137 r = message_read(bus, &z);
1152 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1162 /* If the serial number isn't kept, then we know that no reply
1164 if (!serial && !m->sealed)
1165 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1167 r = bus_seal_message(bus, m);
1171 /* If this is a reply and no reply was requested, then let's
1172 * suppress this, if we can */
1173 if (m->dont_send && !serial)
1176 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1179 r = message_write(bus, m, &idx);
1183 } else if (idx < m->size) {
1184 /* Wasn't fully written. So let's remember how
1185 * much was written. Note that the first entry
1186 * of the wqueue array is always allocated so
1187 * that we always can remember how much was
1189 bus->wqueue[0] = sd_bus_message_ref(m);
1190 bus->wqueue_size = 1;
1196 /* Just append it to the queue. */
1198 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1201 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1206 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1210 *serial = BUS_MESSAGE_SERIAL(m);
1215 static usec_t calc_elapse(uint64_t usec) {
1216 if (usec == (uint64_t) -1)
1220 usec = BUS_DEFAULT_TIMEOUT;
1222 return now(CLOCK_MONOTONIC) + usec;
1225 static int timeout_compare(const void *a, const void *b) {
1226 const struct reply_callback *x = a, *y = b;
1228 if (x->timeout != 0 && y->timeout == 0)
1231 if (x->timeout == 0 && y->timeout != 0)
1234 if (x->timeout < y->timeout)
1237 if (x->timeout > y->timeout)
1243 int sd_bus_send_with_reply(
1246 sd_message_handler_t callback,
1251 struct reply_callback *c;
1262 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1264 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1267 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1271 if (usec != (uint64_t) -1) {
1272 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1277 r = bus_seal_message(bus, m);
1281 c = new(struct reply_callback, 1);
1285 c->callback = callback;
1286 c->userdata = userdata;
1287 c->serial = BUS_MESSAGE_SERIAL(m);
1288 c->timeout = calc_elapse(usec);
1290 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1296 if (c->timeout != 0) {
1297 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1300 sd_bus_send_with_reply_cancel(bus, c->serial);
1305 r = sd_bus_send(bus, m, serial);
1307 sd_bus_send_with_reply_cancel(bus, c->serial);
1314 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1315 struct reply_callback *c;
1322 c = hashmap_remove(bus->reply_callbacks, &serial);
1326 if (c->timeout != 0)
1327 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1333 int bus_ensure_running(sd_bus *bus) {
1338 if (bus->state == BUS_RUNNING)
1342 r = sd_bus_process(bus, NULL);
1345 if (bus->state == BUS_RUNNING)
1350 r = sd_bus_wait(bus, (uint64_t) -1);
1356 int sd_bus_send_with_reply_and_block(
1360 sd_bus_error *error,
1361 sd_bus_message **reply) {
1374 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1376 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1378 if (bus_error_is_dirty(error))
1381 r = bus_ensure_running(bus);
1385 r = sd_bus_send(bus, m, &serial);
1389 timeout = calc_elapse(usec);
1393 sd_bus_message *incoming = NULL;
1398 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1401 /* Make sure there's room for queuing this
1402 * locally, before we read the message */
1404 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1412 r = message_read(bus, &incoming);
1417 if (incoming->reply_serial == serial) {
1418 /* Found a match! */
1420 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1425 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1428 r = sd_bus_error_copy(error, &incoming->error);
1430 sd_bus_message_unref(incoming);
1434 k = bus_error_to_errno(&incoming->error);
1435 sd_bus_message_unref(incoming);
1439 sd_bus_message_unref(incoming);
1443 /* There's already guaranteed to be room for
1444 * this, so need to resize things here */
1445 bus->rqueue[bus->rqueue_size ++] = incoming;
1448 /* Try to read more, right-away */
1457 n = now(CLOCK_MONOTONIC);
1463 left = (uint64_t) -1;
1465 r = bus_poll(bus, true, left);
1469 r = dispatch_wqueue(bus);
1475 int sd_bus_get_fd(sd_bus *bus) {
1485 int sd_bus_get_events(sd_bus *bus) {
1493 if (bus->state == BUS_OPENING)
1495 else if (bus->state == BUS_AUTHENTICATING) {
1497 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1502 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1503 if (bus->rqueue_size <= 0)
1505 if (bus->wqueue_size > 0)
1512 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1513 struct reply_callback *c;
1522 if (bus->state == BUS_AUTHENTICATING) {
1523 *timeout_usec = bus->auth_timeout;
1527 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1530 c = prioq_peek(bus->reply_callbacks_prioq);
1534 *timeout_usec = c->timeout;
1538 static int process_timeout(sd_bus *bus) {
1539 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1540 struct reply_callback *c;
1546 c = prioq_peek(bus->reply_callbacks_prioq);
1550 n = now(CLOCK_MONOTONIC);
1554 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1555 hashmap_remove(bus->reply_callbacks, &c->serial);
1557 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1560 return r < 0 ? r : 1;
1563 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1564 struct reply_callback *c;
1570 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1571 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1574 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1578 if (c->timeout != 0)
1579 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1581 r = c->callback(bus, 0, m, c->userdata);
1587 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1588 struct filter_callback *l;
1591 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1592 r = l->callback(bus, 0, m, l->userdata);
1600 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1601 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1607 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1610 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1613 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1616 if (streq_ptr(m->member, "Ping"))
1617 r = sd_bus_message_new_method_return(bus, m, &reply);
1618 else if (streq_ptr(m->member, "GetMachineId")) {
1622 r = sd_id128_get_machine(&id);
1626 r = sd_bus_message_new_method_return(bus, m, &reply);
1630 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1632 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1634 sd_bus_error_set(&error,
1635 "org.freedesktop.DBus.Error.UnknownMethod",
1636 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1638 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1644 r = sd_bus_send(bus, reply, NULL);
1651 static int process_object(sd_bus *bus, sd_bus_message *m) {
1652 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1653 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1654 struct object_callback *c;
1662 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1665 if (hashmap_isempty(bus->object_callbacks))
1668 c = hashmap_get(bus->object_callbacks, m->path);
1670 r = c->callback(bus, 0, m, c->userdata);
1677 /* Look for fallback prefixes */
1678 p = strdupa(m->path);
1682 e = strrchr(p, '/');
1688 c = hashmap_get(bus->object_callbacks, p);
1689 if (c && c->is_fallback) {
1690 r = c->callback(bus, 0, m, c->userdata);
1701 sd_bus_error_set(&error,
1702 "org.freedesktop.DBus.Error.UnknownMethod",
1703 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1705 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1709 r = sd_bus_send(bus, reply, NULL);
1716 static int process_message(sd_bus *bus, sd_bus_message *m) {
1722 r = process_reply(bus, m);
1726 r = process_filter(bus, m);
1730 r = process_builtin(bus, m);
1734 return process_object(bus, m);
1737 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1740 /* Returns 0 when we didn't do anything. This should cause the
1741 * caller to invoke sd_bus_wait() before returning the next
1742 * time. Returns > 0 when we did something, which possibly
1743 * means *ret is filled in with an unprocessed message. */
1750 if (bus->state == BUS_OPENING) {
1761 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1763 socklen_t slen = sizeof(error);
1765 r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1767 bus->last_connect_error = errno;
1768 else if (error != 0)
1769 bus->last_connect_error = error;
1770 else if (p.revents & (POLLERR|POLLHUP))
1771 bus->last_connect_error = ECONNREFUSED;
1773 r = bus_start_auth(bus);
1777 /* Try next address */
1778 r = bus_start_connect(bus);
1785 } else if (bus->state == BUS_AUTHENTICATING) {
1787 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1790 r = bus_write_auth(bus);
1794 r = bus_read_auth(bus);
1797 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1798 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1801 r = process_timeout(bus);
1805 r = dispatch_wqueue(bus);
1810 r = dispatch_rqueue(bus, &m);
1819 r = process_message(bus, m);
1829 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1830 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1831 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1833 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1835 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1839 r = sd_bus_send(bus, reply, NULL);
1847 assert_not_reached("Unknown state");
1856 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1867 e = sd_bus_get_events(bus);
1874 r = sd_bus_get_timeout(bus, &until);
1881 n = now(CLOCK_MONOTONIC);
1882 m = until > n ? until - n : 0;
1885 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1892 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1896 return r > 0 ? 1 : 0;
1899 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1905 if (bus->rqueue_size > 0)
1908 return bus_poll(bus, false, timeout_usec);
1911 int sd_bus_flush(sd_bus *bus) {
1919 r = bus_ensure_running(bus);
1923 if (bus->wqueue_size <= 0)
1927 r = dispatch_wqueue(bus);
1931 if (bus->wqueue_size <= 0)
1934 r = bus_poll(bus, false, (uint64_t) -1);
1940 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1941 struct filter_callback *f;
1948 f = new(struct filter_callback, 1);
1951 f->callback = callback;
1952 f->userdata = userdata;
1954 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1958 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1959 struct filter_callback *f;
1966 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1967 if (f->callback == callback && f->userdata == userdata) {
1968 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1977 static int bus_add_object(
1981 sd_message_handler_t callback,
1984 struct object_callback *c;
1994 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
1998 c = new(struct object_callback, 1);
2002 c->path = strdup(path);
2008 c->callback = callback;
2009 c->userdata = userdata;
2010 c->is_fallback = fallback;
2012 r = hashmap_put(bus->object_callbacks, c->path, c);
2022 static int bus_remove_object(
2026 sd_message_handler_t callback,
2029 struct object_callback *c;
2038 c = hashmap_get(bus->object_callbacks, path);
2042 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2045 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2053 int sd_bus_add_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2054 return bus_add_object(bus, false, path, callback, userdata);
2057 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2058 return bus_remove_object(bus, false, path, callback, userdata);
2061 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2062 return bus_add_object(bus, true, prefix, callback, userdata);
2065 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2066 return bus_remove_object(bus, true, prefix, callback, userdata);