1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
34 #include "bus-internal.h"
35 #include "bus-message.h"
38 #define WQUEUE_MAX 128
40 static void bus_free(sd_bus *b) {
41 struct filter_callback *f;
46 close_nointr_nofail(b->fd);
53 hashmap_free_free(b->reply_callbacks);
55 while ((f = b->filter_callbacks)) {
56 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
63 static sd_bus* bus_new(void) {
72 r->message_version = 1;
74 /* We guarantee that wqueue always has space for at least one
76 r->wqueue = new(sd_bus_message*, 1);
85 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
92 bus->state = BUS_RUNNING;
94 r = sd_bus_message_read(reply, "s", &s);
98 bus->unique_name = strdup(s);
99 if (!bus->unique_name)
105 static int bus_send_hello(sd_bus *bus) {
106 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
111 r = sd_bus_message_new_method_call(
113 "org.freedesktop.DBus",
115 "org.freedesktop.DBus",
121 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, (uint64_t) -1, NULL);
128 static int bus_start_running(sd_bus *bus) {
133 if (bus->send_hello) {
134 bus->state = BUS_HELLO;
136 r = bus_send_hello(bus);
141 bus->state = BUS_RUNNING;
146 static int parse_address_key(const char **p, const char *key, char **value) {
157 if (!strncmp(*p, key, l) != 0)
167 while (*a != ';' && *a != 0) {
186 c = (char) ((x << 4) | y);
190 t = realloc(r, n + 1);
205 static void skip_address_key(const char **p) {
209 *p += strcspn(*p, ";");
212 static int bus_parse_next_address(sd_bus *b) {
214 _cleanup_free_ char *guid = NULL;
221 if (b->address[b->address_index] == 0)
224 a = b->address + b->address_index;
227 b->sockaddr_size = 0;
228 b->peer = SD_ID128_NULL;
230 if (startswith(a, "unix:")) {
231 _cleanup_free_ char *path = NULL, *abstract = NULL;
234 while (*p != 0 && *p != ';') {
235 r = parse_address_key(&p, "guid", &guid);
241 r = parse_address_key(&p, "path", &path);
247 r = parse_address_key(&p, "abstract", &abstract);
253 skip_address_key(&p);
256 if (!path && !abstract)
259 if (path && abstract)
266 if (l > sizeof(b->sockaddr.un.sun_path))
269 b->sockaddr.un.sun_family = AF_UNIX;
270 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
271 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
272 } else if (abstract) {
276 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
279 b->sockaddr.un.sun_family = AF_UNIX;
280 b->sockaddr.un.sun_path[0] = 0;
281 strncpy(b->sockaddr.un.sun_path+1, path, sizeof(b->sockaddr.un.sun_path)-1);
282 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
285 } else if (startswith(a, "tcp:")) {
286 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
287 struct addrinfo hints, *result;
290 while (*p != 0 && *p != ';') {
291 r = parse_address_key(&p, "guid", &guid);
297 r = parse_address_key(&p, "host", &host);
303 r = parse_address_key(&p, "port", &port);
309 r = parse_address_key(&p, "family", &family);
315 skip_address_key(&p);
322 hints.ai_socktype = SOCK_STREAM;
323 hints.ai_flags = AI_ADDRCONFIG;
326 if (streq(family, "ipv4"))
327 hints.ai_family = AF_INET;
328 else if (streq(family, "ipv6"))
329 hints.ai_family = AF_INET6;
334 r = getaddrinfo(host, port, &hints, &result);
338 return -EADDRNOTAVAIL;
340 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
341 b->sockaddr_size = result->ai_addrlen;
343 freeaddrinfo(result);
347 r = sd_id128_from_string(guid, &b->peer);
352 b->address_index = p - b->address;
356 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
359 struct iovec *i = iov + *idx;
361 if (i->iov_len > size) {
362 i->iov_base = (uint8_t*) i->iov_base + size;
376 static int bus_write_auth(sd_bus *b) {
381 assert(b->state == BUS_AUTHENTICATING);
383 if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
387 mh.msg_iov = b->auth_iovec + b->auth_index;
388 mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
390 k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
392 return errno == EAGAIN ? 0 : -errno;
394 iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
399 static int bus_auth_verify(sd_bus *b) {
405 /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
408 e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
412 f = memmem(e, b->rbuffer_size - (e - (char*) b->rbuffer), "\r\n", 2);
416 if (e - (char*) b->rbuffer != 3 + 32)
419 if (memcmp(b->rbuffer, "OK ", 3))
422 for (i = 0; i < 32; i += 2) {
425 x = unhexchar(((char*) b->rbuffer)[3 + i]);
426 y = unhexchar(((char*) b->rbuffer)[3 + i + 2]);
431 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
434 if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
435 !sd_id128_equal(b->peer, peer))
441 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
442 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
444 if (f + 2 > (char*) b->rbuffer + b->rbuffer_size) {
445 b->rbuffer_size -= (f - (char*) b->rbuffer);
446 memmove(b->rbuffer, f + 2, b->rbuffer_size);
449 r = bus_start_running(b);
456 static int bus_read_auth(sd_bus *b) {
465 r = bus_auth_verify(b);
469 n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
472 iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
473 iov.iov_len = n - b->rbuffer_size;
479 k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
481 return errno == EAGAIN ? 0 : -errno;
483 b->rbuffer_size += k;
485 r = bus_auth_verify(b);
492 static int bus_start_auth(sd_bus *b) {
493 static const char auth_prefix[] = "\0AUTH_EXTERNAL ";
494 static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
496 char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
501 b->state = BUS_AUTHENTICATING;
503 snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
507 b->auth_uid = hexmem(text, l);
511 b->auth_iovec[0].iov_base = (void*) auth_prefix;
512 b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
513 b->auth_iovec[1].iov_base = (void*) b->auth_uid;
514 b->auth_iovec[1].iov_len = l * 2;
515 b->auth_iovec[2].iov_base = (void*) auth_suffix;
516 b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
517 b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
519 return bus_write_auth(b);
522 static int bus_start_connect(sd_bus *b) {
529 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
530 r = bus_parse_next_address(b);
534 return b->last_connect_error ? b->last_connect_error : -ECONNREFUSED;
537 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
539 b->last_connect_error = -errno;
544 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
546 if (errno == EINPROGRESS)
549 b->last_connect_error = -errno;
550 close_nointr_nofail(b->fd);
556 return bus_start_auth(b);
560 int sd_bus_open_system(sd_bus **ret) {
568 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
570 r = sd_bus_open_address(e, &b);
574 b->send_hello = true;
583 b->send_hello = true;
585 b->sockaddr.un.sun_family = AF_UNIX;
586 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
587 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
589 r = bus_start_connect(b);
599 int sd_bus_open_user(sd_bus **ret) {
608 e = getenv("DBUS_SESSION_BUS_ADDRESS");
610 r = sd_bus_open_address(e, &b);
614 b->send_hello = true;
619 e = getenv("XDG_RUNTIME_DIR");
624 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
631 b->send_hello = true;
633 b->sockaddr.un.sun_family = AF_UNIX;
634 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
635 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
637 r = bus_start_connect(b);
647 int sd_bus_open_address(const char *address, sd_bus **ret) {
660 b->address = strdup(address);
666 r = bus_start_connect(b);
676 int sd_bus_open_fd(int fd, sd_bus **ret) {
690 fd_nonblock(b->fd, true);
691 fd_cloexec(b->fd, true);
693 r = bus_start_auth(b);
703 void sd_bus_close(sd_bus *bus) {
709 close_nointr_nofail(bus->fd);
713 sd_bus *sd_bus_ref(sd_bus *bus) {
717 assert(bus->n_ref > 0);
723 sd_bus *sd_bus_unref(sd_bus *bus) {
727 assert(bus->n_ref > 0);
736 int sd_bus_is_running(sd_bus *bus) {
743 return bus->state == BUS_RUNNING;
746 int sd_bus_can_send(sd_bus *bus, char type) {
751 if (type == SD_BUS_TYPE_UNIX_FD)
754 return bus_type_is_valid(type);
757 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
763 return bus_message_seal(m, ++b->serial);
766 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
777 n = m->n_iovec * sizeof(struct iovec);
779 memcpy(iov, m->iovec, n);
782 iovec_advance(iov, &j, *idx);
786 mh.msg_iovlen = m->n_iovec;
788 k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
793 iovec_advance(iov, &j, *idx);
795 return j > m->n_iovec;
798 static int message_read_need(sd_bus *bus, size_t *need) {
805 if (bus->rbuffer_size <= sizeof(struct bus_header)) {
806 *need = sizeof(struct bus_header);
810 a = ((const uint32_t*) bus->rbuffer)[1];
811 b = ((const uint32_t*) bus->rbuffer)[3];
813 e = ((const uint8_t*) bus->rbuffer)[0];
814 if (e == SD_BUS_LITTLE_ENDIAN) {
817 } else if (e == SD_BUS_BIG_ENDIAN) {
823 *need = sizeof(struct bus_header) + ALIGN_TO(a, 8) + b;
827 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
834 assert(bus->rbuffer_size >= size);
836 t = new0(sd_bus_message, 1);
840 if (bus->rbuffer_size > size) {
841 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
850 t->header = bus->rbuffer;
851 t->free_header = true;
853 t->fields = (uint8_t*) bus->rbuffer + sizeof(struct bus_header);
854 t->body = (uint8_t*) bus->rbuffer + sizeof(struct bus_header) + ALIGN_TO(BUS_MESSAGE_BODY_SIZE(t), 8);
857 bus->rbuffer_size -= size;
859 r = bus_message_parse(t);
861 sd_bus_message_unref(t);
869 static int message_read(sd_bus *bus, sd_bus_message **m) {
880 r = message_read_need(bus, &need);
884 if (bus->rbuffer_size >= need)
885 return message_make(bus, need, m);
887 b = realloc(bus->rbuffer, need);
892 iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
893 iov.iov_len = need - bus->rbuffer_size;
899 k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
901 return errno == EAGAIN ? 0 : -errno;
903 bus->rbuffer_size += k;
905 r = message_read_need(bus, &need);
909 if (bus->rbuffer_size >= need)
910 return message_make(bus, need, m);
915 static int dispatch_wqueue(sd_bus *bus) {
923 while (bus->wqueue_size > 0) {
925 r = message_write(bus, bus->wqueue[0], &bus->windex);
930 /* Wasn't fully written yet... */
933 /* Fully written. Let's drop the entry from
936 * This isn't particularly optimized, but
937 * well, this is supposed to be our worst-case
938 * buffer only, and the socket buffer is
939 * supposed to be our primary buffer, and if
940 * it got full, then all bets are off
943 sd_bus_message_unref(bus->wqueue[0]);
945 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
955 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
964 if (bus->rqueue_size > 0) {
965 /* Dispatch a queued message */
969 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
973 /* Try to read a new message */
974 r = message_read(bus, m);
983 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
992 if (m->header->version > bus->message_version)
995 r = bus_seal_message(bus, m);
999 /* If this is a reply and no reply was requested, then let's
1000 * suppress this, if we can */
1001 if (m->dont_send && !serial)
1004 if (bus->wqueue_size <= 0) {
1007 r = message_write(bus, m, &idx);
1011 } else if (r == 0) {
1012 /* Wasn't fully written. So let's remember how
1013 * much was written. Note that the first entry
1014 * of the wqueue array is always allocated so
1015 * that we always can remember how much was
1017 bus->wqueue[0] = sd_bus_message_ref(m);
1018 bus->wqueue_size = 1;
1024 /* Just append it to the queue. */
1026 if (bus->wqueue_size >= WQUEUE_MAX)
1029 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1034 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1038 *serial = BUS_MESSAGE_SERIAL(m);
1043 static usec_t calc_elapse(uint64_t usec) {
1044 if (usec == (uint64_t) -1)
1048 usec = SD_BUS_DEFAULT_TIMEOUT;
1050 return now(CLOCK_MONOTONIC) + usec;
1053 int sd_bus_send_with_reply(
1056 sd_message_handler_t callback,
1061 struct reply_callback *c;
1072 if (!m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1075 r = bus_seal_message(bus, m);
1079 c = new(struct reply_callback, 1);
1083 c->callback = callback;
1084 c->userdata = userdata;
1085 c->serial = BUS_MESSAGE_SERIAL(m);
1086 c->timeout = calc_elapse(usec);
1088 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1094 r = sd_bus_send(bus, m, serial);
1096 hashmap_remove(bus->reply_callbacks, &c->serial);
1104 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1105 struct reply_callbacks *c;
1112 c = hashmap_remove(bus->reply_callbacks, &serial);
1120 int sd_bus_send_with_reply_and_block(
1124 sd_bus_error *error,
1125 sd_bus_message **reply) {
1138 if (!m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1140 if (sd_bus_error_is_set(error))
1143 r = sd_bus_send(bus, m, &serial);
1147 timeout = calc_elapse(usec);
1151 sd_bus_message *incoming;
1156 /* Make sure there's room for queuing this
1157 * locally, before we read the message */
1159 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1167 r = message_read(bus, &incoming);
1171 if (incoming->reply_serial == serial) {
1172 /* Found a match! */
1174 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1179 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1182 r = sd_bus_error_copy(error, &incoming->error);
1184 sd_bus_message_unref(incoming);
1188 k = bus_error_to_errno(&incoming->error);
1189 sd_bus_message_unref(incoming);
1193 sd_bus_message_unref(incoming);
1197 /* There's already guaranteed to be room for
1198 * this, so need to resize things here */
1199 bus->rqueue[bus->rqueue_size ++] = incoming;
1202 /* Try to read more, right-away */
1209 n = now(CLOCK_MONOTONIC);
1215 left = (uint64_t) -1;
1217 r = sd_bus_wait(bus, left);
1221 r = dispatch_wqueue(bus);
1227 int sd_bus_get_fd(sd_bus *bus) {
1237 int sd_bus_get_events(sd_bus *bus) {
1246 if (bus->state == BUS_OPENING)
1248 else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1249 if (bus->rqueue_size <= 0)
1251 if (bus->wqueue_size > 0)
1258 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1267 if (bus->state == BUS_OPENING) {
1278 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1280 socklen_t slen = sizeof(error);
1282 r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1287 bus->last_connect_error = -error;
1288 else if (p.revents & (POLLERR|POLLHUP))
1289 bus->last_connect_error = -ECONNREFUSED;
1291 return bus_start_auth(bus);
1293 /* Try next address */
1294 return bus_start_connect(bus);
1299 } else if (bus->state == BUS_AUTHENTICATING) {
1301 r = bus_write_auth(bus);
1305 r = bus_read_auth(bus);
1309 return bus_start_running(bus);
1311 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1312 struct filter_callback *l;
1314 r = dispatch_wqueue(bus);
1318 r = dispatch_rqueue(bus, &m);
1322 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1323 struct reply_callback *c;
1325 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1327 r = c->callback(bus, m, c->userdata);
1331 sd_bus_message_unref(m);
1332 return r < 0 ? r : 0;
1337 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1338 r = l->callback(bus, m, l->userdata);
1340 sd_bus_message_unref(m);
1341 return r < 0 ? r : 0;
1350 sd_bus_message_unref(m);
1357 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1365 return -ECONNREFUSED;
1367 e = sd_bus_get_events(bus);
1375 r = ppoll(&p, 1, timeout_usec == (uint64_t) -1 ? NULL : timespec_store(&ts, timeout_usec), NULL);
1382 int sd_bus_flush(sd_bus *bus) {
1390 if (bus->state == BUS_RUNNING && bus->wqueue_size <= 0)
1394 r = dispatch_wqueue(bus);
1398 if (bus->state == BUS_RUNNING && bus->wqueue_size <= 0)
1401 r = sd_bus_wait(bus, (uint64_t) -1);
1407 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1408 struct filter_callback *f;
1415 f = new(struct filter_callback, 1);
1418 f->callback = callback;
1419 f->userdata = userdata;
1421 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1425 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1426 struct filter_callback *f;
1433 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1434 if (f->callback == callback && f->userdata == userdata) {
1435 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);