chiark / gitweb /
f27d47cdea06bd808b7faa57a40204e5f951abf4
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32
33 #include "sd-bus.h"
34 #include "bus-internal.h"
35 #include "bus-message.h"
36 #include "bus-type.h"
37
38 #define WQUEUE_MAX 128
39
40 static void bus_free(sd_bus *b) {
41         struct filter_callback *f;
42         unsigned i;
43
44         assert(b);
45
46         if (b->fd >= 0)
47                 close_nointr_nofail(b->fd);
48
49         free(b->rbuffer);
50         free(b->unique_name);
51         free(b->auth_uid);
52         free(b->address);
53
54         for (i = 0; i < b->rqueue_size; i++)
55                 sd_bus_message_unref(b->rqueue[i]);
56         free(b->rqueue);
57
58         for (i = 0; i < b->wqueue_size; i++)
59                 sd_bus_message_unref(b->wqueue[i]);
60         free(b->wqueue);
61
62         hashmap_free_free(b->reply_callbacks);
63
64         while ((f = b->filter_callbacks)) {
65                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
66                 free(f);
67         }
68
69         free(b);
70 }
71
72 static sd_bus* bus_new(void) {
73         sd_bus *r;
74
75         r = new0(sd_bus, 1);
76         if (!r)
77                 return NULL;
78
79         r->n_ref = 1;
80         r->fd = -1;
81         r->message_version = 1;
82
83         /* We guarantee that wqueue always has space for at least one
84          * entry */
85         r->wqueue = new(sd_bus_message*, 1);
86         if (!r->wqueue) {
87                 free(r);
88                 return NULL;
89         }
90
91         return r;
92 };
93
94 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
95         const char *s;
96         int r;
97
98         assert(bus);
99         assert(reply);
100
101         bus->state = BUS_RUNNING;
102
103         r = sd_bus_message_read(reply, "s", &s);
104         if (r < 0)
105                 return r;
106
107         bus->unique_name = strdup(s);
108         if (!bus->unique_name)
109                 return -ENOMEM;
110
111         return 1;
112 }
113
114 static int bus_send_hello(sd_bus *bus) {
115         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
116         int r;
117
118         assert(bus);
119
120         r = sd_bus_message_new_method_call(
121                         bus,
122                         "org.freedesktop.DBus",
123                         "/",
124                         "org.freedesktop.DBus",
125                         "Hello",
126                         &m);
127         if (r < 0)
128                 return r;
129
130         r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, (uint64_t) -1, NULL);
131         if (r < 0)
132                 return r;
133
134         bus->sent_hello = true;
135         return r;
136 }
137
138 static int bus_start_running(sd_bus *bus) {
139         assert(bus);
140
141         if (bus->sent_hello) {
142                 bus->state = BUS_HELLO;
143                 return 0;
144         }
145
146         bus->state = BUS_RUNNING;
147         return 0;
148 }
149
150 static int parse_address_key(const char **p, const char *key, char **value) {
151         size_t l, n = 0;
152         const char *a;
153         char *r = NULL;
154
155         assert(p);
156         assert(*p);
157         assert(key);
158         assert(value);
159
160         l = strlen(key);
161         if (strncmp(*p, key, l) != 0)
162                 return 0;
163
164         if ((*p)[l] != '=')
165                 return 0;
166
167         if (*value)
168                 return -EINVAL;
169
170         a = *p + l + 1;
171         while (*a != ',' && *a != 0) {
172                 char c, *t;
173
174                 if (*a == '%') {
175                         int x, y;
176
177                         x = unhexchar(a[1]);
178                         if (x < 0) {
179                                 free(r);
180                                 return x;
181                         }
182
183                         y = unhexchar(a[2]);
184                         if (y < 0) {
185                                 free(r);
186                                 return y;
187                         }
188
189                         c = (char) ((x << 4) | y);
190                         a += 3;
191                 } else {
192                         c = *a;
193                         a++;
194                 }
195
196                 t = realloc(r, n + 2);
197                 if (!t) {
198                         free(r);
199                         return -ENOMEM;
200                 }
201
202                 r = t;
203                 r[n++] = c;
204         }
205
206         if (!r) {
207                 r = strdup("");
208                 if (!r)
209                         return -ENOMEM;
210         } else
211                 r[n] = 0;
212
213         if (*a == ',')
214                 a++;
215
216         *p = a;
217         *value = r;
218         return 1;
219 }
220
221 static void skip_address_key(const char **p) {
222         assert(p);
223         assert(*p);
224
225         *p += strcspn(*p, ",");
226
227         if (**p == ',')
228                 (*p) ++;
229 }
230
231 static int bus_parse_next_address(sd_bus *b) {
232         const char *a, *p;
233         _cleanup_free_ char *guid = NULL;
234         int r;
235
236         assert(b);
237
238         if (!b->address)
239                 return 0;
240         if (b->address[b->address_index] == 0)
241                 return 0;
242
243         a = b->address + b->address_index;
244
245         zero(b->sockaddr);
246         b->sockaddr_size = 0;
247         b->peer = SD_ID128_NULL;
248
249         if (startswith(a, "unix:")) {
250                 _cleanup_free_ char *path = NULL, *abstract = NULL;
251
252                 p = a + 5;
253                 while (*p != 0) {
254                         r = parse_address_key(&p, "guid", &guid);
255                         if (r < 0)
256                                 return r;
257                         else if (r > 0)
258                                 continue;
259
260                         r = parse_address_key(&p, "path", &path);
261                         if (r < 0)
262                                 return r;
263                         else if (r > 0)
264                                 continue;
265
266                         r = parse_address_key(&p, "abstract", &abstract);
267                         if (r < 0)
268                                 return r;
269                         else if (r > 0)
270                                 continue;
271
272                         skip_address_key(&p);
273                 }
274
275                 if (!path && !abstract)
276                         return -EINVAL;
277
278                 if (path && abstract)
279                         return -EINVAL;
280
281                 if (path) {
282                         size_t l;
283
284                         l = strlen(path);
285                         if (l > sizeof(b->sockaddr.un.sun_path))
286                                 return -E2BIG;
287
288                         b->sockaddr.un.sun_family = AF_UNIX;
289                         strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
290                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
291                 } else if (abstract) {
292                         size_t l;
293
294                         l = strlen(abstract);
295                         if (l > sizeof(b->sockaddr.un.sun_path) - 1)
296                                 return -E2BIG;
297
298                         b->sockaddr.un.sun_family = AF_UNIX;
299                         b->sockaddr.un.sun_path[0] = 0;
300                         strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
301                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
302                 }
303
304         } else if (startswith(a, "tcp:")) {
305                 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
306                 struct addrinfo hints, *result;
307
308                 p = a + 4;
309                 while (*p != 0) {
310                         r = parse_address_key(&p, "guid", &guid);
311                         if (r < 0)
312                                 return r;
313                         else if (r > 0)
314                                 continue;
315
316                         r = parse_address_key(&p, "host", &host);
317                         if (r < 0)
318                                 return r;
319                         else if (r > 0)
320                                 continue;
321
322                         r = parse_address_key(&p, "port", &port);
323                         if (r < 0)
324                                 return r;
325                         else if (r > 0)
326                                 continue;
327
328                         r = parse_address_key(&p, "family", &family);
329                         if (r < 0)
330                                 return r;
331                         else if (r > 0)
332                                 continue;
333
334                         skip_address_key(&p);
335                 }
336
337                 if (!host || !port)
338                         return -EINVAL;
339
340                 zero(hints);
341                 hints.ai_socktype = SOCK_STREAM;
342                 hints.ai_flags = AI_ADDRCONFIG;
343
344                 if (family) {
345                         if (streq(family, "ipv4"))
346                                 hints.ai_family = AF_INET;
347                         else if (streq(family, "ipv6"))
348                                 hints.ai_family = AF_INET6;
349                         else
350                                 return -EINVAL;
351                 }
352
353                 r = getaddrinfo(host, port, &hints, &result);
354                 if (r == EAI_SYSTEM)
355                         return -errno;
356                 else if (r != 0)
357                         return -EADDRNOTAVAIL;
358
359                 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
360                 b->sockaddr_size = result->ai_addrlen;
361
362                 freeaddrinfo(result);
363         }
364
365         if (guid) {
366                 r = sd_id128_from_string(guid, &b->peer);
367                 if (r < 0)
368                         return r;
369         }
370
371         b->address_index = p - b->address;
372         return 1;
373 }
374
375 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
376
377         while (size > 0) {
378                 struct iovec *i = iov + *idx;
379
380                 if (i->iov_len > size) {
381                         i->iov_base = (uint8_t*) i->iov_base + size;
382                         i->iov_len -= size;
383                         return;
384                 }
385
386                 size -= i->iov_len;
387
388                 i->iov_base = NULL;
389                 i->iov_len = 0;
390
391                 (*idx) ++;
392         }
393 }
394
395 static int bus_write_auth(sd_bus *b) {
396         struct msghdr mh;
397         ssize_t k;
398
399         assert(b);
400         assert(b->state == BUS_AUTHENTICATING);
401
402         if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
403                 return 0;
404
405         zero(mh);
406         mh.msg_iov = b->auth_iovec + b->auth_index;
407         mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
408
409         k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
410         if (k < 0)
411                 return errno == EAGAIN ? 0 : -errno;
412
413         iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
414
415         return 1;
416 }
417
418 static int bus_auth_verify(sd_bus *b) {
419         char *e, *f;
420         sd_id128_t peer;
421         unsigned i;
422         int r;
423
424         /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
425          * that's it */
426
427         e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
428         if (!e)
429                 return 0;
430
431         f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
432         if (!f)
433                 return 0;
434
435         if (e - (char*) b->rbuffer != 3 + 32)
436                 return -EPERM;
437
438         if (memcmp(b->rbuffer, "OK ", 3))
439                 return -EPERM;
440
441         for (i = 0; i < 32; i += 2) {
442                 int x, y;
443
444                 x = unhexchar(((char*) b->rbuffer)[3 + i]);
445                 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
446
447                 if (x < 0 || y < 0)
448                         return -EINVAL;
449
450                 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
451         }
452
453         if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
454             !sd_id128_equal(b->peer, peer))
455                 return -EPERM;
456
457         b->peer = peer;
458
459         b->can_fds =
460                 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
461                 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
462
463         if (f + 2 > (char*) b->rbuffer + b->rbuffer_size) {
464                 b->rbuffer_size -= (f - (char*) b->rbuffer);
465                 memmove(b->rbuffer, f + 2, b->rbuffer_size);
466         }
467
468         b->rbuffer_size = 0;
469
470         r = bus_start_running(b);
471         if (r < 0)
472                 return r;
473
474         return 1;
475 }
476
477 static int bus_read_auth(sd_bus *b) {
478         struct msghdr mh;
479         struct iovec iov;
480         size_t n;
481         ssize_t k;
482         int r;
483         void *p;
484
485         assert(b);
486
487         r = bus_auth_verify(b);
488         if (r != 0)
489                 return r;
490
491         n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
492         p = realloc(b->rbuffer, n);
493         if (!p)
494                 return -ENOMEM;
495
496         b->rbuffer = p;
497
498         zero(iov);
499         iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
500         iov.iov_len = n - b->rbuffer_size;
501
502         zero(mh);
503         mh.msg_iov = &iov;
504         mh.msg_iovlen = 1;
505
506         k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
507         if (k < 0)
508                 return errno == EAGAIN ? 0 : -errno;
509
510         b->rbuffer_size += k;
511
512         r = bus_auth_verify(b);
513         if (r != 0)
514                 return r;
515
516         return 0;
517 }
518
519 static int bus_start_auth(sd_bus *b) {
520         static const char auth_prefix[] = "\0AUTH EXTERNAL ";
521         static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
522
523         char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
524         size_t l;
525
526         assert(b);
527
528         b->state = BUS_AUTHENTICATING;
529
530         snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
531         char_array_0(text);
532
533         l = strlen(text);
534         b->auth_uid = hexmem(text, l);
535         if (!b->auth_uid)
536                 return -ENOMEM;
537
538         b->auth_iovec[0].iov_base = (void*) auth_prefix;
539         b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
540         b->auth_iovec[1].iov_base = (void*) b->auth_uid;
541         b->auth_iovec[1].iov_len = l * 2;
542         b->auth_iovec[2].iov_base = (void*) auth_suffix;
543         b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
544         b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
545
546         return bus_write_auth(b);
547 }
548
549 static int bus_start_connect(sd_bus *b) {
550         int r;
551
552         assert(b);
553         assert(b->fd < 0);
554
555         for (;;) {
556                 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
557                         r = bus_parse_next_address(b);
558                         if (r < 0)
559                                 return r;
560                         if (r == 0)
561                                 return b->last_connect_error ? b->last_connect_error : -ECONNREFUSED;
562                 }
563
564                 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
565                 if (b->fd < 0) {
566                         b->last_connect_error = -errno;
567                         zero(b->sockaddr);
568                         continue;
569                 }
570
571                 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
572                 if (r < 0) {
573                         if (errno == EINPROGRESS)
574                                 return 0;
575
576                         b->last_connect_error = -errno;
577                         close_nointr_nofail(b->fd);
578                         b->fd = -1;
579                         zero(b->sockaddr);
580                         continue;
581                 }
582
583                 return bus_start_auth(b);
584         }
585 }
586
587 int sd_bus_open_system(sd_bus **ret) {
588         const char *e;
589         sd_bus *b;
590         int r;
591
592         if (!ret)
593                 return -EINVAL;
594
595         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
596         if (e) {
597                 r = sd_bus_open_address(e, &b);
598                 if (r < 0)
599                         return r;
600         } else {
601                 b = bus_new();
602                 if (!b)
603                         return -ENOMEM;
604
605                 b->sockaddr.un.sun_family = AF_UNIX;
606                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
607                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
608
609                 r = bus_start_connect(b);
610                 if (r < 0) {
611                         bus_free(b);
612                         return r;
613                 }
614         }
615
616         r = bus_send_hello(b);
617         if (r < 0) {
618                 sd_bus_unref(b);
619                 return r;
620         }
621
622         *ret = b;
623         return 0;
624 }
625
626 int sd_bus_open_user(sd_bus **ret) {
627         const char *e;
628         sd_bus *b;
629         size_t l;
630         int r;
631
632         if (!ret)
633                 return -EINVAL;
634
635         e = getenv("DBUS_SESSION_BUS_ADDRESS");
636         if (e) {
637                 r = sd_bus_open_address(e, &b);
638                 if (r < 0)
639                         return r;
640         } else {
641                 e = getenv("XDG_RUNTIME_DIR");
642                 if (!e)
643                         return -ENOENT;
644
645                 l = strlen(e);
646                 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
647                         return -E2BIG;
648
649                 b = bus_new();
650                 if (!b)
651                         return -ENOMEM;
652
653                 b->sockaddr.un.sun_family = AF_UNIX;
654                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
655                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
656
657                 r = bus_start_connect(b);
658                 if (r < 0) {
659                         bus_free(b);
660                         return r;
661                 }
662         }
663
664         r = bus_send_hello(b);
665         if (r < 0) {
666                 sd_bus_unref(b);
667                 return r;
668         }
669
670         *ret = b;
671         return 0;
672 }
673
674 int sd_bus_open_address(const char *address, sd_bus **ret) {
675         sd_bus *b;
676         int r;
677
678         if (!address)
679                 return -EINVAL;
680         if (!ret)
681                 return -EINVAL;
682
683         b = bus_new();
684         if (!b)
685                 return -ENOMEM;
686
687         b->address = strdup(address);
688         if (!b->address) {
689                 bus_free(b);
690                 return -ENOMEM;
691         }
692
693         r = bus_start_connect(b);
694         if (r < 0) {
695                 bus_free(b);
696                 return r;
697         }
698
699         *ret = b;
700         return 0;
701 }
702
703 int sd_bus_open_fd(int fd, sd_bus **ret) {
704         sd_bus *b;
705         int r;
706
707         if (fd < 0)
708                 return -EINVAL;
709         if (!ret)
710                 return -EINVAL;
711
712         b = bus_new();
713         if (!b)
714                 return -ENOMEM;
715
716         b->fd = fd;
717         fd_nonblock(b->fd, true);
718         fd_cloexec(b->fd, true);
719
720         r = bus_start_auth(b);
721         if (r < 0) {
722                 bus_free(b);
723                 return r;
724         }
725
726         *ret = b;
727         return 0;
728 }
729
730 void sd_bus_close(sd_bus *bus) {
731         if (!bus)
732                 return;
733         if (bus->fd < 0)
734                 return;
735
736         close_nointr_nofail(bus->fd);
737         bus->fd = -1;
738 }
739
740 sd_bus *sd_bus_ref(sd_bus *bus) {
741         if (!bus)
742                 return NULL;
743
744         assert(bus->n_ref > 0);
745
746         bus->n_ref++;
747         return bus;
748 }
749
750 sd_bus *sd_bus_unref(sd_bus *bus) {
751         if (!bus)
752                 return NULL;
753
754         assert(bus->n_ref > 0);
755         bus->n_ref--;
756
757         if (bus->n_ref <= 0)
758                 bus_free(bus);
759
760         return NULL;
761 }
762
763 int sd_bus_is_running(sd_bus *bus) {
764         if (!bus)
765                 return -EINVAL;
766
767         if (bus->fd < 0)
768                 return -ENOTCONN;
769
770         return bus->state == BUS_RUNNING;
771 }
772
773 int sd_bus_can_send(sd_bus *bus, char type) {
774
775         if (!bus)
776                 return -EINVAL;
777         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
778                 return -EAGAIN;
779
780         if (type == SD_BUS_TYPE_UNIX_FD)
781                 return bus->can_fds;
782
783         return bus_type_is_valid(type);
784 }
785
786 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
787         assert(m);
788
789         if (m->header->version > b->message_version)
790                 return -EPERM;
791
792         if (m->sealed)
793                 return 0;
794
795         return bus_message_seal(m, ++b->serial);
796 }
797
798 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
799         struct msghdr mh;
800         struct iovec *iov;
801         ssize_t k;
802         size_t n;
803         unsigned j;
804
805         assert(bus);
806         assert(m);
807         assert(idx);
808         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
809
810         n = m->n_iovec * sizeof(struct iovec);
811         iov = alloca(n);
812         memcpy(iov, m->iovec, n);
813
814         j = 0;
815         iovec_advance(iov, &j, *idx);
816
817         zero(mh);
818         mh.msg_iov = iov;
819         mh.msg_iovlen = m->n_iovec;
820
821         k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
822         if (k < 0)
823                 return -errno;
824
825         *idx += (size_t) k;
826         iovec_advance(iov, &j, *idx);
827
828         return j >= m->n_iovec;
829 }
830
831 static int message_read_need(sd_bus *bus, size_t *need) {
832         uint32_t a, b;
833         uint8_t e;
834
835         assert(bus);
836         assert(need);
837         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
838
839         if (bus->rbuffer_size < sizeof(struct bus_header)) {
840                 *need = sizeof(struct bus_header);
841                 return 0;
842         }
843
844         a = ((const uint32_t*) bus->rbuffer)[1];
845         b = ((const uint32_t*) bus->rbuffer)[3];
846
847         e = ((const uint8_t*) bus->rbuffer)[0];
848         if (e == SD_BUS_LITTLE_ENDIAN) {
849                 a = le32toh(a);
850                 b = le32toh(b);
851         } else if (e == SD_BUS_BIG_ENDIAN) {
852                 a = be32toh(a);
853                 b = be32toh(b);
854         } else
855                 return -EBADMSG;
856
857         *need = sizeof(struct bus_header) + ALIGN_TO(b, 8) + a;
858         return 0;
859 }
860
861 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
862         sd_bus_message *t;
863         void *b = NULL;
864         int r;
865
866         assert(bus);
867         assert(m);
868         assert(bus->rbuffer_size >= size);
869         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
870
871         if (bus->rbuffer_size > size) {
872                 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
873                 if (!b) {
874                         free(t);
875                         return -ENOMEM;
876                 }
877         }
878
879         r = bus_message_from_malloc(bus->rbuffer, size, &t);
880         if (r < 0) {
881                 free(b);
882                 return r;
883         }
884
885         bus->rbuffer = b;
886         bus->rbuffer_size -= size;
887
888         *m = t;
889         return 1;
890 }
891
892 static int message_read(sd_bus *bus, sd_bus_message **m) {
893         struct msghdr mh;
894         struct iovec iov;
895         ssize_t k;
896         size_t need;
897         int r;
898         void *b;
899
900         assert(bus);
901         assert(m);
902         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
903
904         r = message_read_need(bus, &need);
905         if (r < 0)
906                 return r;
907
908         if (bus->rbuffer_size >= need)
909                 return message_make(bus, need, m);
910
911         b = realloc(bus->rbuffer, need);
912         if (!b)
913                 return -ENOMEM;
914
915         bus->rbuffer = b;
916
917         zero(iov);
918         iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
919         iov.iov_len = need - bus->rbuffer_size;
920
921         zero(mh);
922         mh.msg_iov = &iov;
923         mh.msg_iovlen = 1;
924
925         k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
926         if (k < 0)
927                 return errno == EAGAIN ? 0 : -errno;
928
929         bus->rbuffer_size += k;
930
931         r = message_read_need(bus, &need);
932         if (r < 0)
933                 return r;
934
935         if (bus->rbuffer_size >= need)
936                 return message_make(bus, need, m);
937
938         return 0;
939 }
940
941 static int dispatch_wqueue(sd_bus *bus) {
942         int r, c = 0;
943
944         assert(bus);
945         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
946
947         if (bus->fd < 0)
948                 return -ENOTCONN;
949
950         while (bus->wqueue_size > 0) {
951
952                 r = message_write(bus, bus->wqueue[0], &bus->windex);
953                 if (r < 0) {
954                         sd_bus_close(bus);
955                         return r;
956                 } else if (r == 0)
957                         /* Wasn't fully written yet... */
958                         break;
959                 else {
960                         /* Fully written. Let's drop the entry from
961                          * the queue.
962                          *
963                          * This isn't particularly optimized, but
964                          * well, this is supposed to be our worst-case
965                          * buffer only, and the socket buffer is
966                          * supposed to be our primary buffer, and if
967                          * it got full, then all bets are off
968                          * anyway. */
969
970                         sd_bus_message_unref(bus->wqueue[0]);
971                         bus->wqueue_size --;
972                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
973                         bus->windex = 0;
974
975                         c++;
976                 }
977         }
978
979         return c;
980 }
981
982 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
983         int r;
984
985         assert(bus);
986         assert(m);
987         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
988
989         if (bus->fd < 0)
990                 return -ENOTCONN;
991
992         if (bus->rqueue_size > 0) {
993                 /* Dispatch a queued message */
994
995                 *m = bus->rqueue[0];
996                 bus->rqueue_size --;
997                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
998                 return 1;
999         }
1000
1001         /* Try to read a new message */
1002         r = message_read(bus, m);
1003         if (r < 0) {
1004                 sd_bus_close(bus);
1005                 return r;
1006         }
1007
1008         return r;
1009 }
1010
1011 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1012         int r;
1013
1014         if (!bus)
1015                 return -EINVAL;
1016         if (bus->fd < 0)
1017                 return -ENOTCONN;
1018         if (!m)
1019                 return -EINVAL;
1020
1021         r = bus_seal_message(bus, m);
1022         if (r < 0)
1023                 return r;
1024
1025         /* If this is a reply and no reply was requested, then let's
1026          * suppress this, if we can */
1027         if (m->dont_send && !serial)
1028                 return 0;
1029
1030         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1031                 size_t idx = 0;
1032
1033                 r = message_write(bus, m, &idx);
1034                 if (r < 0) {
1035                         sd_bus_close(bus);
1036                         return r;
1037                 } else if (r == 0)  {
1038                         /* Wasn't fully written. So let's remember how
1039                          * much was written. Note that the first entry
1040                          * of the wqueue array is always allocated so
1041                          * that we always can remember how much was
1042                          * written. */
1043                         bus->wqueue[0] = sd_bus_message_ref(m);
1044                         bus->wqueue_size = 1;
1045                         bus->windex = idx;
1046                 }
1047         } else {
1048                 sd_bus_message **q;
1049
1050                 /* Just append it to the queue. */
1051
1052                 if (bus->wqueue_size >= WQUEUE_MAX)
1053                         return -ENOBUFS;
1054
1055                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1056                 if (!q)
1057                         return -ENOMEM;
1058
1059                 bus->wqueue = q;
1060                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1061         }
1062
1063         if (serial)
1064                 *serial = BUS_MESSAGE_SERIAL(m);
1065
1066         return 0;
1067 }
1068
1069 static usec_t calc_elapse(uint64_t usec) {
1070         if (usec == (uint64_t) -1)
1071                 return 0;
1072
1073         if (usec == 0)
1074                 usec = SD_BUS_DEFAULT_TIMEOUT;
1075
1076         return now(CLOCK_MONOTONIC) + usec;
1077 }
1078
1079 int sd_bus_send_with_reply(
1080                 sd_bus *bus,
1081                 sd_bus_message *m,
1082                 sd_message_handler_t callback,
1083                 void *userdata,
1084                 uint64_t usec,
1085                 uint64_t *serial) {
1086
1087         struct reply_callback *c;
1088         int r;
1089
1090         if (!bus)
1091                 return -EINVAL;
1092         if (bus->fd < 0)
1093                 return -ENOTCONN;
1094         if (!m)
1095                 return -EINVAL;
1096         if (!callback)
1097                 return -EINVAL;
1098         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1099                 return -EINVAL;
1100         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1101                 return -EINVAL;
1102
1103         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1104         if (r < 0)
1105                 return r;
1106
1107         r = bus_seal_message(bus, m);
1108         if (r < 0)
1109                 return r;
1110
1111         c = new(struct reply_callback, 1);
1112         if (!c)
1113                 return -ENOMEM;
1114
1115         c->callback = callback;
1116         c->userdata = userdata;
1117         c->serial = BUS_MESSAGE_SERIAL(m);
1118         c->timeout = calc_elapse(usec);
1119
1120         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1121         if (r < 0) {
1122                 free(c);
1123                 return r;
1124         }
1125
1126         r = sd_bus_send(bus, m, serial);
1127         if (r < 0) {
1128                 hashmap_remove(bus->reply_callbacks, &c->serial);
1129                 free(c);
1130                 return r;
1131         }
1132
1133         return r;
1134 }
1135
1136 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1137         struct reply_callbacks *c;
1138
1139         if (!bus)
1140                 return -EINVAL;
1141         if (serial == 0)
1142                 return -EINVAL;
1143
1144         c = hashmap_remove(bus->reply_callbacks, &serial);
1145         if (!c)
1146                 return 0;
1147
1148         free(c);
1149         return 1;
1150 }
1151
1152 static int ensure_running(sd_bus *bus) {
1153         int r;
1154
1155         assert(bus);
1156
1157         r = sd_bus_is_running(bus);
1158         if (r != 0)
1159                 return r;
1160
1161         for (;;) {
1162                 r = sd_bus_process(bus, NULL);
1163                 if (r < 0)
1164                         return r;
1165
1166                 r = sd_bus_is_running(bus);
1167                 if (r != 0)
1168                         return r;
1169
1170                 r = sd_bus_wait(bus, (uint64_t) -1);
1171                 if (r < 0)
1172                         return r;
1173         }
1174 }
1175
1176 int sd_bus_send_with_reply_and_block(
1177                 sd_bus *bus,
1178                 sd_bus_message *m,
1179                 uint64_t usec,
1180                 sd_bus_error *error,
1181                 sd_bus_message **reply) {
1182
1183         int r;
1184         usec_t timeout;
1185         uint64_t serial;
1186         bool room = false;
1187
1188         if (!bus)
1189                 return -EINVAL;
1190         if (bus->fd < 0)
1191                 return -ENOTCONN;
1192         if (!m)
1193                 return -EINVAL;
1194         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1195                 return -EINVAL;
1196         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1197                 return -EINVAL;
1198         if (bus_error_is_dirty(error))
1199                 return -EINVAL;
1200
1201         r = ensure_running(bus);
1202         if (r < 0)
1203                 return r;
1204
1205         r = sd_bus_send(bus, m, &serial);
1206         if (r < 0)
1207                 return r;
1208
1209         timeout = calc_elapse(usec);
1210
1211         for (;;) {
1212                 usec_t left;
1213                 sd_bus_message *incoming;
1214
1215                 if (!room) {
1216                         sd_bus_message **q;
1217
1218                         /* Make sure there's room for queuing this
1219                          * locally, before we read the message */
1220
1221                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1222                         if (!q)
1223                                 return -ENOMEM;
1224
1225                         bus->rqueue = q;
1226                         room = true;
1227                 }
1228
1229                 r = message_read(bus, &incoming);
1230                 if (r < 0)
1231                         return r;
1232                 if (r > 0) {
1233                         /* bus_message_dump(incoming); */
1234                         /* sd_bus_message_rewind(incoming, true); */
1235
1236                         if (incoming->reply_serial == serial) {
1237                                 /* Found a match! */
1238
1239                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1240                                         *reply = incoming;
1241                                         return 0;
1242                                 }
1243
1244                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1245                                         int k;
1246
1247                                         r = sd_bus_error_copy(error, &incoming->error);
1248                                         if (r < 0) {
1249                                                 sd_bus_message_unref(incoming);
1250                                                 return r;
1251                                         }
1252
1253                                         k = bus_error_to_errno(&incoming->error);
1254                                         sd_bus_message_unref(incoming);
1255                                         return k;
1256                                 }
1257
1258                                 sd_bus_message_unref(incoming);
1259                                 return -EIO;
1260                         }
1261
1262                         /* There's already guaranteed to be room for
1263                          * this, so need to resize things here */
1264                         bus->rqueue[bus->rqueue_size ++] = incoming;
1265                         room = false;
1266
1267                         /* Try to read more, right-away */
1268                         continue;
1269                 }
1270
1271                 if (timeout > 0) {
1272                         usec_t n;
1273
1274                         n = now(CLOCK_MONOTONIC);
1275                         if (n >= timeout)
1276                                 return -ETIMEDOUT;
1277
1278                         left = timeout - n;
1279                 } else
1280                         left = (uint64_t) -1;
1281
1282                 r = sd_bus_wait(bus, left);
1283                 if (r < 0)
1284                         return r;
1285
1286                 r = dispatch_wqueue(bus);
1287                 if (r < 0)
1288                         return r;
1289         }
1290 }
1291
1292 int sd_bus_get_fd(sd_bus *bus) {
1293         if (!bus)
1294                 return -EINVAL;
1295
1296         if (bus->fd < 0)
1297                 return -ENOTCONN;
1298
1299         return bus->fd;
1300 }
1301
1302 int sd_bus_get_events(sd_bus *bus) {
1303         int flags = 0;
1304
1305         if (!bus)
1306                 return -EINVAL;
1307         if (bus->fd < 0)
1308                 return -ENOTCONN;
1309
1310         if (bus->state == BUS_OPENING)
1311                 flags |= POLLOUT;
1312         else if (bus->state == BUS_AUTHENTICATING) {
1313
1314                 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1315                         flags |= POLLOUT;
1316
1317                 flags |= POLLIN;
1318
1319         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1320                 if (bus->rqueue_size <= 0)
1321                         flags |= POLLIN;
1322                 if (bus->wqueue_size > 0)
1323                         flags |= POLLOUT;
1324         }
1325
1326         return flags;
1327 }
1328
1329 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1330         int r;
1331
1332         if (!bus)
1333                 return -EINVAL;
1334         if (bus->fd < 0)
1335                 return -ENOTCONN;
1336
1337         if (bus->state == BUS_OPENING) {
1338                 struct pollfd p;
1339
1340                 zero(p);
1341                 p.fd = bus->fd;
1342                 p.events = POLLOUT;
1343
1344                 r = poll(&p, 1, 0);
1345                 if (r < 0)
1346                         return -errno;
1347
1348                 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1349                         int error = 0;
1350                         socklen_t slen = sizeof(error);
1351
1352                         r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1353                         if (r < 0)
1354                                 return -errno;
1355
1356                         if (error != 0)
1357                                 bus->last_connect_error = -error;
1358                         else if (p.revents & (POLLERR|POLLHUP))
1359                                 bus->last_connect_error = -ECONNREFUSED;
1360                         else
1361                                 return bus_start_auth(bus);
1362
1363                         /* Try next address */
1364                         return bus_start_connect(bus);
1365                 }
1366
1367                 return 0;
1368
1369         } else if (bus->state == BUS_AUTHENTICATING) {
1370
1371                 r = bus_write_auth(bus);
1372                 if (r < 0)
1373                         return r;
1374
1375                 r = bus_read_auth(bus);
1376                 if (r < 0)
1377                         return r;
1378
1379                 return 0;
1380
1381         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1382                 struct filter_callback *l;
1383                 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1384
1385                 r = dispatch_wqueue(bus);
1386                 if (r < 0)
1387                         return r;
1388
1389                 r = dispatch_rqueue(bus, &m);
1390                 if (r <= 0)
1391                         return r;
1392
1393                 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1394                         struct reply_callback *c;
1395
1396                         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1397                         if (c) {
1398                                 r = c->callback(bus, m, c->userdata);
1399                                 free(c);
1400
1401                                 if (r != 0)
1402                                         return r < 0 ? r : 0;
1403                         }
1404                 }
1405
1406                 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1407                         r = l->callback(bus, m, l->userdata);
1408                         if (r != 0)
1409                                 return r < 0 ? r : 0;
1410                 }
1411
1412                 if (ret) {
1413                         *ret = m;
1414                         m = NULL;
1415                         return 1;
1416                 }
1417
1418                 if (sd_bus_message_is_method_call(m, NULL, NULL)) {
1419                         const sd_bus_error e = SD_BUS_ERROR_INIT_CONST("org.freedesktop.DBus.Error.UnknownObject", "Unknown object.");
1420                         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1421
1422                         r = sd_bus_message_new_method_error(bus, m, &e, &reply);
1423                         if (r < 0)
1424                                 return r;
1425
1426                         r = sd_bus_send(bus, reply, NULL);
1427                         if (r < 0)
1428                                 return r;
1429                 }
1430
1431                 return 0;
1432         }
1433
1434         assert_not_reached("Unknown state");
1435 }
1436
1437 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1438         struct pollfd p;
1439         int r, e;
1440         struct timespec ts;
1441
1442         if (!bus)
1443                 return -EINVAL;
1444         if (bus->fd < 0)
1445                 return -ENOTCONN;
1446
1447         if (bus->rqueue_size > 0)
1448                 return 0;
1449
1450         e = sd_bus_get_events(bus);
1451         if (e < 0)
1452                 return e;
1453
1454         zero(p);
1455         p.fd = bus->fd;
1456         p.events = e;
1457
1458         r = ppoll(&p, 1, timeout_usec == (uint64_t) -1 ? NULL : timespec_store(&ts, timeout_usec), NULL);
1459         if (r < 0)
1460                 return -errno;
1461
1462         return r;
1463 }
1464
1465 int sd_bus_flush(sd_bus *bus) {
1466         int r;
1467
1468         if (!bus)
1469                 return -EINVAL;
1470         if (bus->fd < 0)
1471                 return -ENOTCONN;
1472
1473         r = ensure_running(bus);
1474         if (r < 0)
1475                 return r;
1476
1477         if (bus->wqueue_size <= 0)
1478                 return 0;
1479
1480         for (;;) {
1481                 r = dispatch_wqueue(bus);
1482                 if (r < 0)
1483                         return r;
1484
1485                 if (bus->wqueue_size <= 0)
1486                         return 0;
1487
1488                 r = sd_bus_wait(bus, (uint64_t) -1);
1489                 if (r < 0)
1490                         return r;
1491         }
1492 }
1493
1494 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1495         struct filter_callback *f;
1496
1497         if (!bus)
1498                 return -EINVAL;
1499         if (!callback)
1500                 return -EINVAL;
1501
1502         f = new(struct filter_callback, 1);
1503         if (!f)
1504                 return -ENOMEM;
1505         f->callback = callback;
1506         f->userdata = userdata;
1507
1508         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1509         return 0;
1510 }
1511
1512 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1513         struct filter_callback *f;
1514
1515         if (!bus)
1516                 return -EINVAL;
1517         if (!callback)
1518                 return -EINVAL;
1519
1520         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1521                 if (f->callback == callback && f->userdata == userdata) {
1522                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1523                         free(f);
1524                         return 1;
1525                 }
1526         }
1527
1528         return 0;
1529 }