1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
35 #include "bus-internal.h"
36 #include "bus-message.h"
39 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
41 static void bus_free(sd_bus *b) {
42 struct filter_callback *f;
43 struct object_callback *c;
49 close_nointr_nofail(b->fd);
56 close_many(b->fds, b->n_fds);
59 for (i = 0; i < b->rqueue_size; i++)
60 sd_bus_message_unref(b->rqueue[i]);
63 for (i = 0; i < b->wqueue_size; i++)
64 sd_bus_message_unref(b->wqueue[i]);
67 hashmap_free_free(b->reply_callbacks);
68 prioq_free(b->reply_callbacks_prioq);
70 while ((f = b->filter_callbacks)) {
71 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
75 while ((c = hashmap_steal_first(b->object_callbacks))) {
80 hashmap_free(b->object_callbacks);
85 static sd_bus* bus_new(void) {
94 r->message_version = 1;
96 /* We guarantee that wqueue always has space for at least one
98 r->wqueue = new(sd_bus_message*, 1);
107 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
118 r = sd_bus_message_read(reply, "s", &s);
122 if (!service_name_is_valid(s) || s[0] != ':')
125 bus->unique_name = strdup(s);
126 if (!bus->unique_name)
129 bus->state = BUS_RUNNING;
134 static int bus_send_hello(sd_bus *bus) {
135 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
140 r = sd_bus_message_new_method_call(
142 "org.freedesktop.DBus",
144 "org.freedesktop.DBus",
150 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
154 bus->sent_hello = true;
158 static int bus_start_running(sd_bus *bus) {
161 if (bus->sent_hello) {
162 bus->state = BUS_HELLO;
166 bus->state = BUS_RUNNING;
170 static int parse_address_key(const char **p, const char *key, char **value) {
181 if (strncmp(*p, key, l) != 0)
191 while (*a != ',' && *a != 0) {
209 c = (char) ((x << 4) | y);
216 t = realloc(r, n + 2);
241 static void skip_address_key(const char **p) {
245 *p += strcspn(*p, ",");
251 static int bus_parse_next_address(sd_bus *b) {
253 _cleanup_free_ char *guid = NULL;
260 if (b->address[b->address_index] == 0)
263 a = b->address + b->address_index;
266 b->sockaddr_size = 0;
267 b->peer = SD_ID128_NULL;
269 if (startswith(a, "unix:")) {
270 _cleanup_free_ char *path = NULL, *abstract = NULL;
274 r = parse_address_key(&p, "guid", &guid);
280 r = parse_address_key(&p, "path", &path);
286 r = parse_address_key(&p, "abstract", &abstract);
292 skip_address_key(&p);
295 if (!path && !abstract)
298 if (path && abstract)
305 if (l > sizeof(b->sockaddr.un.sun_path))
308 b->sockaddr.un.sun_family = AF_UNIX;
309 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
310 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
311 } else if (abstract) {
314 l = strlen(abstract);
315 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
318 b->sockaddr.un.sun_family = AF_UNIX;
319 b->sockaddr.un.sun_path[0] = 0;
320 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
321 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
324 } else if (startswith(a, "tcp:")) {
325 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
326 struct addrinfo hints, *result;
330 r = parse_address_key(&p, "guid", &guid);
336 r = parse_address_key(&p, "host", &host);
342 r = parse_address_key(&p, "port", &port);
348 r = parse_address_key(&p, "family", &family);
354 skip_address_key(&p);
361 hints.ai_socktype = SOCK_STREAM;
362 hints.ai_flags = AI_ADDRCONFIG;
365 if (streq(family, "ipv4"))
366 hints.ai_family = AF_INET;
367 else if (streq(family, "ipv6"))
368 hints.ai_family = AF_INET6;
373 r = getaddrinfo(host, port, &hints, &result);
377 return -EADDRNOTAVAIL;
379 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
380 b->sockaddr_size = result->ai_addrlen;
382 freeaddrinfo(result);
386 r = sd_id128_from_string(guid, &b->peer);
391 b->address_index = p - b->address;
395 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
398 struct iovec *i = iov + *idx;
400 if (i->iov_len > size) {
401 i->iov_base = (uint8_t*) i->iov_base + size;
415 static int bus_write_auth(sd_bus *b) {
420 assert(b->state == BUS_AUTHENTICATING);
422 if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
425 if (b->auth_timeout == 0)
426 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
429 mh.msg_iov = b->auth_iovec + b->auth_index;
430 mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
432 k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
434 return errno == EAGAIN ? 0 : -errno;
436 iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
441 static int bus_auth_verify(sd_bus *b) {
447 /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
450 e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
454 f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
458 if (e - (char*) b->rbuffer != 3 + 32)
461 if (memcmp(b->rbuffer, "OK ", 3))
464 for (i = 0; i < 32; i += 2) {
467 x = unhexchar(((char*) b->rbuffer)[3 + i]);
468 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
473 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
476 if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
477 !sd_id128_equal(b->peer, peer))
483 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
484 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
486 b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
487 memmove(b->rbuffer, f + 2, b->rbuffer_size);
489 r = bus_start_running(b);
496 static int bus_read_auth(sd_bus *b) {
506 r = bus_auth_verify(b);
510 n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
512 if (n > BUS_AUTH_SIZE_MAX)
513 n = BUS_AUTH_SIZE_MAX;
515 if (b->rbuffer_size >= n)
518 p = realloc(b->rbuffer, n);
525 iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
526 iov.iov_len = n - b->rbuffer_size;
532 k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
534 return errno == EAGAIN ? 0 : -errno;
538 b->rbuffer_size += k;
540 r = bus_auth_verify(b);
547 static int bus_setup_fd(sd_bus *b) {
552 /* Enable SO_PASSCRED + SO_PASSEC. We try this on any socket,
553 * just in case. This is actually irrelavant for */
555 setsockopt(b->fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
556 setsockopt(b->fd, SOL_SOCKET, SO_PASSSEC, &one, sizeof(one));
558 /* Increase the buffers to a MB */
559 fd_inc_rcvbuf(b->fd, 1024*1024);
560 fd_inc_sndbuf(b->fd, 1024*1024);
565 static int bus_start_auth(sd_bus *b) {
566 static const char auth_prefix[] = "\0AUTH EXTERNAL ";
567 static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
569 char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
574 b->state = BUS_AUTHENTICATING;
576 snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
580 b->auth_uid = hexmem(text, l);
584 b->auth_iovec[0].iov_base = (void*) auth_prefix;
585 b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
586 b->auth_iovec[1].iov_base = (void*) b->auth_uid;
587 b->auth_iovec[1].iov_len = l * 2;
588 b->auth_iovec[2].iov_base = (void*) auth_suffix;
589 b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
590 b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
592 return bus_write_auth(b);
595 static int bus_start_connect(sd_bus *b) {
602 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
603 r = bus_parse_next_address(b);
607 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
610 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
612 b->last_connect_error = errno;
618 b->last_connect_error = errno;
622 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
624 if (errno == EINPROGRESS)
627 b->last_connect_error = errno;
631 return bus_start_auth(b);
637 close_nointr_nofail(b->fd);
643 int sd_bus_open_system(sd_bus **ret) {
651 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
653 r = sd_bus_open_address(e, &b);
661 b->sockaddr.un.sun_family = AF_UNIX;
662 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
663 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
665 r = bus_start_connect(b);
672 r = bus_send_hello(b);
682 int sd_bus_open_user(sd_bus **ret) {
691 e = getenv("DBUS_SESSION_BUS_ADDRESS");
693 r = sd_bus_open_address(e, &b);
697 e = getenv("XDG_RUNTIME_DIR");
702 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
709 b->sockaddr.un.sun_family = AF_UNIX;
710 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
711 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
713 r = bus_start_connect(b);
720 r = bus_send_hello(b);
730 int sd_bus_open_address(const char *address, sd_bus **ret) {
743 b->address = strdup(address);
749 r = bus_start_connect(b);
759 int sd_bus_open_fd(int fd, sd_bus **ret) {
774 r = fd_nonblock(b->fd, true);
778 fd_cloexec(b->fd, true);
786 r = bus_start_auth(b);
798 void sd_bus_close(sd_bus *bus) {
804 close_nointr_nofail(bus->fd);
808 sd_bus *sd_bus_ref(sd_bus *bus) {
812 assert(bus->n_ref > 0);
818 sd_bus *sd_bus_unref(sd_bus *bus) {
822 assert(bus->n_ref > 0);
831 int sd_bus_is_open(sd_bus *bus) {
838 int sd_bus_can_send(sd_bus *bus, char type) {
844 if (type == SD_BUS_TYPE_UNIX_FD) {
845 r = bus_ensure_running(bus);
852 return bus_type_is_valid(type);
855 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
863 r = bus_ensure_running(bus);
871 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
874 if (m->header->version > b->message_version)
880 return bus_message_seal(m, ++b->serial);
883 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
893 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
900 struct cmsghdr *control;
901 control = alloca(CMSG_SPACE(sizeof(int) * m->n_fds));
903 mh.msg_control = control;
904 control->cmsg_level = SOL_SOCKET;
905 control->cmsg_type = SCM_RIGHTS;
906 mh.msg_controllen = control->cmsg_len = CMSG_LEN(sizeof(int) * m->n_fds);
907 memcpy(CMSG_DATA(control), m->fds, sizeof(int) * m->n_fds);
910 n = m->n_iovec * sizeof(struct iovec);
912 memcpy(iov, m->iovec, n);
915 iovec_advance(iov, &j, *idx);
918 mh.msg_iovlen = m->n_iovec;
920 k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
922 return errno == EAGAIN ? 0 : -errno;
928 static int message_read_need(sd_bus *bus, size_t *need) {
935 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
937 if (bus->rbuffer_size < sizeof(struct bus_header)) {
938 *need = sizeof(struct bus_header) + 8;
940 /* Minimum message size:
944 * Method Call: +2 string headers
945 * Signal: +3 string headers
946 * Method Error: +1 string headers
948 * Method Reply: +1 uint32 headers
950 * A string header is at least 9 bytes
951 * A uint32 header is at least 8 bytes
953 * Hence the minimum message size of a valid message
954 * is header + 8 bytes */
959 a = ((const uint32_t*) bus->rbuffer)[1];
960 b = ((const uint32_t*) bus->rbuffer)[3];
962 e = ((const uint8_t*) bus->rbuffer)[0];
963 if (e == SD_BUS_LITTLE_ENDIAN) {
966 } else if (e == SD_BUS_BIG_ENDIAN) {
972 sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
973 if (sum >= BUS_MESSAGE_SIZE_MAX)
976 *need = (size_t) sum;
980 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
987 assert(bus->rbuffer_size >= size);
988 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
990 if (bus->rbuffer_size > size) {
991 b = memdup((const uint8_t*) bus->rbuffer + size,
992 bus->rbuffer_size - size);
998 r = bus_message_from_malloc(bus->rbuffer, size,
999 bus->fds, bus->n_fds,
1000 bus->ucred_valid ? &bus->ucred : NULL,
1001 bus->label[0] ? bus->label : NULL,
1009 bus->rbuffer_size -= size;
1018 static int message_read(sd_bus *bus, sd_bus_message **m) {
1026 struct cmsghdr cmsghdr;
1027 uint8_t buf[CMSG_SPACE(sizeof(int) * BUS_FDS_MAX) +
1028 CMSG_SPACE(sizeof(struct ucred)) +
1029 CMSG_SPACE(NAME_MAX)]; /*selinux label */
1031 struct cmsghdr *cmsg;
1035 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1037 r = message_read_need(bus, &need);
1041 if (bus->rbuffer_size >= need)
1042 return message_make(bus, need, m);
1044 b = realloc(bus->rbuffer, need);
1051 iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
1052 iov.iov_len = need - bus->rbuffer_size;
1057 mh.msg_control = &control;
1058 mh.msg_controllen = sizeof(control);
1060 k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL|MSG_CMSG_CLOEXEC);
1062 return errno == EAGAIN ? 0 : -errno;
1066 bus->rbuffer_size += k;
1068 for (cmsg = CMSG_FIRSTHDR(&mh); cmsg; cmsg = CMSG_NXTHDR(&mh, cmsg)) {
1069 if (cmsg->cmsg_level == SOL_SOCKET &&
1070 cmsg->cmsg_type == SCM_RIGHTS) {
1073 n = (cmsg->cmsg_len - CMSG_LEN(0)) / sizeof(int);
1075 f = realloc(bus->fds, sizeof(int) + (bus->n_fds + n));
1077 close_many((int*) CMSG_DATA(cmsg), n);
1081 memcpy(f + bus->n_fds, CMSG_DATA(cmsg), n * sizeof(int));
1084 } else if (cmsg->cmsg_level == SOL_SOCKET &&
1085 cmsg->cmsg_type == SCM_CREDENTIALS &&
1086 cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred))) {
1088 memcpy(&bus->ucred, CMSG_DATA(cmsg), sizeof(struct ucred));
1089 bus->ucred_valid = true;
1091 } else if (cmsg->cmsg_level == SOL_SOCKET &&
1092 cmsg->cmsg_type == SCM_SECURITY) {
1095 l = cmsg->cmsg_len - CMSG_LEN(0);
1096 memcpy(&bus->label, CMSG_DATA(cmsg), l);
1101 r = message_read_need(bus, &need);
1105 if (bus->rbuffer_size >= need)
1106 return message_make(bus, need, m);
1111 static int dispatch_wqueue(sd_bus *bus) {
1115 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1120 while (bus->wqueue_size > 0) {
1122 r = message_write(bus, bus->wqueue[0], &bus->windex);
1127 /* Didn't do anything this time */
1129 else if (bus->windex >= bus->wqueue[0]->size) {
1130 /* Fully written. Let's drop the entry from
1133 * This isn't particularly optimized, but
1134 * well, this is supposed to be our worst-case
1135 * buffer only, and the socket buffer is
1136 * supposed to be our primary buffer, and if
1137 * it got full, then all bets are off
1140 sd_bus_message_unref(bus->wqueue[0]);
1141 bus->wqueue_size --;
1142 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1152 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1153 sd_bus_message *z = NULL;
1158 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1163 if (bus->rqueue_size > 0) {
1164 /* Dispatch a queued message */
1166 *m = bus->rqueue[0];
1167 bus->rqueue_size --;
1168 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1172 /* Try to read a new message */
1174 r = message_read(bus, &z);
1189 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1198 if (m->n_fds > 0 && !bus->can_fds)
1201 /* If the serial number isn't kept, then we know that no reply
1203 if (!serial && !m->sealed)
1204 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1206 r = bus_seal_message(bus, m);
1210 /* If this is a reply and no reply was requested, then let's
1211 * suppress this, if we can */
1212 if (m->dont_send && !serial)
1215 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1218 r = message_write(bus, m, &idx);
1222 } else if (idx < m->size) {
1223 /* Wasn't fully written. So let's remember how
1224 * much was written. Note that the first entry
1225 * of the wqueue array is always allocated so
1226 * that we always can remember how much was
1228 bus->wqueue[0] = sd_bus_message_ref(m);
1229 bus->wqueue_size = 1;
1235 /* Just append it to the queue. */
1237 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1240 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1245 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1249 *serial = BUS_MESSAGE_SERIAL(m);
1254 static usec_t calc_elapse(uint64_t usec) {
1255 if (usec == (uint64_t) -1)
1259 usec = BUS_DEFAULT_TIMEOUT;
1261 return now(CLOCK_MONOTONIC) + usec;
1264 static int timeout_compare(const void *a, const void *b) {
1265 const struct reply_callback *x = a, *y = b;
1267 if (x->timeout != 0 && y->timeout == 0)
1270 if (x->timeout == 0 && y->timeout != 0)
1273 if (x->timeout < y->timeout)
1276 if (x->timeout > y->timeout)
1282 int sd_bus_send_with_reply(
1285 sd_message_handler_t callback,
1290 struct reply_callback *c;
1301 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1303 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1306 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1310 if (usec != (uint64_t) -1) {
1311 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1316 r = bus_seal_message(bus, m);
1320 c = new(struct reply_callback, 1);
1324 c->callback = callback;
1325 c->userdata = userdata;
1326 c->serial = BUS_MESSAGE_SERIAL(m);
1327 c->timeout = calc_elapse(usec);
1329 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1335 if (c->timeout != 0) {
1336 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1339 sd_bus_send_with_reply_cancel(bus, c->serial);
1344 r = sd_bus_send(bus, m, serial);
1346 sd_bus_send_with_reply_cancel(bus, c->serial);
1353 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1354 struct reply_callback *c;
1361 c = hashmap_remove(bus->reply_callbacks, &serial);
1365 if (c->timeout != 0)
1366 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1372 int bus_ensure_running(sd_bus *bus) {
1377 if (bus->state == BUS_RUNNING)
1381 r = sd_bus_process(bus, NULL);
1384 if (bus->state == BUS_RUNNING)
1389 r = sd_bus_wait(bus, (uint64_t) -1);
1395 int sd_bus_send_with_reply_and_block(
1399 sd_bus_error *error,
1400 sd_bus_message **reply) {
1413 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1415 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1417 if (bus_error_is_dirty(error))
1420 r = bus_ensure_running(bus);
1424 r = sd_bus_send(bus, m, &serial);
1428 timeout = calc_elapse(usec);
1432 sd_bus_message *incoming = NULL;
1437 if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1440 /* Make sure there's room for queuing this
1441 * locally, before we read the message */
1443 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1451 r = message_read(bus, &incoming);
1456 if (incoming->reply_serial == serial) {
1457 /* Found a match! */
1459 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1464 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1467 r = sd_bus_error_copy(error, &incoming->error);
1469 sd_bus_message_unref(incoming);
1473 k = bus_error_to_errno(&incoming->error);
1474 sd_bus_message_unref(incoming);
1478 sd_bus_message_unref(incoming);
1482 /* There's already guaranteed to be room for
1483 * this, so need to resize things here */
1484 bus->rqueue[bus->rqueue_size ++] = incoming;
1487 /* Try to read more, right-away */
1496 n = now(CLOCK_MONOTONIC);
1502 left = (uint64_t) -1;
1504 r = bus_poll(bus, true, left);
1508 r = dispatch_wqueue(bus);
1514 int sd_bus_get_fd(sd_bus *bus) {
1524 int sd_bus_get_events(sd_bus *bus) {
1532 if (bus->state == BUS_OPENING)
1534 else if (bus->state == BUS_AUTHENTICATING) {
1536 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1541 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1542 if (bus->rqueue_size <= 0)
1544 if (bus->wqueue_size > 0)
1551 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1552 struct reply_callback *c;
1561 if (bus->state == BUS_AUTHENTICATING) {
1562 *timeout_usec = bus->auth_timeout;
1566 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1569 c = prioq_peek(bus->reply_callbacks_prioq);
1573 *timeout_usec = c->timeout;
1577 static int process_timeout(sd_bus *bus) {
1578 struct reply_callback *c;
1584 c = prioq_peek(bus->reply_callbacks_prioq);
1588 n = now(CLOCK_MONOTONIC);
1592 assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1593 hashmap_remove(bus->reply_callbacks, &c->serial);
1595 r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1598 return r < 0 ? r : 1;
1601 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1602 struct reply_callback *c;
1608 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1609 m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1612 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1616 if (c->timeout != 0)
1617 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1619 r = c->callback(bus, 0, m, c->userdata);
1625 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1626 struct filter_callback *l;
1629 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1630 r = l->callback(bus, 0, m, l->userdata);
1638 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1639 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1645 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1648 if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1651 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1654 if (streq_ptr(m->member, "Ping"))
1655 r = sd_bus_message_new_method_return(bus, m, &reply);
1656 else if (streq_ptr(m->member, "GetMachineId")) {
1660 r = sd_id128_get_machine(&id);
1664 r = sd_bus_message_new_method_return(bus, m, &reply);
1668 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1670 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1672 sd_bus_error_set(&error,
1673 "org.freedesktop.DBus.Error.UnknownMethod",
1674 "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1676 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1682 r = sd_bus_send(bus, reply, NULL);
1689 static int process_object(sd_bus *bus, sd_bus_message *m) {
1690 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1691 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1692 struct object_callback *c;
1700 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1703 if (hashmap_isempty(bus->object_callbacks))
1706 c = hashmap_get(bus->object_callbacks, m->path);
1708 r = c->callback(bus, 0, m, c->userdata);
1715 /* Look for fallback prefixes */
1716 p = strdupa(m->path);
1720 e = strrchr(p, '/');
1726 c = hashmap_get(bus->object_callbacks, p);
1727 if (c && c->is_fallback) {
1728 r = c->callback(bus, 0, m, c->userdata);
1739 sd_bus_error_set(&error,
1740 "org.freedesktop.DBus.Error.UnknownMethod",
1741 "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1743 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1747 r = sd_bus_send(bus, reply, NULL);
1754 static int process_message(sd_bus *bus, sd_bus_message *m) {
1760 r = process_reply(bus, m);
1764 r = process_filter(bus, m);
1768 r = process_builtin(bus, m);
1772 return process_object(bus, m);
1775 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1778 /* Returns 0 when we didn't do anything. This should cause the
1779 * caller to invoke sd_bus_wait() before returning the next
1780 * time. Returns > 0 when we did something, which possibly
1781 * means *ret is filled in with an unprocessed message. */
1788 if (bus->state == BUS_OPENING) {
1799 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1801 socklen_t slen = sizeof(error);
1803 r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1805 bus->last_connect_error = errno;
1806 else if (error != 0)
1807 bus->last_connect_error = error;
1808 else if (p.revents & (POLLERR|POLLHUP))
1809 bus->last_connect_error = ECONNREFUSED;
1811 r = bus_start_auth(bus);
1815 /* Try next address */
1816 r = bus_start_connect(bus);
1823 } else if (bus->state == BUS_AUTHENTICATING) {
1825 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1828 r = bus_write_auth(bus);
1832 r = bus_read_auth(bus);
1835 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1836 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1839 r = process_timeout(bus);
1843 r = dispatch_wqueue(bus);
1848 r = dispatch_rqueue(bus, &m);
1857 r = process_message(bus, m);
1867 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1868 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1869 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1871 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1873 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1877 r = sd_bus_send(bus, reply, NULL);
1885 assert_not_reached("Unknown state");
1894 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1905 e = sd_bus_get_events(bus);
1912 r = sd_bus_get_timeout(bus, &until);
1919 n = now(CLOCK_MONOTONIC);
1920 m = until > n ? until - n : 0;
1923 if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1930 r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1934 return r > 0 ? 1 : 0;
1937 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1943 if (bus->rqueue_size > 0)
1946 return bus_poll(bus, false, timeout_usec);
1949 int sd_bus_flush(sd_bus *bus) {
1957 r = bus_ensure_running(bus);
1961 if (bus->wqueue_size <= 0)
1965 r = dispatch_wqueue(bus);
1969 if (bus->wqueue_size <= 0)
1972 r = bus_poll(bus, false, (uint64_t) -1);
1978 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1979 struct filter_callback *f;
1986 f = new(struct filter_callback, 1);
1989 f->callback = callback;
1990 f->userdata = userdata;
1992 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1996 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1997 struct filter_callback *f;
2004 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2005 if (f->callback == callback && f->userdata == userdata) {
2006 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2015 static int bus_add_object(
2019 sd_message_handler_t callback,
2022 struct object_callback *c;
2032 r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2036 c = new(struct object_callback, 1);
2040 c->path = strdup(path);
2046 c->callback = callback;
2047 c->userdata = userdata;
2048 c->is_fallback = fallback;
2050 r = hashmap_put(bus->object_callbacks, c->path, c);
2060 static int bus_remove_object(
2064 sd_message_handler_t callback,
2067 struct object_callback *c;
2076 c = hashmap_get(bus->object_callbacks, path);
2080 if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2083 assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2091 int sd_bus_add_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2092 return bus_add_object(bus, false, path, callback, userdata);
2095 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2096 return bus_remove_object(bus, false, path, callback, userdata);
2099 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2100 return bus_add_object(bus, true, prefix, callback, userdata);
2103 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2104 return bus_remove_object(bus, true, prefix, callback, userdata);