1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
34 #include "bus-internal.h"
35 #include "bus-message.h"
38 #define WQUEUE_MAX 128
40 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
42 static void bus_free(sd_bus *b) {
43 struct filter_callback *f;
49 close_nointr_nofail(b->fd);
56 for (i = 0; i < b->rqueue_size; i++)
57 sd_bus_message_unref(b->rqueue[i]);
60 for (i = 0; i < b->wqueue_size; i++)
61 sd_bus_message_unref(b->wqueue[i]);
64 hashmap_free_free(b->reply_callbacks);
65 prioq_free(b->reply_callbacks_prioq);
67 while ((f = b->filter_callbacks)) {
68 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
75 static sd_bus* bus_new(void) {
84 r->message_version = 1;
86 /* We guarantee that wqueue always has space for at least one
88 r->wqueue = new(sd_bus_message*, 1);
97 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
108 bus->state = BUS_RUNNING;
110 r = sd_bus_message_read(reply, "s", &s);
114 bus->unique_name = strdup(s);
115 if (!bus->unique_name)
121 static int bus_send_hello(sd_bus *bus) {
122 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
127 r = sd_bus_message_new_method_call(
129 "org.freedesktop.DBus",
131 "org.freedesktop.DBus",
137 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
141 bus->sent_hello = true;
145 static int bus_start_running(sd_bus *bus) {
148 if (bus->sent_hello) {
149 bus->state = BUS_HELLO;
153 bus->state = BUS_RUNNING;
157 static int parse_address_key(const char **p, const char *key, char **value) {
168 if (strncmp(*p, key, l) != 0)
178 while (*a != ',' && *a != 0) {
196 c = (char) ((x << 4) | y);
203 t = realloc(r, n + 2);
228 static void skip_address_key(const char **p) {
232 *p += strcspn(*p, ",");
238 static int bus_parse_next_address(sd_bus *b) {
240 _cleanup_free_ char *guid = NULL;
247 if (b->address[b->address_index] == 0)
250 a = b->address + b->address_index;
253 b->sockaddr_size = 0;
254 b->peer = SD_ID128_NULL;
256 if (startswith(a, "unix:")) {
257 _cleanup_free_ char *path = NULL, *abstract = NULL;
261 r = parse_address_key(&p, "guid", &guid);
267 r = parse_address_key(&p, "path", &path);
273 r = parse_address_key(&p, "abstract", &abstract);
279 skip_address_key(&p);
282 if (!path && !abstract)
285 if (path && abstract)
292 if (l > sizeof(b->sockaddr.un.sun_path))
295 b->sockaddr.un.sun_family = AF_UNIX;
296 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
297 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
298 } else if (abstract) {
301 l = strlen(abstract);
302 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
305 b->sockaddr.un.sun_family = AF_UNIX;
306 b->sockaddr.un.sun_path[0] = 0;
307 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
308 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
311 } else if (startswith(a, "tcp:")) {
312 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
313 struct addrinfo hints, *result;
317 r = parse_address_key(&p, "guid", &guid);
323 r = parse_address_key(&p, "host", &host);
329 r = parse_address_key(&p, "port", &port);
335 r = parse_address_key(&p, "family", &family);
341 skip_address_key(&p);
348 hints.ai_socktype = SOCK_STREAM;
349 hints.ai_flags = AI_ADDRCONFIG;
352 if (streq(family, "ipv4"))
353 hints.ai_family = AF_INET;
354 else if (streq(family, "ipv6"))
355 hints.ai_family = AF_INET6;
360 r = getaddrinfo(host, port, &hints, &result);
364 return -EADDRNOTAVAIL;
366 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
367 b->sockaddr_size = result->ai_addrlen;
369 freeaddrinfo(result);
373 r = sd_id128_from_string(guid, &b->peer);
378 b->address_index = p - b->address;
382 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
385 struct iovec *i = iov + *idx;
387 if (i->iov_len > size) {
388 i->iov_base = (uint8_t*) i->iov_base + size;
402 static int bus_write_auth(sd_bus *b) {
407 assert(b->state == BUS_AUTHENTICATING);
409 if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
412 if (b->auth_timeout == 0)
413 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
416 mh.msg_iov = b->auth_iovec + b->auth_index;
417 mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
419 k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
421 return errno == EAGAIN ? 0 : -errno;
423 iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
428 static int bus_auth_verify(sd_bus *b) {
434 /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
437 e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
441 f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
445 if (e - (char*) b->rbuffer != 3 + 32)
448 if (memcmp(b->rbuffer, "OK ", 3))
451 for (i = 0; i < 32; i += 2) {
454 x = unhexchar(((char*) b->rbuffer)[3 + i]);
455 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
460 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
463 if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
464 !sd_id128_equal(b->peer, peer))
470 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
471 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
473 b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
474 memmove(b->rbuffer, f + 2, b->rbuffer_size);
476 r = bus_start_running(b);
483 static int bus_read_auth(sd_bus *b) {
493 r = bus_auth_verify(b);
497 n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
498 p = realloc(b->rbuffer, n);
505 iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
506 iov.iov_len = n - b->rbuffer_size;
512 k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
514 return errno == EAGAIN ? 0 : -errno;
516 b->rbuffer_size += k;
518 r = bus_auth_verify(b);
525 static int bus_start_auth(sd_bus *b) {
526 static const char auth_prefix[] = "\0AUTH EXTERNAL ";
527 static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
529 char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
534 b->state = BUS_AUTHENTICATING;
536 snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
540 b->auth_uid = hexmem(text, l);
544 b->auth_iovec[0].iov_base = (void*) auth_prefix;
545 b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
546 b->auth_iovec[1].iov_base = (void*) b->auth_uid;
547 b->auth_iovec[1].iov_len = l * 2;
548 b->auth_iovec[2].iov_base = (void*) auth_suffix;
549 b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
550 b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
552 return bus_write_auth(b);
555 static int bus_start_connect(sd_bus *b) {
562 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
563 r = bus_parse_next_address(b);
567 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
570 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
572 b->last_connect_error = errno;
577 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
579 if (errno == EINPROGRESS)
582 b->last_connect_error = errno;
583 close_nointr_nofail(b->fd);
589 return bus_start_auth(b);
593 int sd_bus_open_system(sd_bus **ret) {
601 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
603 r = sd_bus_open_address(e, &b);
611 b->sockaddr.un.sun_family = AF_UNIX;
612 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
613 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
615 r = bus_start_connect(b);
622 r = bus_send_hello(b);
632 int sd_bus_open_user(sd_bus **ret) {
641 e = getenv("DBUS_SESSION_BUS_ADDRESS");
643 r = sd_bus_open_address(e, &b);
647 e = getenv("XDG_RUNTIME_DIR");
652 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
659 b->sockaddr.un.sun_family = AF_UNIX;
660 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
661 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
663 r = bus_start_connect(b);
670 r = bus_send_hello(b);
680 int sd_bus_open_address(const char *address, sd_bus **ret) {
693 b->address = strdup(address);
699 r = bus_start_connect(b);
709 int sd_bus_open_fd(int fd, sd_bus **ret) {
723 fd_nonblock(b->fd, true);
724 fd_cloexec(b->fd, true);
726 r = bus_start_auth(b);
736 void sd_bus_close(sd_bus *bus) {
742 close_nointr_nofail(bus->fd);
746 sd_bus *sd_bus_ref(sd_bus *bus) {
750 assert(bus->n_ref > 0);
756 sd_bus *sd_bus_unref(sd_bus *bus) {
760 assert(bus->n_ref > 0);
769 int sd_bus_is_open(sd_bus *bus) {
776 int sd_bus_is_running(sd_bus *bus) {
783 return bus->state == BUS_RUNNING;
786 int sd_bus_can_send(sd_bus *bus, char type) {
790 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
793 if (type == SD_BUS_TYPE_UNIX_FD)
796 return bus_type_is_valid(type);
799 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
802 if (m->header->version > b->message_version)
808 return bus_message_seal(m, ++b->serial);
811 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
821 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
826 n = m->n_iovec * sizeof(struct iovec);
828 memcpy(iov, m->iovec, n);
831 iovec_advance(iov, &j, *idx);
835 mh.msg_iovlen = m->n_iovec;
837 k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
839 return errno == EAGAIN ? 0 : -errno;
845 static int message_read_need(sd_bus *bus, size_t *need) {
851 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
853 if (bus->rbuffer_size < sizeof(struct bus_header)) {
854 *need = sizeof(struct bus_header) + 8;
856 /* Minimum message size:
860 * Method Call: +2 string headers
861 * Signal: +3 string headers
862 * Method Error: +1 string headers
864 * Method Reply: +1 uint32 headers
866 * A string header is at least 9 bytes
867 * A uint32 header is at least 8 bytes
869 * Hence the minimum message size of a valid message
870 * is header + 8 bytes */
875 a = ((const uint32_t*) bus->rbuffer)[1];
876 b = ((const uint32_t*) bus->rbuffer)[3];
878 e = ((const uint8_t*) bus->rbuffer)[0];
879 if (e == SD_BUS_LITTLE_ENDIAN) {
882 } else if (e == SD_BUS_BIG_ENDIAN) {
888 *need = sizeof(struct bus_header) + ALIGN_TO(b, 8) + a;
892 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
899 assert(bus->rbuffer_size >= size);
900 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
902 if (bus->rbuffer_size > size) {
903 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
910 r = bus_message_from_malloc(bus->rbuffer, size, &t);
917 bus->rbuffer_size -= size;
923 static int message_read(sd_bus *bus, sd_bus_message **m) {
933 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
935 r = message_read_need(bus, &need);
939 if (bus->rbuffer_size >= need)
940 return message_make(bus, need, m);
942 b = realloc(bus->rbuffer, need);
949 iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
950 iov.iov_len = need - bus->rbuffer_size;
956 k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
958 return errno == EAGAIN ? 0 : -errno;
960 bus->rbuffer_size += k;
962 r = message_read_need(bus, &need);
966 if (bus->rbuffer_size >= need)
967 return message_make(bus, need, m);
972 static int dispatch_wqueue(sd_bus *bus) {
976 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
981 while (bus->wqueue_size > 0) {
983 r = message_write(bus, bus->wqueue[0], &bus->windex);
988 /* Didn't do anything this time */
990 else if (bus->windex >= bus->wqueue[0]->size) {
991 /* Fully written. Let's drop the entry from
994 * This isn't particularly optimized, but
995 * well, this is supposed to be our worst-case
996 * buffer only, and the socket buffer is
997 * supposed to be our primary buffer, and if
998 * it got full, then all bets are off
1001 sd_bus_message_unref(bus->wqueue[0]);
1002 bus->wqueue_size --;
1003 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1013 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1019 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1024 if (bus->rqueue_size > 0) {
1025 /* Dispatch a queued message */
1027 *m = bus->rqueue[0];
1028 bus->rqueue_size --;
1029 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1033 /* Try to read a new message */
1035 r = message_read(bus, &z);
1050 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1060 /* If the serial number isn't kept, then we know that no reply
1062 if (!serial && !m->sealed)
1063 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1065 r = bus_seal_message(bus, m);
1069 /* If this is a reply and no reply was requested, then let's
1070 * suppress this, if we can */
1071 if (m->dont_send && !serial)
1074 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1077 r = message_write(bus, m, &idx);
1081 } else if (idx < m->size) {
1082 /* Wasn't fully written. So let's remember how
1083 * much was written. Note that the first entry
1084 * of the wqueue array is always allocated so
1085 * that we always can remember how much was
1087 bus->wqueue[0] = sd_bus_message_ref(m);
1088 bus->wqueue_size = 1;
1094 /* Just append it to the queue. */
1096 if (bus->wqueue_size >= WQUEUE_MAX)
1099 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1104 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1108 *serial = BUS_MESSAGE_SERIAL(m);
1113 static usec_t calc_elapse(uint64_t usec) {
1114 if (usec == (uint64_t) -1)
1118 usec = BUS_DEFAULT_TIMEOUT;
1120 return now(CLOCK_MONOTONIC) + usec;
1123 static int timeout_compare(const void *a, const void *b) {
1124 const struct reply_callback *x = a, *y = b;
1126 if (x->timeout != 0 && y->timeout == 0)
1129 if (x->timeout == 0 && y->timeout != 0)
1132 if (x->timeout < y->timeout)
1135 if (x->timeout > y->timeout)
1141 int sd_bus_send_with_reply(
1144 sd_message_handler_t callback,
1149 struct reply_callback *c;
1160 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1162 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1165 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1169 if (usec != (uint64_t) -1) {
1170 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1175 r = bus_seal_message(bus, m);
1179 c = new(struct reply_callback, 1);
1183 c->callback = callback;
1184 c->userdata = userdata;
1185 c->serial = BUS_MESSAGE_SERIAL(m);
1186 c->timeout = calc_elapse(usec);
1188 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1194 if (c->timeout != 0) {
1195 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1198 sd_bus_send_with_reply_cancel(bus, c->serial);
1203 r = sd_bus_send(bus, m, serial);
1205 sd_bus_send_with_reply_cancel(bus, c->serial);
1212 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1213 struct reply_callback *c;
1220 c = hashmap_remove(bus->reply_callbacks, &serial);
1224 if (c->timeout != 0)
1225 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1231 static int ensure_running(sd_bus *bus) {
1236 r = sd_bus_is_running(bus);
1243 r = sd_bus_process(bus, NULL);
1248 k = sd_bus_is_running(bus);
1255 r = sd_bus_wait(bus, (uint64_t) -1);
1261 int sd_bus_send_with_reply_and_block(
1265 sd_bus_error *error,
1266 sd_bus_message **reply) {
1279 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1281 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1283 if (bus_error_is_dirty(error))
1286 r = ensure_running(bus);
1290 r = sd_bus_send(bus, m, &serial);
1294 timeout = calc_elapse(usec);
1298 sd_bus_message *incoming = NULL;
1303 /* Make sure there's room for queuing this
1304 * locally, before we read the message */
1306 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1314 r = message_read(bus, &incoming);
1319 if (incoming->reply_serial == serial) {
1320 /* Found a match! */
1322 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1327 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1330 r = sd_bus_error_copy(error, &incoming->error);
1332 sd_bus_message_unref(incoming);
1336 k = bus_error_to_errno(&incoming->error);
1337 sd_bus_message_unref(incoming);
1341 sd_bus_message_unref(incoming);
1345 /* There's already guaranteed to be room for
1346 * this, so need to resize things here */
1347 bus->rqueue[bus->rqueue_size ++] = incoming;
1350 /* Try to read more, right-away */
1359 n = now(CLOCK_MONOTONIC);
1365 left = (uint64_t) -1;
1367 r = bus_poll(bus, true, left);
1371 r = dispatch_wqueue(bus);
1377 int sd_bus_get_fd(sd_bus *bus) {
1387 int sd_bus_get_events(sd_bus *bus) {
1395 if (bus->state == BUS_OPENING)
1397 else if (bus->state == BUS_AUTHENTICATING) {
1399 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1404 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1405 if (bus->rqueue_size <= 0)
1407 if (bus->wqueue_size > 0)
1414 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1415 struct reply_callback *c;
1424 if (bus->state == BUS_AUTHENTICATING) {
1425 *timeout_usec = bus->auth_timeout;
1429 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1432 c = prioq_peek(bus->reply_callbacks_prioq);
1436 *timeout_usec = c->timeout;
1440 static int process_timeout(sd_bus *bus) {
1441 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1442 struct reply_callback *c;
1448 c = prioq_peek(bus->reply_callbacks_prioq);
1452 n = now(CLOCK_MONOTONIC);
1456 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1457 hashmap_remove(bus->reply_callbacks, &c->serial);
1459 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1462 return r < 0 ? r : 1;
1465 static int process_message(sd_bus *bus, sd_bus_message *m) {
1466 struct filter_callback *l;
1472 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1473 struct reply_callback *c;
1475 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1477 if (c->timeout != 0)
1478 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1480 r = c->callback(bus, 0, m, c->userdata);
1488 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1489 r = l->callback(bus, 0, m, l->userdata);
1497 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1500 /* Returns 0 when we didn't do anything. This should cause the
1501 * caller to invoke sd_bus_wait() before returning the next
1502 * time. Returns > 0 when we did something, which possibly
1503 * means *ret is filled in with an unprocessed message. */
1510 if (bus->state == BUS_OPENING) {
1521 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1523 socklen_t slen = sizeof(error);
1525 r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1527 bus->last_connect_error = errno;
1528 else if (error != 0)
1529 bus->last_connect_error = error;
1530 else if (p.revents & (POLLERR|POLLHUP))
1531 bus->last_connect_error = ECONNREFUSED;
1533 r = bus_start_auth(bus);
1537 /* Try next address */
1538 r = bus_start_connect(bus);
1545 } else if (bus->state == BUS_AUTHENTICATING) {
1547 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1550 r = bus_write_auth(bus);
1554 r = bus_read_auth(bus);
1557 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1558 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1561 r = process_timeout(bus);
1565 r = dispatch_wqueue(bus);
1570 r = dispatch_rqueue(bus, &m);
1579 r = process_message(bus, m);
1589 if (sd_bus_message_is_method_call(m, NULL, NULL)) {
1590 const sd_bus_error e = SD_BUS_ERROR_INIT_CONST("org.freedesktop.DBus.Error.UnknownObject", "Unknown object.");
1591 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1593 r = sd_bus_message_new_method_error(bus, m, &e, &reply);
1597 r = sd_bus_send(bus, reply, NULL);
1605 assert_not_reached("Unknown state");
1614 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1625 e = sd_bus_get_events(bus);
1632 r = sd_bus_get_timeout(bus, &until);
1639 n = now(CLOCK_MONOTONIC);
1640 m = until > n ? until - n : 0;
1643 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1650 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1654 return r > 0 ? 1 : 0;
1657 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1663 if (bus->rqueue_size > 0)
1666 return bus_poll(bus, false, timeout_usec);
1669 int sd_bus_flush(sd_bus *bus) {
1677 r = ensure_running(bus);
1681 if (bus->wqueue_size <= 0)
1685 r = dispatch_wqueue(bus);
1689 if (bus->wqueue_size <= 0)
1692 r = bus_poll(bus, false, (uint64_t) -1);
1698 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1699 struct filter_callback *f;
1706 f = new(struct filter_callback, 1);
1709 f->callback = callback;
1710 f->userdata = userdata;
1712 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1716 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1717 struct filter_callback *f;
1724 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1725 if (f->callback == callback && f->userdata == userdata) {
1726 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);