1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
35 #include "bus-internal.h"
36 #include "bus-message.h"
39 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
41 static void bus_free(sd_bus *b) {
42 struct filter_callback *f;
48 close_nointr_nofail(b->fd);
55 for (i = 0; i < b->rqueue_size; i++)
56 sd_bus_message_unref(b->rqueue[i]);
59 for (i = 0; i < b->wqueue_size; i++)
60 sd_bus_message_unref(b->wqueue[i]);
63 hashmap_free_free(b->reply_callbacks);
64 prioq_free(b->reply_callbacks_prioq);
66 while ((f = b->filter_callbacks)) {
67 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
74 static sd_bus* bus_new(void) {
83 r->message_version = 1;
85 /* We guarantee that wqueue always has space for at least one
87 r->wqueue = new(sd_bus_message*, 1);
96 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
107 r = sd_bus_message_read(reply, "s", &s);
111 if (!service_name_is_valid(s) || s[0] != ':')
114 bus->unique_name = strdup(s);
115 if (!bus->unique_name)
118 bus->state = BUS_RUNNING;
123 static int bus_send_hello(sd_bus *bus) {
124 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
129 r = sd_bus_message_new_method_call(
131 "org.freedesktop.DBus",
133 "org.freedesktop.DBus",
139 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
143 bus->sent_hello = true;
147 static int bus_start_running(sd_bus *bus) {
150 if (bus->sent_hello) {
151 bus->state = BUS_HELLO;
155 bus->state = BUS_RUNNING;
159 static int parse_address_key(const char **p, const char *key, char **value) {
170 if (strncmp(*p, key, l) != 0)
180 while (*a != ',' && *a != 0) {
198 c = (char) ((x << 4) | y);
205 t = realloc(r, n + 2);
230 static void skip_address_key(const char **p) {
234 *p += strcspn(*p, ",");
240 static int bus_parse_next_address(sd_bus *b) {
242 _cleanup_free_ char *guid = NULL;
249 if (b->address[b->address_index] == 0)
252 a = b->address + b->address_index;
255 b->sockaddr_size = 0;
256 b->peer = SD_ID128_NULL;
258 if (startswith(a, "unix:")) {
259 _cleanup_free_ char *path = NULL, *abstract = NULL;
263 r = parse_address_key(&p, "guid", &guid);
269 r = parse_address_key(&p, "path", &path);
275 r = parse_address_key(&p, "abstract", &abstract);
281 skip_address_key(&p);
284 if (!path && !abstract)
287 if (path && abstract)
294 if (l > sizeof(b->sockaddr.un.sun_path))
297 b->sockaddr.un.sun_family = AF_UNIX;
298 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
299 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
300 } else if (abstract) {
303 l = strlen(abstract);
304 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
307 b->sockaddr.un.sun_family = AF_UNIX;
308 b->sockaddr.un.sun_path[0] = 0;
309 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
310 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
313 } else if (startswith(a, "tcp:")) {
314 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
315 struct addrinfo hints, *result;
319 r = parse_address_key(&p, "guid", &guid);
325 r = parse_address_key(&p, "host", &host);
331 r = parse_address_key(&p, "port", &port);
337 r = parse_address_key(&p, "family", &family);
343 skip_address_key(&p);
350 hints.ai_socktype = SOCK_STREAM;
351 hints.ai_flags = AI_ADDRCONFIG;
354 if (streq(family, "ipv4"))
355 hints.ai_family = AF_INET;
356 else if (streq(family, "ipv6"))
357 hints.ai_family = AF_INET6;
362 r = getaddrinfo(host, port, &hints, &result);
366 return -EADDRNOTAVAIL;
368 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
369 b->sockaddr_size = result->ai_addrlen;
371 freeaddrinfo(result);
375 r = sd_id128_from_string(guid, &b->peer);
380 b->address_index = p - b->address;
384 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
387 struct iovec *i = iov + *idx;
389 if (i->iov_len > size) {
390 i->iov_base = (uint8_t*) i->iov_base + size;
404 static int bus_write_auth(sd_bus *b) {
409 assert(b->state == BUS_AUTHENTICATING);
411 if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
414 if (b->auth_timeout == 0)
415 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
418 mh.msg_iov = b->auth_iovec + b->auth_index;
419 mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
421 k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
423 return errno == EAGAIN ? 0 : -errno;
425 iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
430 static int bus_auth_verify(sd_bus *b) {
436 /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
439 e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
443 f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
447 if (e - (char*) b->rbuffer != 3 + 32)
450 if (memcmp(b->rbuffer, "OK ", 3))
453 for (i = 0; i < 32; i += 2) {
456 x = unhexchar(((char*) b->rbuffer)[3 + i]);
457 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
462 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
465 if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
466 !sd_id128_equal(b->peer, peer))
472 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
473 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
475 b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
476 memmove(b->rbuffer, f + 2, b->rbuffer_size);
478 r = bus_start_running(b);
485 static int bus_read_auth(sd_bus *b) {
495 r = bus_auth_verify(b);
499 n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
501 if (n > BUS_AUTH_SIZE_MAX)
502 n = BUS_AUTH_SIZE_MAX;
504 if (b->rbuffer_size >= n)
507 p = realloc(b->rbuffer, n);
514 iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
515 iov.iov_len = n - b->rbuffer_size;
521 k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
523 return errno == EAGAIN ? 0 : -errno;
525 b->rbuffer_size += k;
527 r = bus_auth_verify(b);
534 static int bus_setup_fd(sd_bus *b) {
539 /* Enable SO_PASSCRED + SO_PASSEC. We try this on any socket,
540 * just in case. This is actually irrelavant for */
542 setsockopt(b->fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
543 setsockopt(b->fd, SOL_SOCKET, SO_PASSSEC, &one, sizeof(one));
545 /* Increase the buffers to a MB */
546 fd_inc_rcvbuf(b->fd, 1024*1024);
547 fd_inc_sndbuf(b->fd, 1024*1024);
552 static int bus_start_auth(sd_bus *b) {
553 static const char auth_prefix[] = "\0AUTH EXTERNAL ";
554 static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
556 char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
561 b->state = BUS_AUTHENTICATING;
563 snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
567 b->auth_uid = hexmem(text, l);
571 b->auth_iovec[0].iov_base = (void*) auth_prefix;
572 b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
573 b->auth_iovec[1].iov_base = (void*) b->auth_uid;
574 b->auth_iovec[1].iov_len = l * 2;
575 b->auth_iovec[2].iov_base = (void*) auth_suffix;
576 b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
577 b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
579 return bus_write_auth(b);
582 static int bus_start_connect(sd_bus *b) {
589 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
590 r = bus_parse_next_address(b);
594 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
597 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
599 b->last_connect_error = errno;
605 b->last_connect_error = errno;
609 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
611 if (errno == EINPROGRESS)
614 b->last_connect_error = errno;
618 return bus_start_auth(b);
624 close_nointr_nofail(b->fd);
630 int sd_bus_open_system(sd_bus **ret) {
638 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
640 r = sd_bus_open_address(e, &b);
648 b->sockaddr.un.sun_family = AF_UNIX;
649 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
650 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
652 r = bus_start_connect(b);
659 r = bus_send_hello(b);
669 int sd_bus_open_user(sd_bus **ret) {
678 e = getenv("DBUS_SESSION_BUS_ADDRESS");
680 r = sd_bus_open_address(e, &b);
684 e = getenv("XDG_RUNTIME_DIR");
689 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
696 b->sockaddr.un.sun_family = AF_UNIX;
697 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
698 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
700 r = bus_start_connect(b);
707 r = bus_send_hello(b);
717 int sd_bus_open_address(const char *address, sd_bus **ret) {
730 b->address = strdup(address);
736 r = bus_start_connect(b);
746 int sd_bus_open_fd(int fd, sd_bus **ret) {
761 r = fd_nonblock(b->fd, true);
765 fd_cloexec(b->fd, true);
773 r = bus_start_auth(b);
785 void sd_bus_close(sd_bus *bus) {
791 close_nointr_nofail(bus->fd);
795 sd_bus *sd_bus_ref(sd_bus *bus) {
799 assert(bus->n_ref > 0);
805 sd_bus *sd_bus_unref(sd_bus *bus) {
809 assert(bus->n_ref > 0);
818 int sd_bus_is_open(sd_bus *bus) {
825 int sd_bus_can_send(sd_bus *bus, char type) {
831 if (type == SD_BUS_TYPE_UNIX_FD) {
832 r = bus_ensure_running(bus);
839 return bus_type_is_valid(type);
842 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
850 r = bus_ensure_running(bus);
858 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
861 if (m->header->version > b->message_version)
867 return bus_message_seal(m, ++b->serial);
870 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
880 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
885 n = m->n_iovec * sizeof(struct iovec);
887 memcpy(iov, m->iovec, n);
890 iovec_advance(iov, &j, *idx);
894 mh.msg_iovlen = m->n_iovec;
896 k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
898 return errno == EAGAIN ? 0 : -errno;
904 static int message_read_need(sd_bus *bus, size_t *need) {
911 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
913 if (bus->rbuffer_size < sizeof(struct bus_header)) {
914 *need = sizeof(struct bus_header) + 8;
916 /* Minimum message size:
920 * Method Call: +2 string headers
921 * Signal: +3 string headers
922 * Method Error: +1 string headers
924 * Method Reply: +1 uint32 headers
926 * A string header is at least 9 bytes
927 * A uint32 header is at least 8 bytes
929 * Hence the minimum message size of a valid message
930 * is header + 8 bytes */
935 a = ((const uint32_t*) bus->rbuffer)[1];
936 b = ((const uint32_t*) bus->rbuffer)[3];
938 e = ((const uint8_t*) bus->rbuffer)[0];
939 if (e == SD_BUS_LITTLE_ENDIAN) {
942 } else if (e == SD_BUS_BIG_ENDIAN) {
948 sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
949 if (sum >= BUS_MESSAGE_SIZE_MAX)
952 *need = (size_t) sum;
956 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
963 assert(bus->rbuffer_size >= size);
964 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
966 if (bus->rbuffer_size > size) {
967 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
974 r = bus_message_from_malloc(bus->rbuffer, size,
975 bus->ucred_valid ? &bus->ucred : NULL,
976 bus->label[0] ? bus->label : NULL, &t);
983 bus->rbuffer_size -= size;
989 static int message_read(sd_bus *bus, sd_bus_message **m) {
997 struct cmsghdr cmsghdr;
998 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
999 CMSG_SPACE(NAME_MAX)]; /*selinux label */
1001 struct cmsghdr *cmsg;
1005 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1007 r = message_read_need(bus, &need);
1011 if (bus->rbuffer_size >= need)
1012 return message_make(bus, need, m);
1014 b = realloc(bus->rbuffer, need);
1021 iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
1022 iov.iov_len = need - bus->rbuffer_size;
1027 mh.msg_control = &control;
1028 mh.msg_controllen = sizeof(control);
1030 k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL|MSG_CMSG_CLOEXEC);
1032 return errno == EAGAIN ? 0 : -errno;
1034 bus->rbuffer_size += k;
1035 bus->ucred_valid = false;
1038 for (cmsg = CMSG_FIRSTHDR(&mh); cmsg; cmsg = CMSG_NXTHDR(&mh, cmsg)) {
1039 if (cmsg->cmsg_level == SOL_SOCKET &&
1040 cmsg->cmsg_type == SCM_CREDENTIALS &&
1041 cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred))) {
1043 memcpy(&bus->ucred, CMSG_DATA(cmsg), sizeof(struct ucred));
1044 bus->ucred_valid = true;
1046 } else if (cmsg->cmsg_level == SOL_SOCKET &&
1047 cmsg->cmsg_type == SCM_SECURITY) {
1050 l = cmsg->cmsg_len - CMSG_LEN(0);
1051 memcpy(&bus->label, CMSG_DATA(cmsg), l);
1056 r = message_read_need(bus, &need);
1060 if (bus->rbuffer_size >= need)
1061 return message_make(bus, need, m);
1066 static int dispatch_wqueue(sd_bus *bus) {
1070 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1075 while (bus->wqueue_size > 0) {
1077 r = message_write(bus, bus->wqueue[0], &bus->windex);
1082 /* Didn't do anything this time */
1084 else if (bus->windex >= bus->wqueue[0]->size) {
1085 /* Fully written. Let's drop the entry from
1088 * This isn't particularly optimized, but
1089 * well, this is supposed to be our worst-case
1090 * buffer only, and the socket buffer is
1091 * supposed to be our primary buffer, and if
1092 * it got full, then all bets are off
1095 sd_bus_message_unref(bus->wqueue[0]);
1096 bus->wqueue_size --;
1097 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1107 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1108 sd_bus_message *z = NULL;
1113 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1118 if (bus->rqueue_size > 0) {
1119 /* Dispatch a queued message */
1121 *m = bus->rqueue[0];
1122 bus->rqueue_size --;
1123 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1127 /* Try to read a new message */
1129 r = message_read(bus, &z);
1144 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1154 /* If the serial number isn't kept, then we know that no reply
1156 if (!serial && !m->sealed)
1157 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1159 r = bus_seal_message(bus, m);
1163 /* If this is a reply and no reply was requested, then let's
1164 * suppress this, if we can */
1165 if (m->dont_send && !serial)
1168 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1171 r = message_write(bus, m, &idx);
1175 } else if (idx < m->size) {
1176 /* Wasn't fully written. So let's remember how
1177 * much was written. Note that the first entry
1178 * of the wqueue array is always allocated so
1179 * that we always can remember how much was
1181 bus->wqueue[0] = sd_bus_message_ref(m);
1182 bus->wqueue_size = 1;
1188 /* Just append it to the queue. */
1190 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1193 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1198 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1202 *serial = BUS_MESSAGE_SERIAL(m);
1207 static usec_t calc_elapse(uint64_t usec) {
1208 if (usec == (uint64_t) -1)
1212 usec = BUS_DEFAULT_TIMEOUT;
1214 return now(CLOCK_MONOTONIC) + usec;
1217 static int timeout_compare(const void *a, const void *b) {
1218 const struct reply_callback *x = a, *y = b;
1220 if (x->timeout != 0 && y->timeout == 0)
1223 if (x->timeout == 0 && y->timeout != 0)
1226 if (x->timeout < y->timeout)
1229 if (x->timeout > y->timeout)
1235 int sd_bus_send_with_reply(
1238 sd_message_handler_t callback,
1243 struct reply_callback *c;
1254 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1256 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1259 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1263 if (usec != (uint64_t) -1) {
1264 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1269 r = bus_seal_message(bus, m);
1273 c = new(struct reply_callback, 1);
1277 c->callback = callback;
1278 c->userdata = userdata;
1279 c->serial = BUS_MESSAGE_SERIAL(m);
1280 c->timeout = calc_elapse(usec);
1282 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1288 if (c->timeout != 0) {
1289 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1292 sd_bus_send_with_reply_cancel(bus, c->serial);
1297 r = sd_bus_send(bus, m, serial);
1299 sd_bus_send_with_reply_cancel(bus, c->serial);
1306 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1307 struct reply_callback *c;
1314 c = hashmap_remove(bus->reply_callbacks, &serial);
1318 if (c->timeout != 0)
1319 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1325 int bus_ensure_running(sd_bus *bus) {
1330 if (bus->state == BUS_RUNNING)
1334 r = sd_bus_process(bus, NULL);
1337 if (bus->state == BUS_RUNNING)
1342 r = sd_bus_wait(bus, (uint64_t) -1);
1348 int sd_bus_send_with_reply_and_block(
1352 sd_bus_error *error,
1353 sd_bus_message **reply) {
1366 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1368 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1370 if (bus_error_is_dirty(error))
1373 r = bus_ensure_running(bus);
1377 r = sd_bus_send(bus, m, &serial);
1381 timeout = calc_elapse(usec);
1385 sd_bus_message *incoming = NULL;
1390 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1393 /* Make sure there's room for queuing this
1394 * locally, before we read the message */
1396 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1404 r = message_read(bus, &incoming);
1409 if (incoming->reply_serial == serial) {
1410 /* Found a match! */
1412 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1417 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1420 r = sd_bus_error_copy(error, &incoming->error);
1422 sd_bus_message_unref(incoming);
1426 k = bus_error_to_errno(&incoming->error);
1427 sd_bus_message_unref(incoming);
1431 sd_bus_message_unref(incoming);
1435 /* There's already guaranteed to be room for
1436 * this, so need to resize things here */
1437 bus->rqueue[bus->rqueue_size ++] = incoming;
1440 /* Try to read more, right-away */
1449 n = now(CLOCK_MONOTONIC);
1455 left = (uint64_t) -1;
1457 r = bus_poll(bus, true, left);
1461 r = dispatch_wqueue(bus);
1467 int sd_bus_get_fd(sd_bus *bus) {
1477 int sd_bus_get_events(sd_bus *bus) {
1485 if (bus->state == BUS_OPENING)
1487 else if (bus->state == BUS_AUTHENTICATING) {
1489 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1494 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1495 if (bus->rqueue_size <= 0)
1497 if (bus->wqueue_size > 0)
1504 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1505 struct reply_callback *c;
1514 if (bus->state == BUS_AUTHENTICATING) {
1515 *timeout_usec = bus->auth_timeout;
1519 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1522 c = prioq_peek(bus->reply_callbacks_prioq);
1526 *timeout_usec = c->timeout;
1530 static int process_timeout(sd_bus *bus) {
1531 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1532 struct reply_callback *c;
1538 c = prioq_peek(bus->reply_callbacks_prioq);
1542 n = now(CLOCK_MONOTONIC);
1546 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1547 hashmap_remove(bus->reply_callbacks, &c->serial);
1549 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1552 return r < 0 ? r : 1;
1555 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1556 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1562 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1565 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1568 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1571 if (streq_ptr(m->member, "Ping"))
1572 r = sd_bus_message_new_method_return(bus, m, &reply);
1573 else if (streq_ptr(m->member, "GetMachineId")) {
1577 r = sd_id128_get_machine(&id);
1581 r = sd_bus_message_new_method_return(bus, m, &reply);
1585 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1587 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1589 sd_bus_error_set(&error,
1590 "org.freedesktop.DBus.Error.UnknownMethod",
1591 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1593 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1599 r = sd_bus_send(bus, reply, NULL);
1606 static int process_message(sd_bus *bus, sd_bus_message *m) {
1607 struct filter_callback *l;
1613 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1614 struct reply_callback *c;
1616 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1618 if (c->timeout != 0)
1619 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1621 r = c->callback(bus, 0, m, c->userdata);
1629 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1630 r = l->callback(bus, 0, m, l->userdata);
1635 return process_builtin(bus, m);
1638 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1641 /* Returns 0 when we didn't do anything. This should cause the
1642 * caller to invoke sd_bus_wait() before returning the next
1643 * time. Returns > 0 when we did something, which possibly
1644 * means *ret is filled in with an unprocessed message. */
1651 if (bus->state == BUS_OPENING) {
1662 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1664 socklen_t slen = sizeof(error);
1666 r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1668 bus->last_connect_error = errno;
1669 else if (error != 0)
1670 bus->last_connect_error = error;
1671 else if (p.revents & (POLLERR|POLLHUP))
1672 bus->last_connect_error = ECONNREFUSED;
1674 r = bus_start_auth(bus);
1678 /* Try next address */
1679 r = bus_start_connect(bus);
1686 } else if (bus->state == BUS_AUTHENTICATING) {
1688 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1691 r = bus_write_auth(bus);
1695 r = bus_read_auth(bus);
1698 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1699 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1702 r = process_timeout(bus);
1706 r = dispatch_wqueue(bus);
1711 r = dispatch_rqueue(bus, &m);
1720 r = process_message(bus, m);
1730 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1731 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1732 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1734 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1736 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1740 r = sd_bus_send(bus, reply, NULL);
1748 assert_not_reached("Unknown state");
1757 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1768 e = sd_bus_get_events(bus);
1775 r = sd_bus_get_timeout(bus, &until);
1782 n = now(CLOCK_MONOTONIC);
1783 m = until > n ? until - n : 0;
1786 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1793 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1797 return r > 0 ? 1 : 0;
1800 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1806 if (bus->rqueue_size > 0)
1809 return bus_poll(bus, false, timeout_usec);
1812 int sd_bus_flush(sd_bus *bus) {
1820 r = bus_ensure_running(bus);
1824 if (bus->wqueue_size <= 0)
1828 r = dispatch_wqueue(bus);
1832 if (bus->wqueue_size <= 0)
1835 r = bus_poll(bus, false, (uint64_t) -1);
1841 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1842 struct filter_callback *f;
1849 f = new(struct filter_callback, 1);
1852 f->callback = callback;
1853 f->userdata = userdata;
1855 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1859 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1860 struct filter_callback *f;
1867 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1868 if (f->callback == callback && f->userdata == userdata) {
1869 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);