chiark / gitweb /
bus: implement demarshaller
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32
33 #include "sd-bus.h"
34 #include "bus-internal.h"
35 #include "bus-message.h"
36 #include "bus-type.h"
37
38 #define WQUEUE_MAX 128
39
40 static void bus_free(sd_bus *b) {
41         struct filter_callback *f;
42
43         assert(b);
44
45         if (b->fd >= 0)
46                 close_nointr_nofail(b->fd);
47
48         free(b->rbuffer);
49         free(b->rqueue);
50         free(b->wqueue);
51         free(b->unique_name);
52
53         hashmap_free_free(b->reply_callbacks);
54
55         while ((f = b->filter_callbacks)) {
56                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
57                 free(f);
58         }
59
60         free(b);
61 }
62
63 static sd_bus* bus_new(void) {
64         sd_bus *r;
65
66         r = new0(sd_bus, 1);
67         if (!r)
68                 return NULL;
69
70         r->n_ref = 1;
71         r->fd = -1;
72         r->message_version = 1;
73
74         /* We guarantee that wqueue always has space for at least one
75          * entry */
76         r->wqueue = new(sd_bus_message*, 1);
77         if (!r->wqueue) {
78                 free(r);
79                 return NULL;
80         }
81
82         return r;
83 };
84
85 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
86         const char *s;
87         int r;
88
89         assert(bus);
90         assert(reply);
91
92         bus->state = BUS_RUNNING;
93
94         r = sd_bus_message_read(reply, "s", &s);
95         if (r < 0)
96                 return r;
97
98         bus->unique_name = strdup(s);
99         if (!bus->unique_name)
100                 return -ENOMEM;
101
102         return 1;
103 }
104
105 static int bus_send_hello(sd_bus *bus) {
106         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
107         int r;
108
109         assert(bus);
110
111         r = sd_bus_message_new_method_call(
112                         bus,
113                         "org.freedesktop.DBus",
114                         "/",
115                         "org.freedesktop.DBus",
116                         "Hello",
117                         &m);
118         if (r < 0)
119                 return r;
120
121         r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, (uint64_t) -1, NULL);
122         if (r < 0)
123                 return r;
124
125         return 0;
126 }
127
128 static int bus_start_running(sd_bus *bus) {
129         int r;
130
131         assert(bus);
132
133         if (bus->send_hello) {
134                 bus->state = BUS_HELLO;
135
136                 r = bus_send_hello(bus);
137                 if (r < 0)
138                         return r;
139         }
140
141         bus->state = BUS_RUNNING;
142
143         return 0;
144 }
145
146 static int parse_address_key(const char **p, const char *key, char **value) {
147         size_t l, n = 0;
148         const char *a;
149         char *r = NULL;
150
151         assert(p);
152         assert(*p);
153         assert(key);
154         assert(value);
155
156         l = strlen(key);
157         if (!strncmp(*p, key, l) != 0)
158                 return 0;
159
160         if ((*p)[l] != '=')
161                 return 0;
162
163         if (*value)
164                 return -EINVAL;
165
166         a = *p + l + 1;
167         while (*a != ';' && *a != 0) {
168                 char c, *t;
169
170                 if (*a == '%') {
171                         int x, y;
172
173                         x = unhexchar(a[1]);
174                         if (x < 0) {
175                                 free(r);
176                                 return x;
177                         }
178
179                         y = unhexchar(a[2]);
180                         if (y < 0) {
181                                 free(r);
182                                 return y;
183                         }
184
185                         a += 3;
186                         c = (char) ((x << 4) | y);
187                 } else
188                         c = *a;
189
190                 t = realloc(r, n + 1);
191                 if (!t) {
192                         free(r);
193                         return -ENOMEM;
194                 }
195
196                 r = t;
197                 r[n++] = c;
198         }
199
200         *p = a;
201         *value = r;
202         return 1;
203 }
204
205 static void skip_address_key(const char **p) {
206         assert(p);
207         assert(*p);
208
209         *p += strcspn(*p, ";");
210 }
211
212 static int bus_parse_next_address(sd_bus *b) {
213         const char *a, *p;
214         _cleanup_free_ char *guid = NULL;
215         int r;
216
217         assert(b);
218
219         if (!b->address)
220                 return 0;
221         if (b->address[b->address_index] == 0)
222                 return 0;
223
224         a = b->address + b->address_index;
225
226         zero(b->sockaddr);
227         b->sockaddr_size = 0;
228         b->peer = SD_ID128_NULL;
229
230         if (startswith(a, "unix:")) {
231                 _cleanup_free_ char *path = NULL, *abstract = NULL;
232
233                 p = a + 5;
234                 while (*p != 0 && *p != ';') {
235                         r = parse_address_key(&p, "guid", &guid);
236                         if (r < 0)
237                                 return r;
238                         else if (r > 0)
239                                 continue;
240
241                         r = parse_address_key(&p, "path", &path);
242                         if (r < 0)
243                                 return r;
244                         else if (r > 0)
245                                 continue;
246
247                         r = parse_address_key(&p, "abstract", &abstract);
248                         if (r < 0)
249                                 return r;
250                         else if (r > 0)
251                                 continue;
252
253                         skip_address_key(&p);
254                 }
255
256                 if (!path && !abstract)
257                         return -EINVAL;
258
259                 if (path && abstract)
260                         return -EINVAL;
261
262                 if (path) {
263                         size_t l;
264
265                         l = strlen(path);
266                         if (l > sizeof(b->sockaddr.un.sun_path))
267                                 return -E2BIG;
268
269                         b->sockaddr.un.sun_family = AF_UNIX;
270                         strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
271                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
272                 } else if (abstract) {
273                         size_t l;
274
275                         l = strlen(path);
276                         if (l > sizeof(b->sockaddr.un.sun_path) - 1)
277                                 return -E2BIG;
278
279                         b->sockaddr.un.sun_family = AF_UNIX;
280                         b->sockaddr.un.sun_path[0] = 0;
281                         strncpy(b->sockaddr.un.sun_path+1, path, sizeof(b->sockaddr.un.sun_path)-1);
282                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
283                 }
284
285         } else if (startswith(a, "tcp:")) {
286                 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
287                 struct addrinfo hints, *result;
288
289                 p = a + 4;
290                 while (*p != 0 && *p != ';') {
291                         r = parse_address_key(&p, "guid", &guid);
292                         if (r < 0)
293                                 return r;
294                         else if (r > 0)
295                                 continue;
296
297                         r = parse_address_key(&p, "host", &host);
298                         if (r < 0)
299                                 return r;
300                         else if (r > 0)
301                                 continue;
302
303                         r = parse_address_key(&p, "port", &port);
304                         if (r < 0)
305                                 return r;
306                         else if (r > 0)
307                                 continue;
308
309                         r = parse_address_key(&p, "family", &family);
310                         if (r < 0)
311                                 return r;
312                         else if (r > 0)
313                                 continue;
314
315                         skip_address_key(&p);
316                 }
317
318                 if (!host || !port)
319                         return -EINVAL;
320
321                 zero(hints);
322                 hints.ai_socktype = SOCK_STREAM;
323                 hints.ai_flags = AI_ADDRCONFIG;
324
325                 if (family) {
326                         if (streq(family, "ipv4"))
327                                 hints.ai_family = AF_INET;
328                         else if (streq(family, "ipv6"))
329                                 hints.ai_family = AF_INET6;
330                         else
331                                 return -EINVAL;
332                 }
333
334                 r = getaddrinfo(host, port, &hints, &result);
335                 if (r == EAI_SYSTEM)
336                         return -errno;
337                 else if (r != 0)
338                         return -EADDRNOTAVAIL;
339
340                 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
341                 b->sockaddr_size = result->ai_addrlen;
342
343                 freeaddrinfo(result);
344         }
345
346         if (guid) {
347                 r = sd_id128_from_string(guid, &b->peer);
348                 if (r < 0)
349                         return r;
350         }
351
352         b->address_index = p - b->address;
353         return 1;
354 }
355
356 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
357
358         while (size > 0) {
359                 struct iovec *i = iov + *idx;
360
361                 if (i->iov_len > size) {
362                         i->iov_base = (uint8_t*) i->iov_base + size;
363                         i->iov_len -= size;
364                         return;
365                 }
366
367                 size -= i->iov_len;
368
369                 i->iov_base = NULL;
370                 i->iov_len = 0;
371
372                 (*idx) ++;
373         }
374 }
375
376 static int bus_write_auth(sd_bus *b) {
377         struct msghdr mh;
378         ssize_t k;
379
380         assert(b);
381         assert(b->state == BUS_AUTHENTICATING);
382
383         if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
384                 return 0;
385
386         zero(mh);
387         mh.msg_iov = b->auth_iovec + b->auth_index;
388         mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
389
390         k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
391         if (k < 0)
392                 return errno == EAGAIN ? 0 : -errno;
393
394         iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
395
396         return 1;
397 }
398
399 static int bus_auth_verify(sd_bus *b) {
400         char *e, *f;
401         sd_id128_t peer;
402         unsigned i;
403         int r;
404
405         /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
406          * that's it */
407
408         e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
409         if (!e)
410                 return 0;
411
412         f = memmem(e, b->rbuffer_size - (e - (char*) b->rbuffer), "\r\n", 2);
413         if (!f)
414                 return 0;
415
416         if (e - (char*) b->rbuffer != 3 + 32)
417                 return -EPERM;
418
419         if (memcmp(b->rbuffer, "OK ", 3))
420                 return -EPERM;
421
422         for (i = 0; i < 32; i += 2) {
423                 int x, y;
424
425                 x = unhexchar(((char*) b->rbuffer)[3 + i]);
426                 y = unhexchar(((char*) b->rbuffer)[3 + i + 2]);
427
428                 if (x < 0 || y < 0)
429                         return -EINVAL;
430
431                 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
432         }
433
434         if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
435             !sd_id128_equal(b->peer, peer))
436                 return -EPERM;
437
438         b->peer = peer;
439
440         b->can_fds =
441                 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
442                 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
443
444         if (f + 2 > (char*) b->rbuffer + b->rbuffer_size) {
445                 b->rbuffer_size -= (f - (char*) b->rbuffer);
446                 memmove(b->rbuffer, f + 2, b->rbuffer_size);
447         }
448
449         r = bus_start_running(b);
450         if (r < 0)
451                 return r;
452
453         return 1;
454 }
455
456 static int bus_read_auth(sd_bus *b) {
457         struct msghdr mh;
458         struct iovec iov;
459         size_t n;
460         ssize_t k;
461         int r;
462
463         assert(b);
464
465         r = bus_auth_verify(b);
466         if (r != 0)
467                 return r;
468
469         n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
470
471         zero(iov);
472         iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
473         iov.iov_len = n - b->rbuffer_size;
474
475         zero(mh);
476         mh.msg_iov = &iov;
477         mh.msg_iovlen = 1;
478
479         k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
480         if (k < 0)
481                 return errno == EAGAIN ? 0 : -errno;
482
483         b->rbuffer_size += k;
484
485         r = bus_auth_verify(b);
486         if (r != 0)
487                 return r;
488
489         return 0;
490 }
491
492 static int bus_start_auth(sd_bus *b) {
493         static const char auth_prefix[] = "\0AUTH_EXTERNAL ";
494         static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
495
496         char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
497         size_t l;
498
499         assert(b);
500
501         b->state = BUS_AUTHENTICATING;
502
503         snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
504         char_array_0(text);
505
506         l = strlen(text);
507         b->auth_uid = hexmem(text, l);
508         if (!b->auth_uid)
509                 return -ENOMEM;
510
511         b->auth_iovec[0].iov_base = (void*) auth_prefix;
512         b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
513         b->auth_iovec[1].iov_base = (void*) b->auth_uid;
514         b->auth_iovec[1].iov_len = l * 2;
515         b->auth_iovec[2].iov_base = (void*) auth_suffix;
516         b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
517         b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
518
519         return bus_write_auth(b);
520 }
521
522 static int bus_start_connect(sd_bus *b) {
523         int r;
524
525         assert(b);
526         assert(b->fd < 0);
527
528         for (;;) {
529                 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
530                         r = bus_parse_next_address(b);
531                         if (r < 0)
532                                 return r;
533                         if (r == 0)
534                                 return b->last_connect_error ? b->last_connect_error : -ECONNREFUSED;
535                 }
536
537                 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
538                 if (b->fd < 0) {
539                         b->last_connect_error = -errno;
540                         zero(b->sockaddr);
541                         continue;
542                 }
543
544                 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
545                 if (r < 0) {
546                         if (errno == EINPROGRESS)
547                                 return 0;
548
549                         b->last_connect_error = -errno;
550                         close_nointr_nofail(b->fd);
551                         b->fd = -1;
552                         zero(b->sockaddr);
553                         continue;
554                 }
555
556                 return bus_start_auth(b);
557         }
558 }
559
560 int sd_bus_open_system(sd_bus **ret) {
561         const char *e;
562         sd_bus *b;
563         int r;
564
565         if (!ret)
566                 return -EINVAL;
567
568         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
569         if (e) {
570                 r = sd_bus_open_address(e, &b);
571                 if (r < 0)
572                         return r;
573
574                 b->send_hello = true;
575                 *ret = b;
576                 return r;
577         }
578
579         b = bus_new();
580         if (!b)
581                 return -ENOMEM;
582
583         b->send_hello = true;
584
585         b->sockaddr.un.sun_family = AF_UNIX;
586         strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
587         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
588
589         r = bus_start_connect(b);
590         if (r < 0) {
591                 bus_free(b);
592                 return r;
593         }
594
595         *ret = b;
596         return 0;
597 }
598
599 int sd_bus_open_user(sd_bus **ret) {
600         const char *e;
601         sd_bus *b;
602         size_t l;
603         int r;
604
605         if (!ret)
606                 return -EINVAL;
607
608         e = getenv("DBUS_SESSION_BUS_ADDRESS");
609         if (e) {
610                 r = sd_bus_open_address(e, &b);
611                 if (r < 0)
612                         return r;
613
614                 b->send_hello = true;
615                 *ret = b;
616                 return r;
617         }
618
619         e = getenv("XDG_RUNTIME_DIR");
620         if (!e)
621                 return -ENOENT;
622
623         l = strlen(e);
624         if (l + 4 > sizeof(b->sockaddr.un.sun_path))
625                 return -E2BIG;
626
627         b = bus_new();
628         if (!b)
629                 return -ENOMEM;
630
631         b->send_hello = true;
632
633         b->sockaddr.un.sun_family = AF_UNIX;
634         memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
635         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
636
637         r = bus_start_connect(b);
638         if (r < 0) {
639                 bus_free(b);
640                 return r;
641         }
642
643         *ret = b;
644         return 0;
645 }
646
647 int sd_bus_open_address(const char *address, sd_bus **ret) {
648         sd_bus *b;
649         int r;
650
651         if (!address)
652                 return -EINVAL;
653         if (!ret)
654                 return -EINVAL;
655
656         b = bus_new();
657         if (!b)
658                 return -ENOMEM;
659
660         b->address = strdup(address);
661         if (!b->address) {
662                 bus_free(b);
663                 return -ENOMEM;
664         }
665
666         r = bus_start_connect(b);
667         if (r < 0) {
668                 bus_free(b);
669                 return r;
670         }
671
672         *ret = b;
673         return 0;
674 }
675
676 int sd_bus_open_fd(int fd, sd_bus **ret) {
677         sd_bus *b;
678         int r;
679
680         if (fd < 0)
681                 return -EINVAL;
682         if (!ret)
683                 return -EINVAL;
684
685         b = bus_new();
686         if (!b)
687                 return -ENOMEM;
688
689         b->fd = fd;
690         fd_nonblock(b->fd, true);
691         fd_cloexec(b->fd, true);
692
693         r = bus_start_auth(b);
694         if (r < 0) {
695                 bus_free(b);
696                 return r;
697         }
698
699         *ret = b;
700         return 0;
701 }
702
703 void sd_bus_close(sd_bus *bus) {
704         if (!bus)
705                 return;
706         if (bus->fd < 0)
707                 return;
708
709         close_nointr_nofail(bus->fd);
710         bus->fd = -1;
711 }
712
713 sd_bus *sd_bus_ref(sd_bus *bus) {
714         if (!bus)
715                 return NULL;
716
717         assert(bus->n_ref > 0);
718
719         bus->n_ref++;
720         return bus;
721 }
722
723 sd_bus *sd_bus_unref(sd_bus *bus) {
724         if (!bus)
725                 return NULL;
726
727         assert(bus->n_ref > 0);
728         bus->n_ref--;
729
730         if (bus->n_ref <= 0)
731                 bus_free(bus);
732
733         return NULL;
734 }
735
736 int sd_bus_is_running(sd_bus *bus) {
737         if (!bus)
738                 return -EINVAL;
739
740         if (bus->fd < 0)
741                 return -ENOTCONN;
742
743         return bus->state == BUS_RUNNING;
744 }
745
746 int sd_bus_can_send(sd_bus *bus, char type) {
747
748         if (!bus)
749                 return -EINVAL;
750
751         if (type == SD_BUS_TYPE_UNIX_FD)
752                 return bus->can_fds;
753
754         return bus_type_is_valid(type);
755 }
756
757 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
758         assert(m);
759
760         if (m->sealed)
761                 return 0;
762
763         return bus_message_seal(m, ++b->serial);
764 }
765
766 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
767         struct msghdr mh;
768         struct iovec *iov;
769         ssize_t k;
770         size_t n;
771         unsigned j;
772
773         assert(bus);
774         assert(m);
775         assert(idx);
776
777         n = m->n_iovec * sizeof(struct iovec);
778         iov = alloca(n);
779         memcpy(iov, m->iovec, n);
780
781         j = 0;
782         iovec_advance(iov, &j, *idx);
783
784         zero(mh);
785         mh.msg_iov = iov;
786         mh.msg_iovlen = m->n_iovec;
787
788         k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
789         if (k < 0)
790                 return -errno;
791
792         *idx += (size_t) k;
793         iovec_advance(iov, &j, *idx);
794
795         return j > m->n_iovec;
796 }
797
798 static int message_read_need(sd_bus *bus, size_t *need) {
799         uint32_t a, b;
800         uint8_t e;
801
802         assert(bus);
803         assert(need);
804
805         if (bus->rbuffer_size <= sizeof(struct bus_header)) {
806                 *need = sizeof(struct bus_header);
807                 return 0;
808         }
809
810         a = ((const uint32_t*) bus->rbuffer)[1];
811         b = ((const uint32_t*) bus->rbuffer)[3];
812
813         e = ((const uint8_t*) bus->rbuffer)[0];
814         if (e == SD_BUS_LITTLE_ENDIAN) {
815                 a = le32toh(a);
816                 b = le32toh(b);
817         } else if (e == SD_BUS_BIG_ENDIAN) {
818                 a = be32toh(a);
819                 b = be32toh(b);
820         } else
821                 return -EIO;
822
823         *need = sizeof(struct bus_header) + ALIGN_TO(a, 8) + b;
824         return 0;
825 }
826
827 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
828         sd_bus_message *t;
829         void *b = NULL;
830         int r;
831
832         assert(bus);
833         assert(m);
834         assert(bus->rbuffer_size >= size);
835
836         t = new0(sd_bus_message, 1);
837         if (!t)
838                 return -ENOMEM;
839
840         if (bus->rbuffer_size > size) {
841                 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
842                 if (!b) {
843                         free(t);
844                         return -ENOMEM;
845                 }
846         }
847
848         t->n_ref = 1;
849
850         t->header = bus->rbuffer;
851         t->free_header = true;
852
853         t->fields = (uint8_t*) bus->rbuffer + sizeof(struct bus_header);
854         t->body = (uint8_t*) bus->rbuffer + sizeof(struct bus_header) + ALIGN_TO(BUS_MESSAGE_BODY_SIZE(t), 8);
855
856         bus->rbuffer = b;
857         bus->rbuffer_size -= size;
858
859         r = bus_message_parse(t);
860         if (r < 0) {
861                 sd_bus_message_unref(t);
862                 return r;
863         }
864
865         *m = t;
866         return 1;
867 }
868
869 static int message_read(sd_bus *bus, sd_bus_message **m) {
870         struct msghdr mh;
871         struct iovec iov;
872         ssize_t k;
873         size_t need;
874         int r;
875         void *b;
876
877         assert(bus);
878         assert(m);
879
880         r = message_read_need(bus, &need);
881         if (r < 0)
882                 return r;
883
884         if (bus->rbuffer_size >= need)
885                 return message_make(bus, need, m);
886
887         b = realloc(bus->rbuffer, need);
888         if (!b)
889                 return -ENOMEM;
890
891         zero(iov);
892         iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
893         iov.iov_len = need - bus->rbuffer_size;
894
895         zero(mh);
896         mh.msg_iov = &iov;
897         mh.msg_iovlen = 1;
898
899         k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
900         if (k < 0)
901                 return errno == EAGAIN ? 0 : -errno;
902
903         bus->rbuffer_size += k;
904
905         r = message_read_need(bus, &need);
906         if (r < 0)
907                 return r;
908
909         if (bus->rbuffer_size >= need)
910                 return message_make(bus, need, m);
911
912         return 0;
913 }
914
915 static int dispatch_wqueue(sd_bus *bus) {
916         int r, c = 0;
917
918         assert(bus);
919
920         if (bus->fd < 0)
921                 return -ENOTCONN;
922
923         while (bus->wqueue_size > 0) {
924
925                 r = message_write(bus, bus->wqueue[0], &bus->windex);
926                 if (r < 0) {
927                         sd_bus_close(bus);
928                         return r;
929                 } else if (r == 0)
930                         /* Wasn't fully written yet... */
931                         break;
932                 else {
933                         /* Fully written. Let's drop the entry from
934                          * the queue.
935                          *
936                          * This isn't particularly optimized, but
937                          * well, this is supposed to be our worst-case
938                          * buffer only, and the socket buffer is
939                          * supposed to be our primary buffer, and if
940                          * it got full, then all bets are off
941                          * anyway. */
942
943                         sd_bus_message_unref(bus->wqueue[0]);
944                         bus->wqueue_size --;
945                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
946                         bus->windex = 0;
947
948                         c++;
949                 }
950         }
951
952         return c;
953 }
954
955 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
956         int r;
957
958         assert(bus);
959         assert(m);
960
961         if (bus->fd < 0)
962                 return -ENOTCONN;
963
964         if (bus->rqueue_size > 0) {
965                 /* Dispatch a queued message */
966
967                 *m = bus->rqueue[0];
968                 bus->rqueue_size --;
969                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
970                 return 1;
971         }
972
973         /* Try to read a new message */
974         r = message_read(bus, m);
975         if (r < 0) {
976                 sd_bus_close(bus);
977                 return r;
978         }
979
980         return r;
981 }
982
983 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
984         int r;
985
986         if (!bus)
987                 return -EINVAL;
988         if (bus->fd < 0)
989                 return -ENOTCONN;
990         if (!m)
991                 return -EINVAL;
992         if (m->header->version > bus->message_version)
993                 return -EPERM;
994
995         r = bus_seal_message(bus, m);
996         if (r < 0)
997                 return r;
998
999         /* If this is a reply and no reply was requested, then let's
1000          * suppress this, if we can */
1001         if (m->dont_send && !serial)
1002                 return 0;
1003
1004         if (bus->wqueue_size <= 0) {
1005                 size_t idx = 0;
1006
1007                 r = message_write(bus, m, &idx);
1008                 if (r < 0) {
1009                         sd_bus_close(bus);
1010                         return r;
1011                 } else if (r == 0)  {
1012                         /* Wasn't fully written. So let's remember how
1013                          * much was written. Note that the first entry
1014                          * of the wqueue array is always allocated so
1015                          * that we always can remember how much was
1016                          * written. */
1017                         bus->wqueue[0] = sd_bus_message_ref(m);
1018                         bus->wqueue_size = 1;
1019                         bus->windex = idx;
1020                 }
1021         } else {
1022                 sd_bus_message **q;
1023
1024                 /* Just append it to the queue. */
1025
1026                 if (bus->wqueue_size >= WQUEUE_MAX)
1027                         return -ENOBUFS;
1028
1029                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1030                 if (!q)
1031                         return -ENOMEM;
1032
1033                 bus->wqueue = q;
1034                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1035         }
1036
1037         if (serial)
1038                 *serial = BUS_MESSAGE_SERIAL(m);
1039
1040         return 0;
1041 }
1042
1043 static usec_t calc_elapse(uint64_t usec) {
1044         if (usec == (uint64_t) -1)
1045                 return 0;
1046
1047         if (usec == 0)
1048                 usec = SD_BUS_DEFAULT_TIMEOUT;
1049
1050         return now(CLOCK_MONOTONIC) + usec;
1051 }
1052
1053 int sd_bus_send_with_reply(
1054                 sd_bus *bus,
1055                 sd_bus_message *m,
1056                 sd_message_handler_t callback,
1057                 void *userdata,
1058                 uint64_t usec,
1059                 uint64_t *serial) {
1060
1061         struct reply_callback *c;
1062         int r;
1063
1064         if (!bus)
1065                 return -EINVAL;
1066         if (!bus->fd < 0)
1067                 return -ENOTCONN;
1068         if (!m)
1069                 return -EINVAL;
1070         if (!callback)
1071                 return -EINVAL;
1072         if (!m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1073                 return -EINVAL;
1074
1075         r = bus_seal_message(bus, m);
1076         if (r < 0)
1077                 return r;
1078
1079         c = new(struct reply_callback, 1);
1080         if (!c)
1081                 return -ENOMEM;
1082
1083         c->callback = callback;
1084         c->userdata = userdata;
1085         c->serial = BUS_MESSAGE_SERIAL(m);
1086         c->timeout = calc_elapse(usec);
1087
1088         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1089         if (r < 0) {
1090                 free(c);
1091                 return r;
1092         }
1093
1094         r = sd_bus_send(bus, m, serial);
1095         if (r < 0) {
1096                 hashmap_remove(bus->reply_callbacks, &c->serial);
1097                 free(c);
1098                 return r;
1099         }
1100
1101         return r;
1102 }
1103
1104 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1105         struct reply_callbacks *c;
1106
1107         if (!bus)
1108                 return -EINVAL;
1109         if (serial == 0)
1110                 return -EINVAL;
1111
1112         c = hashmap_remove(bus->reply_callbacks, &serial);
1113         if (!c)
1114                 return 0;
1115
1116         free(c);
1117         return 1;
1118 }
1119
1120 int sd_bus_send_with_reply_and_block(
1121                 sd_bus *bus,
1122                 sd_bus_message *m,
1123                 uint64_t usec,
1124                 sd_bus_error *error,
1125                 sd_bus_message **reply) {
1126
1127         int r;
1128         usec_t timeout;
1129         uint64_t serial;
1130         bool room = false;
1131
1132         if (!bus)
1133                 return -EINVAL;
1134         if (!bus->fd < 0)
1135                 return -ENOTCONN;
1136         if (!m)
1137                 return -EINVAL;
1138         if (!m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1139                 return -EINVAL;
1140         if (sd_bus_error_is_set(error))
1141                 return -EINVAL;
1142
1143         r = sd_bus_send(bus, m, &serial);
1144         if (r < 0)
1145                 return r;
1146
1147         timeout = calc_elapse(usec);
1148
1149         for (;;) {
1150                 usec_t left;
1151                 sd_bus_message *incoming;
1152
1153                 if (!room) {
1154                         sd_bus_message **q;
1155
1156                         /* Make sure there's room for queuing this
1157                          * locally, before we read the message */
1158
1159                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1160                         if (!q)
1161                                 return -ENOMEM;
1162
1163                         bus->rqueue = q;
1164                         room = true;
1165                 }
1166
1167                 r = message_read(bus, &incoming);
1168                 if (r < 0)
1169                         return r;
1170                 if (r > 0) {
1171                         if (incoming->reply_serial == serial) {
1172                                 /* Found a match! */
1173
1174                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1175                                         *reply = incoming;
1176                                         return 0;
1177                                 }
1178
1179                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1180                                         int k;
1181
1182                                         r = sd_bus_error_copy(error, &incoming->error);
1183                                         if (r < 0) {
1184                                                 sd_bus_message_unref(incoming);
1185                                                 return r;
1186                                         }
1187
1188                                         k = bus_error_to_errno(&incoming->error);
1189                                         sd_bus_message_unref(incoming);
1190                                         return k;
1191                                 }
1192
1193                                 sd_bus_message_unref(incoming);
1194                                 return -EIO;
1195                         }
1196
1197                         /* There's already guaranteed to be room for
1198                          * this, so need to resize things here */
1199                         bus->rqueue[bus->rqueue_size ++] = incoming;
1200                         room = false;
1201
1202                         /* Try to read more, right-away */
1203                         continue;
1204                 }
1205
1206                 if (timeout > 0) {
1207                         usec_t n;
1208
1209                         n = now(CLOCK_MONOTONIC);
1210                         if (n >= timeout)
1211                                 return -ETIMEDOUT;
1212
1213                         left = timeout - n;
1214                 } else
1215                         left = (uint64_t) -1;
1216
1217                 r = sd_bus_wait(bus, left);
1218                 if (r < 0)
1219                         return r;
1220
1221                 r = dispatch_wqueue(bus);
1222                 if (r < 0)
1223                         return r;
1224         }
1225 }
1226
1227 int sd_bus_get_fd(sd_bus *bus) {
1228         if (!bus)
1229                 return -EINVAL;
1230
1231         if (bus->fd < 0)
1232                 return -EINVAL;
1233
1234         return bus->fd;
1235 }
1236
1237 int sd_bus_get_events(sd_bus *bus) {
1238         int flags = 0;
1239
1240         if (!bus)
1241                 return -EINVAL;
1242
1243         if (bus->fd < 0)
1244                 return -EINVAL;
1245
1246         if (bus->state == BUS_OPENING)
1247                 flags |= POLLOUT;
1248         else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1249                 if (bus->rqueue_size <= 0)
1250                         flags |= POLLIN;
1251                 if (bus->wqueue_size > 0)
1252                         flags |= POLLOUT;
1253         }
1254
1255         return flags;
1256 }
1257
1258 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1259         sd_bus_message *m;
1260         int r;
1261
1262         if (!bus)
1263                 return -EINVAL;
1264         if (bus->fd < 0)
1265                 return -ENOTCONN;
1266
1267         if (bus->state == BUS_OPENING) {
1268                 struct pollfd p;
1269
1270                 zero(p);
1271                 p.fd = bus->fd;
1272                 p.events = POLLOUT;
1273
1274                 r = poll(&p, 1, 0);
1275                 if (r < 0)
1276                         return -errno;
1277
1278                 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1279                         int error;
1280                         socklen_t slen = sizeof(error);
1281
1282                         r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1283                         if (r < 0)
1284                                 return -errno;
1285
1286                         if (error != 0)
1287                                 bus->last_connect_error = -error;
1288                         else if (p.revents & (POLLERR|POLLHUP))
1289                                 bus->last_connect_error = -ECONNREFUSED;
1290                         else
1291                                 return bus_start_auth(bus);
1292
1293                         /* Try next address */
1294                         return bus_start_connect(bus);
1295                 }
1296
1297                 return 0;
1298
1299         } else if (bus->state == BUS_AUTHENTICATING) {
1300
1301                 r = bus_write_auth(bus);
1302                 if (r < 0)
1303                         return r;
1304
1305                 r = bus_read_auth(bus);
1306                 if (r <= 0)
1307                         return r;
1308
1309                 return bus_start_running(bus);
1310
1311         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1312                 struct filter_callback *l;
1313
1314                 r = dispatch_wqueue(bus);
1315                 if (r < 0)
1316                         return r;
1317
1318                 r = dispatch_rqueue(bus, &m);
1319                 if (r <= 0)
1320                         return r;
1321
1322                 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1323                         struct reply_callback *c;
1324
1325                         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1326                         if (c) {
1327                                 r = c->callback(bus, m, c->userdata);
1328                                 free(c);
1329
1330                                 if (r != 0) {
1331                                         sd_bus_message_unref(m);
1332                                         return r < 0 ? r : 0;
1333                                 }
1334                         }
1335                 }
1336
1337                 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1338                         r = l->callback(bus, m, l->userdata);
1339                         if (r != 0) {
1340                                 sd_bus_message_unref(m);
1341                                 return r < 0 ? r : 0;
1342                         }
1343                 }
1344
1345                 if (ret) {
1346                         *ret = m;
1347                         return 1;
1348                 }
1349
1350                 sd_bus_message_unref(m);
1351                 return 0;
1352         }
1353
1354         return -ENOTSUP;
1355 }
1356
1357 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1358         struct pollfd p;
1359         int r, e;
1360         struct timespec ts;
1361
1362         if (!bus)
1363                 return -EINVAL;
1364         if (bus->fd < 0)
1365                 return -ECONNREFUSED;
1366
1367         e = sd_bus_get_events(bus);
1368         if (e < 0)
1369                 return e;
1370
1371         zero(p);
1372         p.fd = bus->fd;
1373         p.events = e;
1374
1375         r = ppoll(&p, 1, timeout_usec == (uint64_t) -1 ? NULL : timespec_store(&ts, timeout_usec), NULL);
1376         if (r < 0)
1377                 return -EINVAL;
1378
1379         return r;
1380 }
1381
1382 int sd_bus_flush(sd_bus *bus) {
1383         int r;
1384
1385         if (!bus)
1386                 return -EINVAL;
1387         if (bus->fd < 0)
1388                 return -ENOTCONN;
1389
1390         if (bus->state == BUS_RUNNING && bus->wqueue_size <= 0)
1391                 return 0;
1392
1393         for (;;) {
1394                 r = dispatch_wqueue(bus);
1395                 if (r < 0)
1396                         return r;
1397
1398                 if (bus->state == BUS_RUNNING && bus->wqueue_size <= 0)
1399                         return 0;
1400
1401                 r = sd_bus_wait(bus, (uint64_t) -1);
1402                 if (r < 0)
1403                         return r;
1404         }
1405 }
1406
1407 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1408         struct filter_callback *f;
1409
1410         if (!bus)
1411                 return -EINVAL;
1412         if (!callback)
1413                 return -EINVAL;
1414
1415         f = new(struct filter_callback, 1);
1416         if (!f)
1417                 return -ENOMEM;
1418         f->callback = callback;
1419         f->userdata = userdata;
1420
1421         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1422         return 0;
1423 }
1424
1425 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1426         struct filter_callback *f;
1427
1428         if (!bus)
1429                 return -EINVAL;
1430         if (!callback)
1431                 return -EINVAL;
1432
1433         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1434                 if (f->callback == callback && f->userdata == userdata) {
1435                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1436                         free(f);
1437                         return 1;
1438                 }
1439         }
1440
1441         return 0;
1442 }