chiark / gitweb /
bus: add basic implementation of a native bus client library
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32
33 #include "sd-bus.h"
34 #include "bus-internal.h"
35 #include "bus-message.h"
36 #include "bus-type.h"
37
38 #define WQUEUE_MAX 128
39
40 static void bus_free(sd_bus *b) {
41         struct filter_callback *f;
42
43         assert(b);
44
45         if (b->fd >= 0)
46                 close_nointr_nofail(b->fd);
47
48         free(b->rbuffer);
49         free(b->rqueue);
50         free(b->wqueue);
51         free(b->unique_name);
52
53         hashmap_free_free(b->reply_callbacks);
54
55         while ((f = b->filter_callbacks)) {
56                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
57                 free(f);
58         }
59
60         free(b);
61 }
62
63 static sd_bus* bus_new(void) {
64         sd_bus *r;
65
66         r = new0(sd_bus, 1);
67         if (!r)
68                 return NULL;
69
70         r->n_ref = 1;
71         r->fd = -1;
72         r->message_version = 1;
73
74         /* We guarantee that wqueue always has space for at least one
75          * entry */
76         r->wqueue = new(sd_bus_message*, 1);
77         if (!r->wqueue) {
78                 free(r);
79                 return NULL;
80         }
81
82         return r;
83 };
84
85 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
86         const char *s;
87         int r;
88
89         assert(bus);
90         assert(reply);
91
92         bus->state = BUS_RUNNING;
93
94         r = sd_bus_message_read(reply, "s", &s);
95         if (r < 0)
96                 return r;
97
98         bus->unique_name = strdup(s);
99         if (!bus->unique_name)
100                 return -ENOMEM;
101
102         return 1;
103 }
104
105 static int bus_send_hello(sd_bus *bus) {
106         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
107         int r;
108
109         assert(bus);
110
111         r = sd_bus_message_new_method_call(
112                         bus,
113                         "org.freedesktop.DBus",
114                         "/",
115                         "org.freedesktop.DBus",
116                         "Hello",
117                         &m);
118         if (r < 0)
119                 return r;
120
121         r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, (uint64_t) -1, NULL);
122         if (r < 0)
123                 return r;
124
125         return 0;
126 }
127
128 static int bus_start_running(sd_bus *bus) {
129         int r;
130
131         assert(bus);
132
133         if (bus->send_hello) {
134                 bus->state = BUS_HELLO;
135
136                 r = bus_send_hello(bus);
137                 if (r < 0)
138                         return r;
139         }
140
141         bus->state = BUS_RUNNING;
142
143         return 0;
144 }
145
146 static int parse_address_key(const char **p, const char *key, char **value) {
147         size_t l, n = 0;
148         const char *a;
149         char *r = NULL;
150
151         assert(p);
152         assert(*p);
153         assert(key);
154         assert(value);
155
156         l = strlen(key);
157         if (!strncmp(*p, key, l) != 0)
158                 return 0;
159
160         if ((*p)[l] != '=')
161                 return 0;
162
163         if (*value)
164                 return -EINVAL;
165
166         a = *p + l + 1;
167         while (*a != ';' && *a != 0) {
168                 char c, *t;
169
170                 if (*a == '%') {
171                         int x, y;
172
173                         x = unhexchar(a[1]);
174                         if (x < 0) {
175                                 free(r);
176                                 return x;
177                         }
178
179                         y = unhexchar(a[2]);
180                         if (y < 0) {
181                                 free(r);
182                                 return y;
183                         }
184
185                         a += 3;
186                         c = (char) ((x << 4) | y);
187                 } else
188                         c = *a;
189
190                 t = realloc(r, n + 1);
191                 if (!t) {
192                         free(r);
193                         return -ENOMEM;
194                 }
195
196                 r = t;
197                 r[n++] = c;
198         }
199
200         *p = a;
201         *value = r;
202         return 1;
203 }
204
205 static void skip_address_key(const char **p) {
206         assert(p);
207         assert(*p);
208
209         *p += strcspn(*p, ";");
210 }
211
212 static int bus_parse_next_address(sd_bus *b) {
213         const char *a, *p;
214         _cleanup_free_ char *guid = NULL;
215         int r;
216
217         assert(b);
218
219         if (!b->address)
220                 return 0;
221         if (b->address[b->address_index] == 0)
222                 return 0;
223
224         a = b->address + b->address_index;
225
226         zero(b->sockaddr);
227         b->sockaddr_size = 0;
228         b->peer = SD_ID128_NULL;
229
230         if (startswith(a, "unix:")) {
231                 _cleanup_free_ char *path = NULL, *abstract = NULL;
232
233                 p = a + 5;
234                 while (*p != 0 && *p != ';') {
235                         r = parse_address_key(&p, "guid", &guid);
236                         if (r < 0)
237                                 return r;
238                         else if (r > 0)
239                                 continue;
240
241                         r = parse_address_key(&p, "path", &path);
242                         if (r < 0)
243                                 return r;
244                         else if (r > 0)
245                                 continue;
246
247                         r = parse_address_key(&p, "abstract", &abstract);
248                         if (r < 0)
249                                 return r;
250                         else if (r > 0)
251                                 continue;
252
253                         skip_address_key(&p);
254                 }
255
256                 if (!path && !abstract)
257                         return -EINVAL;
258
259                 if (path && abstract)
260                         return -EINVAL;
261
262                 if (path) {
263                         size_t l;
264
265                         l = strlen(path);
266                         if (l > sizeof(b->sockaddr.un.sun_path))
267                                 return -E2BIG;
268
269                         b->sockaddr.un.sun_family = AF_UNIX;
270                         strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
271                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
272                 } else if (abstract) {
273                         size_t l;
274
275                         l = strlen(path);
276                         if (l > sizeof(b->sockaddr.un.sun_path) - 1)
277                                 return -E2BIG;
278
279                         b->sockaddr.un.sun_family = AF_UNIX;
280                         b->sockaddr.un.sun_path[0] = 0;
281                         strncpy(b->sockaddr.un.sun_path+1, path, sizeof(b->sockaddr.un.sun_path)-1);
282                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
283                 }
284
285         } else if (startswith(a, "tcp:")) {
286                 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
287                 struct addrinfo hints, *result;
288
289                 p = a + 4;
290                 while (*p != 0 && *p != ';') {
291                         r = parse_address_key(&p, "guid", &guid);
292                         if (r < 0)
293                                 return r;
294                         else if (r > 0)
295                                 continue;
296
297                         r = parse_address_key(&p, "host", &host);
298                         if (r < 0)
299                                 return r;
300                         else if (r > 0)
301                                 continue;
302
303                         r = parse_address_key(&p, "port", &port);
304                         if (r < 0)
305                                 return r;
306                         else if (r > 0)
307                                 continue;
308
309                         r = parse_address_key(&p, "family", &family);
310                         if (r < 0)
311                                 return r;
312                         else if (r > 0)
313                                 continue;
314
315                         skip_address_key(&p);
316                 }
317
318                 if (!host || !port)
319                         return -EINVAL;
320
321                 zero(hints);
322                 hints.ai_socktype = SOCK_STREAM;
323                 hints.ai_flags = AI_ADDRCONFIG;
324
325                 if (family) {
326                         if (streq(family, "ipv4"))
327                                 hints.ai_family = AF_INET;
328                         else if (streq(family, "ipv6"))
329                                 hints.ai_family = AF_INET6;
330                         else
331                                 return -EINVAL;
332                 }
333
334                 r = getaddrinfo(host, port, &hints, &result);
335                 if (r == EAI_SYSTEM)
336                         return -errno;
337                 else if (r != 0)
338                         return -EADDRNOTAVAIL;
339
340                 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
341                 b->sockaddr_size = result->ai_addrlen;
342
343                 freeaddrinfo(result);
344         }
345
346         if (guid) {
347                 r = sd_id128_from_string(guid, &b->peer);
348                 if (r < 0)
349                         return r;
350         }
351
352         b->address_index = p - b->address;
353         return 1;
354 }
355
356 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
357
358         while (size > 0) {
359                 struct iovec *i = iov + *idx;
360
361                 if (i->iov_len > size) {
362                         i->iov_base = (uint8_t*) i->iov_base + size;
363                         i->iov_len -= size;
364                         return;
365                 }
366
367                 size -= i->iov_len;
368
369                 i->iov_base = NULL;
370                 i->iov_len = 0;
371
372                 (*idx) ++;
373         }
374 }
375
376 static int bus_write_auth(sd_bus *b) {
377         struct msghdr mh;
378         ssize_t k;
379
380         assert(b);
381         assert(b->state == BUS_AUTHENTICATING);
382
383         if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
384                 return 0;
385
386         zero(mh);
387         mh.msg_iov = b->auth_iovec + b->auth_index;
388         mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
389
390         k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
391         if (k < 0)
392                 return errno == EAGAIN ? 0 : -errno;
393
394         iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
395
396         return 1;
397 }
398
399 static int bus_auth_verify(sd_bus *b) {
400         char *e, *f;
401         sd_id128_t peer;
402         unsigned i;
403         int r;
404
405         /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
406          * that's it */
407
408         e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
409         if (!e)
410                 return 0;
411
412         f = memmem(e, b->rbuffer_size - (e - (char*) b->rbuffer), "\r\n", 2);
413         if (!f)
414                 return 0;
415
416         if (e - (char*) b->rbuffer != 3 + 32)
417                 return -EPERM;
418
419         if (memcmp(b->rbuffer, "OK ", 3))
420                 return -EPERM;
421
422         for (i = 0; i < 32; i += 2) {
423                 int x, y;
424
425                 x = unhexchar(((char*) b->rbuffer)[3 + i]);
426                 y = unhexchar(((char*) b->rbuffer)[3 + i + 2]);
427
428                 if (x < 0 || y < 0)
429                         return -EINVAL;
430
431                 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
432         }
433
434         if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
435             !sd_id128_equal(b->peer, peer))
436                 return -EPERM;
437
438         b->peer = peer;
439
440         b->can_fds =
441                 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
442                 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
443
444         if (f + 2 > (char*) b->rbuffer + b->rbuffer_size) {
445                 b->rbuffer_size -= (f - (char*) b->rbuffer);
446                 memmove(b->rbuffer, f + 2, b->rbuffer_size);
447         }
448
449         r = bus_start_running(b);
450         if (r < 0)
451                 return r;
452
453         return 1;
454 }
455
456 static int bus_read_auth(sd_bus *b) {
457         struct msghdr mh;
458         struct iovec iov;
459         size_t n;
460         ssize_t k;
461         int r;
462
463         assert(b);
464
465         r = bus_auth_verify(b);
466         if (r != 0)
467                 return r;
468
469         n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
470
471         zero(iov);
472         iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
473         iov.iov_len = n - b->rbuffer_size;
474
475         zero(mh);
476         mh.msg_iov = &iov;
477         mh.msg_iovlen = 1;
478
479         k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
480         if (k < 0)
481                 return errno == EAGAIN ? 0 : -errno;
482
483         b->rbuffer_size += k;
484
485         r = bus_auth_verify(b);
486         if (r != 0)
487                 return r;
488
489         return 0;
490 }
491
492 static int bus_start_auth(sd_bus *b) {
493         static const char auth_prefix[] = "\0AUTH_EXTERNAL ";
494         static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
495
496         char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
497         size_t l;
498
499         assert(b);
500
501         b->state = BUS_AUTHENTICATING;
502
503         snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
504         char_array_0(text);
505
506         l = strlen(text);
507         b->auth_uid = hexmem(text, l);
508         if (!b->auth_uid)
509                 return -ENOMEM;
510
511         b->auth_iovec[0].iov_base = (void*) auth_prefix;
512         b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
513         b->auth_iovec[1].iov_base = (void*) b->auth_uid;
514         b->auth_iovec[1].iov_len = l * 2;
515         b->auth_iovec[2].iov_base = (void*) auth_suffix;
516         b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
517         b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
518
519         return bus_write_auth(b);
520 }
521
522 static int bus_start_connect(sd_bus *b) {
523         int r;
524
525         assert(b);
526         assert(b->fd < 0);
527
528         for (;;) {
529                 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
530                         r = bus_parse_next_address(b);
531                         if (r < 0)
532                                 return r;
533                         if (r == 0)
534                                 return b->last_connect_error ? b->last_connect_error : -ECONNREFUSED;
535                 }
536
537                 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
538                 if (b->fd < 0) {
539                         b->last_connect_error = -errno;
540                         zero(b->sockaddr);
541                         continue;
542                 }
543
544                 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
545                 if (r < 0) {
546                         if (errno == EINPROGRESS)
547                                 return 0;
548
549                         b->last_connect_error = -errno;
550                         close_nointr_nofail(b->fd);
551                         b->fd = -1;
552                         zero(b->sockaddr);
553                         continue;
554                 }
555
556                 return bus_start_auth(b);
557         }
558 }
559
560 int sd_bus_open_system(sd_bus **ret) {
561         const char *e;
562         sd_bus *b;
563         int r;
564
565         if (!ret)
566                 return -EINVAL;
567
568         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
569         if (e) {
570                 r = sd_bus_open_address(e, &b);
571                 if (r < 0)
572                         return r;
573
574                 b->send_hello = true;
575                 *ret = b;
576                 return r;
577         }
578
579         b = bus_new();
580         if (!b)
581                 return -ENOMEM;
582
583         b->send_hello = true;
584
585         b->sockaddr.un.sun_family = AF_UNIX;
586         strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
587         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
588
589         r = bus_start_connect(b);
590         if (r < 0) {
591                 bus_free(b);
592                 return r;
593         }
594
595         *ret = b;
596         return 0;
597 }
598
599 int sd_bus_open_user(sd_bus **ret) {
600         const char *e;
601         sd_bus *b;
602         size_t l;
603         int r;
604
605         if (!ret)
606                 return -EINVAL;
607
608         e = getenv("DBUS_SESSION_BUS_ADDRESS");
609         if (e) {
610                 r = sd_bus_open_address(e, &b);
611                 if (r < 0)
612                         return r;
613
614                 b->send_hello = true;
615                 *ret = b;
616                 return r;
617         }
618
619         e = getenv("XDG_RUNTIME_DIR");
620         if (!e)
621                 return -ENOENT;
622
623         l = strlen(e);
624         if (l + 4 > sizeof(b->sockaddr.un.sun_path))
625                 return -E2BIG;
626
627         b = bus_new();
628         if (!b)
629                 return -ENOMEM;
630
631         b->send_hello = true;
632
633         b->sockaddr.un.sun_family = AF_UNIX;
634         memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
635         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
636
637         r = bus_start_connect(b);
638         if (r < 0) {
639                 bus_free(b);
640                 return r;
641         }
642
643         *ret = b;
644         return 0;
645 }
646
647 int sd_bus_open_address(const char *address, sd_bus **ret) {
648         sd_bus *b;
649         int r;
650
651         if (!address)
652                 return -EINVAL;
653         if (!ret)
654                 return -EINVAL;
655
656         b = bus_new();
657         if (!b)
658                 return -ENOMEM;
659
660         b->address = strdup(address);
661         if (!b->address) {
662                 bus_free(b);
663                 return -ENOMEM;
664         }
665
666         r = bus_start_connect(b);
667         if (r < 0) {
668                 bus_free(b);
669                 return r;
670         }
671
672         *ret = b;
673         return 0;
674 }
675
676 int sd_bus_open_fd(int fd, sd_bus **ret) {
677         sd_bus *b;
678         int r;
679
680         if (fd < 0)
681                 return -EINVAL;
682         if (!ret)
683                 return -EINVAL;
684
685         b = bus_new();
686         if (!b)
687                 return -ENOMEM;
688
689         b->fd = fd;
690         fd_nonblock(b->fd, true);
691         fd_cloexec(b->fd, true);
692
693         r = bus_start_auth(b);
694         if (r < 0) {
695                 bus_free(b);
696                 return r;
697         }
698
699         *ret = b;
700         return 0;
701 }
702
703 void sd_bus_close(sd_bus *bus) {
704         if (!bus)
705                 return;
706         if (bus->fd < 0)
707                 return;
708
709         close_nointr_nofail(bus->fd);
710         bus->fd = -1;
711 }
712
713 sd_bus *sd_bus_ref(sd_bus *bus) {
714         if (!bus)
715                 return NULL;
716
717         assert(bus->n_ref > 0);
718
719         bus->n_ref++;
720         return bus;
721 }
722
723 sd_bus *sd_bus_unref(sd_bus *bus) {
724         if (!bus)
725                 return NULL;
726
727         assert(bus->n_ref > 0);
728         bus->n_ref--;
729
730         if (bus->n_ref <= 0)
731                 bus_free(bus);
732
733         return NULL;
734 }
735
736 int sd_bus_is_running(sd_bus *bus) {
737         if (!bus)
738                 return -EINVAL;
739
740         if (bus->fd < 0)
741                 return -ENOTCONN;
742
743         return bus->state == BUS_RUNNING;
744 }
745
746 int sd_bus_can_send(sd_bus *bus, char type) {
747
748         if (!bus)
749                 return -EINVAL;
750
751         if (type == SD_BUS_TYPE_UNIX_FD)
752                 return bus->can_fds;
753
754         return bus_type_is_valid(type);
755 }
756
757 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
758         assert(m);
759
760         if (m->sealed)
761                 return 0;
762
763         return message_seal(m, ++b->serial);
764 }
765
766 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
767         struct msghdr mh;
768         struct iovec *iov;
769         ssize_t k;
770         size_t n;
771         unsigned j;
772
773         assert(bus);
774         assert(m);
775         assert(idx);
776
777         n = m->n_iovec * sizeof(struct iovec);
778         iov = alloca(n);
779         memcpy(iov, m->iovec, n);
780
781         j = 0;
782         iovec_advance(iov, &j, *idx);
783
784         zero(mh);
785         mh.msg_iov = iov;
786         mh.msg_iovlen = m->n_iovec;
787
788         k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
789         if (k < 0)
790                 return -errno;
791
792         *idx += (size_t) k;
793         iovec_advance(iov, &j, *idx);
794
795         return j > m->n_iovec;
796 }
797
798 static int message_read_need(sd_bus *bus, size_t *need) {
799         uint32_t a, b;
800         uint8_t e;
801
802         assert(bus);
803         assert(need);
804
805         if (bus->rbuffer_size <= sizeof(struct bus_header)) {
806                 *need = sizeof(struct bus_header);
807                 return 0;
808         }
809
810         a = ((const uint32_t*) bus->rbuffer)[1];
811         b = ((const uint32_t*) bus->rbuffer)[3];
812
813         e = ((const uint8_t*) bus->rbuffer)[0];
814         if (e == SD_BUS_LITTLE_ENDIAN) {
815                 a = le32toh(a);
816                 b = le32toh(b);
817         } else if (e == SD_BUS_BIG_ENDIAN) {
818                 a = be32toh(a);
819                 b = be32toh(b);
820         } else
821                 return -EIO;
822
823         *need = sizeof(struct bus_header) + ALIGN_TO(a, 8) + b;
824         return 0;
825 }
826
827 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
828         sd_bus_message *t;
829         void *b = NULL;
830         int r;
831
832         assert(bus);
833         assert(m);
834         assert(bus->rbuffer_size >= size);
835
836         t = new0(sd_bus_message, 1);
837         if (!t)
838                 return -ENOMEM;
839
840         if (bus->rbuffer_size > size) {
841                 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
842                 if (!b) {
843                         free(t);
844                         return -ENOMEM;
845                 }
846         }
847
848         t->n_ref = 1;
849
850         t->header = bus->rbuffer;
851         t->free_header = true;
852
853         t->fields = (uint8_t*) bus->rbuffer + sizeof(struct bus_header);
854         t->body = (uint8_t*) bus->rbuffer + sizeof(struct bus_header) + ALIGN_TO(BUS_MESSAGE_BODY_SIZE(t), 8);
855
856         bus->rbuffer = b;
857         bus->rbuffer_size -= size;
858
859         r = message_parse(t);
860         if (r < 0) {
861                 sd_bus_message_unref(t);
862                 return r;
863         }
864
865         *m = t;
866         return 1;
867 }
868
869 static int message_read(sd_bus *bus, sd_bus_message **m) {
870         struct msghdr mh;
871         struct iovec iov;
872         ssize_t k;
873         size_t need;
874         int r;
875         void *b;
876
877         assert(bus);
878         assert(m);
879
880         r = message_read_need(bus, &need);
881         if (r < 0)
882                 return r;
883
884         if (bus->rbuffer_size >= need)
885                 return message_make(bus, need, m);
886
887         b = realloc(bus->rbuffer, need);
888         if (!b)
889                 return -ENOMEM;
890
891         zero(iov);
892         iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
893         iov.iov_len = need - bus->rbuffer_size;
894
895         zero(mh);
896         mh.msg_iov = &iov;
897         mh.msg_iovlen = 1;
898
899         k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
900         if (k < 0)
901                 return errno == EAGAIN ? 0 : -errno;
902
903         bus->rbuffer_size += k;
904
905         r = message_read_need(bus, &need);
906         if (r < 0)
907                 return r;
908
909         if (bus->rbuffer_size >= need)
910                 return message_make(bus, need, m);
911
912         return 0;
913 }
914
915 static int dispatch_wqueue(sd_bus *bus) {
916         int r, c = 0;
917
918         assert(bus);
919
920         if (bus->fd < 0)
921                 return -ENOTCONN;
922
923         while (bus->wqueue_size > 0) {
924
925                 r = message_write(bus, bus->wqueue[0], &bus->windex);
926                 if (r < 0) {
927                         sd_bus_close(bus);
928                         return r;
929                 } else if (r == 0)
930                         /* Wasn't fully written yet... */
931                         break;
932                 else {
933                         /* Fully written. Let's drop the entry from
934                          * the queue.
935                          *
936                          * This isn't particularly optimized, but
937                          * well, this is supposed to be our worst-case
938                          * buffer only, and the socket buffer is
939                          * supposed to be our primary buffer, and if
940                          * it got full, then all bets are off
941                          * anyway. */
942
943                         sd_bus_message_unref(bus->wqueue[0]);
944                         bus->wqueue_size --;
945                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
946                         bus->windex = 0;
947
948                         c++;
949                 }
950         }
951
952         return c;
953 }
954
955 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
956         int r;
957
958         assert(bus);
959         assert(m);
960
961         if (bus->fd < 0)
962                 return -ENOTCONN;
963
964         if (bus->rqueue_size > 0) {
965                 /* Dispatch a queued message */
966
967                 *m = bus->rqueue[0];
968                 bus->rqueue_size --;
969                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
970                 return 1;
971         }
972
973         /* Try to read a new message */
974         r = message_read(bus, m);
975         if (r < 0) {
976                 sd_bus_close(bus);
977                 return r;
978         }
979
980         return r;
981 }
982
983 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
984         int r;
985
986         if (!bus)
987                 return -EINVAL;
988         if (bus->fd < 0)
989                 return -ENOTCONN;
990         if (!m)
991                 return -EINVAL;
992         if (m->header->version > bus->message_version)
993                 return -EPERM;
994
995         r = bus_seal_message(bus, m);
996         if (r < 0)
997                 return r;
998
999         if (bus->wqueue_size <= 0) {
1000                 size_t idx = 0;
1001
1002                 r = message_write(bus, m, &idx);
1003                 if (r < 0) {
1004                         sd_bus_close(bus);
1005                         return r;
1006                 } else if (r == 0)  {
1007                         /* Wasn't fully written. So let's remember how
1008                          * much was written. Note that the first entry
1009                          * of the wqueue array is always allocated so
1010                          * that we always can remember how much was
1011                          * written. */
1012                         bus->wqueue[0] = sd_bus_message_ref(m);
1013                         bus->wqueue_size = 1;
1014                         bus->windex = idx;
1015                 }
1016         } else {
1017                 sd_bus_message **q;
1018
1019                 /* Just append it to the queue. */
1020
1021                 if (bus->wqueue_size >= WQUEUE_MAX)
1022                         return -ENOBUFS;
1023
1024                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1025                 if (!q)
1026                         return -ENOMEM;
1027
1028                 bus->wqueue = q;
1029                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1030         }
1031
1032         if (serial)
1033                 *serial = BUS_MESSAGE_SERIAL(m);
1034
1035         return 0;
1036 }
1037
1038 static usec_t calc_elapse(uint64_t usec) {
1039         if (usec == (uint64_t) -1)
1040                 return 0;
1041
1042         if (usec == 0)
1043                 usec = SD_BUS_DEFAULT_TIMEOUT;
1044
1045         return now(CLOCK_MONOTONIC) + usec;
1046 }
1047
1048 int sd_bus_send_with_reply(
1049                 sd_bus *bus,
1050                 sd_bus_message *m,
1051                 sd_message_handler_t callback,
1052                 void *userdata,
1053                 uint64_t usec,
1054                 uint64_t *serial) {
1055
1056         struct reply_callback *c;
1057         int r;
1058
1059         if (!bus)
1060                 return -EINVAL;
1061         if (!bus->fd < 0)
1062                 return -ENOTCONN;
1063         if (!m)
1064                 return -EINVAL;
1065         if (!callback)
1066                 return -EINVAL;
1067         if (!m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1068                 return -EINVAL;
1069
1070         r = bus_seal_message(bus, m);
1071         if (r < 0)
1072                 return r;
1073
1074         c = new(struct reply_callback, 1);
1075         if (!c)
1076                 return -ENOMEM;
1077
1078         c->callback = callback;
1079         c->userdata = userdata;
1080         c->serial = BUS_MESSAGE_SERIAL(m);
1081         c->timeout = calc_elapse(usec);
1082
1083         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1084         if (r < 0) {
1085                 free(c);
1086                 return r;
1087         }
1088
1089         r = sd_bus_send(bus, m, serial);
1090         if (r < 0) {
1091                 hashmap_remove(bus->reply_callbacks, &c->serial);
1092                 free(c);
1093                 return r;
1094         }
1095
1096         return r;
1097 }
1098
1099 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1100         struct reply_callbacks *c;
1101
1102         if (!bus)
1103                 return -EINVAL;
1104         if (serial == 0)
1105                 return -EINVAL;
1106
1107         c = hashmap_remove(bus->reply_callbacks, &serial);
1108         if (!c)
1109                 return 0;
1110
1111         free(c);
1112         return 1;
1113 }
1114
1115 int sd_bus_send_with_reply_and_block(
1116                 sd_bus *bus,
1117                 sd_bus_message *m,
1118                 uint64_t usec,
1119                 sd_bus_error *error,
1120                 sd_bus_message **reply) {
1121
1122         int r;
1123         usec_t timeout;
1124         uint64_t serial;
1125         bool room = false;
1126
1127         if (!bus)
1128                 return -EINVAL;
1129         if (!bus->fd < 0)
1130                 return -ENOTCONN;
1131         if (!m)
1132                 return -EINVAL;
1133         if (!m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1134                 return -EINVAL;
1135         if (sd_bus_error_is_set(error))
1136                 return -EINVAL;
1137
1138         r = sd_bus_send(bus, m, &serial);
1139         if (r < 0)
1140                 return r;
1141
1142         timeout = calc_elapse(usec);
1143
1144         for (;;) {
1145                 usec_t left;
1146                 sd_bus_message *incoming;
1147
1148                 if (!room) {
1149                         sd_bus_message **q;
1150
1151                         /* Make sure there's room for queuing this
1152                          * locally, before we read the message */
1153
1154                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1155                         if (!q)
1156                                 return -ENOMEM;
1157
1158                         bus->rqueue = q;
1159                         room = true;
1160                 }
1161
1162                 r = message_read(bus, &incoming);
1163                 if (r < 0)
1164                         return r;
1165                 if (r > 0) {
1166                         if (incoming->reply_serial == serial) {
1167                                 /* Found a match! */
1168
1169                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1170                                         *reply = incoming;
1171                                         return 0;
1172                                 }
1173
1174                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1175                                         int k;
1176
1177                                         r = sd_bus_error_copy(error, &incoming->error);
1178                                         if (r < 0) {
1179                                                 sd_bus_message_unref(incoming);
1180                                                 return r;
1181                                         }
1182
1183                                         k = bus_error_to_errno(&incoming->error);
1184                                         sd_bus_message_unref(incoming);
1185                                         return k;
1186                                 }
1187
1188                                 sd_bus_message_unref(incoming);
1189                                 return -EIO;
1190                         }
1191
1192                         /* There's already guaranteed to be room for
1193                          * this, so need to resize things here */
1194                         bus->rqueue[bus->rqueue_size ++] = incoming;
1195                         room = false;
1196
1197                         /* Try to read more, right-away */
1198                         continue;
1199                 }
1200
1201                 if (timeout > 0) {
1202                         usec_t n;
1203
1204                         n = now(CLOCK_MONOTONIC);
1205                         if (n >= timeout)
1206                                 return -ETIMEDOUT;
1207
1208                         left = timeout - n;
1209                 } else
1210                         left = (uint64_t) -1;
1211
1212                 r = sd_bus_wait(bus, left);
1213                 if (r < 0)
1214                         return r;
1215
1216                 r = dispatch_wqueue(bus);
1217                 if (r < 0)
1218                         return r;
1219         }
1220 }
1221
1222 int sd_bus_get_fd(sd_bus *bus) {
1223         if (!bus)
1224                 return -EINVAL;
1225
1226         if (bus->fd < 0)
1227                 return -EINVAL;
1228
1229         return bus->fd;
1230 }
1231
1232 int sd_bus_get_events(sd_bus *bus) {
1233         int flags = 0;
1234
1235         if (!bus)
1236                 return -EINVAL;
1237
1238         if (bus->fd < 0)
1239                 return -EINVAL;
1240
1241         if (bus->state == BUS_OPENING)
1242                 flags |= POLLOUT;
1243         else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1244                 if (bus->rqueue_size <= 0)
1245                         flags |= POLLIN;
1246                 if (bus->wqueue_size > 0)
1247                         flags |= POLLOUT;
1248         }
1249
1250         return flags;
1251 }
1252
1253 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1254         sd_bus_message *m;
1255         int r;
1256
1257         if (!bus)
1258                 return -EINVAL;
1259         if (bus->fd < 0)
1260                 return -ENOTCONN;
1261
1262         if (bus->state == BUS_OPENING) {
1263                 struct pollfd p;
1264
1265                 zero(p);
1266                 p.fd = bus->fd;
1267                 p.events = POLLOUT;
1268
1269                 r = poll(&p, 1, 0);
1270                 if (r < 0)
1271                         return -errno;
1272
1273                 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1274                         int error;
1275                         socklen_t slen = sizeof(error);
1276
1277                         r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1278                         if (r < 0)
1279                                 return -errno;
1280
1281                         if (error != 0)
1282                                 bus->last_connect_error = -error;
1283                         else if (p.revents & (POLLERR|POLLHUP))
1284                                 bus->last_connect_error = -ECONNREFUSED;
1285                         else
1286                                 return bus_start_auth(bus);
1287
1288                         /* Try next address */
1289                         return bus_start_connect(bus);
1290                 }
1291
1292                 return 0;
1293
1294         } else if (bus->state == BUS_AUTHENTICATING) {
1295
1296                 r = bus_write_auth(bus);
1297                 if (r < 0)
1298                         return r;
1299
1300                 r = bus_read_auth(bus);
1301                 if (r <= 0)
1302                         return r;
1303
1304                 return bus_start_running(bus);
1305
1306         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1307                 struct filter_callback *l;
1308
1309                 r = dispatch_wqueue(bus);
1310                 if (r < 0)
1311                         return r;
1312
1313                 r = dispatch_rqueue(bus, &m);
1314                 if (r <= 0)
1315                         return r;
1316
1317                 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1318                         struct reply_callback *c;
1319
1320                         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1321                         if (c) {
1322                                 r = c->callback(bus, m, c->userdata);
1323                                 free(c);
1324
1325                                 if (r != 0) {
1326                                         sd_bus_message_unref(m);
1327                                         return r < 0 ? r : 0;
1328                                 }
1329                         }
1330                 }
1331
1332                 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1333                         r = l->callback(bus, m, l->userdata);
1334                         if (r != 0) {
1335                                 sd_bus_message_unref(m);
1336                                 return r < 0 ? r : 0;
1337                         }
1338                 }
1339
1340                 if (ret) {
1341                         *ret = m;
1342                         return 1;
1343                 }
1344
1345                 sd_bus_message_unref(m);
1346                 return 0;
1347         }
1348
1349         return -ENOTSUP;
1350 }
1351
1352 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1353         struct pollfd p;
1354         int r, e;
1355         struct timespec ts;
1356
1357         if (!bus)
1358                 return -EINVAL;
1359         if (bus->fd < 0)
1360                 return -ECONNREFUSED;
1361
1362         e = sd_bus_get_events(bus);
1363         if (e < 0)
1364                 return e;
1365
1366         zero(p);
1367         p.fd = bus->fd;
1368         p.events = e;
1369
1370         r = ppoll(&p, 1, timeout_usec == (uint64_t) -1 ? NULL : timespec_store(&ts, timeout_usec), NULL);
1371         if (r < 0)
1372                 return -EINVAL;
1373
1374         return r;
1375 }
1376
1377 int sd_bus_flush(sd_bus *bus) {
1378         int r;
1379
1380         if (!bus)
1381                 return -EINVAL;
1382         if (bus->fd < 0)
1383                 return -ENOTCONN;
1384
1385         if (bus->state == BUS_RUNNING && bus->wqueue_size <= 0)
1386                 return 0;
1387
1388         for (;;) {
1389                 r = dispatch_wqueue(bus);
1390                 if (r < 0)
1391                         return r;
1392
1393                 if (bus->state == BUS_RUNNING && bus->wqueue_size <= 0)
1394                         return 0;
1395
1396                 r = sd_bus_wait(bus, (uint64_t) -1);
1397                 if (r < 0)
1398                         return r;
1399         }
1400 }
1401
1402 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1403         struct filter_callback *f;
1404
1405         if (!bus)
1406                 return -EINVAL;
1407         if (!callback)
1408                 return -EINVAL;
1409
1410         f = new(struct filter_callback, 1);
1411         if (!f)
1412                 return -ENOMEM;
1413         f->callback = callback;
1414         f->userdata = userdata;
1415
1416         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1417         return 0;
1418 }
1419
1420 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1421         struct filter_callback *f;
1422
1423         if (!bus)
1424                 return -EINVAL;
1425         if (!callback)
1426                 return -EINVAL;
1427
1428         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1429                 if (f->callback == callback && f->userdata == userdata) {
1430                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1431                         free(f);
1432                         return 1;
1433                 }
1434         }
1435
1436         return 0;
1437 }