1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
34 #include "bus-internal.h"
35 #include "bus-message.h"
38 static int ensure_running(sd_bus *bus);
39 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
41 static void bus_free(sd_bus *b) {
42 struct filter_callback *f;
48 close_nointr_nofail(b->fd);
55 for (i = 0; i < b->rqueue_size; i++)
56 sd_bus_message_unref(b->rqueue[i]);
59 for (i = 0; i < b->wqueue_size; i++)
60 sd_bus_message_unref(b->wqueue[i]);
63 hashmap_free_free(b->reply_callbacks);
64 prioq_free(b->reply_callbacks_prioq);
66 while ((f = b->filter_callbacks)) {
67 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
74 static sd_bus* bus_new(void) {
83 r->message_version = 1;
85 /* We guarantee that wqueue always has space for at least one
87 r->wqueue = new(sd_bus_message*, 1);
96 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
107 bus->state = BUS_RUNNING;
109 r = sd_bus_message_read(reply, "s", &s);
113 bus->unique_name = strdup(s);
114 if (!bus->unique_name)
120 static int bus_send_hello(sd_bus *bus) {
121 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
126 r = sd_bus_message_new_method_call(
128 "org.freedesktop.DBus",
130 "org.freedesktop.DBus",
136 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
140 bus->sent_hello = true;
144 static int bus_start_running(sd_bus *bus) {
147 if (bus->sent_hello) {
148 bus->state = BUS_HELLO;
152 bus->state = BUS_RUNNING;
156 static int parse_address_key(const char **p, const char *key, char **value) {
167 if (strncmp(*p, key, l) != 0)
177 while (*a != ',' && *a != 0) {
195 c = (char) ((x << 4) | y);
202 t = realloc(r, n + 2);
227 static void skip_address_key(const char **p) {
231 *p += strcspn(*p, ",");
237 static int bus_parse_next_address(sd_bus *b) {
239 _cleanup_free_ char *guid = NULL;
246 if (b->address[b->address_index] == 0)
249 a = b->address + b->address_index;
252 b->sockaddr_size = 0;
253 b->peer = SD_ID128_NULL;
255 if (startswith(a, "unix:")) {
256 _cleanup_free_ char *path = NULL, *abstract = NULL;
260 r = parse_address_key(&p, "guid", &guid);
266 r = parse_address_key(&p, "path", &path);
272 r = parse_address_key(&p, "abstract", &abstract);
278 skip_address_key(&p);
281 if (!path && !abstract)
284 if (path && abstract)
291 if (l > sizeof(b->sockaddr.un.sun_path))
294 b->sockaddr.un.sun_family = AF_UNIX;
295 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
296 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
297 } else if (abstract) {
300 l = strlen(abstract);
301 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
304 b->sockaddr.un.sun_family = AF_UNIX;
305 b->sockaddr.un.sun_path[0] = 0;
306 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
307 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
310 } else if (startswith(a, "tcp:")) {
311 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
312 struct addrinfo hints, *result;
316 r = parse_address_key(&p, "guid", &guid);
322 r = parse_address_key(&p, "host", &host);
328 r = parse_address_key(&p, "port", &port);
334 r = parse_address_key(&p, "family", &family);
340 skip_address_key(&p);
347 hints.ai_socktype = SOCK_STREAM;
348 hints.ai_flags = AI_ADDRCONFIG;
351 if (streq(family, "ipv4"))
352 hints.ai_family = AF_INET;
353 else if (streq(family, "ipv6"))
354 hints.ai_family = AF_INET6;
359 r = getaddrinfo(host, port, &hints, &result);
363 return -EADDRNOTAVAIL;
365 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
366 b->sockaddr_size = result->ai_addrlen;
368 freeaddrinfo(result);
372 r = sd_id128_from_string(guid, &b->peer);
377 b->address_index = p - b->address;
381 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
384 struct iovec *i = iov + *idx;
386 if (i->iov_len > size) {
387 i->iov_base = (uint8_t*) i->iov_base + size;
401 static int bus_write_auth(sd_bus *b) {
406 assert(b->state == BUS_AUTHENTICATING);
408 if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
411 if (b->auth_timeout == 0)
412 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
415 mh.msg_iov = b->auth_iovec + b->auth_index;
416 mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
418 k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
420 return errno == EAGAIN ? 0 : -errno;
422 iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
427 static int bus_auth_verify(sd_bus *b) {
433 /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
436 e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
440 f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
444 if (e - (char*) b->rbuffer != 3 + 32)
447 if (memcmp(b->rbuffer, "OK ", 3))
450 for (i = 0; i < 32; i += 2) {
453 x = unhexchar(((char*) b->rbuffer)[3 + i]);
454 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
459 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
462 if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
463 !sd_id128_equal(b->peer, peer))
469 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
470 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
472 b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
473 memmove(b->rbuffer, f + 2, b->rbuffer_size);
475 r = bus_start_running(b);
482 static int bus_read_auth(sd_bus *b) {
492 r = bus_auth_verify(b);
496 n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
498 if (n > BUS_AUTH_SIZE_MAX)
499 n = BUS_AUTH_SIZE_MAX;
501 if (b->rbuffer_size >= n)
504 p = realloc(b->rbuffer, n);
511 iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
512 iov.iov_len = n - b->rbuffer_size;
518 k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
520 return errno == EAGAIN ? 0 : -errno;
522 b->rbuffer_size += k;
524 r = bus_auth_verify(b);
531 static int bus_start_auth(sd_bus *b) {
532 static const char auth_prefix[] = "\0AUTH EXTERNAL ";
533 static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
535 char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
540 b->state = BUS_AUTHENTICATING;
542 snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
546 b->auth_uid = hexmem(text, l);
550 b->auth_iovec[0].iov_base = (void*) auth_prefix;
551 b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
552 b->auth_iovec[1].iov_base = (void*) b->auth_uid;
553 b->auth_iovec[1].iov_len = l * 2;
554 b->auth_iovec[2].iov_base = (void*) auth_suffix;
555 b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
556 b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
558 return bus_write_auth(b);
561 static int bus_start_connect(sd_bus *b) {
568 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
569 r = bus_parse_next_address(b);
573 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
576 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
578 b->last_connect_error = errno;
583 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
585 if (errno == EINPROGRESS)
588 b->last_connect_error = errno;
589 close_nointr_nofail(b->fd);
595 return bus_start_auth(b);
599 int sd_bus_open_system(sd_bus **ret) {
607 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
609 r = sd_bus_open_address(e, &b);
617 b->sockaddr.un.sun_family = AF_UNIX;
618 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
619 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
621 r = bus_start_connect(b);
628 r = bus_send_hello(b);
638 int sd_bus_open_user(sd_bus **ret) {
647 e = getenv("DBUS_SESSION_BUS_ADDRESS");
649 r = sd_bus_open_address(e, &b);
653 e = getenv("XDG_RUNTIME_DIR");
658 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
665 b->sockaddr.un.sun_family = AF_UNIX;
666 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
667 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
669 r = bus_start_connect(b);
676 r = bus_send_hello(b);
686 int sd_bus_open_address(const char *address, sd_bus **ret) {
699 b->address = strdup(address);
705 r = bus_start_connect(b);
715 int sd_bus_open_fd(int fd, sd_bus **ret) {
729 fd_nonblock(b->fd, true);
730 fd_cloexec(b->fd, true);
732 r = bus_start_auth(b);
742 void sd_bus_close(sd_bus *bus) {
748 close_nointr_nofail(bus->fd);
752 sd_bus *sd_bus_ref(sd_bus *bus) {
756 assert(bus->n_ref > 0);
762 sd_bus *sd_bus_unref(sd_bus *bus) {
766 assert(bus->n_ref > 0);
775 int sd_bus_is_open(sd_bus *bus) {
782 int sd_bus_can_send(sd_bus *bus, char type) {
788 if (type == SD_BUS_TYPE_UNIX_FD) {
789 r = ensure_running(bus);
796 return bus_type_is_valid(type);
799 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
807 r = ensure_running(bus);
815 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
818 if (m->header->version > b->message_version)
824 return bus_message_seal(m, ++b->serial);
827 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
837 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
842 n = m->n_iovec * sizeof(struct iovec);
844 memcpy(iov, m->iovec, n);
847 iovec_advance(iov, &j, *idx);
851 mh.msg_iovlen = m->n_iovec;
853 k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
855 return errno == EAGAIN ? 0 : -errno;
861 static int message_read_need(sd_bus *bus, size_t *need) {
868 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
870 if (bus->rbuffer_size < sizeof(struct bus_header)) {
871 *need = sizeof(struct bus_header) + 8;
873 /* Minimum message size:
877 * Method Call: +2 string headers
878 * Signal: +3 string headers
879 * Method Error: +1 string headers
881 * Method Reply: +1 uint32 headers
883 * A string header is at least 9 bytes
884 * A uint32 header is at least 8 bytes
886 * Hence the minimum message size of a valid message
887 * is header + 8 bytes */
892 a = ((const uint32_t*) bus->rbuffer)[1];
893 b = ((const uint32_t*) bus->rbuffer)[3];
895 e = ((const uint8_t*) bus->rbuffer)[0];
896 if (e == SD_BUS_LITTLE_ENDIAN) {
899 } else if (e == SD_BUS_BIG_ENDIAN) {
905 sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
906 if (sum >= BUS_MESSAGE_SIZE_MAX)
909 *need = (size_t) sum;
913 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
920 assert(bus->rbuffer_size >= size);
921 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
923 if (bus->rbuffer_size > size) {
924 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
931 r = bus_message_from_malloc(bus->rbuffer, size, &t);
938 bus->rbuffer_size -= size;
944 static int message_read(sd_bus *bus, sd_bus_message **m) {
954 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
956 r = message_read_need(bus, &need);
960 if (bus->rbuffer_size >= need)
961 return message_make(bus, need, m);
963 b = realloc(bus->rbuffer, need);
970 iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
971 iov.iov_len = need - bus->rbuffer_size;
977 k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
979 return errno == EAGAIN ? 0 : -errno;
981 bus->rbuffer_size += k;
983 r = message_read_need(bus, &need);
987 if (bus->rbuffer_size >= need)
988 return message_make(bus, need, m);
993 static int dispatch_wqueue(sd_bus *bus) {
997 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1002 while (bus->wqueue_size > 0) {
1004 r = message_write(bus, bus->wqueue[0], &bus->windex);
1009 /* Didn't do anything this time */
1011 else if (bus->windex >= bus->wqueue[0]->size) {
1012 /* Fully written. Let's drop the entry from
1015 * This isn't particularly optimized, but
1016 * well, this is supposed to be our worst-case
1017 * buffer only, and the socket buffer is
1018 * supposed to be our primary buffer, and if
1019 * it got full, then all bets are off
1022 sd_bus_message_unref(bus->wqueue[0]);
1023 bus->wqueue_size --;
1024 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1034 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1040 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1045 if (bus->rqueue_size > 0) {
1046 /* Dispatch a queued message */
1048 *m = bus->rqueue[0];
1049 bus->rqueue_size --;
1050 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1054 /* Try to read a new message */
1056 r = message_read(bus, &z);
1071 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1081 /* If the serial number isn't kept, then we know that no reply
1083 if (!serial && !m->sealed)
1084 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1086 r = bus_seal_message(bus, m);
1090 /* If this is a reply and no reply was requested, then let's
1091 * suppress this, if we can */
1092 if (m->dont_send && !serial)
1095 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1098 r = message_write(bus, m, &idx);
1102 } else if (idx < m->size) {
1103 /* Wasn't fully written. So let's remember how
1104 * much was written. Note that the first entry
1105 * of the wqueue array is always allocated so
1106 * that we always can remember how much was
1108 bus->wqueue[0] = sd_bus_message_ref(m);
1109 bus->wqueue_size = 1;
1115 /* Just append it to the queue. */
1117 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1120 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1125 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1129 *serial = BUS_MESSAGE_SERIAL(m);
1134 static usec_t calc_elapse(uint64_t usec) {
1135 if (usec == (uint64_t) -1)
1139 usec = BUS_DEFAULT_TIMEOUT;
1141 return now(CLOCK_MONOTONIC) + usec;
1144 static int timeout_compare(const void *a, const void *b) {
1145 const struct reply_callback *x = a, *y = b;
1147 if (x->timeout != 0 && y->timeout == 0)
1150 if (x->timeout == 0 && y->timeout != 0)
1153 if (x->timeout < y->timeout)
1156 if (x->timeout > y->timeout)
1162 int sd_bus_send_with_reply(
1165 sd_message_handler_t callback,
1170 struct reply_callback *c;
1181 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1183 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1186 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1190 if (usec != (uint64_t) -1) {
1191 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1196 r = bus_seal_message(bus, m);
1200 c = new(struct reply_callback, 1);
1204 c->callback = callback;
1205 c->userdata = userdata;
1206 c->serial = BUS_MESSAGE_SERIAL(m);
1207 c->timeout = calc_elapse(usec);
1209 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1215 if (c->timeout != 0) {
1216 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1219 sd_bus_send_with_reply_cancel(bus, c->serial);
1224 r = sd_bus_send(bus, m, serial);
1226 sd_bus_send_with_reply_cancel(bus, c->serial);
1233 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1234 struct reply_callback *c;
1241 c = hashmap_remove(bus->reply_callbacks, &serial);
1245 if (c->timeout != 0)
1246 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1252 static int ensure_running(sd_bus *bus) {
1257 if (bus->state == BUS_RUNNING)
1261 r = sd_bus_process(bus, NULL);
1264 if (bus->state == BUS_RUNNING)
1269 r = sd_bus_wait(bus, (uint64_t) -1);
1275 int sd_bus_send_with_reply_and_block(
1279 sd_bus_error *error,
1280 sd_bus_message **reply) {
1293 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1295 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1297 if (bus_error_is_dirty(error))
1300 r = ensure_running(bus);
1304 r = sd_bus_send(bus, m, &serial);
1308 timeout = calc_elapse(usec);
1312 sd_bus_message *incoming = NULL;
1317 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1320 /* Make sure there's room for queuing this
1321 * locally, before we read the message */
1323 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1331 r = message_read(bus, &incoming);
1336 if (incoming->reply_serial == serial) {
1337 /* Found a match! */
1339 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1344 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1347 r = sd_bus_error_copy(error, &incoming->error);
1349 sd_bus_message_unref(incoming);
1353 k = bus_error_to_errno(&incoming->error);
1354 sd_bus_message_unref(incoming);
1358 sd_bus_message_unref(incoming);
1362 /* There's already guaranteed to be room for
1363 * this, so need to resize things here */
1364 bus->rqueue[bus->rqueue_size ++] = incoming;
1367 /* Try to read more, right-away */
1376 n = now(CLOCK_MONOTONIC);
1382 left = (uint64_t) -1;
1384 r = bus_poll(bus, true, left);
1388 r = dispatch_wqueue(bus);
1394 int sd_bus_get_fd(sd_bus *bus) {
1404 int sd_bus_get_events(sd_bus *bus) {
1412 if (bus->state == BUS_OPENING)
1414 else if (bus->state == BUS_AUTHENTICATING) {
1416 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1421 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1422 if (bus->rqueue_size <= 0)
1424 if (bus->wqueue_size > 0)
1431 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1432 struct reply_callback *c;
1441 if (bus->state == BUS_AUTHENTICATING) {
1442 *timeout_usec = bus->auth_timeout;
1446 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1449 c = prioq_peek(bus->reply_callbacks_prioq);
1453 *timeout_usec = c->timeout;
1457 static int process_timeout(sd_bus *bus) {
1458 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1459 struct reply_callback *c;
1465 c = prioq_peek(bus->reply_callbacks_prioq);
1469 n = now(CLOCK_MONOTONIC);
1473 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1474 hashmap_remove(bus->reply_callbacks, &c->serial);
1476 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1479 return r < 0 ? r : 1;
1482 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1483 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1489 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1492 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1495 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1498 if (streq_ptr(m->member, "Ping"))
1499 r = sd_bus_message_new_method_return(bus, m, &reply);
1500 else if (streq_ptr(m->member, "GetMachineId")) {
1504 r = sd_id128_get_machine(&id);
1508 r = sd_bus_message_new_method_return(bus, m, &reply);
1512 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1514 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1516 sd_bus_error_set(&error,
1517 "org.freedesktop.DBus.Error.UnknownMethod",
1518 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1520 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1526 r = sd_bus_send(bus, reply, NULL);
1533 static int process_message(sd_bus *bus, sd_bus_message *m) {
1534 struct filter_callback *l;
1540 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1541 struct reply_callback *c;
1543 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1545 if (c->timeout != 0)
1546 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1548 r = c->callback(bus, 0, m, c->userdata);
1556 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1557 r = l->callback(bus, 0, m, l->userdata);
1562 return process_builtin(bus, m);
1565 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1568 /* Returns 0 when we didn't do anything. This should cause the
1569 * caller to invoke sd_bus_wait() before returning the next
1570 * time. Returns > 0 when we did something, which possibly
1571 * means *ret is filled in with an unprocessed message. */
1578 if (bus->state == BUS_OPENING) {
1589 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1591 socklen_t slen = sizeof(error);
1593 r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1595 bus->last_connect_error = errno;
1596 else if (error != 0)
1597 bus->last_connect_error = error;
1598 else if (p.revents & (POLLERR|POLLHUP))
1599 bus->last_connect_error = ECONNREFUSED;
1601 r = bus_start_auth(bus);
1605 /* Try next address */
1606 r = bus_start_connect(bus);
1613 } else if (bus->state == BUS_AUTHENTICATING) {
1615 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1618 r = bus_write_auth(bus);
1622 r = bus_read_auth(bus);
1625 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1626 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1629 r = process_timeout(bus);
1633 r = dispatch_wqueue(bus);
1638 r = dispatch_rqueue(bus, &m);
1647 r = process_message(bus, m);
1657 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1658 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1659 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1661 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1663 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1667 r = sd_bus_send(bus, reply, NULL);
1675 assert_not_reached("Unknown state");
1684 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1695 e = sd_bus_get_events(bus);
1702 r = sd_bus_get_timeout(bus, &until);
1709 n = now(CLOCK_MONOTONIC);
1710 m = until > n ? until - n : 0;
1713 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1720 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1724 return r > 0 ? 1 : 0;
1727 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1733 if (bus->rqueue_size > 0)
1736 return bus_poll(bus, false, timeout_usec);
1739 int sd_bus_flush(sd_bus *bus) {
1747 r = ensure_running(bus);
1751 if (bus->wqueue_size <= 0)
1755 r = dispatch_wqueue(bus);
1759 if (bus->wqueue_size <= 0)
1762 r = bus_poll(bus, false, (uint64_t) -1);
1768 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1769 struct filter_callback *f;
1776 f = new(struct filter_callback, 1);
1779 f->callback = callback;
1780 f->userdata = userdata;
1782 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1786 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1787 struct filter_callback *f;
1794 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1795 if (f->callback == callback && f->userdata == userdata) {
1796 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);