1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
34 #include "bus-internal.h"
35 #include "bus-message.h"
38 #define WQUEUE_MAX 128
40 static void bus_free(sd_bus *b) {
41 struct filter_callback *f;
46 close_nointr_nofail(b->fd);
53 hashmap_free_free(b->reply_callbacks);
55 while ((f = b->filter_callbacks)) {
56 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
63 static sd_bus* bus_new(void) {
72 r->message_version = 1;
74 /* We guarantee that wqueue always has space for at least one
76 r->wqueue = new(sd_bus_message*, 1);
85 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
92 bus->state = BUS_RUNNING;
94 r = sd_bus_message_read(reply, "s", &s);
98 bus->unique_name = strdup(s);
99 if (!bus->unique_name)
105 static int bus_send_hello(sd_bus *bus) {
106 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
111 r = sd_bus_message_new_method_call(
113 "org.freedesktop.DBus",
115 "org.freedesktop.DBus",
121 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, (uint64_t) -1, NULL);
128 static int bus_start_running(sd_bus *bus) {
133 if (bus->send_hello) {
134 bus->state = BUS_HELLO;
136 r = bus_send_hello(bus);
141 bus->state = BUS_RUNNING;
146 static int parse_address_key(const char **p, const char *key, char **value) {
157 if (!strncmp(*p, key, l) != 0)
167 while (*a != ';' && *a != 0) {
186 c = (char) ((x << 4) | y);
190 t = realloc(r, n + 1);
205 static void skip_address_key(const char **p) {
209 *p += strcspn(*p, ";");
212 static int bus_parse_next_address(sd_bus *b) {
214 _cleanup_free_ char *guid = NULL;
221 if (b->address[b->address_index] == 0)
224 a = b->address + b->address_index;
227 b->sockaddr_size = 0;
228 b->peer = SD_ID128_NULL;
230 if (startswith(a, "unix:")) {
231 _cleanup_free_ char *path = NULL, *abstract = NULL;
234 while (*p != 0 && *p != ';') {
235 r = parse_address_key(&p, "guid", &guid);
241 r = parse_address_key(&p, "path", &path);
247 r = parse_address_key(&p, "abstract", &abstract);
253 skip_address_key(&p);
256 if (!path && !abstract)
259 if (path && abstract)
266 if (l > sizeof(b->sockaddr.un.sun_path))
269 b->sockaddr.un.sun_family = AF_UNIX;
270 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
271 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
272 } else if (abstract) {
276 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
279 b->sockaddr.un.sun_family = AF_UNIX;
280 b->sockaddr.un.sun_path[0] = 0;
281 strncpy(b->sockaddr.un.sun_path+1, path, sizeof(b->sockaddr.un.sun_path)-1);
282 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
285 } else if (startswith(a, "tcp:")) {
286 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
287 struct addrinfo hints, *result;
290 while (*p != 0 && *p != ';') {
291 r = parse_address_key(&p, "guid", &guid);
297 r = parse_address_key(&p, "host", &host);
303 r = parse_address_key(&p, "port", &port);
309 r = parse_address_key(&p, "family", &family);
315 skip_address_key(&p);
322 hints.ai_socktype = SOCK_STREAM;
323 hints.ai_flags = AI_ADDRCONFIG;
326 if (streq(family, "ipv4"))
327 hints.ai_family = AF_INET;
328 else if (streq(family, "ipv6"))
329 hints.ai_family = AF_INET6;
334 r = getaddrinfo(host, port, &hints, &result);
338 return -EADDRNOTAVAIL;
340 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
341 b->sockaddr_size = result->ai_addrlen;
343 freeaddrinfo(result);
347 r = sd_id128_from_string(guid, &b->peer);
352 b->address_index = p - b->address;
356 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
359 struct iovec *i = iov + *idx;
361 if (i->iov_len > size) {
362 i->iov_base = (uint8_t*) i->iov_base + size;
376 static int bus_write_auth(sd_bus *b) {
381 assert(b->state == BUS_AUTHENTICATING);
383 if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
387 mh.msg_iov = b->auth_iovec + b->auth_index;
388 mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
390 k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
392 return errno == EAGAIN ? 0 : -errno;
394 iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
399 static int bus_auth_verify(sd_bus *b) {
405 /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
408 e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
412 f = memmem(e, b->rbuffer_size - (e - (char*) b->rbuffer), "\r\n", 2);
416 if (e - (char*) b->rbuffer != 3 + 32)
419 if (memcmp(b->rbuffer, "OK ", 3))
422 for (i = 0; i < 32; i += 2) {
425 x = unhexchar(((char*) b->rbuffer)[3 + i]);
426 y = unhexchar(((char*) b->rbuffer)[3 + i + 2]);
431 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
434 if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
435 !sd_id128_equal(b->peer, peer))
441 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
442 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
444 if (f + 2 > (char*) b->rbuffer + b->rbuffer_size) {
445 b->rbuffer_size -= (f - (char*) b->rbuffer);
446 memmove(b->rbuffer, f + 2, b->rbuffer_size);
449 r = bus_start_running(b);
456 static int bus_read_auth(sd_bus *b) {
465 r = bus_auth_verify(b);
469 n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
472 iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
473 iov.iov_len = n - b->rbuffer_size;
479 k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
481 return errno == EAGAIN ? 0 : -errno;
483 b->rbuffer_size += k;
485 r = bus_auth_verify(b);
492 static int bus_start_auth(sd_bus *b) {
493 static const char auth_prefix[] = "\0AUTH_EXTERNAL ";
494 static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
496 char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
501 b->state = BUS_AUTHENTICATING;
503 snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
507 b->auth_uid = hexmem(text, l);
511 b->auth_iovec[0].iov_base = (void*) auth_prefix;
512 b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
513 b->auth_iovec[1].iov_base = (void*) b->auth_uid;
514 b->auth_iovec[1].iov_len = l * 2;
515 b->auth_iovec[2].iov_base = (void*) auth_suffix;
516 b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
517 b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
519 return bus_write_auth(b);
522 static int bus_start_connect(sd_bus *b) {
529 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
530 r = bus_parse_next_address(b);
534 return b->last_connect_error ? b->last_connect_error : -ECONNREFUSED;
537 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
539 b->last_connect_error = -errno;
544 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
546 if (errno == EINPROGRESS)
549 b->last_connect_error = -errno;
550 close_nointr_nofail(b->fd);
556 return bus_start_auth(b);
560 int sd_bus_open_system(sd_bus **ret) {
568 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
570 r = sd_bus_open_address(e, &b);
574 b->send_hello = true;
583 b->send_hello = true;
585 b->sockaddr.un.sun_family = AF_UNIX;
586 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
587 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
589 r = bus_start_connect(b);
599 int sd_bus_open_user(sd_bus **ret) {
608 e = getenv("DBUS_SESSION_BUS_ADDRESS");
610 r = sd_bus_open_address(e, &b);
614 b->send_hello = true;
619 e = getenv("XDG_RUNTIME_DIR");
624 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
631 b->send_hello = true;
633 b->sockaddr.un.sun_family = AF_UNIX;
634 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
635 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
637 r = bus_start_connect(b);
647 int sd_bus_open_address(const char *address, sd_bus **ret) {
660 b->address = strdup(address);
666 r = bus_start_connect(b);
676 int sd_bus_open_fd(int fd, sd_bus **ret) {
690 fd_nonblock(b->fd, true);
691 fd_cloexec(b->fd, true);
693 r = bus_start_auth(b);
703 void sd_bus_close(sd_bus *bus) {
709 close_nointr_nofail(bus->fd);
713 sd_bus *sd_bus_ref(sd_bus *bus) {
717 assert(bus->n_ref > 0);
723 sd_bus *sd_bus_unref(sd_bus *bus) {
727 assert(bus->n_ref > 0);
736 int sd_bus_is_running(sd_bus *bus) {
743 return bus->state == BUS_RUNNING;
746 int sd_bus_can_send(sd_bus *bus, char type) {
751 if (type == SD_BUS_TYPE_UNIX_FD)
754 return bus_type_is_valid(type);
757 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
763 return message_seal(m, ++b->serial);
766 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
777 n = m->n_iovec * sizeof(struct iovec);
779 memcpy(iov, m->iovec, n);
782 iovec_advance(iov, &j, *idx);
786 mh.msg_iovlen = m->n_iovec;
788 k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
793 iovec_advance(iov, &j, *idx);
795 return j > m->n_iovec;
798 static int message_read_need(sd_bus *bus, size_t *need) {
805 if (bus->rbuffer_size <= sizeof(struct bus_header)) {
806 *need = sizeof(struct bus_header);
810 a = ((const uint32_t*) bus->rbuffer)[1];
811 b = ((const uint32_t*) bus->rbuffer)[3];
813 e = ((const uint8_t*) bus->rbuffer)[0];
814 if (e == SD_BUS_LITTLE_ENDIAN) {
817 } else if (e == SD_BUS_BIG_ENDIAN) {
823 *need = sizeof(struct bus_header) + ALIGN_TO(a, 8) + b;
827 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
834 assert(bus->rbuffer_size >= size);
836 t = new0(sd_bus_message, 1);
840 if (bus->rbuffer_size > size) {
841 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
850 t->header = bus->rbuffer;
851 t->free_header = true;
853 t->fields = (uint8_t*) bus->rbuffer + sizeof(struct bus_header);
854 t->body = (uint8_t*) bus->rbuffer + sizeof(struct bus_header) + ALIGN_TO(BUS_MESSAGE_BODY_SIZE(t), 8);
857 bus->rbuffer_size -= size;
859 r = message_parse(t);
861 sd_bus_message_unref(t);
869 static int message_read(sd_bus *bus, sd_bus_message **m) {
880 r = message_read_need(bus, &need);
884 if (bus->rbuffer_size >= need)
885 return message_make(bus, need, m);
887 b = realloc(bus->rbuffer, need);
892 iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
893 iov.iov_len = need - bus->rbuffer_size;
899 k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
901 return errno == EAGAIN ? 0 : -errno;
903 bus->rbuffer_size += k;
905 r = message_read_need(bus, &need);
909 if (bus->rbuffer_size >= need)
910 return message_make(bus, need, m);
915 static int dispatch_wqueue(sd_bus *bus) {
923 while (bus->wqueue_size > 0) {
925 r = message_write(bus, bus->wqueue[0], &bus->windex);
930 /* Wasn't fully written yet... */
933 /* Fully written. Let's drop the entry from
936 * This isn't particularly optimized, but
937 * well, this is supposed to be our worst-case
938 * buffer only, and the socket buffer is
939 * supposed to be our primary buffer, and if
940 * it got full, then all bets are off
943 sd_bus_message_unref(bus->wqueue[0]);
945 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
955 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
964 if (bus->rqueue_size > 0) {
965 /* Dispatch a queued message */
969 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
973 /* Try to read a new message */
974 r = message_read(bus, m);
983 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
992 if (m->header->version > bus->message_version)
995 r = bus_seal_message(bus, m);
999 if (bus->wqueue_size <= 0) {
1002 r = message_write(bus, m, &idx);
1006 } else if (r == 0) {
1007 /* Wasn't fully written. So let's remember how
1008 * much was written. Note that the first entry
1009 * of the wqueue array is always allocated so
1010 * that we always can remember how much was
1012 bus->wqueue[0] = sd_bus_message_ref(m);
1013 bus->wqueue_size = 1;
1019 /* Just append it to the queue. */
1021 if (bus->wqueue_size >= WQUEUE_MAX)
1024 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1029 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1033 *serial = BUS_MESSAGE_SERIAL(m);
1038 static usec_t calc_elapse(uint64_t usec) {
1039 if (usec == (uint64_t) -1)
1043 usec = SD_BUS_DEFAULT_TIMEOUT;
1045 return now(CLOCK_MONOTONIC) + usec;
1048 int sd_bus_send_with_reply(
1051 sd_message_handler_t callback,
1056 struct reply_callback *c;
1067 if (!m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1070 r = bus_seal_message(bus, m);
1074 c = new(struct reply_callback, 1);
1078 c->callback = callback;
1079 c->userdata = userdata;
1080 c->serial = BUS_MESSAGE_SERIAL(m);
1081 c->timeout = calc_elapse(usec);
1083 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1089 r = sd_bus_send(bus, m, serial);
1091 hashmap_remove(bus->reply_callbacks, &c->serial);
1099 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1100 struct reply_callbacks *c;
1107 c = hashmap_remove(bus->reply_callbacks, &serial);
1115 int sd_bus_send_with_reply_and_block(
1119 sd_bus_error *error,
1120 sd_bus_message **reply) {
1133 if (!m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1135 if (sd_bus_error_is_set(error))
1138 r = sd_bus_send(bus, m, &serial);
1142 timeout = calc_elapse(usec);
1146 sd_bus_message *incoming;
1151 /* Make sure there's room for queuing this
1152 * locally, before we read the message */
1154 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1162 r = message_read(bus, &incoming);
1166 if (incoming->reply_serial == serial) {
1167 /* Found a match! */
1169 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1174 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1177 r = sd_bus_error_copy(error, &incoming->error);
1179 sd_bus_message_unref(incoming);
1183 k = bus_error_to_errno(&incoming->error);
1184 sd_bus_message_unref(incoming);
1188 sd_bus_message_unref(incoming);
1192 /* There's already guaranteed to be room for
1193 * this, so need to resize things here */
1194 bus->rqueue[bus->rqueue_size ++] = incoming;
1197 /* Try to read more, right-away */
1204 n = now(CLOCK_MONOTONIC);
1210 left = (uint64_t) -1;
1212 r = sd_bus_wait(bus, left);
1216 r = dispatch_wqueue(bus);
1222 int sd_bus_get_fd(sd_bus *bus) {
1232 int sd_bus_get_events(sd_bus *bus) {
1241 if (bus->state == BUS_OPENING)
1243 else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1244 if (bus->rqueue_size <= 0)
1246 if (bus->wqueue_size > 0)
1253 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1262 if (bus->state == BUS_OPENING) {
1273 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1275 socklen_t slen = sizeof(error);
1277 r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1282 bus->last_connect_error = -error;
1283 else if (p.revents & (POLLERR|POLLHUP))
1284 bus->last_connect_error = -ECONNREFUSED;
1286 return bus_start_auth(bus);
1288 /* Try next address */
1289 return bus_start_connect(bus);
1294 } else if (bus->state == BUS_AUTHENTICATING) {
1296 r = bus_write_auth(bus);
1300 r = bus_read_auth(bus);
1304 return bus_start_running(bus);
1306 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1307 struct filter_callback *l;
1309 r = dispatch_wqueue(bus);
1313 r = dispatch_rqueue(bus, &m);
1317 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1318 struct reply_callback *c;
1320 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1322 r = c->callback(bus, m, c->userdata);
1326 sd_bus_message_unref(m);
1327 return r < 0 ? r : 0;
1332 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1333 r = l->callback(bus, m, l->userdata);
1335 sd_bus_message_unref(m);
1336 return r < 0 ? r : 0;
1345 sd_bus_message_unref(m);
1352 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1360 return -ECONNREFUSED;
1362 e = sd_bus_get_events(bus);
1370 r = ppoll(&p, 1, timeout_usec == (uint64_t) -1 ? NULL : timespec_store(&ts, timeout_usec), NULL);
1377 int sd_bus_flush(sd_bus *bus) {
1385 if (bus->state == BUS_RUNNING && bus->wqueue_size <= 0)
1389 r = dispatch_wqueue(bus);
1393 if (bus->state == BUS_RUNNING && bus->wqueue_size <= 0)
1396 r = sd_bus_wait(bus, (uint64_t) -1);
1402 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1403 struct filter_callback *f;
1410 f = new(struct filter_callback, 1);
1413 f->callback = callback;
1414 f->userdata = userdata;
1416 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1420 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1421 struct filter_callback *f;
1428 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1429 if (f->callback == callback && f->userdata == userdata) {
1430 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);