1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
34 #include "bus-internal.h"
35 #include "bus-message.h"
38 #define WQUEUE_MAX 128
40 static void bus_free(sd_bus *b) {
41 struct filter_callback *f;
46 close_nointr_nofail(b->fd);
53 hashmap_free_free(b->reply_callbacks);
55 while ((f = b->filter_callbacks)) {
56 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
63 static sd_bus* bus_new(void) {
72 r->message_version = 1;
74 /* We guarantee that wqueue always has space for at least one
76 r->wqueue = new(sd_bus_message*, 1);
85 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
92 bus->state = BUS_RUNNING;
94 r = sd_bus_message_read(reply, "s", &s);
98 bus->unique_name = strdup(s);
99 if (!bus->unique_name)
105 static int bus_send_hello(sd_bus *bus) {
106 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
111 r = sd_bus_message_new_method_call(
113 "org.freedesktop.DBus",
115 "org.freedesktop.DBus",
121 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, (uint64_t) -1, NULL);
128 static int bus_start_running(sd_bus *bus) {
133 if (bus->send_hello) {
134 bus->state = BUS_HELLO;
136 r = bus_send_hello(bus);
141 bus->state = BUS_RUNNING;
146 static int parse_address_key(const char **p, const char *key, char **value) {
157 if (!strncmp(*p, key, l) != 0)
167 while (*a != ';' && *a != 0) {
186 c = (char) ((x << 4) | y);
190 t = realloc(r, n + 1);
205 static void skip_address_key(const char **p) {
209 *p += strcspn(*p, ";");
212 static int bus_parse_next_address(sd_bus *b) {
214 _cleanup_free_ char *guid = NULL;
221 if (b->address[b->address_index] == 0)
224 a = b->address + b->address_index;
227 b->sockaddr_size = 0;
228 b->peer = SD_ID128_NULL;
230 if (startswith(a, "unix:")) {
231 _cleanup_free_ char *path = NULL, *abstract = NULL;
234 while (*p != 0 && *p != ';') {
235 r = parse_address_key(&p, "guid", &guid);
241 r = parse_address_key(&p, "path", &path);
247 r = parse_address_key(&p, "abstract", &abstract);
253 skip_address_key(&p);
256 if (!path && !abstract)
259 if (path && abstract)
266 if (l > sizeof(b->sockaddr.un.sun_path))
269 b->sockaddr.un.sun_family = AF_UNIX;
270 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
271 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
272 } else if (abstract) {
276 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
279 b->sockaddr.un.sun_family = AF_UNIX;
280 b->sockaddr.un.sun_path[0] = 0;
281 strncpy(b->sockaddr.un.sun_path+1, path, sizeof(b->sockaddr.un.sun_path)-1);
282 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
285 } else if (startswith(a, "tcp:")) {
286 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
287 struct addrinfo hints, *result;
290 while (*p != 0 && *p != ';') {
291 r = parse_address_key(&p, "guid", &guid);
297 r = parse_address_key(&p, "host", &host);
303 r = parse_address_key(&p, "port", &port);
309 r = parse_address_key(&p, "family", &family);
315 skip_address_key(&p);
322 hints.ai_socktype = SOCK_STREAM;
323 hints.ai_flags = AI_ADDRCONFIG;
326 if (streq(family, "ipv4"))
327 hints.ai_family = AF_INET;
328 else if (streq(family, "ipv6"))
329 hints.ai_family = AF_INET6;
334 r = getaddrinfo(host, port, &hints, &result);
338 return -EADDRNOTAVAIL;
340 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
341 b->sockaddr_size = result->ai_addrlen;
343 freeaddrinfo(result);
347 r = sd_id128_from_string(guid, &b->peer);
352 b->address_index = p - b->address;
356 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
359 struct iovec *i = iov + *idx;
361 if (i->iov_len > size) {
362 i->iov_base = (uint8_t*) i->iov_base + size;
376 static int bus_write_auth(sd_bus *b) {
381 assert(b->state == BUS_AUTHENTICATING);
383 if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
387 mh.msg_iov = b->auth_iovec + b->auth_index;
388 mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
390 k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
392 return errno == EAGAIN ? 0 : -errno;
394 iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
399 static int bus_auth_verify(sd_bus *b) {
405 /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
408 e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
412 f = memmem(e, b->rbuffer_size - (e - (char*) b->rbuffer), "\r\n", 2);
416 if (e - (char*) b->rbuffer != 3 + 32)
419 if (memcmp(b->rbuffer, "OK ", 3))
422 for (i = 0; i < 32; i += 2) {
425 x = unhexchar(((char*) b->rbuffer)[3 + i]);
426 y = unhexchar(((char*) b->rbuffer)[3 + i + 2]);
431 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
434 if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
435 !sd_id128_equal(b->peer, peer))
441 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
442 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
444 if (f + 2 > (char*) b->rbuffer + b->rbuffer_size) {
445 b->rbuffer_size -= (f - (char*) b->rbuffer);
446 memmove(b->rbuffer, f + 2, b->rbuffer_size);
449 r = bus_start_running(b);
456 static int bus_read_auth(sd_bus *b) {
465 r = bus_auth_verify(b);
469 n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
472 iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
473 iov.iov_len = n - b->rbuffer_size;
479 k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
481 return errno == EAGAIN ? 0 : -errno;
483 b->rbuffer_size += k;
485 r = bus_auth_verify(b);
492 static int bus_start_auth(sd_bus *b) {
493 static const char auth_prefix[] = "\0AUTH_EXTERNAL ";
494 static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
496 char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
501 b->state = BUS_AUTHENTICATING;
503 snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
507 b->auth_uid = hexmem(text, l);
511 b->auth_iovec[0].iov_base = (void*) auth_prefix;
512 b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
513 b->auth_iovec[1].iov_base = (void*) b->auth_uid;
514 b->auth_iovec[1].iov_len = l * 2;
515 b->auth_iovec[2].iov_base = (void*) auth_suffix;
516 b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
517 b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
519 return bus_write_auth(b);
522 static int bus_start_connect(sd_bus *b) {
529 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
530 r = bus_parse_next_address(b);
534 return b->last_connect_error ? b->last_connect_error : -ECONNREFUSED;
537 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
539 b->last_connect_error = -errno;
544 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
546 if (errno == EINPROGRESS)
549 b->last_connect_error = -errno;
550 close_nointr_nofail(b->fd);
556 return bus_start_auth(b);
560 int sd_bus_open_system(sd_bus **ret) {
568 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
570 r = sd_bus_open_address(e, &b);
574 b->send_hello = true;
583 b->send_hello = true;
585 b->sockaddr.un.sun_family = AF_UNIX;
586 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
587 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
589 r = bus_start_connect(b);
599 int sd_bus_open_user(sd_bus **ret) {
608 e = getenv("DBUS_SESSION_BUS_ADDRESS");
610 r = sd_bus_open_address(e, &b);
614 b->send_hello = true;
619 e = getenv("XDG_RUNTIME_DIR");
624 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
631 b->send_hello = true;
633 b->sockaddr.un.sun_family = AF_UNIX;
634 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
635 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
637 r = bus_start_connect(b);
647 int sd_bus_open_address(const char *address, sd_bus **ret) {
660 b->address = strdup(address);
666 r = bus_start_connect(b);
676 int sd_bus_open_fd(int fd, sd_bus **ret) {
690 fd_nonblock(b->fd, true);
691 fd_cloexec(b->fd, true);
693 r = bus_start_auth(b);
703 void sd_bus_close(sd_bus *bus) {
709 close_nointr_nofail(bus->fd);
713 sd_bus *sd_bus_ref(sd_bus *bus) {
717 assert(bus->n_ref > 0);
723 sd_bus *sd_bus_unref(sd_bus *bus) {
727 assert(bus->n_ref > 0);
736 int sd_bus_is_running(sd_bus *bus) {
743 return bus->state == BUS_RUNNING;
746 int sd_bus_can_send(sd_bus *bus, char type) {
751 if (type == SD_BUS_TYPE_UNIX_FD)
754 return bus_type_is_valid(type);
757 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
763 return bus_message_seal(m, ++b->serial);
766 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
777 n = m->n_iovec * sizeof(struct iovec);
779 memcpy(iov, m->iovec, n);
782 iovec_advance(iov, &j, *idx);
786 mh.msg_iovlen = m->n_iovec;
788 k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
793 iovec_advance(iov, &j, *idx);
795 return j > m->n_iovec;
798 static int message_read_need(sd_bus *bus, size_t *need) {
805 if (bus->rbuffer_size <= sizeof(struct bus_header)) {
806 *need = sizeof(struct bus_header);
810 a = ((const uint32_t*) bus->rbuffer)[1];
811 b = ((const uint32_t*) bus->rbuffer)[3];
813 e = ((const uint8_t*) bus->rbuffer)[0];
814 if (e == SD_BUS_LITTLE_ENDIAN) {
817 } else if (e == SD_BUS_BIG_ENDIAN) {
823 *need = sizeof(struct bus_header) + ALIGN_TO(a, 8) + b;
827 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
834 assert(bus->rbuffer_size >= size);
836 if (bus->rbuffer_size > size) {
837 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
844 r = bus_message_from_malloc(bus->rbuffer, size, &t);
851 bus->rbuffer_size -= size;
853 r = bus_message_parse(t);
855 sd_bus_message_unref(t);
863 static int message_read(sd_bus *bus, sd_bus_message **m) {
874 r = message_read_need(bus, &need);
878 if (bus->rbuffer_size >= need)
879 return message_make(bus, need, m);
881 b = realloc(bus->rbuffer, need);
886 iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
887 iov.iov_len = need - bus->rbuffer_size;
893 k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
895 return errno == EAGAIN ? 0 : -errno;
897 bus->rbuffer_size += k;
899 r = message_read_need(bus, &need);
903 if (bus->rbuffer_size >= need)
904 return message_make(bus, need, m);
909 static int dispatch_wqueue(sd_bus *bus) {
917 while (bus->wqueue_size > 0) {
919 r = message_write(bus, bus->wqueue[0], &bus->windex);
924 /* Wasn't fully written yet... */
927 /* Fully written. Let's drop the entry from
930 * This isn't particularly optimized, but
931 * well, this is supposed to be our worst-case
932 * buffer only, and the socket buffer is
933 * supposed to be our primary buffer, and if
934 * it got full, then all bets are off
937 sd_bus_message_unref(bus->wqueue[0]);
939 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
949 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
958 if (bus->rqueue_size > 0) {
959 /* Dispatch a queued message */
963 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
967 /* Try to read a new message */
968 r = message_read(bus, m);
977 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
986 if (m->header->version > bus->message_version)
989 r = bus_seal_message(bus, m);
993 /* If this is a reply and no reply was requested, then let's
994 * suppress this, if we can */
995 if (m->dont_send && !serial)
998 if (bus->wqueue_size <= 0) {
1001 r = message_write(bus, m, &idx);
1005 } else if (r == 0) {
1006 /* Wasn't fully written. So let's remember how
1007 * much was written. Note that the first entry
1008 * of the wqueue array is always allocated so
1009 * that we always can remember how much was
1011 bus->wqueue[0] = sd_bus_message_ref(m);
1012 bus->wqueue_size = 1;
1018 /* Just append it to the queue. */
1020 if (bus->wqueue_size >= WQUEUE_MAX)
1023 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1028 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1032 *serial = BUS_MESSAGE_SERIAL(m);
1037 static usec_t calc_elapse(uint64_t usec) {
1038 if (usec == (uint64_t) -1)
1042 usec = SD_BUS_DEFAULT_TIMEOUT;
1044 return now(CLOCK_MONOTONIC) + usec;
1047 int sd_bus_send_with_reply(
1050 sd_message_handler_t callback,
1055 struct reply_callback *c;
1066 if (!m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1069 r = bus_seal_message(bus, m);
1073 c = new(struct reply_callback, 1);
1077 c->callback = callback;
1078 c->userdata = userdata;
1079 c->serial = BUS_MESSAGE_SERIAL(m);
1080 c->timeout = calc_elapse(usec);
1082 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1088 r = sd_bus_send(bus, m, serial);
1090 hashmap_remove(bus->reply_callbacks, &c->serial);
1098 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1099 struct reply_callbacks *c;
1106 c = hashmap_remove(bus->reply_callbacks, &serial);
1114 int sd_bus_send_with_reply_and_block(
1118 sd_bus_error *error,
1119 sd_bus_message **reply) {
1132 if (!m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1134 if (sd_bus_error_is_dirty(error))
1137 r = sd_bus_send(bus, m, &serial);
1141 timeout = calc_elapse(usec);
1145 sd_bus_message *incoming;
1150 /* Make sure there's room for queuing this
1151 * locally, before we read the message */
1153 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1161 r = message_read(bus, &incoming);
1165 if (incoming->reply_serial == serial) {
1166 /* Found a match! */
1168 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1173 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1176 r = sd_bus_error_copy(error, &incoming->error);
1178 sd_bus_message_unref(incoming);
1182 k = bus_error_to_errno(&incoming->error);
1183 sd_bus_message_unref(incoming);
1187 sd_bus_message_unref(incoming);
1191 /* There's already guaranteed to be room for
1192 * this, so need to resize things here */
1193 bus->rqueue[bus->rqueue_size ++] = incoming;
1196 /* Try to read more, right-away */
1203 n = now(CLOCK_MONOTONIC);
1209 left = (uint64_t) -1;
1211 r = sd_bus_wait(bus, left);
1215 r = dispatch_wqueue(bus);
1221 int sd_bus_get_fd(sd_bus *bus) {
1231 int sd_bus_get_events(sd_bus *bus) {
1240 if (bus->state == BUS_OPENING)
1242 else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1243 if (bus->rqueue_size <= 0)
1245 if (bus->wqueue_size > 0)
1252 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1261 if (bus->state == BUS_OPENING) {
1272 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1274 socklen_t slen = sizeof(error);
1276 r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1281 bus->last_connect_error = -error;
1282 else if (p.revents & (POLLERR|POLLHUP))
1283 bus->last_connect_error = -ECONNREFUSED;
1285 return bus_start_auth(bus);
1287 /* Try next address */
1288 return bus_start_connect(bus);
1293 } else if (bus->state == BUS_AUTHENTICATING) {
1295 r = bus_write_auth(bus);
1299 r = bus_read_auth(bus);
1303 return bus_start_running(bus);
1305 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1306 struct filter_callback *l;
1308 r = dispatch_wqueue(bus);
1312 r = dispatch_rqueue(bus, &m);
1316 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1317 struct reply_callback *c;
1319 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1321 r = c->callback(bus, m, c->userdata);
1325 sd_bus_message_unref(m);
1326 return r < 0 ? r : 0;
1331 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1332 r = l->callback(bus, m, l->userdata);
1334 sd_bus_message_unref(m);
1335 return r < 0 ? r : 0;
1344 sd_bus_message_unref(m);
1351 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1359 return -ECONNREFUSED;
1361 e = sd_bus_get_events(bus);
1369 r = ppoll(&p, 1, timeout_usec == (uint64_t) -1 ? NULL : timespec_store(&ts, timeout_usec), NULL);
1376 int sd_bus_flush(sd_bus *bus) {
1384 if (bus->state == BUS_RUNNING && bus->wqueue_size <= 0)
1388 r = dispatch_wqueue(bus);
1392 if (bus->state == BUS_RUNNING && bus->wqueue_size <= 0)
1395 r = sd_bus_wait(bus, (uint64_t) -1);
1401 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1402 struct filter_callback *f;
1409 f = new(struct filter_callback, 1);
1412 f->callback = callback;
1413 f->userdata = userdata;
1415 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1419 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1420 struct filter_callback *f;
1427 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1428 if (f->callback == callback && f->userdata == userdata) {
1429 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);