1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
35 #include "bus-internal.h"
36 #include "bus-message.h"
39 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
41 static void bus_free(sd_bus *b) {
42 struct filter_callback *f;
43 struct object_callback *c;
49 close_nointr_nofail(b->fd);
56 close_many(b->fds, b->n_fds);
59 for (i = 0; i < b->rqueue_size; i++)
60 sd_bus_message_unref(b->rqueue[i]);
63 for (i = 0; i < b->wqueue_size; i++)
64 sd_bus_message_unref(b->wqueue[i]);
67 hashmap_free_free(b->reply_callbacks);
68 prioq_free(b->reply_callbacks_prioq);
70 while ((f = b->filter_callbacks)) {
71 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
75 while ((c = hashmap_steal_first(b->object_callbacks))) {
80 hashmap_free(b->object_callbacks);
85 static sd_bus* bus_new(void) {
94 r->message_version = 1;
96 /* We guarantee that wqueue always has space for at least one
98 r->wqueue = new(sd_bus_message*, 1);
107 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
118 r = sd_bus_message_read(reply, "s", &s);
122 if (!service_name_is_valid(s) || s[0] != ':')
125 bus->unique_name = strdup(s);
126 if (!bus->unique_name)
129 bus->state = BUS_RUNNING;
134 static int bus_send_hello(sd_bus *bus) {
135 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
140 r = sd_bus_message_new_method_call(
142 "org.freedesktop.DBus",
144 "org.freedesktop.DBus",
150 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
154 bus->sent_hello = true;
158 static int bus_start_running(sd_bus *bus) {
161 if (bus->sent_hello) {
162 bus->state = BUS_HELLO;
166 bus->state = BUS_RUNNING;
170 static int parse_address_key(const char **p, const char *key, char **value) {
181 if (strncmp(*p, key, l) != 0)
191 while (*a != ',' && *a != 0) {
209 c = (char) ((x << 4) | y);
216 t = realloc(r, n + 2);
241 static void skip_address_key(const char **p) {
245 *p += strcspn(*p, ",");
251 static int bus_parse_next_address(sd_bus *b) {
253 _cleanup_free_ char *guid = NULL;
260 if (b->address[b->address_index] == 0)
263 a = b->address + b->address_index;
266 b->sockaddr_size = 0;
267 b->peer = SD_ID128_NULL;
269 if (startswith(a, "unix:")) {
270 _cleanup_free_ char *path = NULL, *abstract = NULL;
274 r = parse_address_key(&p, "guid", &guid);
280 r = parse_address_key(&p, "path", &path);
286 r = parse_address_key(&p, "abstract", &abstract);
292 skip_address_key(&p);
295 if (!path && !abstract)
298 if (path && abstract)
305 if (l > sizeof(b->sockaddr.un.sun_path))
308 b->sockaddr.un.sun_family = AF_UNIX;
309 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
310 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
311 } else if (abstract) {
314 l = strlen(abstract);
315 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
318 b->sockaddr.un.sun_family = AF_UNIX;
319 b->sockaddr.un.sun_path[0] = 0;
320 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
321 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
324 } else if (startswith(a, "tcp:")) {
325 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
326 struct addrinfo hints, *result;
330 r = parse_address_key(&p, "guid", &guid);
336 r = parse_address_key(&p, "host", &host);
342 r = parse_address_key(&p, "port", &port);
348 r = parse_address_key(&p, "family", &family);
354 skip_address_key(&p);
361 hints.ai_socktype = SOCK_STREAM;
362 hints.ai_flags = AI_ADDRCONFIG;
365 if (streq(family, "ipv4"))
366 hints.ai_family = AF_INET;
367 else if (streq(family, "ipv6"))
368 hints.ai_family = AF_INET6;
373 r = getaddrinfo(host, port, &hints, &result);
377 return -EADDRNOTAVAIL;
379 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
380 b->sockaddr_size = result->ai_addrlen;
382 freeaddrinfo(result);
386 r = sd_id128_from_string(guid, &b->peer);
391 b->address_index = p - b->address;
395 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
398 struct iovec *i = iov + *idx;
400 if (i->iov_len > size) {
401 i->iov_base = (uint8_t*) i->iov_base + size;
415 static int bus_write_auth(sd_bus *b) {
420 assert(b->state == BUS_AUTHENTICATING);
422 if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
425 if (b->auth_timeout == 0)
426 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
429 mh.msg_iov = b->auth_iovec + b->auth_index;
430 mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
432 k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
434 return errno == EAGAIN ? 0 : -errno;
436 iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
441 static int bus_auth_verify(sd_bus *b) {
447 /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
450 e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
454 f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
458 if (e - (char*) b->rbuffer != 3 + 32)
461 if (memcmp(b->rbuffer, "OK ", 3))
464 for (i = 0; i < 32; i += 2) {
467 x = unhexchar(((char*) b->rbuffer)[3 + i]);
468 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
473 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
476 if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
477 !sd_id128_equal(b->peer, peer))
483 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
484 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
486 b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
487 memmove(b->rbuffer, f + 2, b->rbuffer_size);
489 r = bus_start_running(b);
496 static int bus_read_auth(sd_bus *b) {
506 r = bus_auth_verify(b);
510 n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
512 if (n > BUS_AUTH_SIZE_MAX)
513 n = BUS_AUTH_SIZE_MAX;
515 if (b->rbuffer_size >= n)
518 p = realloc(b->rbuffer, n);
525 iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
526 iov.iov_len = n - b->rbuffer_size;
532 k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
534 return errno == EAGAIN ? 0 : -errno;
536 b->rbuffer_size += k;
538 r = bus_auth_verify(b);
545 static int bus_setup_fd(sd_bus *b) {
550 /* Enable SO_PASSCRED + SO_PASSEC. We try this on any socket,
551 * just in case. This is actually irrelavant for */
553 setsockopt(b->fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
554 setsockopt(b->fd, SOL_SOCKET, SO_PASSSEC, &one, sizeof(one));
556 /* Increase the buffers to a MB */
557 fd_inc_rcvbuf(b->fd, 1024*1024);
558 fd_inc_sndbuf(b->fd, 1024*1024);
563 static int bus_start_auth(sd_bus *b) {
564 static const char auth_prefix[] = "\0AUTH EXTERNAL ";
565 static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
567 char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
572 b->state = BUS_AUTHENTICATING;
574 snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
578 b->auth_uid = hexmem(text, l);
582 b->auth_iovec[0].iov_base = (void*) auth_prefix;
583 b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
584 b->auth_iovec[1].iov_base = (void*) b->auth_uid;
585 b->auth_iovec[1].iov_len = l * 2;
586 b->auth_iovec[2].iov_base = (void*) auth_suffix;
587 b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
588 b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
590 return bus_write_auth(b);
593 static int bus_start_connect(sd_bus *b) {
600 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
601 r = bus_parse_next_address(b);
605 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
608 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
610 b->last_connect_error = errno;
616 b->last_connect_error = errno;
620 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
622 if (errno == EINPROGRESS)
625 b->last_connect_error = errno;
629 return bus_start_auth(b);
635 close_nointr_nofail(b->fd);
641 int sd_bus_open_system(sd_bus **ret) {
649 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
651 r = sd_bus_open_address(e, &b);
659 b->sockaddr.un.sun_family = AF_UNIX;
660 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
661 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
663 r = bus_start_connect(b);
670 r = bus_send_hello(b);
680 int sd_bus_open_user(sd_bus **ret) {
689 e = getenv("DBUS_SESSION_BUS_ADDRESS");
691 r = sd_bus_open_address(e, &b);
695 e = getenv("XDG_RUNTIME_DIR");
700 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
707 b->sockaddr.un.sun_family = AF_UNIX;
708 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
709 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
711 r = bus_start_connect(b);
718 r = bus_send_hello(b);
728 int sd_bus_open_address(const char *address, sd_bus **ret) {
741 b->address = strdup(address);
747 r = bus_start_connect(b);
757 int sd_bus_open_fd(int fd, sd_bus **ret) {
772 r = fd_nonblock(b->fd, true);
776 fd_cloexec(b->fd, true);
784 r = bus_start_auth(b);
796 void sd_bus_close(sd_bus *bus) {
802 close_nointr_nofail(bus->fd);
806 sd_bus *sd_bus_ref(sd_bus *bus) {
810 assert(bus->n_ref > 0);
816 sd_bus *sd_bus_unref(sd_bus *bus) {
820 assert(bus->n_ref > 0);
829 int sd_bus_is_open(sd_bus *bus) {
836 int sd_bus_can_send(sd_bus *bus, char type) {
842 if (type == SD_BUS_TYPE_UNIX_FD) {
843 r = bus_ensure_running(bus);
850 return bus_type_is_valid(type);
853 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
861 r = bus_ensure_running(bus);
869 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
872 if (m->header->version > b->message_version)
878 return bus_message_seal(m, ++b->serial);
881 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
891 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
898 struct cmsghdr *control;
899 control = alloca(CMSG_SPACE(sizeof(int) * m->n_fds));
901 mh.msg_control = control;
902 control->cmsg_level = SOL_SOCKET;
903 control->cmsg_type = SCM_RIGHTS;
904 mh.msg_controllen = control->cmsg_len = CMSG_LEN(sizeof(int) * m->n_fds);
905 memcpy(CMSG_DATA(control), m->fds, sizeof(int) * m->n_fds);
908 n = m->n_iovec * sizeof(struct iovec);
910 memcpy(iov, m->iovec, n);
913 iovec_advance(iov, &j, *idx);
916 mh.msg_iovlen = m->n_iovec;
918 k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
920 return errno == EAGAIN ? 0 : -errno;
926 static int message_read_need(sd_bus *bus, size_t *need) {
933 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
935 if (bus->rbuffer_size < sizeof(struct bus_header)) {
936 *need = sizeof(struct bus_header) + 8;
938 /* Minimum message size:
942 * Method Call: +2 string headers
943 * Signal: +3 string headers
944 * Method Error: +1 string headers
946 * Method Reply: +1 uint32 headers
948 * A string header is at least 9 bytes
949 * A uint32 header is at least 8 bytes
951 * Hence the minimum message size of a valid message
952 * is header + 8 bytes */
957 a = ((const uint32_t*) bus->rbuffer)[1];
958 b = ((const uint32_t*) bus->rbuffer)[3];
960 e = ((const uint8_t*) bus->rbuffer)[0];
961 if (e == SD_BUS_LITTLE_ENDIAN) {
964 } else if (e == SD_BUS_BIG_ENDIAN) {
970 sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
971 if (sum >= BUS_MESSAGE_SIZE_MAX)
974 *need = (size_t) sum;
978 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
985 assert(bus->rbuffer_size >= size);
986 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
988 if (bus->rbuffer_size > size) {
989 b = memdup((const uint8_t*) bus->rbuffer + size,
990 bus->rbuffer_size - size);
996 r = bus_message_from_malloc(bus->rbuffer, size,
997 bus->fds, bus->n_fds,
998 bus->ucred_valid ? &bus->ucred : NULL,
999 bus->label[0] ? bus->label : NULL,
1007 bus->rbuffer_size -= size;
1016 static int message_read(sd_bus *bus, sd_bus_message **m) {
1024 struct cmsghdr cmsghdr;
1025 uint8_t buf[CMSG_SPACE(sizeof(int) * BUS_FDS_MAX) +
1026 CMSG_SPACE(sizeof(struct ucred)) +
1027 CMSG_SPACE(NAME_MAX)]; /*selinux label */
1029 struct cmsghdr *cmsg;
1033 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1035 r = message_read_need(bus, &need);
1039 if (bus->rbuffer_size >= need)
1040 return message_make(bus, need, m);
1042 b = realloc(bus->rbuffer, need);
1049 iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
1050 iov.iov_len = need - bus->rbuffer_size;
1055 mh.msg_control = &control;
1056 mh.msg_controllen = sizeof(control);
1058 k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL|MSG_CMSG_CLOEXEC);
1060 return errno == EAGAIN ? 0 : -errno;
1062 bus->rbuffer_size += k;
1064 for (cmsg = CMSG_FIRSTHDR(&mh); cmsg; cmsg = CMSG_NXTHDR(&mh, cmsg)) {
1065 if (cmsg->cmsg_level == SOL_SOCKET &&
1066 cmsg->cmsg_type == SCM_RIGHTS) {
1069 n = (cmsg->cmsg_len - CMSG_LEN(0)) / sizeof(int);
1071 f = realloc(bus->fds, sizeof(int) + (bus->n_fds + n));
1073 close_many((int*) CMSG_DATA(cmsg), n);
1077 memcpy(f + bus->n_fds, CMSG_DATA(cmsg), n * sizeof(int));
1080 } else if (cmsg->cmsg_level == SOL_SOCKET &&
1081 cmsg->cmsg_type == SCM_CREDENTIALS &&
1082 cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred))) {
1084 memcpy(&bus->ucred, CMSG_DATA(cmsg), sizeof(struct ucred));
1085 bus->ucred_valid = true;
1087 } else if (cmsg->cmsg_level == SOL_SOCKET &&
1088 cmsg->cmsg_type == SCM_SECURITY) {
1091 l = cmsg->cmsg_len - CMSG_LEN(0);
1092 memcpy(&bus->label, CMSG_DATA(cmsg), l);
1097 r = message_read_need(bus, &need);
1101 if (bus->rbuffer_size >= need)
1102 return message_make(bus, need, m);
1107 static int dispatch_wqueue(sd_bus *bus) {
1111 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1116 while (bus->wqueue_size > 0) {
1118 r = message_write(bus, bus->wqueue[0], &bus->windex);
1123 /* Didn't do anything this time */
1125 else if (bus->windex >= bus->wqueue[0]->size) {
1126 /* Fully written. Let's drop the entry from
1129 * This isn't particularly optimized, but
1130 * well, this is supposed to be our worst-case
1131 * buffer only, and the socket buffer is
1132 * supposed to be our primary buffer, and if
1133 * it got full, then all bets are off
1136 sd_bus_message_unref(bus->wqueue[0]);
1137 bus->wqueue_size --;
1138 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1148 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1149 sd_bus_message *z = NULL;
1154 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1159 if (bus->rqueue_size > 0) {
1160 /* Dispatch a queued message */
1162 *m = bus->rqueue[0];
1163 bus->rqueue_size --;
1164 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1168 /* Try to read a new message */
1170 r = message_read(bus, &z);
1185 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1194 if (m->n_fds > 0 && !bus->can_fds)
1197 /* If the serial number isn't kept, then we know that no reply
1199 if (!serial && !m->sealed)
1200 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1202 r = bus_seal_message(bus, m);
1206 /* If this is a reply and no reply was requested, then let's
1207 * suppress this, if we can */
1208 if (m->dont_send && !serial)
1211 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1214 r = message_write(bus, m, &idx);
1218 } else if (idx < m->size) {
1219 /* Wasn't fully written. So let's remember how
1220 * much was written. Note that the first entry
1221 * of the wqueue array is always allocated so
1222 * that we always can remember how much was
1224 bus->wqueue[0] = sd_bus_message_ref(m);
1225 bus->wqueue_size = 1;
1231 /* Just append it to the queue. */
1233 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1236 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1241 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1245 *serial = BUS_MESSAGE_SERIAL(m);
1250 static usec_t calc_elapse(uint64_t usec) {
1251 if (usec == (uint64_t) -1)
1255 usec = BUS_DEFAULT_TIMEOUT;
1257 return now(CLOCK_MONOTONIC) + usec;
1260 static int timeout_compare(const void *a, const void *b) {
1261 const struct reply_callback *x = a, *y = b;
1263 if (x->timeout != 0 && y->timeout == 0)
1266 if (x->timeout == 0 && y->timeout != 0)
1269 if (x->timeout < y->timeout)
1272 if (x->timeout > y->timeout)
1278 int sd_bus_send_with_reply(
1281 sd_message_handler_t callback,
1286 struct reply_callback *c;
1297 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1299 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1302 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1306 if (usec != (uint64_t) -1) {
1307 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1312 r = bus_seal_message(bus, m);
1316 c = new(struct reply_callback, 1);
1320 c->callback = callback;
1321 c->userdata = userdata;
1322 c->serial = BUS_MESSAGE_SERIAL(m);
1323 c->timeout = calc_elapse(usec);
1325 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1331 if (c->timeout != 0) {
1332 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1335 sd_bus_send_with_reply_cancel(bus, c->serial);
1340 r = sd_bus_send(bus, m, serial);
1342 sd_bus_send_with_reply_cancel(bus, c->serial);
1349 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1350 struct reply_callback *c;
1357 c = hashmap_remove(bus->reply_callbacks, &serial);
1361 if (c->timeout != 0)
1362 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1368 int bus_ensure_running(sd_bus *bus) {
1373 if (bus->state == BUS_RUNNING)
1377 r = sd_bus_process(bus, NULL);
1380 if (bus->state == BUS_RUNNING)
1385 r = sd_bus_wait(bus, (uint64_t) -1);
1391 int sd_bus_send_with_reply_and_block(
1395 sd_bus_error *error,
1396 sd_bus_message **reply) {
1409 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1411 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1413 if (bus_error_is_dirty(error))
1416 r = bus_ensure_running(bus);
1420 r = sd_bus_send(bus, m, &serial);
1424 timeout = calc_elapse(usec);
1428 sd_bus_message *incoming = NULL;
1433 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1436 /* Make sure there's room for queuing this
1437 * locally, before we read the message */
1439 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1447 r = message_read(bus, &incoming);
1452 if (incoming->reply_serial == serial) {
1453 /* Found a match! */
1455 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1460 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1463 r = sd_bus_error_copy(error, &incoming->error);
1465 sd_bus_message_unref(incoming);
1469 k = bus_error_to_errno(&incoming->error);
1470 sd_bus_message_unref(incoming);
1474 sd_bus_message_unref(incoming);
1478 /* There's already guaranteed to be room for
1479 * this, so need to resize things here */
1480 bus->rqueue[bus->rqueue_size ++] = incoming;
1483 /* Try to read more, right-away */
1492 n = now(CLOCK_MONOTONIC);
1498 left = (uint64_t) -1;
1500 r = bus_poll(bus, true, left);
1504 r = dispatch_wqueue(bus);
1510 int sd_bus_get_fd(sd_bus *bus) {
1520 int sd_bus_get_events(sd_bus *bus) {
1528 if (bus->state == BUS_OPENING)
1530 else if (bus->state == BUS_AUTHENTICATING) {
1532 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1537 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1538 if (bus->rqueue_size <= 0)
1540 if (bus->wqueue_size > 0)
1547 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1548 struct reply_callback *c;
1557 if (bus->state == BUS_AUTHENTICATING) {
1558 *timeout_usec = bus->auth_timeout;
1562 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1565 c = prioq_peek(bus->reply_callbacks_prioq);
1569 *timeout_usec = c->timeout;
1573 static int process_timeout(sd_bus *bus) {
1574 struct reply_callback *c;
1580 c = prioq_peek(bus->reply_callbacks_prioq);
1584 n = now(CLOCK_MONOTONIC);
1588 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1589 hashmap_remove(bus->reply_callbacks, &c->serial);
1591 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1594 return r < 0 ? r : 1;
1597 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1598 struct reply_callback *c;
1604 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1605 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1608 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1612 if (c->timeout != 0)
1613 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1615 r = c->callback(bus, 0, m, c->userdata);
1621 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1622 struct filter_callback *l;
1625 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1626 r = l->callback(bus, 0, m, l->userdata);
1634 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1635 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1641 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1644 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1647 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1650 if (streq_ptr(m->member, "Ping"))
1651 r = sd_bus_message_new_method_return(bus, m, &reply);
1652 else if (streq_ptr(m->member, "GetMachineId")) {
1656 r = sd_id128_get_machine(&id);
1660 r = sd_bus_message_new_method_return(bus, m, &reply);
1664 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1666 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1668 sd_bus_error_set(&error,
1669 "org.freedesktop.DBus.Error.UnknownMethod",
1670 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1672 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1678 r = sd_bus_send(bus, reply, NULL);
1685 static int process_object(sd_bus *bus, sd_bus_message *m) {
1686 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1687 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1688 struct object_callback *c;
1696 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1699 if (hashmap_isempty(bus->object_callbacks))
1702 c = hashmap_get(bus->object_callbacks, m->path);
1704 r = c->callback(bus, 0, m, c->userdata);
1711 /* Look for fallback prefixes */
1712 p = strdupa(m->path);
1716 e = strrchr(p, '/');
1722 c = hashmap_get(bus->object_callbacks, p);
1723 if (c && c->is_fallback) {
1724 r = c->callback(bus, 0, m, c->userdata);
1735 sd_bus_error_set(&error,
1736 "org.freedesktop.DBus.Error.UnknownMethod",
1737 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1739 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1743 r = sd_bus_send(bus, reply, NULL);
1750 static int process_message(sd_bus *bus, sd_bus_message *m) {
1756 r = process_reply(bus, m);
1760 r = process_filter(bus, m);
1764 r = process_builtin(bus, m);
1768 return process_object(bus, m);
1771 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1774 /* Returns 0 when we didn't do anything. This should cause the
1775 * caller to invoke sd_bus_wait() before returning the next
1776 * time. Returns > 0 when we did something, which possibly
1777 * means *ret is filled in with an unprocessed message. */
1784 if (bus->state == BUS_OPENING) {
1795 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1797 socklen_t slen = sizeof(error);
1799 r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1801 bus->last_connect_error = errno;
1802 else if (error != 0)
1803 bus->last_connect_error = error;
1804 else if (p.revents & (POLLERR|POLLHUP))
1805 bus->last_connect_error = ECONNREFUSED;
1807 r = bus_start_auth(bus);
1811 /* Try next address */
1812 r = bus_start_connect(bus);
1819 } else if (bus->state == BUS_AUTHENTICATING) {
1821 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1824 r = bus_write_auth(bus);
1828 r = bus_read_auth(bus);
1831 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1832 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1835 r = process_timeout(bus);
1839 r = dispatch_wqueue(bus);
1844 r = dispatch_rqueue(bus, &m);
1853 r = process_message(bus, m);
1863 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1864 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1865 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1867 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1869 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1873 r = sd_bus_send(bus, reply, NULL);
1881 assert_not_reached("Unknown state");
1890 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1901 e = sd_bus_get_events(bus);
1908 r = sd_bus_get_timeout(bus, &until);
1915 n = now(CLOCK_MONOTONIC);
1916 m = until > n ? until - n : 0;
1919 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1926 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1930 return r > 0 ? 1 : 0;
1933 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1939 if (bus->rqueue_size > 0)
1942 return bus_poll(bus, false, timeout_usec);
1945 int sd_bus_flush(sd_bus *bus) {
1953 r = bus_ensure_running(bus);
1957 if (bus->wqueue_size <= 0)
1961 r = dispatch_wqueue(bus);
1965 if (bus->wqueue_size <= 0)
1968 r = bus_poll(bus, false, (uint64_t) -1);
1974 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1975 struct filter_callback *f;
1982 f = new(struct filter_callback, 1);
1985 f->callback = callback;
1986 f->userdata = userdata;
1988 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1992 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1993 struct filter_callback *f;
2000 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2001 if (f->callback == callback && f->userdata == userdata) {
2002 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2011 static int bus_add_object(
2015 sd_message_handler_t callback,
2018 struct object_callback *c;
2028 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2032 c = new(struct object_callback, 1);
2036 c->path = strdup(path);
2042 c->callback = callback;
2043 c->userdata = userdata;
2044 c->is_fallback = fallback;
2046 r = hashmap_put(bus->object_callbacks, c->path, c);
2056 static int bus_remove_object(
2060 sd_message_handler_t callback,
2063 struct object_callback *c;
2072 c = hashmap_get(bus->object_callbacks, path);
2076 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2079 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2087 int sd_bus_add_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2088 return bus_add_object(bus, false, path, callback, userdata);
2091 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2092 return bus_remove_object(bus, false, path, callback, userdata);
2095 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2096 return bus_add_object(bus, true, prefix, callback, userdata);
2099 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2100 return bus_remove_object(bus, true, prefix, callback, userdata);