1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2013 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
34 #include "bus-internal.h"
35 #include "bus-message.h"
38 #define WQUEUE_MAX 128
40 static void bus_free(sd_bus *b) {
41 struct filter_callback *f;
47 close_nointr_nofail(b->fd);
54 for (i = 0; i < b->rqueue_size; i++)
55 sd_bus_message_unref(b->rqueue[i]);
58 for (i = 0; i < b->wqueue_size; i++)
59 sd_bus_message_unref(b->wqueue[i]);
62 hashmap_free_free(b->reply_callbacks);
64 while ((f = b->filter_callbacks)) {
65 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
72 static sd_bus* bus_new(void) {
81 r->message_version = 1;
83 /* We guarantee that wqueue always has space for at least one
85 r->wqueue = new(sd_bus_message*, 1);
94 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
101 bus->state = BUS_RUNNING;
103 r = sd_bus_message_read(reply, "s", &s);
107 bus->unique_name = strdup(s);
108 if (!bus->unique_name)
114 static int bus_send_hello(sd_bus *bus) {
115 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
120 r = sd_bus_message_new_method_call(
122 "org.freedesktop.DBus",
124 "org.freedesktop.DBus",
130 r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, (uint64_t) -1, NULL);
134 bus->sent_hello = true;
138 static int bus_start_running(sd_bus *bus) {
141 if (bus->sent_hello) {
142 bus->state = BUS_HELLO;
146 bus->state = BUS_RUNNING;
150 static int parse_address_key(const char **p, const char *key, char **value) {
161 if (strncmp(*p, key, l) != 0)
171 while (*a != ',' && *a != 0) {
189 c = (char) ((x << 4) | y);
196 t = realloc(r, n + 2);
221 static void skip_address_key(const char **p) {
225 *p += strcspn(*p, ",");
231 static int bus_parse_next_address(sd_bus *b) {
233 _cleanup_free_ char *guid = NULL;
240 if (b->address[b->address_index] == 0)
243 a = b->address + b->address_index;
246 b->sockaddr_size = 0;
247 b->peer = SD_ID128_NULL;
249 if (startswith(a, "unix:")) {
250 _cleanup_free_ char *path = NULL, *abstract = NULL;
254 r = parse_address_key(&p, "guid", &guid);
260 r = parse_address_key(&p, "path", &path);
266 r = parse_address_key(&p, "abstract", &abstract);
272 skip_address_key(&p);
275 if (!path && !abstract)
278 if (path && abstract)
285 if (l > sizeof(b->sockaddr.un.sun_path))
288 b->sockaddr.un.sun_family = AF_UNIX;
289 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
290 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
291 } else if (abstract) {
294 l = strlen(abstract);
295 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
298 b->sockaddr.un.sun_family = AF_UNIX;
299 b->sockaddr.un.sun_path[0] = 0;
300 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
301 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
304 } else if (startswith(a, "tcp:")) {
305 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
306 struct addrinfo hints, *result;
310 r = parse_address_key(&p, "guid", &guid);
316 r = parse_address_key(&p, "host", &host);
322 r = parse_address_key(&p, "port", &port);
328 r = parse_address_key(&p, "family", &family);
334 skip_address_key(&p);
341 hints.ai_socktype = SOCK_STREAM;
342 hints.ai_flags = AI_ADDRCONFIG;
345 if (streq(family, "ipv4"))
346 hints.ai_family = AF_INET;
347 else if (streq(family, "ipv6"))
348 hints.ai_family = AF_INET6;
353 r = getaddrinfo(host, port, &hints, &result);
357 return -EADDRNOTAVAIL;
359 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
360 b->sockaddr_size = result->ai_addrlen;
362 freeaddrinfo(result);
366 r = sd_id128_from_string(guid, &b->peer);
371 b->address_index = p - b->address;
375 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
378 struct iovec *i = iov + *idx;
380 if (i->iov_len > size) {
381 i->iov_base = (uint8_t*) i->iov_base + size;
395 static int bus_write_auth(sd_bus *b) {
400 assert(b->state == BUS_AUTHENTICATING);
402 if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
406 mh.msg_iov = b->auth_iovec + b->auth_index;
407 mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
409 k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
411 return errno == EAGAIN ? 0 : -errno;
413 iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
418 static int bus_auth_verify(sd_bus *b) {
424 /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
427 e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
431 f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
435 if (e - (char*) b->rbuffer != 3 + 32)
438 if (memcmp(b->rbuffer, "OK ", 3))
441 for (i = 0; i < 32; i += 2) {
444 x = unhexchar(((char*) b->rbuffer)[3 + i]);
445 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
450 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
453 if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
454 !sd_id128_equal(b->peer, peer))
460 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
461 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
463 if (f + 2 > (char*) b->rbuffer + b->rbuffer_size) {
464 b->rbuffer_size -= (f - (char*) b->rbuffer);
465 memmove(b->rbuffer, f + 2, b->rbuffer_size);
470 r = bus_start_running(b);
477 static int bus_read_auth(sd_bus *b) {
487 r = bus_auth_verify(b);
491 n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
492 p = realloc(b->rbuffer, n);
499 iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
500 iov.iov_len = n - b->rbuffer_size;
506 k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
508 return errno == EAGAIN ? 0 : -errno;
510 b->rbuffer_size += k;
512 r = bus_auth_verify(b);
519 static int bus_start_auth(sd_bus *b) {
520 static const char auth_prefix[] = "\0AUTH EXTERNAL ";
521 static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
523 char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
528 b->state = BUS_AUTHENTICATING;
530 snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
534 b->auth_uid = hexmem(text, l);
538 b->auth_iovec[0].iov_base = (void*) auth_prefix;
539 b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
540 b->auth_iovec[1].iov_base = (void*) b->auth_uid;
541 b->auth_iovec[1].iov_len = l * 2;
542 b->auth_iovec[2].iov_base = (void*) auth_suffix;
543 b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
544 b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
546 return bus_write_auth(b);
549 static int bus_start_connect(sd_bus *b) {
556 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
557 r = bus_parse_next_address(b);
561 return b->last_connect_error ? b->last_connect_error : -ECONNREFUSED;
564 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
566 b->last_connect_error = -errno;
571 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
573 if (errno == EINPROGRESS)
576 b->last_connect_error = -errno;
577 close_nointr_nofail(b->fd);
583 return bus_start_auth(b);
587 int sd_bus_open_system(sd_bus **ret) {
595 e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
597 r = sd_bus_open_address(e, &b);
605 b->sockaddr.un.sun_family = AF_UNIX;
606 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
607 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
609 r = bus_start_connect(b);
616 r = bus_send_hello(b);
626 int sd_bus_open_user(sd_bus **ret) {
635 e = getenv("DBUS_SESSION_BUS_ADDRESS");
637 r = sd_bus_open_address(e, &b);
641 e = getenv("XDG_RUNTIME_DIR");
646 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
653 b->sockaddr.un.sun_family = AF_UNIX;
654 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
655 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
657 r = bus_start_connect(b);
664 r = bus_send_hello(b);
674 int sd_bus_open_address(const char *address, sd_bus **ret) {
687 b->address = strdup(address);
693 r = bus_start_connect(b);
703 int sd_bus_open_fd(int fd, sd_bus **ret) {
717 fd_nonblock(b->fd, true);
718 fd_cloexec(b->fd, true);
720 r = bus_start_auth(b);
730 void sd_bus_close(sd_bus *bus) {
736 close_nointr_nofail(bus->fd);
740 sd_bus *sd_bus_ref(sd_bus *bus) {
744 assert(bus->n_ref > 0);
750 sd_bus *sd_bus_unref(sd_bus *bus) {
754 assert(bus->n_ref > 0);
763 int sd_bus_is_running(sd_bus *bus) {
770 return bus->state == BUS_RUNNING;
773 int sd_bus_can_send(sd_bus *bus, char type) {
777 if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
780 if (type == SD_BUS_TYPE_UNIX_FD)
783 return bus_type_is_valid(type);
786 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
789 if (m->header->version > b->message_version)
795 return bus_message_seal(m, ++b->serial);
798 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
808 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
810 n = m->n_iovec * sizeof(struct iovec);
812 memcpy(iov, m->iovec, n);
815 iovec_advance(iov, &j, *idx);
819 mh.msg_iovlen = m->n_iovec;
821 k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
826 iovec_advance(iov, &j, *idx);
828 return j >= m->n_iovec;
831 static int message_read_need(sd_bus *bus, size_t *need) {
837 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
839 if (bus->rbuffer_size < sizeof(struct bus_header)) {
840 *need = sizeof(struct bus_header);
844 a = ((const uint32_t*) bus->rbuffer)[1];
845 b = ((const uint32_t*) bus->rbuffer)[3];
847 e = ((const uint8_t*) bus->rbuffer)[0];
848 if (e == SD_BUS_LITTLE_ENDIAN) {
851 } else if (e == SD_BUS_BIG_ENDIAN) {
857 *need = sizeof(struct bus_header) + ALIGN_TO(b, 8) + a;
861 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
868 assert(bus->rbuffer_size >= size);
869 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
871 if (bus->rbuffer_size > size) {
872 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
879 r = bus_message_from_malloc(bus->rbuffer, size, &t);
886 bus->rbuffer_size -= size;
892 static int message_read(sd_bus *bus, sd_bus_message **m) {
902 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
904 r = message_read_need(bus, &need);
908 if (bus->rbuffer_size >= need)
909 return message_make(bus, need, m);
911 b = realloc(bus->rbuffer, need);
918 iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
919 iov.iov_len = need - bus->rbuffer_size;
925 k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
927 return errno == EAGAIN ? 0 : -errno;
929 bus->rbuffer_size += k;
931 r = message_read_need(bus, &need);
935 if (bus->rbuffer_size >= need)
936 return message_make(bus, need, m);
941 static int dispatch_wqueue(sd_bus *bus) {
945 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
950 while (bus->wqueue_size > 0) {
952 r = message_write(bus, bus->wqueue[0], &bus->windex);
957 /* Wasn't fully written yet... */
960 /* Fully written. Let's drop the entry from
963 * This isn't particularly optimized, but
964 * well, this is supposed to be our worst-case
965 * buffer only, and the socket buffer is
966 * supposed to be our primary buffer, and if
967 * it got full, then all bets are off
970 sd_bus_message_unref(bus->wqueue[0]);
972 memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
982 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
987 assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
992 if (bus->rqueue_size > 0) {
993 /* Dispatch a queued message */
997 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1001 /* Try to read a new message */
1002 r = message_read(bus, m);
1011 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1021 r = bus_seal_message(bus, m);
1025 /* If this is a reply and no reply was requested, then let's
1026 * suppress this, if we can */
1027 if (m->dont_send && !serial)
1030 if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1033 r = message_write(bus, m, &idx);
1037 } else if (r == 0) {
1038 /* Wasn't fully written. So let's remember how
1039 * much was written. Note that the first entry
1040 * of the wqueue array is always allocated so
1041 * that we always can remember how much was
1043 bus->wqueue[0] = sd_bus_message_ref(m);
1044 bus->wqueue_size = 1;
1050 /* Just append it to the queue. */
1052 if (bus->wqueue_size >= WQUEUE_MAX)
1055 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1060 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1064 *serial = BUS_MESSAGE_SERIAL(m);
1069 static usec_t calc_elapse(uint64_t usec) {
1070 if (usec == (uint64_t) -1)
1074 usec = SD_BUS_DEFAULT_TIMEOUT;
1076 return now(CLOCK_MONOTONIC) + usec;
1079 int sd_bus_send_with_reply(
1082 sd_message_handler_t callback,
1087 struct reply_callback *c;
1098 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1100 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1103 r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1107 r = bus_seal_message(bus, m);
1111 c = new(struct reply_callback, 1);
1115 c->callback = callback;
1116 c->userdata = userdata;
1117 c->serial = BUS_MESSAGE_SERIAL(m);
1118 c->timeout = calc_elapse(usec);
1120 r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1126 r = sd_bus_send(bus, m, serial);
1128 hashmap_remove(bus->reply_callbacks, &c->serial);
1136 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1137 struct reply_callbacks *c;
1144 c = hashmap_remove(bus->reply_callbacks, &serial);
1152 static int ensure_running(sd_bus *bus) {
1157 r = sd_bus_is_running(bus);
1162 r = sd_bus_process(bus, NULL);
1166 r = sd_bus_is_running(bus);
1170 r = sd_bus_wait(bus, (uint64_t) -1);
1176 int sd_bus_send_with_reply_and_block(
1180 sd_bus_error *error,
1181 sd_bus_message **reply) {
1194 if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1196 if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1198 if (bus_error_is_dirty(error))
1201 r = ensure_running(bus);
1205 r = sd_bus_send(bus, m, &serial);
1209 timeout = calc_elapse(usec);
1213 sd_bus_message *incoming;
1218 /* Make sure there's room for queuing this
1219 * locally, before we read the message */
1221 q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1229 r = message_read(bus, &incoming);
1233 /* bus_message_dump(incoming); */
1234 /* sd_bus_message_rewind(incoming, true); */
1236 if (incoming->reply_serial == serial) {
1237 /* Found a match! */
1239 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1244 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1247 r = sd_bus_error_copy(error, &incoming->error);
1249 sd_bus_message_unref(incoming);
1253 k = bus_error_to_errno(&incoming->error);
1254 sd_bus_message_unref(incoming);
1258 sd_bus_message_unref(incoming);
1262 /* There's already guaranteed to be room for
1263 * this, so need to resize things here */
1264 bus->rqueue[bus->rqueue_size ++] = incoming;
1267 /* Try to read more, right-away */
1274 n = now(CLOCK_MONOTONIC);
1280 left = (uint64_t) -1;
1282 r = sd_bus_wait(bus, left);
1286 r = dispatch_wqueue(bus);
1292 int sd_bus_get_fd(sd_bus *bus) {
1302 int sd_bus_get_events(sd_bus *bus) {
1310 if (bus->state == BUS_OPENING)
1312 else if (bus->state == BUS_AUTHENTICATING) {
1314 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1319 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1320 if (bus->rqueue_size <= 0)
1322 if (bus->wqueue_size > 0)
1329 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1337 if (bus->state == BUS_OPENING) {
1348 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1350 socklen_t slen = sizeof(error);
1352 r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1357 bus->last_connect_error = -error;
1358 else if (p.revents & (POLLERR|POLLHUP))
1359 bus->last_connect_error = -ECONNREFUSED;
1361 return bus_start_auth(bus);
1363 /* Try next address */
1364 return bus_start_connect(bus);
1369 } else if (bus->state == BUS_AUTHENTICATING) {
1371 r = bus_write_auth(bus);
1375 r = bus_read_auth(bus);
1381 } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1382 struct filter_callback *l;
1383 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1385 r = dispatch_wqueue(bus);
1389 r = dispatch_rqueue(bus, &m);
1393 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1394 struct reply_callback *c;
1396 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1398 r = c->callback(bus, m, c->userdata);
1402 return r < 0 ? r : 0;
1406 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1407 r = l->callback(bus, m, l->userdata);
1409 return r < 0 ? r : 0;
1418 if (sd_bus_message_is_method_call(m, NULL, NULL)) {
1419 const sd_bus_error e = SD_BUS_ERROR_INIT_CONST("org.freedesktop.DBus.Error.UnknownObject", "Unknown object.");
1420 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1422 r = sd_bus_message_new_method_error(bus, m, &e, &reply);
1426 r = sd_bus_send(bus, reply, NULL);
1434 assert_not_reached("Unknown state");
1437 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1447 if (bus->rqueue_size > 0)
1450 e = sd_bus_get_events(bus);
1458 r = ppoll(&p, 1, timeout_usec == (uint64_t) -1 ? NULL : timespec_store(&ts, timeout_usec), NULL);
1465 int sd_bus_flush(sd_bus *bus) {
1473 r = ensure_running(bus);
1477 if (bus->wqueue_size <= 0)
1481 r = dispatch_wqueue(bus);
1485 if (bus->wqueue_size <= 0)
1488 r = sd_bus_wait(bus, (uint64_t) -1);
1494 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1495 struct filter_callback *f;
1502 f = new(struct filter_callback, 1);
1505 f->callback = callback;
1506 f->userdata = userdata;
1508 LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1512 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1513 struct filter_callback *f;
1520 LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1521 if (f->callback == callback && f->userdata == userdata) {
1522 LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);