1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
34 #include "bus-internal.h"
35 #include "bus-message.h"
38 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
40 static void bus_free(sd_bus *b) {
41 struct filter_callback *f;
47 close_nointr_nofail(b->fd);
54 for (i = 0; i < b->rqueue_size; i++)
55 sd_bus_message_unref(b->rqueue[i]);
58 for (i = 0; i < b->wqueue_size; i++)
59 sd_bus_message_unref(b->wqueue[i]);
62 hashmap_free_free(b->reply_callbacks);
63 prioq_free(b->reply_callbacks_prioq);
65 while ((f = b->filter_callbacks)) {
66 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
73 static sd_bus* bus_new(void) {
82 r->message_version = 1;
84 /* We guarantee that wqueue always has space for at least one
86 r->wqueue = new(sd_bus_message*, 1);
95 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
106 r = sd_bus_message_read(reply, "s", &s);
110 if (!service_name_is_valid(s) || s[0] != ':')
113 bus->unique_name = strdup(s);
114 if (!bus->unique_name)
117 bus->state = BUS_RUNNING;
122 static int bus_send_hello(sd_bus *bus) {
123 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
128 r = sd_bus_message_new_method_call(
130 "org.freedesktop.DBus",
132 "org.freedesktop.DBus",
138 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
142 bus->sent_hello = true;
146 static int bus_start_running(sd_bus *bus) {
149 if (bus->sent_hello) {
150 bus->state = BUS_HELLO;
154 bus->state = BUS_RUNNING;
158 static int parse_address_key(const char **p, const char *key, char **value) {
169 if (strncmp(*p, key, l) != 0)
179 while (*a != ',' && *a != 0) {
197 c = (char) ((x << 4) | y);
204 t = realloc(r, n + 2);
229 static void skip_address_key(const char **p) {
233 *p += strcspn(*p, ",");
239 static int bus_parse_next_address(sd_bus *b) {
241 _cleanup_free_ char *guid = NULL;
248 if (b->address[b->address_index] == 0)
251 a = b->address + b->address_index;
254 b->sockaddr_size = 0;
255 b->peer = SD_ID128_NULL;
257 if (startswith(a, "unix:")) {
258 _cleanup_free_ char *path = NULL, *abstract = NULL;
262 r = parse_address_key(&p, "guid", &guid);
268 r = parse_address_key(&p, "path", &path);
274 r = parse_address_key(&p, "abstract", &abstract);
280 skip_address_key(&p);
283 if (!path && !abstract)
286 if (path && abstract)
293 if (l > sizeof(b->sockaddr.un.sun_path))
296 b->sockaddr.un.sun_family = AF_UNIX;
297 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
298 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
299 } else if (abstract) {
302 l = strlen(abstract);
303 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
306 b->sockaddr.un.sun_family = AF_UNIX;
307 b->sockaddr.un.sun_path[0] = 0;
308 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
309 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
312 } else if (startswith(a, "tcp:")) {
313 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
314 struct addrinfo hints, *result;
318 r = parse_address_key(&p, "guid", &guid);
324 r = parse_address_key(&p, "host", &host);
330 r = parse_address_key(&p, "port", &port);
336 r = parse_address_key(&p, "family", &family);
342 skip_address_key(&p);
349 hints.ai_socktype = SOCK_STREAM;
350 hints.ai_flags = AI_ADDRCONFIG;
353 if (streq(family, "ipv4"))
354 hints.ai_family = AF_INET;
355 else if (streq(family, "ipv6"))
356 hints.ai_family = AF_INET6;
361 r = getaddrinfo(host, port, &hints, &result);
365 return -EADDRNOTAVAIL;
367 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
368 b->sockaddr_size = result->ai_addrlen;
370 freeaddrinfo(result);
374 r = sd_id128_from_string(guid, &b->peer);
379 b->address_index = p - b->address;
383 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
386 struct iovec *i = iov + *idx;
388 if (i->iov_len > size) {
389 i->iov_base = (uint8_t*) i->iov_base + size;
403 static int bus_write_auth(sd_bus *b) {
408 assert(b->state == BUS_AUTHENTICATING);
410 if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
413 if (b->auth_timeout == 0)
414 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
417 mh.msg_iov = b->auth_iovec + b->auth_index;
418 mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
420 k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
422 return errno == EAGAIN ? 0 : -errno;
424 iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
429 static int bus_auth_verify(sd_bus *b) {
435 /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
438 e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
442 f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
446 if (e - (char*) b->rbuffer != 3 + 32)
449 if (memcmp(b->rbuffer, "OK ", 3))
452 for (i = 0; i < 32; i += 2) {
455 x = unhexchar(((char*) b->rbuffer)[3 + i]);
456 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
461 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
464 if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
465 !sd_id128_equal(b->peer, peer))
471 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
472 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
474 b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
475 memmove(b->rbuffer, f + 2, b->rbuffer_size);
477 r = bus_start_running(b);
484 static int bus_read_auth(sd_bus *b) {
494 r = bus_auth_verify(b);
498 n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
500 if (n > BUS_AUTH_SIZE_MAX)
501 n = BUS_AUTH_SIZE_MAX;
503 if (b->rbuffer_size >= n)
506 p = realloc(b->rbuffer, n);
513 iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
514 iov.iov_len = n - b->rbuffer_size;
520 k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
522 return errno == EAGAIN ? 0 : -errno;
524 b->rbuffer_size += k;
526 r = bus_auth_verify(b);
533 static int bus_start_auth(sd_bus *b) {
534 static const char auth_prefix[] = "\0AUTH EXTERNAL ";
535 static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
537 char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
542 b->state = BUS_AUTHENTICATING;
544 snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
548 b->auth_uid = hexmem(text, l);
552 b->auth_iovec[0].iov_base = (void*) auth_prefix;
553 b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
554 b->auth_iovec[1].iov_base = (void*) b->auth_uid;
555 b->auth_iovec[1].iov_len = l * 2;
556 b->auth_iovec[2].iov_base = (void*) auth_suffix;
557 b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
558 b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
560 return bus_write_auth(b);
563 static int bus_start_connect(sd_bus *b) {
570 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
571 r = bus_parse_next_address(b);
575 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
578 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
580 b->last_connect_error = errno;
585 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
587 if (errno == EINPROGRESS)
590 b->last_connect_error = errno;
591 close_nointr_nofail(b->fd);
597 return bus_start_auth(b);
601 int sd_bus_open_system(sd_bus **ret) {
609 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
611 r = sd_bus_open_address(e, &b);
619 b->sockaddr.un.sun_family = AF_UNIX;
620 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
621 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
623 r = bus_start_connect(b);
630 r = bus_send_hello(b);
640 int sd_bus_open_user(sd_bus **ret) {
649 e = getenv("DBUS_SESSION_BUS_ADDRESS");
651 r = sd_bus_open_address(e, &b);
655 e = getenv("XDG_RUNTIME_DIR");
660 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
667 b->sockaddr.un.sun_family = AF_UNIX;
668 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
669 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
671 r = bus_start_connect(b);
678 r = bus_send_hello(b);
688 int sd_bus_open_address(const char *address, sd_bus **ret) {
701 b->address = strdup(address);
707 r = bus_start_connect(b);
717 int sd_bus_open_fd(int fd, sd_bus **ret) {
731 fd_nonblock(b->fd, true);
732 fd_cloexec(b->fd, true);
734 r = bus_start_auth(b);
744 void sd_bus_close(sd_bus *bus) {
750 close_nointr_nofail(bus->fd);
754 sd_bus *sd_bus_ref(sd_bus *bus) {
758 assert(bus->n_ref > 0);
764 sd_bus *sd_bus_unref(sd_bus *bus) {
768 assert(bus->n_ref > 0);
777 int sd_bus_is_open(sd_bus *bus) {
784 int sd_bus_can_send(sd_bus *bus, char type) {
790 if (type == SD_BUS_TYPE_UNIX_FD) {
791 r = bus_ensure_running(bus);
798 return bus_type_is_valid(type);
801 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
809 r = bus_ensure_running(bus);
817 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
820 if (m->header->version > b->message_version)
826 return bus_message_seal(m, ++b->serial);
829 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
839 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
844 n = m->n_iovec * sizeof(struct iovec);
846 memcpy(iov, m->iovec, n);
849 iovec_advance(iov, &j, *idx);
853 mh.msg_iovlen = m->n_iovec;
855 k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
857 return errno == EAGAIN ? 0 : -errno;
863 static int message_read_need(sd_bus *bus, size_t *need) {
870 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
872 if (bus->rbuffer_size < sizeof(struct bus_header)) {
873 *need = sizeof(struct bus_header) + 8;
875 /* Minimum message size:
879 * Method Call: +2 string headers
880 * Signal: +3 string headers
881 * Method Error: +1 string headers
883 * Method Reply: +1 uint32 headers
885 * A string header is at least 9 bytes
886 * A uint32 header is at least 8 bytes
888 * Hence the minimum message size of a valid message
889 * is header + 8 bytes */
894 a = ((const uint32_t*) bus->rbuffer)[1];
895 b = ((const uint32_t*) bus->rbuffer)[3];
897 e = ((const uint8_t*) bus->rbuffer)[0];
898 if (e == SD_BUS_LITTLE_ENDIAN) {
901 } else if (e == SD_BUS_BIG_ENDIAN) {
907 sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
908 if (sum >= BUS_MESSAGE_SIZE_MAX)
911 *need = (size_t) sum;
915 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
922 assert(bus->rbuffer_size >= size);
923 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
925 if (bus->rbuffer_size > size) {
926 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
933 r = bus_message_from_malloc(bus->rbuffer, size, &t);
940 bus->rbuffer_size -= size;
946 static int message_read(sd_bus *bus, sd_bus_message **m) {
956 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
958 r = message_read_need(bus, &need);
962 if (bus->rbuffer_size >= need)
963 return message_make(bus, need, m);
965 b = realloc(bus->rbuffer, need);
972 iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
973 iov.iov_len = need - bus->rbuffer_size;
979 k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
981 return errno == EAGAIN ? 0 : -errno;
983 bus->rbuffer_size += k;
985 r = message_read_need(bus, &need);
989 if (bus->rbuffer_size >= need)
990 return message_make(bus, need, m);
995 static int dispatch_wqueue(sd_bus *bus) {
999 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1004 while (bus->wqueue_size > 0) {
1006 r = message_write(bus, bus->wqueue[0], &bus->windex);
1011 /* Didn't do anything this time */
1013 else if (bus->windex >= bus->wqueue[0]->size) {
1014 /* Fully written. Let's drop the entry from
1017 * This isn't particularly optimized, but
1018 * well, this is supposed to be our worst-case
1019 * buffer only, and the socket buffer is
1020 * supposed to be our primary buffer, and if
1021 * it got full, then all bets are off
1024 sd_bus_message_unref(bus->wqueue[0]);
1025 bus->wqueue_size --;
1026 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1036 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1037 sd_bus_message *z = NULL;
1042 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1047 if (bus->rqueue_size > 0) {
1048 /* Dispatch a queued message */
1050 *m = bus->rqueue[0];
1051 bus->rqueue_size --;
1052 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1056 /* Try to read a new message */
1058 r = message_read(bus, &z);
1073 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1083 /* If the serial number isn't kept, then we know that no reply
1085 if (!serial && !m->sealed)
1086 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1088 r = bus_seal_message(bus, m);
1092 /* If this is a reply and no reply was requested, then let's
1093 * suppress this, if we can */
1094 if (m->dont_send && !serial)
1097 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1100 r = message_write(bus, m, &idx);
1104 } else if (idx < m->size) {
1105 /* Wasn't fully written. So let's remember how
1106 * much was written. Note that the first entry
1107 * of the wqueue array is always allocated so
1108 * that we always can remember how much was
1110 bus->wqueue[0] = sd_bus_message_ref(m);
1111 bus->wqueue_size = 1;
1117 /* Just append it to the queue. */
1119 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1122 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1127 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1131 *serial = BUS_MESSAGE_SERIAL(m);
1136 static usec_t calc_elapse(uint64_t usec) {
1137 if (usec == (uint64_t) -1)
1141 usec = BUS_DEFAULT_TIMEOUT;
1143 return now(CLOCK_MONOTONIC) + usec;
1146 static int timeout_compare(const void *a, const void *b) {
1147 const struct reply_callback *x = a, *y = b;
1149 if (x->timeout != 0 && y->timeout == 0)
1152 if (x->timeout == 0 && y->timeout != 0)
1155 if (x->timeout < y->timeout)
1158 if (x->timeout > y->timeout)
1164 int sd_bus_send_with_reply(
1167 sd_message_handler_t callback,
1172 struct reply_callback *c;
1183 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1185 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1188 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1192 if (usec != (uint64_t) -1) {
1193 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1198 r = bus_seal_message(bus, m);
1202 c = new(struct reply_callback, 1);
1206 c->callback = callback;
1207 c->userdata = userdata;
1208 c->serial = BUS_MESSAGE_SERIAL(m);
1209 c->timeout = calc_elapse(usec);
1211 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1217 if (c->timeout != 0) {
1218 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1221 sd_bus_send_with_reply_cancel(bus, c->serial);
1226 r = sd_bus_send(bus, m, serial);
1228 sd_bus_send_with_reply_cancel(bus, c->serial);
1235 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1236 struct reply_callback *c;
1243 c = hashmap_remove(bus->reply_callbacks, &serial);
1247 if (c->timeout != 0)
1248 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1254 int bus_ensure_running(sd_bus *bus) {
1259 if (bus->state == BUS_RUNNING)
1263 r = sd_bus_process(bus, NULL);
1266 if (bus->state == BUS_RUNNING)
1271 r = sd_bus_wait(bus, (uint64_t) -1);
1277 int sd_bus_send_with_reply_and_block(
1281 sd_bus_error *error,
1282 sd_bus_message **reply) {
1295 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1297 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1299 if (bus_error_is_dirty(error))
1302 r = bus_ensure_running(bus);
1306 r = sd_bus_send(bus, m, &serial);
1310 timeout = calc_elapse(usec);
1314 sd_bus_message *incoming = NULL;
1319 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1322 /* Make sure there's room for queuing this
1323 * locally, before we read the message */
1325 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1333 r = message_read(bus, &incoming);
1338 if (incoming->reply_serial == serial) {
1339 /* Found a match! */
1341 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1346 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1349 r = sd_bus_error_copy(error, &incoming->error);
1351 sd_bus_message_unref(incoming);
1355 k = bus_error_to_errno(&incoming->error);
1356 sd_bus_message_unref(incoming);
1360 sd_bus_message_unref(incoming);
1364 /* There's already guaranteed to be room for
1365 * this, so need to resize things here */
1366 bus->rqueue[bus->rqueue_size ++] = incoming;
1369 /* Try to read more, right-away */
1378 n = now(CLOCK_MONOTONIC);
1384 left = (uint64_t) -1;
1386 r = bus_poll(bus, true, left);
1390 r = dispatch_wqueue(bus);
1396 int sd_bus_get_fd(sd_bus *bus) {
1406 int sd_bus_get_events(sd_bus *bus) {
1414 if (bus->state == BUS_OPENING)
1416 else if (bus->state == BUS_AUTHENTICATING) {
1418 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1423 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1424 if (bus->rqueue_size <= 0)
1426 if (bus->wqueue_size > 0)
1433 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1434 struct reply_callback *c;
1443 if (bus->state == BUS_AUTHENTICATING) {
1444 *timeout_usec = bus->auth_timeout;
1448 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1451 c = prioq_peek(bus->reply_callbacks_prioq);
1455 *timeout_usec = c->timeout;
1459 static int process_timeout(sd_bus *bus) {
1460 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1461 struct reply_callback *c;
1467 c = prioq_peek(bus->reply_callbacks_prioq);
1471 n = now(CLOCK_MONOTONIC);
1475 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1476 hashmap_remove(bus->reply_callbacks, &c->serial);
1478 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1481 return r < 0 ? r : 1;
1484 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1485 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1491 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1494 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1497 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1500 if (streq_ptr(m->member, "Ping"))
1501 r = sd_bus_message_new_method_return(bus, m, &reply);
1502 else if (streq_ptr(m->member, "GetMachineId")) {
1506 r = sd_id128_get_machine(&id);
1510 r = sd_bus_message_new_method_return(bus, m, &reply);
1514 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1516 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1518 sd_bus_error_set(&error,
1519 "org.freedesktop.DBus.Error.UnknownMethod",
1520 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1522 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1528 r = sd_bus_send(bus, reply, NULL);
1535 static int process_message(sd_bus *bus, sd_bus_message *m) {
1536 struct filter_callback *l;
1542 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1543 struct reply_callback *c;
1545 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1547 if (c->timeout != 0)
1548 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1550 r = c->callback(bus, 0, m, c->userdata);
1558 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1559 r = l->callback(bus, 0, m, l->userdata);
1564 return process_builtin(bus, m);
1567 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1570 /* Returns 0 when we didn't do anything. This should cause the
1571 * caller to invoke sd_bus_wait() before returning the next
1572 * time. Returns > 0 when we did something, which possibly
1573 * means *ret is filled in with an unprocessed message. */
1580 if (bus->state == BUS_OPENING) {
1591 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1593 socklen_t slen = sizeof(error);
1595 r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1597 bus->last_connect_error = errno;
1598 else if (error != 0)
1599 bus->last_connect_error = error;
1600 else if (p.revents & (POLLERR|POLLHUP))
1601 bus->last_connect_error = ECONNREFUSED;
1603 r = bus_start_auth(bus);
1607 /* Try next address */
1608 r = bus_start_connect(bus);
1615 } else if (bus->state == BUS_AUTHENTICATING) {
1617 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1620 r = bus_write_auth(bus);
1624 r = bus_read_auth(bus);
1627 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1628 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1631 r = process_timeout(bus);
1635 r = dispatch_wqueue(bus);
1640 r = dispatch_rqueue(bus, &m);
1649 r = process_message(bus, m);
1659 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1660 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1661 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1663 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1665 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1669 r = sd_bus_send(bus, reply, NULL);
1677 assert_not_reached("Unknown state");
1686 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1697 e = sd_bus_get_events(bus);
1704 r = sd_bus_get_timeout(bus, &until);
1711 n = now(CLOCK_MONOTONIC);
1712 m = until > n ? until - n : 0;
1715 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1722 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1726 return r > 0 ? 1 : 0;
1729 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1735 if (bus->rqueue_size > 0)
1738 return bus_poll(bus, false, timeout_usec);
1741 int sd_bus_flush(sd_bus *bus) {
1749 r = bus_ensure_running(bus);
1753 if (bus->wqueue_size <= 0)
1757 r = dispatch_wqueue(bus);
1761 if (bus->wqueue_size <= 0)
1764 r = bus_poll(bus, false, (uint64_t) -1);
1770 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1771 struct filter_callback *f;
1778 f = new(struct filter_callback, 1);
1781 f->callback = callback;
1782 f->userdata = userdata;
1784 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1788 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1789 struct filter_callback *f;
1796 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1797 if (f->callback == callback && f->userdata == userdata) {
1798 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);