1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
34 #include "bus-internal.h"
35 #include "bus-message.h"
38 static int ensure_running(sd_bus *bus);
39 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
41 static void bus_free(sd_bus *b) {
42 struct filter_callback *f;
48 close_nointr_nofail(b->fd);
55 for (i = 0; i < b->rqueue_size; i++)
56 sd_bus_message_unref(b->rqueue[i]);
59 for (i = 0; i < b->wqueue_size; i++)
60 sd_bus_message_unref(b->wqueue[i]);
63 hashmap_free_free(b->reply_callbacks);
64 prioq_free(b->reply_callbacks_prioq);
66 while ((f = b->filter_callbacks)) {
67 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
74 static sd_bus* bus_new(void) {
83 r->message_version = 1;
85 /* We guarantee that wqueue always has space for at least one
87 r->wqueue = new(sd_bus_message*, 1);
96 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
107 r = sd_bus_message_read(reply, "s", &s);
111 if (!service_name_is_valid(s) || s[0] != ':')
114 bus->unique_name = strdup(s);
115 if (!bus->unique_name)
118 bus->state = BUS_RUNNING;
123 static int bus_send_hello(sd_bus *bus) {
124 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
129 r = sd_bus_message_new_method_call(
131 "org.freedesktop.DBus",
133 "org.freedesktop.DBus",
139 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
143 bus->sent_hello = true;
147 static int bus_start_running(sd_bus *bus) {
150 if (bus->sent_hello) {
151 bus->state = BUS_HELLO;
155 bus->state = BUS_RUNNING;
159 static int parse_address_key(const char **p, const char *key, char **value) {
170 if (strncmp(*p, key, l) != 0)
180 while (*a != ',' && *a != 0) {
198 c = (char) ((x << 4) | y);
205 t = realloc(r, n + 2);
230 static void skip_address_key(const char **p) {
234 *p += strcspn(*p, ",");
240 static int bus_parse_next_address(sd_bus *b) {
242 _cleanup_free_ char *guid = NULL;
249 if (b->address[b->address_index] == 0)
252 a = b->address + b->address_index;
255 b->sockaddr_size = 0;
256 b->peer = SD_ID128_NULL;
258 if (startswith(a, "unix:")) {
259 _cleanup_free_ char *path = NULL, *abstract = NULL;
263 r = parse_address_key(&p, "guid", &guid);
269 r = parse_address_key(&p, "path", &path);
275 r = parse_address_key(&p, "abstract", &abstract);
281 skip_address_key(&p);
284 if (!path && !abstract)
287 if (path && abstract)
294 if (l > sizeof(b->sockaddr.un.sun_path))
297 b->sockaddr.un.sun_family = AF_UNIX;
298 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
299 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
300 } else if (abstract) {
303 l = strlen(abstract);
304 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
307 b->sockaddr.un.sun_family = AF_UNIX;
308 b->sockaddr.un.sun_path[0] = 0;
309 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
310 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
313 } else if (startswith(a, "tcp:")) {
314 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
315 struct addrinfo hints, *result;
319 r = parse_address_key(&p, "guid", &guid);
325 r = parse_address_key(&p, "host", &host);
331 r = parse_address_key(&p, "port", &port);
337 r = parse_address_key(&p, "family", &family);
343 skip_address_key(&p);
350 hints.ai_socktype = SOCK_STREAM;
351 hints.ai_flags = AI_ADDRCONFIG;
354 if (streq(family, "ipv4"))
355 hints.ai_family = AF_INET;
356 else if (streq(family, "ipv6"))
357 hints.ai_family = AF_INET6;
362 r = getaddrinfo(host, port, &hints, &result);
366 return -EADDRNOTAVAIL;
368 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
369 b->sockaddr_size = result->ai_addrlen;
371 freeaddrinfo(result);
375 r = sd_id128_from_string(guid, &b->peer);
380 b->address_index = p - b->address;
384 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
387 struct iovec *i = iov + *idx;
389 if (i->iov_len > size) {
390 i->iov_base = (uint8_t*) i->iov_base + size;
404 static int bus_write_auth(sd_bus *b) {
409 assert(b->state == BUS_AUTHENTICATING);
411 if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
414 if (b->auth_timeout == 0)
415 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
418 mh.msg_iov = b->auth_iovec + b->auth_index;
419 mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
421 k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
423 return errno == EAGAIN ? 0 : -errno;
425 iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
430 static int bus_auth_verify(sd_bus *b) {
436 /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
439 e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
443 f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
447 if (e - (char*) b->rbuffer != 3 + 32)
450 if (memcmp(b->rbuffer, "OK ", 3))
453 for (i = 0; i < 32; i += 2) {
456 x = unhexchar(((char*) b->rbuffer)[3 + i]);
457 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
462 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
465 if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
466 !sd_id128_equal(b->peer, peer))
472 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
473 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
475 b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
476 memmove(b->rbuffer, f + 2, b->rbuffer_size);
478 r = bus_start_running(b);
485 static int bus_read_auth(sd_bus *b) {
495 r = bus_auth_verify(b);
499 n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
501 if (n > BUS_AUTH_SIZE_MAX)
502 n = BUS_AUTH_SIZE_MAX;
504 if (b->rbuffer_size >= n)
507 p = realloc(b->rbuffer, n);
514 iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
515 iov.iov_len = n - b->rbuffer_size;
521 k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
523 return errno == EAGAIN ? 0 : -errno;
525 b->rbuffer_size += k;
527 r = bus_auth_verify(b);
534 static int bus_start_auth(sd_bus *b) {
535 static const char auth_prefix[] = "\0AUTH EXTERNAL ";
536 static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
538 char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
543 b->state = BUS_AUTHENTICATING;
545 snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
549 b->auth_uid = hexmem(text, l);
553 b->auth_iovec[0].iov_base = (void*) auth_prefix;
554 b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
555 b->auth_iovec[1].iov_base = (void*) b->auth_uid;
556 b->auth_iovec[1].iov_len = l * 2;
557 b->auth_iovec[2].iov_base = (void*) auth_suffix;
558 b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
559 b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
561 return bus_write_auth(b);
564 static int bus_start_connect(sd_bus *b) {
571 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
572 r = bus_parse_next_address(b);
576 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
579 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
581 b->last_connect_error = errno;
586 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
588 if (errno == EINPROGRESS)
591 b->last_connect_error = errno;
592 close_nointr_nofail(b->fd);
598 return bus_start_auth(b);
602 int sd_bus_open_system(sd_bus **ret) {
610 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
612 r = sd_bus_open_address(e, &b);
620 b->sockaddr.un.sun_family = AF_UNIX;
621 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
622 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
624 r = bus_start_connect(b);
631 r = bus_send_hello(b);
641 int sd_bus_open_user(sd_bus **ret) {
650 e = getenv("DBUS_SESSION_BUS_ADDRESS");
652 r = sd_bus_open_address(e, &b);
656 e = getenv("XDG_RUNTIME_DIR");
661 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
668 b->sockaddr.un.sun_family = AF_UNIX;
669 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
670 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
672 r = bus_start_connect(b);
679 r = bus_send_hello(b);
689 int sd_bus_open_address(const char *address, sd_bus **ret) {
702 b->address = strdup(address);
708 r = bus_start_connect(b);
718 int sd_bus_open_fd(int fd, sd_bus **ret) {
732 fd_nonblock(b->fd, true);
733 fd_cloexec(b->fd, true);
735 r = bus_start_auth(b);
745 void sd_bus_close(sd_bus *bus) {
751 close_nointr_nofail(bus->fd);
755 sd_bus *sd_bus_ref(sd_bus *bus) {
759 assert(bus->n_ref > 0);
765 sd_bus *sd_bus_unref(sd_bus *bus) {
769 assert(bus->n_ref > 0);
778 int sd_bus_is_open(sd_bus *bus) {
785 int sd_bus_can_send(sd_bus *bus, char type) {
791 if (type == SD_BUS_TYPE_UNIX_FD) {
792 r = ensure_running(bus);
799 return bus_type_is_valid(type);
802 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
810 r = ensure_running(bus);
818 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
821 if (m->header->version > b->message_version)
827 return bus_message_seal(m, ++b->serial);
830 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
840 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
845 n = m->n_iovec * sizeof(struct iovec);
847 memcpy(iov, m->iovec, n);
850 iovec_advance(iov, &j, *idx);
854 mh.msg_iovlen = m->n_iovec;
856 k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
858 return errno == EAGAIN ? 0 : -errno;
864 static int message_read_need(sd_bus *bus, size_t *need) {
871 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
873 if (bus->rbuffer_size < sizeof(struct bus_header)) {
874 *need = sizeof(struct bus_header) + 8;
876 /* Minimum message size:
880 * Method Call: +2 string headers
881 * Signal: +3 string headers
882 * Method Error: +1 string headers
884 * Method Reply: +1 uint32 headers
886 * A string header is at least 9 bytes
887 * A uint32 header is at least 8 bytes
889 * Hence the minimum message size of a valid message
890 * is header + 8 bytes */
895 a = ((const uint32_t*) bus->rbuffer)[1];
896 b = ((const uint32_t*) bus->rbuffer)[3];
898 e = ((const uint8_t*) bus->rbuffer)[0];
899 if (e == SD_BUS_LITTLE_ENDIAN) {
902 } else if (e == SD_BUS_BIG_ENDIAN) {
908 sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
909 if (sum >= BUS_MESSAGE_SIZE_MAX)
912 *need = (size_t) sum;
916 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
923 assert(bus->rbuffer_size >= size);
924 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
926 if (bus->rbuffer_size > size) {
927 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
934 r = bus_message_from_malloc(bus->rbuffer, size, &t);
941 bus->rbuffer_size -= size;
947 static int message_read(sd_bus *bus, sd_bus_message **m) {
957 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
959 r = message_read_need(bus, &need);
963 if (bus->rbuffer_size >= need)
964 return message_make(bus, need, m);
966 b = realloc(bus->rbuffer, need);
973 iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
974 iov.iov_len = need - bus->rbuffer_size;
980 k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
982 return errno == EAGAIN ? 0 : -errno;
984 bus->rbuffer_size += k;
986 r = message_read_need(bus, &need);
990 if (bus->rbuffer_size >= need)
991 return message_make(bus, need, m);
996 static int dispatch_wqueue(sd_bus *bus) {
1000 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1005 while (bus->wqueue_size > 0) {
1007 r = message_write(bus, bus->wqueue[0], &bus->windex);
1012 /* Didn't do anything this time */
1014 else if (bus->windex >= bus->wqueue[0]->size) {
1015 /* Fully written. Let's drop the entry from
1018 * This isn't particularly optimized, but
1019 * well, this is supposed to be our worst-case
1020 * buffer only, and the socket buffer is
1021 * supposed to be our primary buffer, and if
1022 * it got full, then all bets are off
1025 sd_bus_message_unref(bus->wqueue[0]);
1026 bus->wqueue_size --;
1027 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1037 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1038 sd_bus_message *z = NULL;
1043 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1048 if (bus->rqueue_size > 0) {
1049 /* Dispatch a queued message */
1051 *m = bus->rqueue[0];
1052 bus->rqueue_size --;
1053 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1057 /* Try to read a new message */
1059 r = message_read(bus, &z);
1074 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1084 /* If the serial number isn't kept, then we know that no reply
1086 if (!serial && !m->sealed)
1087 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1089 r = bus_seal_message(bus, m);
1093 /* If this is a reply and no reply was requested, then let's
1094 * suppress this, if we can */
1095 if (m->dont_send && !serial)
1098 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1101 r = message_write(bus, m, &idx);
1105 } else if (idx < m->size) {
1106 /* Wasn't fully written. So let's remember how
1107 * much was written. Note that the first entry
1108 * of the wqueue array is always allocated so
1109 * that we always can remember how much was
1111 bus->wqueue[0] = sd_bus_message_ref(m);
1112 bus->wqueue_size = 1;
1118 /* Just append it to the queue. */
1120 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1123 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1128 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1132 *serial = BUS_MESSAGE_SERIAL(m);
1137 static usec_t calc_elapse(uint64_t usec) {
1138 if (usec == (uint64_t) -1)
1142 usec = BUS_DEFAULT_TIMEOUT;
1144 return now(CLOCK_MONOTONIC) + usec;
1147 static int timeout_compare(const void *a, const void *b) {
1148 const struct reply_callback *x = a, *y = b;
1150 if (x->timeout != 0 && y->timeout == 0)
1153 if (x->timeout == 0 && y->timeout != 0)
1156 if (x->timeout < y->timeout)
1159 if (x->timeout > y->timeout)
1165 int sd_bus_send_with_reply(
1168 sd_message_handler_t callback,
1173 struct reply_callback *c;
1184 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1186 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1189 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1193 if (usec != (uint64_t) -1) {
1194 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1199 r = bus_seal_message(bus, m);
1203 c = new(struct reply_callback, 1);
1207 c->callback = callback;
1208 c->userdata = userdata;
1209 c->serial = BUS_MESSAGE_SERIAL(m);
1210 c->timeout = calc_elapse(usec);
1212 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1218 if (c->timeout != 0) {
1219 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1222 sd_bus_send_with_reply_cancel(bus, c->serial);
1227 r = sd_bus_send(bus, m, serial);
1229 sd_bus_send_with_reply_cancel(bus, c->serial);
1236 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1237 struct reply_callback *c;
1244 c = hashmap_remove(bus->reply_callbacks, &serial);
1248 if (c->timeout != 0)
1249 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1255 static int ensure_running(sd_bus *bus) {
1260 if (bus->state == BUS_RUNNING)
1264 r = sd_bus_process(bus, NULL);
1267 if (bus->state == BUS_RUNNING)
1272 r = sd_bus_wait(bus, (uint64_t) -1);
1278 int sd_bus_send_with_reply_and_block(
1282 sd_bus_error *error,
1283 sd_bus_message **reply) {
1296 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1298 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1300 if (bus_error_is_dirty(error))
1303 r = ensure_running(bus);
1307 r = sd_bus_send(bus, m, &serial);
1311 timeout = calc_elapse(usec);
1315 sd_bus_message *incoming = NULL;
1320 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1323 /* Make sure there's room for queuing this
1324 * locally, before we read the message */
1326 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1334 r = message_read(bus, &incoming);
1339 if (incoming->reply_serial == serial) {
1340 /* Found a match! */
1342 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1347 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1350 r = sd_bus_error_copy(error, &incoming->error);
1352 sd_bus_message_unref(incoming);
1356 k = bus_error_to_errno(&incoming->error);
1357 sd_bus_message_unref(incoming);
1361 sd_bus_message_unref(incoming);
1365 /* There's already guaranteed to be room for
1366 * this, so need to resize things here */
1367 bus->rqueue[bus->rqueue_size ++] = incoming;
1370 /* Try to read more, right-away */
1379 n = now(CLOCK_MONOTONIC);
1385 left = (uint64_t) -1;
1387 r = bus_poll(bus, true, left);
1391 r = dispatch_wqueue(bus);
1397 int sd_bus_get_fd(sd_bus *bus) {
1407 int sd_bus_get_events(sd_bus *bus) {
1415 if (bus->state == BUS_OPENING)
1417 else if (bus->state == BUS_AUTHENTICATING) {
1419 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1424 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1425 if (bus->rqueue_size <= 0)
1427 if (bus->wqueue_size > 0)
1434 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1435 struct reply_callback *c;
1444 if (bus->state == BUS_AUTHENTICATING) {
1445 *timeout_usec = bus->auth_timeout;
1449 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1452 c = prioq_peek(bus->reply_callbacks_prioq);
1456 *timeout_usec = c->timeout;
1460 static int process_timeout(sd_bus *bus) {
1461 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1462 struct reply_callback *c;
1468 c = prioq_peek(bus->reply_callbacks_prioq);
1472 n = now(CLOCK_MONOTONIC);
1476 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1477 hashmap_remove(bus->reply_callbacks, &c->serial);
1479 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1482 return r < 0 ? r : 1;
1485 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1486 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1492 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1495 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1498 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1501 if (streq_ptr(m->member, "Ping"))
1502 r = sd_bus_message_new_method_return(bus, m, &reply);
1503 else if (streq_ptr(m->member, "GetMachineId")) {
1507 r = sd_id128_get_machine(&id);
1511 r = sd_bus_message_new_method_return(bus, m, &reply);
1515 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1517 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1519 sd_bus_error_set(&error,
1520 "org.freedesktop.DBus.Error.UnknownMethod",
1521 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1523 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1529 r = sd_bus_send(bus, reply, NULL);
1536 static int process_message(sd_bus *bus, sd_bus_message *m) {
1537 struct filter_callback *l;
1543 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1544 struct reply_callback *c;
1546 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1548 if (c->timeout != 0)
1549 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1551 r = c->callback(bus, 0, m, c->userdata);
1559 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1560 r = l->callback(bus, 0, m, l->userdata);
1565 return process_builtin(bus, m);
1568 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1571 /* Returns 0 when we didn't do anything. This should cause the
1572 * caller to invoke sd_bus_wait() before returning the next
1573 * time. Returns > 0 when we did something, which possibly
1574 * means *ret is filled in with an unprocessed message. */
1581 if (bus->state == BUS_OPENING) {
1592 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1594 socklen_t slen = sizeof(error);
1596 r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1598 bus->last_connect_error = errno;
1599 else if (error != 0)
1600 bus->last_connect_error = error;
1601 else if (p.revents & (POLLERR|POLLHUP))
1602 bus->last_connect_error = ECONNREFUSED;
1604 r = bus_start_auth(bus);
1608 /* Try next address */
1609 r = bus_start_connect(bus);
1616 } else if (bus->state == BUS_AUTHENTICATING) {
1618 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1621 r = bus_write_auth(bus);
1625 r = bus_read_auth(bus);
1628 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1629 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1632 r = process_timeout(bus);
1636 r = dispatch_wqueue(bus);
1641 r = dispatch_rqueue(bus, &m);
1650 r = process_message(bus, m);
1660 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1661 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1662 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1664 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1666 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1670 r = sd_bus_send(bus, reply, NULL);
1678 assert_not_reached("Unknown state");
1687 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1698 e = sd_bus_get_events(bus);
1705 r = sd_bus_get_timeout(bus, &until);
1712 n = now(CLOCK_MONOTONIC);
1713 m = until > n ? until - n : 0;
1716 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1723 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1727 return r > 0 ? 1 : 0;
1730 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1736 if (bus->rqueue_size > 0)
1739 return bus_poll(bus, false, timeout_usec);
1742 int sd_bus_flush(sd_bus *bus) {
1750 r = ensure_running(bus);
1754 if (bus->wqueue_size <= 0)
1758 r = dispatch_wqueue(bus);
1762 if (bus->wqueue_size <= 0)
1765 r = bus_poll(bus, false, (uint64_t) -1);
1771 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1772 struct filter_callback *f;
1779 f = new(struct filter_callback, 1);
1782 f->callback = callback;
1783 f->userdata = userdata;
1785 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1789 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1790 struct filter_callback *f;
1797 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1798 if (f->callback == callback && f->userdata == userdata) {
1799 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);