1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
34 #include "bus-internal.h"
35 #include "bus-message.h"
38 #define WQUEUE_MAX 128
40 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
42 static void bus_free(sd_bus *b) {
43 struct filter_callback *f;
49 close_nointr_nofail(b->fd);
56 for (i = 0; i < b->rqueue_size; i++)
57 sd_bus_message_unref(b->rqueue[i]);
60 for (i = 0; i < b->wqueue_size; i++)
61 sd_bus_message_unref(b->wqueue[i]);
64 hashmap_free_free(b->reply_callbacks);
65 prioq_free(b->reply_callbacks_prioq);
67 while ((f = b->filter_callbacks)) {
68 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
75 static sd_bus* bus_new(void) {
84 r->message_version = 1;
86 /* We guarantee that wqueue always has space for at least one
88 r->wqueue = new(sd_bus_message*, 1);
97 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
108 bus->state = BUS_RUNNING;
110 r = sd_bus_message_read(reply, "s", &s);
114 bus->unique_name = strdup(s);
115 if (!bus->unique_name)
121 static int bus_send_hello(sd_bus *bus) {
122 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
127 r = sd_bus_message_new_method_call(
129 "org.freedesktop.DBus",
131 "org.freedesktop.DBus",
137 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
141 bus->sent_hello = true;
145 static int bus_start_running(sd_bus *bus) {
148 if (bus->sent_hello) {
149 bus->state = BUS_HELLO;
153 bus->state = BUS_RUNNING;
157 static int parse_address_key(const char **p, const char *key, char **value) {
168 if (strncmp(*p, key, l) != 0)
178 while (*a != ',' && *a != 0) {
196 c = (char) ((x << 4) | y);
203 t = realloc(r, n + 2);
228 static void skip_address_key(const char **p) {
232 *p += strcspn(*p, ",");
238 static int bus_parse_next_address(sd_bus *b) {
240 _cleanup_free_ char *guid = NULL;
247 if (b->address[b->address_index] == 0)
250 a = b->address + b->address_index;
253 b->sockaddr_size = 0;
254 b->peer = SD_ID128_NULL;
256 if (startswith(a, "unix:")) {
257 _cleanup_free_ char *path = NULL, *abstract = NULL;
261 r = parse_address_key(&p, "guid", &guid);
267 r = parse_address_key(&p, "path", &path);
273 r = parse_address_key(&p, "abstract", &abstract);
279 skip_address_key(&p);
282 if (!path && !abstract)
285 if (path && abstract)
292 if (l > sizeof(b->sockaddr.un.sun_path))
295 b->sockaddr.un.sun_family = AF_UNIX;
296 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
297 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
298 } else if (abstract) {
301 l = strlen(abstract);
302 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
305 b->sockaddr.un.sun_family = AF_UNIX;
306 b->sockaddr.un.sun_path[0] = 0;
307 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
308 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
311 } else if (startswith(a, "tcp:")) {
312 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
313 struct addrinfo hints, *result;
317 r = parse_address_key(&p, "guid", &guid);
323 r = parse_address_key(&p, "host", &host);
329 r = parse_address_key(&p, "port", &port);
335 r = parse_address_key(&p, "family", &family);
341 skip_address_key(&p);
348 hints.ai_socktype = SOCK_STREAM;
349 hints.ai_flags = AI_ADDRCONFIG;
352 if (streq(family, "ipv4"))
353 hints.ai_family = AF_INET;
354 else if (streq(family, "ipv6"))
355 hints.ai_family = AF_INET6;
360 r = getaddrinfo(host, port, &hints, &result);
364 return -EADDRNOTAVAIL;
366 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
367 b->sockaddr_size = result->ai_addrlen;
369 freeaddrinfo(result);
373 r = sd_id128_from_string(guid, &b->peer);
378 b->address_index = p - b->address;
382 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
385 struct iovec *i = iov + *idx;
387 if (i->iov_len > size) {
388 i->iov_base = (uint8_t*) i->iov_base + size;
402 static int bus_write_auth(sd_bus *b) {
407 assert(b->state == BUS_AUTHENTICATING);
409 if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
412 if (b->auth_timeout == 0)
413 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
416 mh.msg_iov = b->auth_iovec + b->auth_index;
417 mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
419 k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
421 return errno == EAGAIN ? 0 : -errno;
423 iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
428 static int bus_auth_verify(sd_bus *b) {
434 /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
437 e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
441 f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
445 if (e - (char*) b->rbuffer != 3 + 32)
448 if (memcmp(b->rbuffer, "OK ", 3))
451 for (i = 0; i < 32; i += 2) {
454 x = unhexchar(((char*) b->rbuffer)[3 + i]);
455 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
460 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
463 if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
464 !sd_id128_equal(b->peer, peer))
470 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
471 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
473 b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
474 memmove(b->rbuffer, f + 2, b->rbuffer_size);
476 r = bus_start_running(b);
483 static int bus_read_auth(sd_bus *b) {
493 r = bus_auth_verify(b);
497 n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
498 p = realloc(b->rbuffer, n);
505 iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
506 iov.iov_len = n - b->rbuffer_size;
512 k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
514 return errno == EAGAIN ? 0 : -errno;
516 b->rbuffer_size += k;
518 r = bus_auth_verify(b);
525 static int bus_start_auth(sd_bus *b) {
526 static const char auth_prefix[] = "\0AUTH EXTERNAL ";
527 static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
529 char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
534 b->state = BUS_AUTHENTICATING;
536 snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
540 b->auth_uid = hexmem(text, l);
544 b->auth_iovec[0].iov_base = (void*) auth_prefix;
545 b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
546 b->auth_iovec[1].iov_base = (void*) b->auth_uid;
547 b->auth_iovec[1].iov_len = l * 2;
548 b->auth_iovec[2].iov_base = (void*) auth_suffix;
549 b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
550 b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
552 return bus_write_auth(b);
555 static int bus_start_connect(sd_bus *b) {
562 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
563 r = bus_parse_next_address(b);
567 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
570 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
572 b->last_connect_error = errno;
577 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
579 if (errno == EINPROGRESS)
582 b->last_connect_error = errno;
583 close_nointr_nofail(b->fd);
589 return bus_start_auth(b);
593 int sd_bus_open_system(sd_bus **ret) {
601 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
603 r = sd_bus_open_address(e, &b);
611 b->sockaddr.un.sun_family = AF_UNIX;
612 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
613 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
615 r = bus_start_connect(b);
622 r = bus_send_hello(b);
632 int sd_bus_open_user(sd_bus **ret) {
641 e = getenv("DBUS_SESSION_BUS_ADDRESS");
643 r = sd_bus_open_address(e, &b);
647 e = getenv("XDG_RUNTIME_DIR");
652 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
659 b->sockaddr.un.sun_family = AF_UNIX;
660 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
661 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
663 r = bus_start_connect(b);
670 r = bus_send_hello(b);
680 int sd_bus_open_address(const char *address, sd_bus **ret) {
693 b->address = strdup(address);
699 r = bus_start_connect(b);
709 int sd_bus_open_fd(int fd, sd_bus **ret) {
723 fd_nonblock(b->fd, true);
724 fd_cloexec(b->fd, true);
726 r = bus_start_auth(b);
736 void sd_bus_close(sd_bus *bus) {
742 close_nointr_nofail(bus->fd);
746 sd_bus *sd_bus_ref(sd_bus *bus) {
750 assert(bus->n_ref > 0);
756 sd_bus *sd_bus_unref(sd_bus *bus) {
760 assert(bus->n_ref > 0);
769 int sd_bus_is_open(sd_bus *bus) {
776 int sd_bus_is_running(sd_bus *bus) {
783 return bus->state == BUS_RUNNING;
786 int sd_bus_can_send(sd_bus *bus, char type) {
790 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
793 if (type == SD_BUS_TYPE_UNIX_FD)
796 return bus_type_is_valid(type);
799 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
802 if (m->header->version > b->message_version)
808 return bus_message_seal(m, ++b->serial);
811 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
821 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
826 n = m->n_iovec * sizeof(struct iovec);
828 memcpy(iov, m->iovec, n);
831 iovec_advance(iov, &j, *idx);
835 mh.msg_iovlen = m->n_iovec;
837 k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
839 return errno == EAGAIN ? 0 : -errno;
845 static int message_read_need(sd_bus *bus, size_t *need) {
851 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
853 if (bus->rbuffer_size < sizeof(struct bus_header)) {
854 *need = sizeof(struct bus_header) + 8;
856 /* Minimum message size:
860 * Method Call: +2 string headers
861 * Signal: +3 string headers
862 * Method Error: +1 string headers
864 * Method Reply: +1 uint32 headers
866 * A string header is at least 9 bytes
867 * A uint32 header is at least 8 bytes
869 * Hence the minimum message size of a valid message
870 * is header + 8 bytes */
875 a = ((const uint32_t*) bus->rbuffer)[1];
876 b = ((const uint32_t*) bus->rbuffer)[3];
878 e = ((const uint8_t*) bus->rbuffer)[0];
879 if (e == SD_BUS_LITTLE_ENDIAN) {
882 } else if (e == SD_BUS_BIG_ENDIAN) {
888 *need = sizeof(struct bus_header) + ALIGN_TO(b, 8) + a;
892 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
899 assert(bus->rbuffer_size >= size);
900 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
902 if (bus->rbuffer_size > size) {
903 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
910 r = bus_message_from_malloc(bus->rbuffer, size, &t);
917 bus->rbuffer_size -= size;
923 static int message_read(sd_bus *bus, sd_bus_message **m) {
933 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
935 r = message_read_need(bus, &need);
939 if (bus->rbuffer_size >= need)
940 return message_make(bus, need, m);
942 b = realloc(bus->rbuffer, need);
949 iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
950 iov.iov_len = need - bus->rbuffer_size;
956 k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
958 return errno == EAGAIN ? 0 : -errno;
960 bus->rbuffer_size += k;
962 r = message_read_need(bus, &need);
966 if (bus->rbuffer_size >= need)
967 return message_make(bus, need, m);
972 static int dispatch_wqueue(sd_bus *bus) {
976 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
981 while (bus->wqueue_size > 0) {
983 r = message_write(bus, bus->wqueue[0], &bus->windex);
988 /* Didn't do anything this time */
990 else if (bus->windex >= bus->wqueue[0]->size) {
991 /* Fully written. Let's drop the entry from
994 * This isn't particularly optimized, but
995 * well, this is supposed to be our worst-case
996 * buffer only, and the socket buffer is
997 * supposed to be our primary buffer, and if
998 * it got full, then all bets are off
1001 sd_bus_message_unref(bus->wqueue[0]);
1002 bus->wqueue_size --;
1003 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1013 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1019 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1024 if (bus->rqueue_size > 0) {
1025 /* Dispatch a queued message */
1027 *m = bus->rqueue[0];
1028 bus->rqueue_size --;
1029 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1033 /* Try to read a new message */
1035 r = message_read(bus, &z);
1050 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1060 r = bus_seal_message(bus, m);
1064 /* If this is a reply and no reply was requested, then let's
1065 * suppress this, if we can */
1066 if (m->dont_send && !serial)
1069 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1072 r = message_write(bus, m, &idx);
1076 } else if (idx < m->size) {
1077 /* Wasn't fully written. So let's remember how
1078 * much was written. Note that the first entry
1079 * of the wqueue array is always allocated so
1080 * that we always can remember how much was
1082 bus->wqueue[0] = sd_bus_message_ref(m);
1083 bus->wqueue_size = 1;
1089 /* Just append it to the queue. */
1091 if (bus->wqueue_size >= WQUEUE_MAX)
1094 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1099 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1103 *serial = BUS_MESSAGE_SERIAL(m);
1108 static usec_t calc_elapse(uint64_t usec) {
1109 if (usec == (uint64_t) -1)
1113 usec = BUS_DEFAULT_TIMEOUT;
1115 return now(CLOCK_MONOTONIC) + usec;
1118 static int timeout_compare(const void *a, const void *b) {
1119 const struct reply_callback *x = a, *y = b;
1121 if (x->timeout != 0 && y->timeout == 0)
1124 if (x->timeout == 0 && y->timeout != 0)
1127 if (x->timeout < y->timeout)
1130 if (x->timeout > y->timeout)
1136 int sd_bus_send_with_reply(
1139 sd_message_handler_t callback,
1144 struct reply_callback *c;
1155 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1157 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1160 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1164 if (usec != (uint64_t) -1) {
1165 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1170 r = bus_seal_message(bus, m);
1174 c = new(struct reply_callback, 1);
1178 c->callback = callback;
1179 c->userdata = userdata;
1180 c->serial = BUS_MESSAGE_SERIAL(m);
1181 c->timeout = calc_elapse(usec);
1183 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1189 if (c->timeout != 0) {
1190 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1193 sd_bus_send_with_reply_cancel(bus, c->serial);
1198 r = sd_bus_send(bus, m, serial);
1200 sd_bus_send_with_reply_cancel(bus, c->serial);
1207 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1208 struct reply_callback *c;
1215 c = hashmap_remove(bus->reply_callbacks, &serial);
1219 if (c->timeout != 0)
1220 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1226 static int ensure_running(sd_bus *bus) {
1231 r = sd_bus_is_running(bus);
1238 r = sd_bus_process(bus, NULL);
1243 k = sd_bus_is_running(bus);
1250 r = sd_bus_wait(bus, (uint64_t) -1);
1256 int sd_bus_send_with_reply_and_block(
1260 sd_bus_error *error,
1261 sd_bus_message **reply) {
1274 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1276 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1278 if (bus_error_is_dirty(error))
1281 r = ensure_running(bus);
1285 r = sd_bus_send(bus, m, &serial);
1289 timeout = calc_elapse(usec);
1293 sd_bus_message *incoming = NULL;
1298 /* Make sure there's room for queuing this
1299 * locally, before we read the message */
1301 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1309 r = message_read(bus, &incoming);
1314 if (incoming->reply_serial == serial) {
1315 /* Found a match! */
1317 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1322 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1325 r = sd_bus_error_copy(error, &incoming->error);
1327 sd_bus_message_unref(incoming);
1331 k = bus_error_to_errno(&incoming->error);
1332 sd_bus_message_unref(incoming);
1336 sd_bus_message_unref(incoming);
1340 /* There's already guaranteed to be room for
1341 * this, so need to resize things here */
1342 bus->rqueue[bus->rqueue_size ++] = incoming;
1345 /* Try to read more, right-away */
1354 n = now(CLOCK_MONOTONIC);
1360 left = (uint64_t) -1;
1362 r = bus_poll(bus, true, left);
1366 r = dispatch_wqueue(bus);
1372 int sd_bus_get_fd(sd_bus *bus) {
1382 int sd_bus_get_events(sd_bus *bus) {
1390 if (bus->state == BUS_OPENING)
1392 else if (bus->state == BUS_AUTHENTICATING) {
1394 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1399 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1400 if (bus->rqueue_size <= 0)
1402 if (bus->wqueue_size > 0)
1409 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1410 struct reply_callback *c;
1419 if (bus->state == BUS_AUTHENTICATING) {
1420 *timeout_usec = bus->auth_timeout;
1424 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1427 c = prioq_peek(bus->reply_callbacks_prioq);
1431 *timeout_usec = c->timeout;
1435 static int process_timeout(sd_bus *bus) {
1436 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1437 struct reply_callback *c;
1443 c = prioq_peek(bus->reply_callbacks_prioq);
1447 n = now(CLOCK_MONOTONIC);
1451 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1452 hashmap_remove(bus->reply_callbacks, &c->serial);
1454 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1457 return r < 0 ? r : 1;
1460 static int process_message(sd_bus *bus, sd_bus_message *m) {
1461 struct filter_callback *l;
1467 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1468 struct reply_callback *c;
1470 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1472 if (c->timeout != 0)
1473 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1475 r = c->callback(bus, 0, m, c->userdata);
1483 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1484 r = l->callback(bus, 0, m, l->userdata);
1492 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1495 /* Returns 0 when we didn't do anything. This should cause the
1496 * caller to invoke sd_bus_wait() before returning the next
1497 * time. Returns > 0 when we did something, which possibly
1498 * means *ret is filled in with an unprocessed message. */
1505 if (bus->state == BUS_OPENING) {
1516 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1518 socklen_t slen = sizeof(error);
1520 r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1522 bus->last_connect_error = errno;
1523 else if (error != 0)
1524 bus->last_connect_error = error;
1525 else if (p.revents & (POLLERR|POLLHUP))
1526 bus->last_connect_error = ECONNREFUSED;
1528 r = bus_start_auth(bus);
1532 /* Try next address */
1533 r = bus_start_connect(bus);
1540 } else if (bus->state == BUS_AUTHENTICATING) {
1542 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1545 r = bus_write_auth(bus);
1549 r = bus_read_auth(bus);
1552 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1553 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1556 r = process_timeout(bus);
1560 r = dispatch_wqueue(bus);
1565 r = dispatch_rqueue(bus, &m);
1574 r = process_message(bus, m);
1584 if (sd_bus_message_is_method_call(m, NULL, NULL)) {
1585 const sd_bus_error e = SD_BUS_ERROR_INIT_CONST("org.freedesktop.DBus.Error.UnknownObject", "Unknown object.");
1586 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1588 r = sd_bus_message_new_method_error(bus, m, &e, &reply);
1592 r = sd_bus_send(bus, reply, NULL);
1600 assert_not_reached("Unknown state");
1609 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1620 e = sd_bus_get_events(bus);
1627 r = sd_bus_get_timeout(bus, &until);
1634 n = now(CLOCK_MONOTONIC);
1635 m = until > n ? until - n : 0;
1638 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1645 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1649 return r > 0 ? 1 : 0;
1652 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1658 if (bus->rqueue_size > 0)
1661 return bus_poll(bus, false, timeout_usec);
1664 int sd_bus_flush(sd_bus *bus) {
1672 r = ensure_running(bus);
1676 if (bus->wqueue_size <= 0)
1680 r = dispatch_wqueue(bus);
1684 if (bus->wqueue_size <= 0)
1687 r = bus_poll(bus, false, (uint64_t) -1);
1693 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1694 struct filter_callback *f;
1701 f = new(struct filter_callback, 1);
1704 f->callback = callback;
1705 f->userdata = userdata;
1707 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1711 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1712 struct filter_callback *f;
1719 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1720 if (f->callback == callback && f->userdata == userdata) {
1721 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);