1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
34 #include "bus-internal.h"
35 #include "bus-message.h"
38 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
40 static void bus_free(sd_bus *b) {
41 struct filter_callback *f;
47 close_nointr_nofail(b->fd);
54 for (i = 0; i < b->rqueue_size; i++)
55 sd_bus_message_unref(b->rqueue[i]);
58 for (i = 0; i < b->wqueue_size; i++)
59 sd_bus_message_unref(b->wqueue[i]);
62 hashmap_free_free(b->reply_callbacks);
63 prioq_free(b->reply_callbacks_prioq);
65 while ((f = b->filter_callbacks)) {
66 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
73 static sd_bus* bus_new(void) {
82 r->message_version = 1;
84 /* We guarantee that wqueue always has space for at least one
86 r->wqueue = new(sd_bus_message*, 1);
95 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
106 bus->state = BUS_RUNNING;
108 r = sd_bus_message_read(reply, "s", &s);
112 bus->unique_name = strdup(s);
113 if (!bus->unique_name)
119 static int bus_send_hello(sd_bus *bus) {
120 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
125 r = sd_bus_message_new_method_call(
127 "org.freedesktop.DBus",
129 "org.freedesktop.DBus",
135 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
139 bus->sent_hello = true;
143 static int bus_start_running(sd_bus *bus) {
146 if (bus->sent_hello) {
147 bus->state = BUS_HELLO;
151 bus->state = BUS_RUNNING;
155 static int parse_address_key(const char **p, const char *key, char **value) {
166 if (strncmp(*p, key, l) != 0)
176 while (*a != ',' && *a != 0) {
194 c = (char) ((x << 4) | y);
201 t = realloc(r, n + 2);
226 static void skip_address_key(const char **p) {
230 *p += strcspn(*p, ",");
236 static int bus_parse_next_address(sd_bus *b) {
238 _cleanup_free_ char *guid = NULL;
245 if (b->address[b->address_index] == 0)
248 a = b->address + b->address_index;
251 b->sockaddr_size = 0;
252 b->peer = SD_ID128_NULL;
254 if (startswith(a, "unix:")) {
255 _cleanup_free_ char *path = NULL, *abstract = NULL;
259 r = parse_address_key(&p, "guid", &guid);
265 r = parse_address_key(&p, "path", &path);
271 r = parse_address_key(&p, "abstract", &abstract);
277 skip_address_key(&p);
280 if (!path && !abstract)
283 if (path && abstract)
290 if (l > sizeof(b->sockaddr.un.sun_path))
293 b->sockaddr.un.sun_family = AF_UNIX;
294 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
295 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
296 } else if (abstract) {
299 l = strlen(abstract);
300 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
303 b->sockaddr.un.sun_family = AF_UNIX;
304 b->sockaddr.un.sun_path[0] = 0;
305 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
306 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
309 } else if (startswith(a, "tcp:")) {
310 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
311 struct addrinfo hints, *result;
315 r = parse_address_key(&p, "guid", &guid);
321 r = parse_address_key(&p, "host", &host);
327 r = parse_address_key(&p, "port", &port);
333 r = parse_address_key(&p, "family", &family);
339 skip_address_key(&p);
346 hints.ai_socktype = SOCK_STREAM;
347 hints.ai_flags = AI_ADDRCONFIG;
350 if (streq(family, "ipv4"))
351 hints.ai_family = AF_INET;
352 else if (streq(family, "ipv6"))
353 hints.ai_family = AF_INET6;
358 r = getaddrinfo(host, port, &hints, &result);
362 return -EADDRNOTAVAIL;
364 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
365 b->sockaddr_size = result->ai_addrlen;
367 freeaddrinfo(result);
371 r = sd_id128_from_string(guid, &b->peer);
376 b->address_index = p - b->address;
380 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
383 struct iovec *i = iov + *idx;
385 if (i->iov_len > size) {
386 i->iov_base = (uint8_t*) i->iov_base + size;
400 static int bus_write_auth(sd_bus *b) {
405 assert(b->state == BUS_AUTHENTICATING);
407 if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
410 if (b->auth_timeout == 0)
411 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
414 mh.msg_iov = b->auth_iovec + b->auth_index;
415 mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
417 k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
419 return errno == EAGAIN ? 0 : -errno;
421 iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
426 static int bus_auth_verify(sd_bus *b) {
432 /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
435 e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
439 f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
443 if (e - (char*) b->rbuffer != 3 + 32)
446 if (memcmp(b->rbuffer, "OK ", 3))
449 for (i = 0; i < 32; i += 2) {
452 x = unhexchar(((char*) b->rbuffer)[3 + i]);
453 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
458 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
461 if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
462 !sd_id128_equal(b->peer, peer))
468 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
469 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
471 b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
472 memmove(b->rbuffer, f + 2, b->rbuffer_size);
474 r = bus_start_running(b);
481 static int bus_read_auth(sd_bus *b) {
491 r = bus_auth_verify(b);
495 n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
497 if (n > BUS_AUTH_SIZE_MAX)
498 n = BUS_AUTH_SIZE_MAX;
500 if (b->rbuffer_size >= n)
503 p = realloc(b->rbuffer, n);
510 iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
511 iov.iov_len = n - b->rbuffer_size;
517 k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
519 return errno == EAGAIN ? 0 : -errno;
521 b->rbuffer_size += k;
523 r = bus_auth_verify(b);
530 static int bus_start_auth(sd_bus *b) {
531 static const char auth_prefix[] = "\0AUTH EXTERNAL ";
532 static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
534 char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
539 b->state = BUS_AUTHENTICATING;
541 snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
545 b->auth_uid = hexmem(text, l);
549 b->auth_iovec[0].iov_base = (void*) auth_prefix;
550 b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
551 b->auth_iovec[1].iov_base = (void*) b->auth_uid;
552 b->auth_iovec[1].iov_len = l * 2;
553 b->auth_iovec[2].iov_base = (void*) auth_suffix;
554 b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
555 b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
557 return bus_write_auth(b);
560 static int bus_start_connect(sd_bus *b) {
567 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
568 r = bus_parse_next_address(b);
572 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
575 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
577 b->last_connect_error = errno;
582 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
584 if (errno == EINPROGRESS)
587 b->last_connect_error = errno;
588 close_nointr_nofail(b->fd);
594 return bus_start_auth(b);
598 int sd_bus_open_system(sd_bus **ret) {
606 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
608 r = sd_bus_open_address(e, &b);
616 b->sockaddr.un.sun_family = AF_UNIX;
617 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
618 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
620 r = bus_start_connect(b);
627 r = bus_send_hello(b);
637 int sd_bus_open_user(sd_bus **ret) {
646 e = getenv("DBUS_SESSION_BUS_ADDRESS");
648 r = sd_bus_open_address(e, &b);
652 e = getenv("XDG_RUNTIME_DIR");
657 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
664 b->sockaddr.un.sun_family = AF_UNIX;
665 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
666 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
668 r = bus_start_connect(b);
675 r = bus_send_hello(b);
685 int sd_bus_open_address(const char *address, sd_bus **ret) {
698 b->address = strdup(address);
704 r = bus_start_connect(b);
714 int sd_bus_open_fd(int fd, sd_bus **ret) {
728 fd_nonblock(b->fd, true);
729 fd_cloexec(b->fd, true);
731 r = bus_start_auth(b);
741 void sd_bus_close(sd_bus *bus) {
747 close_nointr_nofail(bus->fd);
751 sd_bus *sd_bus_ref(sd_bus *bus) {
755 assert(bus->n_ref > 0);
761 sd_bus *sd_bus_unref(sd_bus *bus) {
765 assert(bus->n_ref > 0);
774 int sd_bus_is_open(sd_bus *bus) {
781 int sd_bus_is_running(sd_bus *bus) {
788 return bus->state == BUS_RUNNING;
791 int sd_bus_can_send(sd_bus *bus, char type) {
795 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
798 if (type == SD_BUS_TYPE_UNIX_FD)
801 return bus_type_is_valid(type);
804 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
807 if (m->header->version > b->message_version)
813 return bus_message_seal(m, ++b->serial);
816 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
826 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
831 n = m->n_iovec * sizeof(struct iovec);
833 memcpy(iov, m->iovec, n);
836 iovec_advance(iov, &j, *idx);
840 mh.msg_iovlen = m->n_iovec;
842 k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
844 return errno == EAGAIN ? 0 : -errno;
850 static int message_read_need(sd_bus *bus, size_t *need) {
857 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
859 if (bus->rbuffer_size < sizeof(struct bus_header)) {
860 *need = sizeof(struct bus_header) + 8;
862 /* Minimum message size:
866 * Method Call: +2 string headers
867 * Signal: +3 string headers
868 * Method Error: +1 string headers
870 * Method Reply: +1 uint32 headers
872 * A string header is at least 9 bytes
873 * A uint32 header is at least 8 bytes
875 * Hence the minimum message size of a valid message
876 * is header + 8 bytes */
881 a = ((const uint32_t*) bus->rbuffer)[1];
882 b = ((const uint32_t*) bus->rbuffer)[3];
884 e = ((const uint8_t*) bus->rbuffer)[0];
885 if (e == SD_BUS_LITTLE_ENDIAN) {
888 } else if (e == SD_BUS_BIG_ENDIAN) {
894 sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
895 if (sum >= BUS_MESSAGE_SIZE_MAX)
898 *need = (size_t) sum;
902 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
909 assert(bus->rbuffer_size >= size);
910 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
912 if (bus->rbuffer_size > size) {
913 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
920 r = bus_message_from_malloc(bus->rbuffer, size, &t);
927 bus->rbuffer_size -= size;
933 static int message_read(sd_bus *bus, sd_bus_message **m) {
943 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
945 r = message_read_need(bus, &need);
949 if (bus->rbuffer_size >= need)
950 return message_make(bus, need, m);
952 b = realloc(bus->rbuffer, need);
959 iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
960 iov.iov_len = need - bus->rbuffer_size;
966 k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
968 return errno == EAGAIN ? 0 : -errno;
970 bus->rbuffer_size += k;
972 r = message_read_need(bus, &need);
976 if (bus->rbuffer_size >= need)
977 return message_make(bus, need, m);
982 static int dispatch_wqueue(sd_bus *bus) {
986 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
991 while (bus->wqueue_size > 0) {
993 r = message_write(bus, bus->wqueue[0], &bus->windex);
998 /* Didn't do anything this time */
1000 else if (bus->windex >= bus->wqueue[0]->size) {
1001 /* Fully written. Let's drop the entry from
1004 * This isn't particularly optimized, but
1005 * well, this is supposed to be our worst-case
1006 * buffer only, and the socket buffer is
1007 * supposed to be our primary buffer, and if
1008 * it got full, then all bets are off
1011 sd_bus_message_unref(bus->wqueue[0]);
1012 bus->wqueue_size --;
1013 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1023 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1029 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1034 if (bus->rqueue_size > 0) {
1035 /* Dispatch a queued message */
1037 *m = bus->rqueue[0];
1038 bus->rqueue_size --;
1039 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1043 /* Try to read a new message */
1045 r = message_read(bus, &z);
1060 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1070 /* If the serial number isn't kept, then we know that no reply
1072 if (!serial && !m->sealed)
1073 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1075 r = bus_seal_message(bus, m);
1079 /* If this is a reply and no reply was requested, then let's
1080 * suppress this, if we can */
1081 if (m->dont_send && !serial)
1084 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1087 r = message_write(bus, m, &idx);
1091 } else if (idx < m->size) {
1092 /* Wasn't fully written. So let's remember how
1093 * much was written. Note that the first entry
1094 * of the wqueue array is always allocated so
1095 * that we always can remember how much was
1097 bus->wqueue[0] = sd_bus_message_ref(m);
1098 bus->wqueue_size = 1;
1104 /* Just append it to the queue. */
1106 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1109 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1114 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1118 *serial = BUS_MESSAGE_SERIAL(m);
1123 static usec_t calc_elapse(uint64_t usec) {
1124 if (usec == (uint64_t) -1)
1128 usec = BUS_DEFAULT_TIMEOUT;
1130 return now(CLOCK_MONOTONIC) + usec;
1133 static int timeout_compare(const void *a, const void *b) {
1134 const struct reply_callback *x = a, *y = b;
1136 if (x->timeout != 0 && y->timeout == 0)
1139 if (x->timeout == 0 && y->timeout != 0)
1142 if (x->timeout < y->timeout)
1145 if (x->timeout > y->timeout)
1151 int sd_bus_send_with_reply(
1154 sd_message_handler_t callback,
1159 struct reply_callback *c;
1170 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1172 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1175 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1179 if (usec != (uint64_t) -1) {
1180 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1185 r = bus_seal_message(bus, m);
1189 c = new(struct reply_callback, 1);
1193 c->callback = callback;
1194 c->userdata = userdata;
1195 c->serial = BUS_MESSAGE_SERIAL(m);
1196 c->timeout = calc_elapse(usec);
1198 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1204 if (c->timeout != 0) {
1205 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1208 sd_bus_send_with_reply_cancel(bus, c->serial);
1213 r = sd_bus_send(bus, m, serial);
1215 sd_bus_send_with_reply_cancel(bus, c->serial);
1222 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1223 struct reply_callback *c;
1230 c = hashmap_remove(bus->reply_callbacks, &serial);
1234 if (c->timeout != 0)
1235 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1241 static int ensure_running(sd_bus *bus) {
1246 r = sd_bus_is_running(bus);
1253 r = sd_bus_process(bus, NULL);
1258 k = sd_bus_is_running(bus);
1265 r = sd_bus_wait(bus, (uint64_t) -1);
1271 int sd_bus_send_with_reply_and_block(
1275 sd_bus_error *error,
1276 sd_bus_message **reply) {
1289 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1291 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1293 if (bus_error_is_dirty(error))
1296 r = ensure_running(bus);
1300 r = sd_bus_send(bus, m, &serial);
1304 timeout = calc_elapse(usec);
1308 sd_bus_message *incoming = NULL;
1313 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1316 /* Make sure there's room for queuing this
1317 * locally, before we read the message */
1319 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1327 r = message_read(bus, &incoming);
1332 if (incoming->reply_serial == serial) {
1333 /* Found a match! */
1335 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1340 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1343 r = sd_bus_error_copy(error, &incoming->error);
1345 sd_bus_message_unref(incoming);
1349 k = bus_error_to_errno(&incoming->error);
1350 sd_bus_message_unref(incoming);
1354 sd_bus_message_unref(incoming);
1358 /* There's already guaranteed to be room for
1359 * this, so need to resize things here */
1360 bus->rqueue[bus->rqueue_size ++] = incoming;
1363 /* Try to read more, right-away */
1372 n = now(CLOCK_MONOTONIC);
1378 left = (uint64_t) -1;
1380 r = bus_poll(bus, true, left);
1384 r = dispatch_wqueue(bus);
1390 int sd_bus_get_fd(sd_bus *bus) {
1400 int sd_bus_get_events(sd_bus *bus) {
1408 if (bus->state == BUS_OPENING)
1410 else if (bus->state == BUS_AUTHENTICATING) {
1412 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1417 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1418 if (bus->rqueue_size <= 0)
1420 if (bus->wqueue_size > 0)
1427 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1428 struct reply_callback *c;
1437 if (bus->state == BUS_AUTHENTICATING) {
1438 *timeout_usec = bus->auth_timeout;
1442 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1445 c = prioq_peek(bus->reply_callbacks_prioq);
1449 *timeout_usec = c->timeout;
1453 static int process_timeout(sd_bus *bus) {
1454 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1455 struct reply_callback *c;
1461 c = prioq_peek(bus->reply_callbacks_prioq);
1465 n = now(CLOCK_MONOTONIC);
1469 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1470 hashmap_remove(bus->reply_callbacks, &c->serial);
1472 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1475 return r < 0 ? r : 1;
1478 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1479 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1485 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1488 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1491 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1494 if (streq_ptr(m->member, "Ping"))
1495 r = sd_bus_message_new_method_return(bus, m, &reply);
1496 else if (streq_ptr(m->member, "GetMachineId")) {
1500 r = sd_id128_get_machine(&id);
1504 r = sd_bus_message_new_method_return(bus, m, &reply);
1508 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1510 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1512 sd_bus_error_set(&error,
1513 "org.freedesktop.DBus.Error.UnknownMethod",
1514 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1516 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1522 r = sd_bus_send(bus, reply, NULL);
1529 static int process_message(sd_bus *bus, sd_bus_message *m) {
1530 struct filter_callback *l;
1536 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1537 struct reply_callback *c;
1539 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1541 if (c->timeout != 0)
1542 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1544 r = c->callback(bus, 0, m, c->userdata);
1552 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1553 r = l->callback(bus, 0, m, l->userdata);
1558 return process_builtin(bus, m);
1561 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1564 /* Returns 0 when we didn't do anything. This should cause the
1565 * caller to invoke sd_bus_wait() before returning the next
1566 * time. Returns > 0 when we did something, which possibly
1567 * means *ret is filled in with an unprocessed message. */
1574 if (bus->state == BUS_OPENING) {
1585 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1587 socklen_t slen = sizeof(error);
1589 r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1591 bus->last_connect_error = errno;
1592 else if (error != 0)
1593 bus->last_connect_error = error;
1594 else if (p.revents & (POLLERR|POLLHUP))
1595 bus->last_connect_error = ECONNREFUSED;
1597 r = bus_start_auth(bus);
1601 /* Try next address */
1602 r = bus_start_connect(bus);
1609 } else if (bus->state == BUS_AUTHENTICATING) {
1611 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1614 r = bus_write_auth(bus);
1618 r = bus_read_auth(bus);
1621 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1622 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1625 r = process_timeout(bus);
1629 r = dispatch_wqueue(bus);
1634 r = dispatch_rqueue(bus, &m);
1643 r = process_message(bus, m);
1653 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1654 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1655 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1657 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1659 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1663 r = sd_bus_send(bus, reply, NULL);
1671 assert_not_reached("Unknown state");
1680 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1691 e = sd_bus_get_events(bus);
1698 r = sd_bus_get_timeout(bus, &until);
1705 n = now(CLOCK_MONOTONIC);
1706 m = until > n ? until - n : 0;
1709 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1716 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1720 return r > 0 ? 1 : 0;
1723 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1729 if (bus->rqueue_size > 0)
1732 return bus_poll(bus, false, timeout_usec);
1735 int sd_bus_flush(sd_bus *bus) {
1743 r = ensure_running(bus);
1747 if (bus->wqueue_size <= 0)
1751 r = dispatch_wqueue(bus);
1755 if (bus->wqueue_size <= 0)
1758 r = bus_poll(bus, false, (uint64_t) -1);
1764 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1765 struct filter_callback *f;
1772 f = new(struct filter_callback, 1);
1775 f->callback = callback;
1776 f->userdata = userdata;
1778 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1782 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1783 struct filter_callback *f;
1790 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1791 if (f->callback == callback && f->userdata == userdata) {
1792 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);