chiark / gitweb /
bus: introduce bus_error_is_dirty() independently of sd_bus_error_is_set()
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32
33 #include "sd-bus.h"
34 #include "bus-internal.h"
35 #include "bus-message.h"
36 #include "bus-type.h"
37
38 #define WQUEUE_MAX 128
39
40 static void bus_free(sd_bus *b) {
41         struct filter_callback *f;
42
43         assert(b);
44
45         if (b->fd >= 0)
46                 close_nointr_nofail(b->fd);
47
48         free(b->rbuffer);
49         free(b->rqueue);
50         free(b->wqueue);
51         free(b->unique_name);
52
53         hashmap_free_free(b->reply_callbacks);
54
55         while ((f = b->filter_callbacks)) {
56                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
57                 free(f);
58         }
59
60         free(b);
61 }
62
63 static sd_bus* bus_new(void) {
64         sd_bus *r;
65
66         r = new0(sd_bus, 1);
67         if (!r)
68                 return NULL;
69
70         r->n_ref = 1;
71         r->fd = -1;
72         r->message_version = 1;
73
74         /* We guarantee that wqueue always has space for at least one
75          * entry */
76         r->wqueue = new(sd_bus_message*, 1);
77         if (!r->wqueue) {
78                 free(r);
79                 return NULL;
80         }
81
82         return r;
83 };
84
85 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
86         const char *s;
87         int r;
88
89         assert(bus);
90         assert(reply);
91
92         bus->state = BUS_RUNNING;
93
94         r = sd_bus_message_read(reply, "s", &s);
95         if (r < 0)
96                 return r;
97
98         bus->unique_name = strdup(s);
99         if (!bus->unique_name)
100                 return -ENOMEM;
101
102         return 1;
103 }
104
105 static int bus_send_hello(sd_bus *bus) {
106         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
107         int r;
108
109         assert(bus);
110
111         r = sd_bus_message_new_method_call(
112                         bus,
113                         "org.freedesktop.DBus",
114                         "/",
115                         "org.freedesktop.DBus",
116                         "Hello",
117                         &m);
118         if (r < 0)
119                 return r;
120
121         r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, (uint64_t) -1, NULL);
122         if (r < 0)
123                 return r;
124
125         return 0;
126 }
127
128 static int bus_start_running(sd_bus *bus) {
129         int r;
130
131         assert(bus);
132
133         if (bus->send_hello) {
134                 bus->state = BUS_HELLO;
135
136                 r = bus_send_hello(bus);
137                 if (r < 0)
138                         return r;
139         }
140
141         bus->state = BUS_RUNNING;
142
143         return 0;
144 }
145
146 static int parse_address_key(const char **p, const char *key, char **value) {
147         size_t l, n = 0;
148         const char *a;
149         char *r = NULL;
150
151         assert(p);
152         assert(*p);
153         assert(key);
154         assert(value);
155
156         l = strlen(key);
157         if (!strncmp(*p, key, l) != 0)
158                 return 0;
159
160         if ((*p)[l] != '=')
161                 return 0;
162
163         if (*value)
164                 return -EINVAL;
165
166         a = *p + l + 1;
167         while (*a != ';' && *a != 0) {
168                 char c, *t;
169
170                 if (*a == '%') {
171                         int x, y;
172
173                         x = unhexchar(a[1]);
174                         if (x < 0) {
175                                 free(r);
176                                 return x;
177                         }
178
179                         y = unhexchar(a[2]);
180                         if (y < 0) {
181                                 free(r);
182                                 return y;
183                         }
184
185                         a += 3;
186                         c = (char) ((x << 4) | y);
187                 } else
188                         c = *a;
189
190                 t = realloc(r, n + 1);
191                 if (!t) {
192                         free(r);
193                         return -ENOMEM;
194                 }
195
196                 r = t;
197                 r[n++] = c;
198         }
199
200         *p = a;
201         *value = r;
202         return 1;
203 }
204
205 static void skip_address_key(const char **p) {
206         assert(p);
207         assert(*p);
208
209         *p += strcspn(*p, ";");
210 }
211
212 static int bus_parse_next_address(sd_bus *b) {
213         const char *a, *p;
214         _cleanup_free_ char *guid = NULL;
215         int r;
216
217         assert(b);
218
219         if (!b->address)
220                 return 0;
221         if (b->address[b->address_index] == 0)
222                 return 0;
223
224         a = b->address + b->address_index;
225
226         zero(b->sockaddr);
227         b->sockaddr_size = 0;
228         b->peer = SD_ID128_NULL;
229
230         if (startswith(a, "unix:")) {
231                 _cleanup_free_ char *path = NULL, *abstract = NULL;
232
233                 p = a + 5;
234                 while (*p != 0 && *p != ';') {
235                         r = parse_address_key(&p, "guid", &guid);
236                         if (r < 0)
237                                 return r;
238                         else if (r > 0)
239                                 continue;
240
241                         r = parse_address_key(&p, "path", &path);
242                         if (r < 0)
243                                 return r;
244                         else if (r > 0)
245                                 continue;
246
247                         r = parse_address_key(&p, "abstract", &abstract);
248                         if (r < 0)
249                                 return r;
250                         else if (r > 0)
251                                 continue;
252
253                         skip_address_key(&p);
254                 }
255
256                 if (!path && !abstract)
257                         return -EINVAL;
258
259                 if (path && abstract)
260                         return -EINVAL;
261
262                 if (path) {
263                         size_t l;
264
265                         l = strlen(path);
266                         if (l > sizeof(b->sockaddr.un.sun_path))
267                                 return -E2BIG;
268
269                         b->sockaddr.un.sun_family = AF_UNIX;
270                         strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
271                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
272                 } else if (abstract) {
273                         size_t l;
274
275                         l = strlen(path);
276                         if (l > sizeof(b->sockaddr.un.sun_path) - 1)
277                                 return -E2BIG;
278
279                         b->sockaddr.un.sun_family = AF_UNIX;
280                         b->sockaddr.un.sun_path[0] = 0;
281                         strncpy(b->sockaddr.un.sun_path+1, path, sizeof(b->sockaddr.un.sun_path)-1);
282                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
283                 }
284
285         } else if (startswith(a, "tcp:")) {
286                 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
287                 struct addrinfo hints, *result;
288
289                 p = a + 4;
290                 while (*p != 0 && *p != ';') {
291                         r = parse_address_key(&p, "guid", &guid);
292                         if (r < 0)
293                                 return r;
294                         else if (r > 0)
295                                 continue;
296
297                         r = parse_address_key(&p, "host", &host);
298                         if (r < 0)
299                                 return r;
300                         else if (r > 0)
301                                 continue;
302
303                         r = parse_address_key(&p, "port", &port);
304                         if (r < 0)
305                                 return r;
306                         else if (r > 0)
307                                 continue;
308
309                         r = parse_address_key(&p, "family", &family);
310                         if (r < 0)
311                                 return r;
312                         else if (r > 0)
313                                 continue;
314
315                         skip_address_key(&p);
316                 }
317
318                 if (!host || !port)
319                         return -EINVAL;
320
321                 zero(hints);
322                 hints.ai_socktype = SOCK_STREAM;
323                 hints.ai_flags = AI_ADDRCONFIG;
324
325                 if (family) {
326                         if (streq(family, "ipv4"))
327                                 hints.ai_family = AF_INET;
328                         else if (streq(family, "ipv6"))
329                                 hints.ai_family = AF_INET6;
330                         else
331                                 return -EINVAL;
332                 }
333
334                 r = getaddrinfo(host, port, &hints, &result);
335                 if (r == EAI_SYSTEM)
336                         return -errno;
337                 else if (r != 0)
338                         return -EADDRNOTAVAIL;
339
340                 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
341                 b->sockaddr_size = result->ai_addrlen;
342
343                 freeaddrinfo(result);
344         }
345
346         if (guid) {
347                 r = sd_id128_from_string(guid, &b->peer);
348                 if (r < 0)
349                         return r;
350         }
351
352         b->address_index = p - b->address;
353         return 1;
354 }
355
356 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
357
358         while (size > 0) {
359                 struct iovec *i = iov + *idx;
360
361                 if (i->iov_len > size) {
362                         i->iov_base = (uint8_t*) i->iov_base + size;
363                         i->iov_len -= size;
364                         return;
365                 }
366
367                 size -= i->iov_len;
368
369                 i->iov_base = NULL;
370                 i->iov_len = 0;
371
372                 (*idx) ++;
373         }
374 }
375
376 static int bus_write_auth(sd_bus *b) {
377         struct msghdr mh;
378         ssize_t k;
379
380         assert(b);
381         assert(b->state == BUS_AUTHENTICATING);
382
383         if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
384                 return 0;
385
386         zero(mh);
387         mh.msg_iov = b->auth_iovec + b->auth_index;
388         mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
389
390         k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
391         if (k < 0)
392                 return errno == EAGAIN ? 0 : -errno;
393
394         iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
395
396         return 1;
397 }
398
399 static int bus_auth_verify(sd_bus *b) {
400         char *e, *f;
401         sd_id128_t peer;
402         unsigned i;
403         int r;
404
405         /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
406          * that's it */
407
408         e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
409         if (!e)
410                 return 0;
411
412         f = memmem(e, b->rbuffer_size - (e - (char*) b->rbuffer), "\r\n", 2);
413         if (!f)
414                 return 0;
415
416         if (e - (char*) b->rbuffer != 3 + 32)
417                 return -EPERM;
418
419         if (memcmp(b->rbuffer, "OK ", 3))
420                 return -EPERM;
421
422         for (i = 0; i < 32; i += 2) {
423                 int x, y;
424
425                 x = unhexchar(((char*) b->rbuffer)[3 + i]);
426                 y = unhexchar(((char*) b->rbuffer)[3 + i + 2]);
427
428                 if (x < 0 || y < 0)
429                         return -EINVAL;
430
431                 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
432         }
433
434         if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
435             !sd_id128_equal(b->peer, peer))
436                 return -EPERM;
437
438         b->peer = peer;
439
440         b->can_fds =
441                 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
442                 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
443
444         if (f + 2 > (char*) b->rbuffer + b->rbuffer_size) {
445                 b->rbuffer_size -= (f - (char*) b->rbuffer);
446                 memmove(b->rbuffer, f + 2, b->rbuffer_size);
447         }
448
449         r = bus_start_running(b);
450         if (r < 0)
451                 return r;
452
453         return 1;
454 }
455
456 static int bus_read_auth(sd_bus *b) {
457         struct msghdr mh;
458         struct iovec iov;
459         size_t n;
460         ssize_t k;
461         int r;
462
463         assert(b);
464
465         r = bus_auth_verify(b);
466         if (r != 0)
467                 return r;
468
469         n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
470
471         zero(iov);
472         iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
473         iov.iov_len = n - b->rbuffer_size;
474
475         zero(mh);
476         mh.msg_iov = &iov;
477         mh.msg_iovlen = 1;
478
479         k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
480         if (k < 0)
481                 return errno == EAGAIN ? 0 : -errno;
482
483         b->rbuffer_size += k;
484
485         r = bus_auth_verify(b);
486         if (r != 0)
487                 return r;
488
489         return 0;
490 }
491
492 static int bus_start_auth(sd_bus *b) {
493         static const char auth_prefix[] = "\0AUTH_EXTERNAL ";
494         static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
495
496         char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
497         size_t l;
498
499         assert(b);
500
501         b->state = BUS_AUTHENTICATING;
502
503         snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
504         char_array_0(text);
505
506         l = strlen(text);
507         b->auth_uid = hexmem(text, l);
508         if (!b->auth_uid)
509                 return -ENOMEM;
510
511         b->auth_iovec[0].iov_base = (void*) auth_prefix;
512         b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
513         b->auth_iovec[1].iov_base = (void*) b->auth_uid;
514         b->auth_iovec[1].iov_len = l * 2;
515         b->auth_iovec[2].iov_base = (void*) auth_suffix;
516         b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
517         b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
518
519         return bus_write_auth(b);
520 }
521
522 static int bus_start_connect(sd_bus *b) {
523         int r;
524
525         assert(b);
526         assert(b->fd < 0);
527
528         for (;;) {
529                 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
530                         r = bus_parse_next_address(b);
531                         if (r < 0)
532                                 return r;
533                         if (r == 0)
534                                 return b->last_connect_error ? b->last_connect_error : -ECONNREFUSED;
535                 }
536
537                 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
538                 if (b->fd < 0) {
539                         b->last_connect_error = -errno;
540                         zero(b->sockaddr);
541                         continue;
542                 }
543
544                 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
545                 if (r < 0) {
546                         if (errno == EINPROGRESS)
547                                 return 0;
548
549                         b->last_connect_error = -errno;
550                         close_nointr_nofail(b->fd);
551                         b->fd = -1;
552                         zero(b->sockaddr);
553                         continue;
554                 }
555
556                 return bus_start_auth(b);
557         }
558 }
559
560 int sd_bus_open_system(sd_bus **ret) {
561         const char *e;
562         sd_bus *b;
563         int r;
564
565         if (!ret)
566                 return -EINVAL;
567
568         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
569         if (e) {
570                 r = sd_bus_open_address(e, &b);
571                 if (r < 0)
572                         return r;
573
574                 b->send_hello = true;
575                 *ret = b;
576                 return r;
577         }
578
579         b = bus_new();
580         if (!b)
581                 return -ENOMEM;
582
583         b->send_hello = true;
584
585         b->sockaddr.un.sun_family = AF_UNIX;
586         strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
587         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
588
589         r = bus_start_connect(b);
590         if (r < 0) {
591                 bus_free(b);
592                 return r;
593         }
594
595         *ret = b;
596         return 0;
597 }
598
599 int sd_bus_open_user(sd_bus **ret) {
600         const char *e;
601         sd_bus *b;
602         size_t l;
603         int r;
604
605         if (!ret)
606                 return -EINVAL;
607
608         e = getenv("DBUS_SESSION_BUS_ADDRESS");
609         if (e) {
610                 r = sd_bus_open_address(e, &b);
611                 if (r < 0)
612                         return r;
613
614                 b->send_hello = true;
615                 *ret = b;
616                 return r;
617         }
618
619         e = getenv("XDG_RUNTIME_DIR");
620         if (!e)
621                 return -ENOENT;
622
623         l = strlen(e);
624         if (l + 4 > sizeof(b->sockaddr.un.sun_path))
625                 return -E2BIG;
626
627         b = bus_new();
628         if (!b)
629                 return -ENOMEM;
630
631         b->send_hello = true;
632
633         b->sockaddr.un.sun_family = AF_UNIX;
634         memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
635         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
636
637         r = bus_start_connect(b);
638         if (r < 0) {
639                 bus_free(b);
640                 return r;
641         }
642
643         *ret = b;
644         return 0;
645 }
646
647 int sd_bus_open_address(const char *address, sd_bus **ret) {
648         sd_bus *b;
649         int r;
650
651         if (!address)
652                 return -EINVAL;
653         if (!ret)
654                 return -EINVAL;
655
656         b = bus_new();
657         if (!b)
658                 return -ENOMEM;
659
660         b->address = strdup(address);
661         if (!b->address) {
662                 bus_free(b);
663                 return -ENOMEM;
664         }
665
666         r = bus_start_connect(b);
667         if (r < 0) {
668                 bus_free(b);
669                 return r;
670         }
671
672         *ret = b;
673         return 0;
674 }
675
676 int sd_bus_open_fd(int fd, sd_bus **ret) {
677         sd_bus *b;
678         int r;
679
680         if (fd < 0)
681                 return -EINVAL;
682         if (!ret)
683                 return -EINVAL;
684
685         b = bus_new();
686         if (!b)
687                 return -ENOMEM;
688
689         b->fd = fd;
690         fd_nonblock(b->fd, true);
691         fd_cloexec(b->fd, true);
692
693         r = bus_start_auth(b);
694         if (r < 0) {
695                 bus_free(b);
696                 return r;
697         }
698
699         *ret = b;
700         return 0;
701 }
702
703 void sd_bus_close(sd_bus *bus) {
704         if (!bus)
705                 return;
706         if (bus->fd < 0)
707                 return;
708
709         close_nointr_nofail(bus->fd);
710         bus->fd = -1;
711 }
712
713 sd_bus *sd_bus_ref(sd_bus *bus) {
714         if (!bus)
715                 return NULL;
716
717         assert(bus->n_ref > 0);
718
719         bus->n_ref++;
720         return bus;
721 }
722
723 sd_bus *sd_bus_unref(sd_bus *bus) {
724         if (!bus)
725                 return NULL;
726
727         assert(bus->n_ref > 0);
728         bus->n_ref--;
729
730         if (bus->n_ref <= 0)
731                 bus_free(bus);
732
733         return NULL;
734 }
735
736 int sd_bus_is_running(sd_bus *bus) {
737         if (!bus)
738                 return -EINVAL;
739
740         if (bus->fd < 0)
741                 return -ENOTCONN;
742
743         return bus->state == BUS_RUNNING;
744 }
745
746 int sd_bus_can_send(sd_bus *bus, char type) {
747
748         if (!bus)
749                 return -EINVAL;
750
751         if (type == SD_BUS_TYPE_UNIX_FD)
752                 return bus->can_fds;
753
754         return bus_type_is_valid(type);
755 }
756
757 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
758         assert(m);
759
760         if (m->sealed)
761                 return 0;
762
763         return bus_message_seal(m, ++b->serial);
764 }
765
766 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
767         struct msghdr mh;
768         struct iovec *iov;
769         ssize_t k;
770         size_t n;
771         unsigned j;
772
773         assert(bus);
774         assert(m);
775         assert(idx);
776
777         n = m->n_iovec * sizeof(struct iovec);
778         iov = alloca(n);
779         memcpy(iov, m->iovec, n);
780
781         j = 0;
782         iovec_advance(iov, &j, *idx);
783
784         zero(mh);
785         mh.msg_iov = iov;
786         mh.msg_iovlen = m->n_iovec;
787
788         k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
789         if (k < 0)
790                 return -errno;
791
792         *idx += (size_t) k;
793         iovec_advance(iov, &j, *idx);
794
795         return j > m->n_iovec;
796 }
797
798 static int message_read_need(sd_bus *bus, size_t *need) {
799         uint32_t a, b;
800         uint8_t e;
801
802         assert(bus);
803         assert(need);
804
805         if (bus->rbuffer_size <= sizeof(struct bus_header)) {
806                 *need = sizeof(struct bus_header);
807                 return 0;
808         }
809
810         a = ((const uint32_t*) bus->rbuffer)[1];
811         b = ((const uint32_t*) bus->rbuffer)[3];
812
813         e = ((const uint8_t*) bus->rbuffer)[0];
814         if (e == SD_BUS_LITTLE_ENDIAN) {
815                 a = le32toh(a);
816                 b = le32toh(b);
817         } else if (e == SD_BUS_BIG_ENDIAN) {
818                 a = be32toh(a);
819                 b = be32toh(b);
820         } else
821                 return -EIO;
822
823         *need = sizeof(struct bus_header) + ALIGN_TO(a, 8) + b;
824         return 0;
825 }
826
827 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
828         sd_bus_message *t;
829         void *b = NULL;
830         int r;
831
832         assert(bus);
833         assert(m);
834         assert(bus->rbuffer_size >= size);
835
836         if (bus->rbuffer_size > size) {
837                 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
838                 if (!b) {
839                         free(t);
840                         return -ENOMEM;
841                 }
842         }
843
844         r = bus_message_from_malloc(bus->rbuffer, size, &t);
845         if (r < 0) {
846                 free(b);
847                 return r;
848         }
849
850         bus->rbuffer = b;
851         bus->rbuffer_size -= size;
852
853         r = bus_message_parse(t);
854         if (r < 0) {
855                 sd_bus_message_unref(t);
856                 return r;
857         }
858
859         *m = t;
860         return 1;
861 }
862
863 static int message_read(sd_bus *bus, sd_bus_message **m) {
864         struct msghdr mh;
865         struct iovec iov;
866         ssize_t k;
867         size_t need;
868         int r;
869         void *b;
870
871         assert(bus);
872         assert(m);
873
874         r = message_read_need(bus, &need);
875         if (r < 0)
876                 return r;
877
878         if (bus->rbuffer_size >= need)
879                 return message_make(bus, need, m);
880
881         b = realloc(bus->rbuffer, need);
882         if (!b)
883                 return -ENOMEM;
884
885         zero(iov);
886         iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
887         iov.iov_len = need - bus->rbuffer_size;
888
889         zero(mh);
890         mh.msg_iov = &iov;
891         mh.msg_iovlen = 1;
892
893         k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
894         if (k < 0)
895                 return errno == EAGAIN ? 0 : -errno;
896
897         bus->rbuffer_size += k;
898
899         r = message_read_need(bus, &need);
900         if (r < 0)
901                 return r;
902
903         if (bus->rbuffer_size >= need)
904                 return message_make(bus, need, m);
905
906         return 0;
907 }
908
909 static int dispatch_wqueue(sd_bus *bus) {
910         int r, c = 0;
911
912         assert(bus);
913
914         if (bus->fd < 0)
915                 return -ENOTCONN;
916
917         while (bus->wqueue_size > 0) {
918
919                 r = message_write(bus, bus->wqueue[0], &bus->windex);
920                 if (r < 0) {
921                         sd_bus_close(bus);
922                         return r;
923                 } else if (r == 0)
924                         /* Wasn't fully written yet... */
925                         break;
926                 else {
927                         /* Fully written. Let's drop the entry from
928                          * the queue.
929                          *
930                          * This isn't particularly optimized, but
931                          * well, this is supposed to be our worst-case
932                          * buffer only, and the socket buffer is
933                          * supposed to be our primary buffer, and if
934                          * it got full, then all bets are off
935                          * anyway. */
936
937                         sd_bus_message_unref(bus->wqueue[0]);
938                         bus->wqueue_size --;
939                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
940                         bus->windex = 0;
941
942                         c++;
943                 }
944         }
945
946         return c;
947 }
948
949 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
950         int r;
951
952         assert(bus);
953         assert(m);
954
955         if (bus->fd < 0)
956                 return -ENOTCONN;
957
958         if (bus->rqueue_size > 0) {
959                 /* Dispatch a queued message */
960
961                 *m = bus->rqueue[0];
962                 bus->rqueue_size --;
963                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
964                 return 1;
965         }
966
967         /* Try to read a new message */
968         r = message_read(bus, m);
969         if (r < 0) {
970                 sd_bus_close(bus);
971                 return r;
972         }
973
974         return r;
975 }
976
977 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
978         int r;
979
980         if (!bus)
981                 return -EINVAL;
982         if (bus->fd < 0)
983                 return -ENOTCONN;
984         if (!m)
985                 return -EINVAL;
986         if (m->header->version > bus->message_version)
987                 return -EPERM;
988
989         r = bus_seal_message(bus, m);
990         if (r < 0)
991                 return r;
992
993         /* If this is a reply and no reply was requested, then let's
994          * suppress this, if we can */
995         if (m->dont_send && !serial)
996                 return 0;
997
998         if (bus->wqueue_size <= 0) {
999                 size_t idx = 0;
1000
1001                 r = message_write(bus, m, &idx);
1002                 if (r < 0) {
1003                         sd_bus_close(bus);
1004                         return r;
1005                 } else if (r == 0)  {
1006                         /* Wasn't fully written. So let's remember how
1007                          * much was written. Note that the first entry
1008                          * of the wqueue array is always allocated so
1009                          * that we always can remember how much was
1010                          * written. */
1011                         bus->wqueue[0] = sd_bus_message_ref(m);
1012                         bus->wqueue_size = 1;
1013                         bus->windex = idx;
1014                 }
1015         } else {
1016                 sd_bus_message **q;
1017
1018                 /* Just append it to the queue. */
1019
1020                 if (bus->wqueue_size >= WQUEUE_MAX)
1021                         return -ENOBUFS;
1022
1023                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1024                 if (!q)
1025                         return -ENOMEM;
1026
1027                 bus->wqueue = q;
1028                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1029         }
1030
1031         if (serial)
1032                 *serial = BUS_MESSAGE_SERIAL(m);
1033
1034         return 0;
1035 }
1036
1037 static usec_t calc_elapse(uint64_t usec) {
1038         if (usec == (uint64_t) -1)
1039                 return 0;
1040
1041         if (usec == 0)
1042                 usec = SD_BUS_DEFAULT_TIMEOUT;
1043
1044         return now(CLOCK_MONOTONIC) + usec;
1045 }
1046
1047 int sd_bus_send_with_reply(
1048                 sd_bus *bus,
1049                 sd_bus_message *m,
1050                 sd_message_handler_t callback,
1051                 void *userdata,
1052                 uint64_t usec,
1053                 uint64_t *serial) {
1054
1055         struct reply_callback *c;
1056         int r;
1057
1058         if (!bus)
1059                 return -EINVAL;
1060         if (!bus->fd < 0)
1061                 return -ENOTCONN;
1062         if (!m)
1063                 return -EINVAL;
1064         if (!callback)
1065                 return -EINVAL;
1066         if (!m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1067                 return -EINVAL;
1068
1069         r = bus_seal_message(bus, m);
1070         if (r < 0)
1071                 return r;
1072
1073         c = new(struct reply_callback, 1);
1074         if (!c)
1075                 return -ENOMEM;
1076
1077         c->callback = callback;
1078         c->userdata = userdata;
1079         c->serial = BUS_MESSAGE_SERIAL(m);
1080         c->timeout = calc_elapse(usec);
1081
1082         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1083         if (r < 0) {
1084                 free(c);
1085                 return r;
1086         }
1087
1088         r = sd_bus_send(bus, m, serial);
1089         if (r < 0) {
1090                 hashmap_remove(bus->reply_callbacks, &c->serial);
1091                 free(c);
1092                 return r;
1093         }
1094
1095         return r;
1096 }
1097
1098 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1099         struct reply_callbacks *c;
1100
1101         if (!bus)
1102                 return -EINVAL;
1103         if (serial == 0)
1104                 return -EINVAL;
1105
1106         c = hashmap_remove(bus->reply_callbacks, &serial);
1107         if (!c)
1108                 return 0;
1109
1110         free(c);
1111         return 1;
1112 }
1113
1114 int sd_bus_send_with_reply_and_block(
1115                 sd_bus *bus,
1116                 sd_bus_message *m,
1117                 uint64_t usec,
1118                 sd_bus_error *error,
1119                 sd_bus_message **reply) {
1120
1121         int r;
1122         usec_t timeout;
1123         uint64_t serial;
1124         bool room = false;
1125
1126         if (!bus)
1127                 return -EINVAL;
1128         if (!bus->fd < 0)
1129                 return -ENOTCONN;
1130         if (!m)
1131                 return -EINVAL;
1132         if (!m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1133                 return -EINVAL;
1134         if (sd_bus_error_is_dirty(error))
1135                 return -EINVAL;
1136
1137         r = sd_bus_send(bus, m, &serial);
1138         if (r < 0)
1139                 return r;
1140
1141         timeout = calc_elapse(usec);
1142
1143         for (;;) {
1144                 usec_t left;
1145                 sd_bus_message *incoming;
1146
1147                 if (!room) {
1148                         sd_bus_message **q;
1149
1150                         /* Make sure there's room for queuing this
1151                          * locally, before we read the message */
1152
1153                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1154                         if (!q)
1155                                 return -ENOMEM;
1156
1157                         bus->rqueue = q;
1158                         room = true;
1159                 }
1160
1161                 r = message_read(bus, &incoming);
1162                 if (r < 0)
1163                         return r;
1164                 if (r > 0) {
1165                         if (incoming->reply_serial == serial) {
1166                                 /* Found a match! */
1167
1168                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1169                                         *reply = incoming;
1170                                         return 0;
1171                                 }
1172
1173                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1174                                         int k;
1175
1176                                         r = sd_bus_error_copy(error, &incoming->error);
1177                                         if (r < 0) {
1178                                                 sd_bus_message_unref(incoming);
1179                                                 return r;
1180                                         }
1181
1182                                         k = bus_error_to_errno(&incoming->error);
1183                                         sd_bus_message_unref(incoming);
1184                                         return k;
1185                                 }
1186
1187                                 sd_bus_message_unref(incoming);
1188                                 return -EIO;
1189                         }
1190
1191                         /* There's already guaranteed to be room for
1192                          * this, so need to resize things here */
1193                         bus->rqueue[bus->rqueue_size ++] = incoming;
1194                         room = false;
1195
1196                         /* Try to read more, right-away */
1197                         continue;
1198                 }
1199
1200                 if (timeout > 0) {
1201                         usec_t n;
1202
1203                         n = now(CLOCK_MONOTONIC);
1204                         if (n >= timeout)
1205                                 return -ETIMEDOUT;
1206
1207                         left = timeout - n;
1208                 } else
1209                         left = (uint64_t) -1;
1210
1211                 r = sd_bus_wait(bus, left);
1212                 if (r < 0)
1213                         return r;
1214
1215                 r = dispatch_wqueue(bus);
1216                 if (r < 0)
1217                         return r;
1218         }
1219 }
1220
1221 int sd_bus_get_fd(sd_bus *bus) {
1222         if (!bus)
1223                 return -EINVAL;
1224
1225         if (bus->fd < 0)
1226                 return -EINVAL;
1227
1228         return bus->fd;
1229 }
1230
1231 int sd_bus_get_events(sd_bus *bus) {
1232         int flags = 0;
1233
1234         if (!bus)
1235                 return -EINVAL;
1236
1237         if (bus->fd < 0)
1238                 return -EINVAL;
1239
1240         if (bus->state == BUS_OPENING)
1241                 flags |= POLLOUT;
1242         else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1243                 if (bus->rqueue_size <= 0)
1244                         flags |= POLLIN;
1245                 if (bus->wqueue_size > 0)
1246                         flags |= POLLOUT;
1247         }
1248
1249         return flags;
1250 }
1251
1252 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1253         sd_bus_message *m;
1254         int r;
1255
1256         if (!bus)
1257                 return -EINVAL;
1258         if (bus->fd < 0)
1259                 return -ENOTCONN;
1260
1261         if (bus->state == BUS_OPENING) {
1262                 struct pollfd p;
1263
1264                 zero(p);
1265                 p.fd = bus->fd;
1266                 p.events = POLLOUT;
1267
1268                 r = poll(&p, 1, 0);
1269                 if (r < 0)
1270                         return -errno;
1271
1272                 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1273                         int error;
1274                         socklen_t slen = sizeof(error);
1275
1276                         r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1277                         if (r < 0)
1278                                 return -errno;
1279
1280                         if (error != 0)
1281                                 bus->last_connect_error = -error;
1282                         else if (p.revents & (POLLERR|POLLHUP))
1283                                 bus->last_connect_error = -ECONNREFUSED;
1284                         else
1285                                 return bus_start_auth(bus);
1286
1287                         /* Try next address */
1288                         return bus_start_connect(bus);
1289                 }
1290
1291                 return 0;
1292
1293         } else if (bus->state == BUS_AUTHENTICATING) {
1294
1295                 r = bus_write_auth(bus);
1296                 if (r < 0)
1297                         return r;
1298
1299                 r = bus_read_auth(bus);
1300                 if (r <= 0)
1301                         return r;
1302
1303                 return bus_start_running(bus);
1304
1305         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1306                 struct filter_callback *l;
1307
1308                 r = dispatch_wqueue(bus);
1309                 if (r < 0)
1310                         return r;
1311
1312                 r = dispatch_rqueue(bus, &m);
1313                 if (r <= 0)
1314                         return r;
1315
1316                 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1317                         struct reply_callback *c;
1318
1319                         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1320                         if (c) {
1321                                 r = c->callback(bus, m, c->userdata);
1322                                 free(c);
1323
1324                                 if (r != 0) {
1325                                         sd_bus_message_unref(m);
1326                                         return r < 0 ? r : 0;
1327                                 }
1328                         }
1329                 }
1330
1331                 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1332                         r = l->callback(bus, m, l->userdata);
1333                         if (r != 0) {
1334                                 sd_bus_message_unref(m);
1335                                 return r < 0 ? r : 0;
1336                         }
1337                 }
1338
1339                 if (ret) {
1340                         *ret = m;
1341                         return 1;
1342                 }
1343
1344                 sd_bus_message_unref(m);
1345                 return 0;
1346         }
1347
1348         return -ENOTSUP;
1349 }
1350
1351 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1352         struct pollfd p;
1353         int r, e;
1354         struct timespec ts;
1355
1356         if (!bus)
1357                 return -EINVAL;
1358         if (bus->fd < 0)
1359                 return -ECONNREFUSED;
1360
1361         e = sd_bus_get_events(bus);
1362         if (e < 0)
1363                 return e;
1364
1365         zero(p);
1366         p.fd = bus->fd;
1367         p.events = e;
1368
1369         r = ppoll(&p, 1, timeout_usec == (uint64_t) -1 ? NULL : timespec_store(&ts, timeout_usec), NULL);
1370         if (r < 0)
1371                 return -EINVAL;
1372
1373         return r;
1374 }
1375
1376 int sd_bus_flush(sd_bus *bus) {
1377         int r;
1378
1379         if (!bus)
1380                 return -EINVAL;
1381         if (bus->fd < 0)
1382                 return -ENOTCONN;
1383
1384         if (bus->state == BUS_RUNNING && bus->wqueue_size <= 0)
1385                 return 0;
1386
1387         for (;;) {
1388                 r = dispatch_wqueue(bus);
1389                 if (r < 0)
1390                         return r;
1391
1392                 if (bus->state == BUS_RUNNING && bus->wqueue_size <= 0)
1393                         return 0;
1394
1395                 r = sd_bus_wait(bus, (uint64_t) -1);
1396                 if (r < 0)
1397                         return r;
1398         }
1399 }
1400
1401 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1402         struct filter_callback *f;
1403
1404         if (!bus)
1405                 return -EINVAL;
1406         if (!callback)
1407                 return -EINVAL;
1408
1409         f = new(struct filter_callback, 1);
1410         if (!f)
1411                 return -ENOMEM;
1412         f->callback = callback;
1413         f->userdata = userdata;
1414
1415         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1416         return 0;
1417 }
1418
1419 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1420         struct filter_callback *f;
1421
1422         if (!bus)
1423                 return -EINVAL;
1424         if (!callback)
1425                 return -EINVAL;
1426
1427         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1428                 if (f->callback == callback && f->userdata == userdata) {
1429                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1430                         free(f);
1431                         return 1;
1432                 }
1433         }
1434
1435         return 0;
1436 }