chiark / gitweb /
bus: when parsing enforce maximum container depth
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32
33 #include "sd-bus.h"
34 #include "bus-internal.h"
35 #include "bus-message.h"
36 #include "bus-type.h"
37
38 static int ensure_running(sd_bus *bus);
39 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
40
41 static void bus_free(sd_bus *b) {
42         struct filter_callback *f;
43         unsigned i;
44
45         assert(b);
46
47         if (b->fd >= 0)
48                 close_nointr_nofail(b->fd);
49
50         free(b->rbuffer);
51         free(b->unique_name);
52         free(b->auth_uid);
53         free(b->address);
54
55         for (i = 0; i < b->rqueue_size; i++)
56                 sd_bus_message_unref(b->rqueue[i]);
57         free(b->rqueue);
58
59         for (i = 0; i < b->wqueue_size; i++)
60                 sd_bus_message_unref(b->wqueue[i]);
61         free(b->wqueue);
62
63         hashmap_free_free(b->reply_callbacks);
64         prioq_free(b->reply_callbacks_prioq);
65
66         while ((f = b->filter_callbacks)) {
67                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
68                 free(f);
69         }
70
71         free(b);
72 }
73
74 static sd_bus* bus_new(void) {
75         sd_bus *r;
76
77         r = new0(sd_bus, 1);
78         if (!r)
79                 return NULL;
80
81         r->n_ref = 1;
82         r->fd = -1;
83         r->message_version = 1;
84
85         /* We guarantee that wqueue always has space for at least one
86          * entry */
87         r->wqueue = new(sd_bus_message*, 1);
88         if (!r->wqueue) {
89                 free(r);
90                 return NULL;
91         }
92
93         return r;
94 };
95
96 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
97         const char *s;
98         int r;
99
100         assert(bus);
101
102         if (error != 0)
103                 return -error;
104
105         assert(reply);
106
107         r = sd_bus_message_read(reply, "s", &s);
108         if (r < 0)
109                 return r;
110
111         if (!service_name_is_valid(s) || s[0] != ':')
112                 return -EBADMSG;
113
114         bus->unique_name = strdup(s);
115         if (!bus->unique_name)
116                 return -ENOMEM;
117
118         bus->state = BUS_RUNNING;
119
120         return 1;
121 }
122
123 static int bus_send_hello(sd_bus *bus) {
124         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
125         int r;
126
127         assert(bus);
128
129         r = sd_bus_message_new_method_call(
130                         bus,
131                         "org.freedesktop.DBus",
132                         "/",
133                         "org.freedesktop.DBus",
134                         "Hello",
135                         &m);
136         if (r < 0)
137                 return r;
138
139         r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
140         if (r < 0)
141                 return r;
142
143         bus->sent_hello = true;
144         return r;
145 }
146
147 static int bus_start_running(sd_bus *bus) {
148         assert(bus);
149
150         if (bus->sent_hello) {
151                 bus->state = BUS_HELLO;
152                 return 1;
153         }
154
155         bus->state = BUS_RUNNING;
156         return 1;
157 }
158
159 static int parse_address_key(const char **p, const char *key, char **value) {
160         size_t l, n = 0;
161         const char *a;
162         char *r = NULL;
163
164         assert(p);
165         assert(*p);
166         assert(key);
167         assert(value);
168
169         l = strlen(key);
170         if (strncmp(*p, key, l) != 0)
171                 return 0;
172
173         if ((*p)[l] != '=')
174                 return 0;
175
176         if (*value)
177                 return -EINVAL;
178
179         a = *p + l + 1;
180         while (*a != ',' && *a != 0) {
181                 char c, *t;
182
183                 if (*a == '%') {
184                         int x, y;
185
186                         x = unhexchar(a[1]);
187                         if (x < 0) {
188                                 free(r);
189                                 return x;
190                         }
191
192                         y = unhexchar(a[2]);
193                         if (y < 0) {
194                                 free(r);
195                                 return y;
196                         }
197
198                         c = (char) ((x << 4) | y);
199                         a += 3;
200                 } else {
201                         c = *a;
202                         a++;
203                 }
204
205                 t = realloc(r, n + 2);
206                 if (!t) {
207                         free(r);
208                         return -ENOMEM;
209                 }
210
211                 r = t;
212                 r[n++] = c;
213         }
214
215         if (!r) {
216                 r = strdup("");
217                 if (!r)
218                         return -ENOMEM;
219         } else
220                 r[n] = 0;
221
222         if (*a == ',')
223                 a++;
224
225         *p = a;
226         *value = r;
227         return 1;
228 }
229
230 static void skip_address_key(const char **p) {
231         assert(p);
232         assert(*p);
233
234         *p += strcspn(*p, ",");
235
236         if (**p == ',')
237                 (*p) ++;
238 }
239
240 static int bus_parse_next_address(sd_bus *b) {
241         const char *a, *p;
242         _cleanup_free_ char *guid = NULL;
243         int r;
244
245         assert(b);
246
247         if (!b->address)
248                 return 0;
249         if (b->address[b->address_index] == 0)
250                 return 0;
251
252         a = b->address + b->address_index;
253
254         zero(b->sockaddr);
255         b->sockaddr_size = 0;
256         b->peer = SD_ID128_NULL;
257
258         if (startswith(a, "unix:")) {
259                 _cleanup_free_ char *path = NULL, *abstract = NULL;
260
261                 p = a + 5;
262                 while (*p != 0) {
263                         r = parse_address_key(&p, "guid", &guid);
264                         if (r < 0)
265                                 return r;
266                         else if (r > 0)
267                                 continue;
268
269                         r = parse_address_key(&p, "path", &path);
270                         if (r < 0)
271                                 return r;
272                         else if (r > 0)
273                                 continue;
274
275                         r = parse_address_key(&p, "abstract", &abstract);
276                         if (r < 0)
277                                 return r;
278                         else if (r > 0)
279                                 continue;
280
281                         skip_address_key(&p);
282                 }
283
284                 if (!path && !abstract)
285                         return -EINVAL;
286
287                 if (path && abstract)
288                         return -EINVAL;
289
290                 if (path) {
291                         size_t l;
292
293                         l = strlen(path);
294                         if (l > sizeof(b->sockaddr.un.sun_path))
295                                 return -E2BIG;
296
297                         b->sockaddr.un.sun_family = AF_UNIX;
298                         strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
299                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
300                 } else if (abstract) {
301                         size_t l;
302
303                         l = strlen(abstract);
304                         if (l > sizeof(b->sockaddr.un.sun_path) - 1)
305                                 return -E2BIG;
306
307                         b->sockaddr.un.sun_family = AF_UNIX;
308                         b->sockaddr.un.sun_path[0] = 0;
309                         strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
310                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
311                 }
312
313         } else if (startswith(a, "tcp:")) {
314                 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
315                 struct addrinfo hints, *result;
316
317                 p = a + 4;
318                 while (*p != 0) {
319                         r = parse_address_key(&p, "guid", &guid);
320                         if (r < 0)
321                                 return r;
322                         else if (r > 0)
323                                 continue;
324
325                         r = parse_address_key(&p, "host", &host);
326                         if (r < 0)
327                                 return r;
328                         else if (r > 0)
329                                 continue;
330
331                         r = parse_address_key(&p, "port", &port);
332                         if (r < 0)
333                                 return r;
334                         else if (r > 0)
335                                 continue;
336
337                         r = parse_address_key(&p, "family", &family);
338                         if (r < 0)
339                                 return r;
340                         else if (r > 0)
341                                 continue;
342
343                         skip_address_key(&p);
344                 }
345
346                 if (!host || !port)
347                         return -EINVAL;
348
349                 zero(hints);
350                 hints.ai_socktype = SOCK_STREAM;
351                 hints.ai_flags = AI_ADDRCONFIG;
352
353                 if (family) {
354                         if (streq(family, "ipv4"))
355                                 hints.ai_family = AF_INET;
356                         else if (streq(family, "ipv6"))
357                                 hints.ai_family = AF_INET6;
358                         else
359                                 return -EINVAL;
360                 }
361
362                 r = getaddrinfo(host, port, &hints, &result);
363                 if (r == EAI_SYSTEM)
364                         return -errno;
365                 else if (r != 0)
366                         return -EADDRNOTAVAIL;
367
368                 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
369                 b->sockaddr_size = result->ai_addrlen;
370
371                 freeaddrinfo(result);
372         }
373
374         if (guid) {
375                 r = sd_id128_from_string(guid, &b->peer);
376                 if (r < 0)
377                         return r;
378         }
379
380         b->address_index = p - b->address;
381         return 1;
382 }
383
384 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
385
386         while (size > 0) {
387                 struct iovec *i = iov + *idx;
388
389                 if (i->iov_len > size) {
390                         i->iov_base = (uint8_t*) i->iov_base + size;
391                         i->iov_len -= size;
392                         return;
393                 }
394
395                 size -= i->iov_len;
396
397                 i->iov_base = NULL;
398                 i->iov_len = 0;
399
400                 (*idx) ++;
401         }
402 }
403
404 static int bus_write_auth(sd_bus *b) {
405         struct msghdr mh;
406         ssize_t k;
407
408         assert(b);
409         assert(b->state == BUS_AUTHENTICATING);
410
411         if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
412                 return 0;
413
414         if (b->auth_timeout == 0)
415                 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
416
417         zero(mh);
418         mh.msg_iov = b->auth_iovec + b->auth_index;
419         mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
420
421         k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
422         if (k < 0)
423                 return errno == EAGAIN ? 0 : -errno;
424
425         iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
426
427         return 1;
428 }
429
430 static int bus_auth_verify(sd_bus *b) {
431         char *e, *f;
432         sd_id128_t peer;
433         unsigned i;
434         int r;
435
436         /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
437          * that's it */
438
439         e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
440         if (!e)
441                 return 0;
442
443         f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
444         if (!f)
445                 return 0;
446
447         if (e - (char*) b->rbuffer != 3 + 32)
448                 return -EPERM;
449
450         if (memcmp(b->rbuffer, "OK ", 3))
451                 return -EPERM;
452
453         for (i = 0; i < 32; i += 2) {
454                 int x, y;
455
456                 x = unhexchar(((char*) b->rbuffer)[3 + i]);
457                 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
458
459                 if (x < 0 || y < 0)
460                         return -EINVAL;
461
462                 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
463         }
464
465         if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
466             !sd_id128_equal(b->peer, peer))
467                 return -EPERM;
468
469         b->peer = peer;
470
471         b->can_fds =
472                 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
473                 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
474
475         b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
476         memmove(b->rbuffer, f + 2, b->rbuffer_size);
477
478         r = bus_start_running(b);
479         if (r < 0)
480                 return r;
481
482         return 1;
483 }
484
485 static int bus_read_auth(sd_bus *b) {
486         struct msghdr mh;
487         struct iovec iov;
488         size_t n;
489         ssize_t k;
490         int r;
491         void *p;
492
493         assert(b);
494
495         r = bus_auth_verify(b);
496         if (r != 0)
497                 return r;
498
499         n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
500
501         if (n > BUS_AUTH_SIZE_MAX)
502                 n = BUS_AUTH_SIZE_MAX;
503
504         if (b->rbuffer_size >= n)
505                 return -ENOBUFS;
506
507         p = realloc(b->rbuffer, n);
508         if (!p)
509                 return -ENOMEM;
510
511         b->rbuffer = p;
512
513         zero(iov);
514         iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
515         iov.iov_len = n - b->rbuffer_size;
516
517         zero(mh);
518         mh.msg_iov = &iov;
519         mh.msg_iovlen = 1;
520
521         k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
522         if (k < 0)
523                 return errno == EAGAIN ? 0 : -errno;
524
525         b->rbuffer_size += k;
526
527         r = bus_auth_verify(b);
528         if (r != 0)
529                 return r;
530
531         return 1;
532 }
533
534 static int bus_start_auth(sd_bus *b) {
535         static const char auth_prefix[] = "\0AUTH EXTERNAL ";
536         static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
537
538         char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
539         size_t l;
540
541         assert(b);
542
543         b->state = BUS_AUTHENTICATING;
544
545         snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
546         char_array_0(text);
547
548         l = strlen(text);
549         b->auth_uid = hexmem(text, l);
550         if (!b->auth_uid)
551                 return -ENOMEM;
552
553         b->auth_iovec[0].iov_base = (void*) auth_prefix;
554         b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
555         b->auth_iovec[1].iov_base = (void*) b->auth_uid;
556         b->auth_iovec[1].iov_len = l * 2;
557         b->auth_iovec[2].iov_base = (void*) auth_suffix;
558         b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
559         b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
560
561         return bus_write_auth(b);
562 }
563
564 static int bus_start_connect(sd_bus *b) {
565         int r;
566
567         assert(b);
568         assert(b->fd < 0);
569
570         for (;;) {
571                 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
572                         r = bus_parse_next_address(b);
573                         if (r < 0)
574                                 return r;
575                         if (r == 0)
576                                 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
577                 }
578
579                 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
580                 if (b->fd < 0) {
581                         b->last_connect_error = errno;
582                         zero(b->sockaddr);
583                         continue;
584                 }
585
586                 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
587                 if (r < 0) {
588                         if (errno == EINPROGRESS)
589                                 return 1;
590
591                         b->last_connect_error = errno;
592                         close_nointr_nofail(b->fd);
593                         b->fd = -1;
594                         zero(b->sockaddr);
595                         continue;
596                 }
597
598                 return bus_start_auth(b);
599         }
600 }
601
602 int sd_bus_open_system(sd_bus **ret) {
603         const char *e;
604         sd_bus *b;
605         int r;
606
607         if (!ret)
608                 return -EINVAL;
609
610         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
611         if (e) {
612                 r = sd_bus_open_address(e, &b);
613                 if (r < 0)
614                         return r;
615         } else {
616                 b = bus_new();
617                 if (!b)
618                         return -ENOMEM;
619
620                 b->sockaddr.un.sun_family = AF_UNIX;
621                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
622                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
623
624                 r = bus_start_connect(b);
625                 if (r < 0) {
626                         bus_free(b);
627                         return r;
628                 }
629         }
630
631         r = bus_send_hello(b);
632         if (r < 0) {
633                 sd_bus_unref(b);
634                 return r;
635         }
636
637         *ret = b;
638         return 0;
639 }
640
641 int sd_bus_open_user(sd_bus **ret) {
642         const char *e;
643         sd_bus *b;
644         size_t l;
645         int r;
646
647         if (!ret)
648                 return -EINVAL;
649
650         e = getenv("DBUS_SESSION_BUS_ADDRESS");
651         if (e) {
652                 r = sd_bus_open_address(e, &b);
653                 if (r < 0)
654                         return r;
655         } else {
656                 e = getenv("XDG_RUNTIME_DIR");
657                 if (!e)
658                         return -ENOENT;
659
660                 l = strlen(e);
661                 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
662                         return -E2BIG;
663
664                 b = bus_new();
665                 if (!b)
666                         return -ENOMEM;
667
668                 b->sockaddr.un.sun_family = AF_UNIX;
669                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
670                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
671
672                 r = bus_start_connect(b);
673                 if (r < 0) {
674                         bus_free(b);
675                         return r;
676                 }
677         }
678
679         r = bus_send_hello(b);
680         if (r < 0) {
681                 sd_bus_unref(b);
682                 return r;
683         }
684
685         *ret = b;
686         return 0;
687 }
688
689 int sd_bus_open_address(const char *address, sd_bus **ret) {
690         sd_bus *b;
691         int r;
692
693         if (!address)
694                 return -EINVAL;
695         if (!ret)
696                 return -EINVAL;
697
698         b = bus_new();
699         if (!b)
700                 return -ENOMEM;
701
702         b->address = strdup(address);
703         if (!b->address) {
704                 bus_free(b);
705                 return -ENOMEM;
706         }
707
708         r = bus_start_connect(b);
709         if (r < 0) {
710                 bus_free(b);
711                 return r;
712         }
713
714         *ret = b;
715         return 0;
716 }
717
718 int sd_bus_open_fd(int fd, sd_bus **ret) {
719         sd_bus *b;
720         int r;
721
722         if (fd < 0)
723                 return -EINVAL;
724         if (!ret)
725                 return -EINVAL;
726
727         b = bus_new();
728         if (!b)
729                 return -ENOMEM;
730
731         b->fd = fd;
732         fd_nonblock(b->fd, true);
733         fd_cloexec(b->fd, true);
734
735         r = bus_start_auth(b);
736         if (r < 0) {
737                 bus_free(b);
738                 return r;
739         }
740
741         *ret = b;
742         return 0;
743 }
744
745 void sd_bus_close(sd_bus *bus) {
746         if (!bus)
747                 return;
748         if (bus->fd < 0)
749                 return;
750
751         close_nointr_nofail(bus->fd);
752         bus->fd = -1;
753 }
754
755 sd_bus *sd_bus_ref(sd_bus *bus) {
756         if (!bus)
757                 return NULL;
758
759         assert(bus->n_ref > 0);
760
761         bus->n_ref++;
762         return bus;
763 }
764
765 sd_bus *sd_bus_unref(sd_bus *bus) {
766         if (!bus)
767                 return NULL;
768
769         assert(bus->n_ref > 0);
770         bus->n_ref--;
771
772         if (bus->n_ref <= 0)
773                 bus_free(bus);
774
775         return NULL;
776 }
777
778 int sd_bus_is_open(sd_bus *bus) {
779         if (!bus)
780                 return -EINVAL;
781
782         return bus->fd >= 0;
783 }
784
785 int sd_bus_can_send(sd_bus *bus, char type) {
786         int r;
787
788         if (!bus)
789                 return -EINVAL;
790
791         if (type == SD_BUS_TYPE_UNIX_FD) {
792                 r = ensure_running(bus);
793                 if (r < 0)
794                         return r;
795
796                 return bus->can_fds;
797         }
798
799         return bus_type_is_valid(type);
800 }
801
802 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
803         int r;
804
805         if (!bus)
806                 return -EINVAL;
807         if (!peer)
808                 return -EINVAL;
809
810         r = ensure_running(bus);
811         if (r < 0)
812                 return r;
813
814         *peer = bus->peer;
815         return 0;
816 }
817
818 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
819         assert(m);
820
821         if (m->header->version > b->message_version)
822                 return -EPERM;
823
824         if (m->sealed)
825                 return 0;
826
827         return bus_message_seal(m, ++b->serial);
828 }
829
830 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
831         struct msghdr mh;
832         struct iovec *iov;
833         ssize_t k;
834         size_t n;
835         unsigned j;
836
837         assert(bus);
838         assert(m);
839         assert(idx);
840         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
841
842         if (*idx >= m->size)
843                 return 0;
844
845         n = m->n_iovec * sizeof(struct iovec);
846         iov = alloca(n);
847         memcpy(iov, m->iovec, n);
848
849         j = 0;
850         iovec_advance(iov, &j, *idx);
851
852         zero(mh);
853         mh.msg_iov = iov;
854         mh.msg_iovlen = m->n_iovec;
855
856         k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
857         if (k < 0)
858                 return errno == EAGAIN ? 0 : -errno;
859
860         *idx += (size_t) k;
861         return 1;
862 }
863
864 static int message_read_need(sd_bus *bus, size_t *need) {
865         uint32_t a, b;
866         uint8_t e;
867         uint64_t sum;
868
869         assert(bus);
870         assert(need);
871         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
872
873         if (bus->rbuffer_size < sizeof(struct bus_header)) {
874                 *need = sizeof(struct bus_header) + 8;
875
876                 /* Minimum message size:
877                  *
878                  * Header +
879                  *
880                  *  Method Call: +2 string headers
881                  *       Signal: +3 string headers
882                  * Method Error: +1 string headers
883                  *               +1 uint32 headers
884                  * Method Reply: +1 uint32 headers
885                  *
886                  * A string header is at least 9 bytes
887                  * A uint32 header is at least 8 bytes
888                  *
889                  * Hence the minimum message size of a valid message
890                  * is header + 8 bytes */
891
892                 return 0;
893         }
894
895         a = ((const uint32_t*) bus->rbuffer)[1];
896         b = ((const uint32_t*) bus->rbuffer)[3];
897
898         e = ((const uint8_t*) bus->rbuffer)[0];
899         if (e == SD_BUS_LITTLE_ENDIAN) {
900                 a = le32toh(a);
901                 b = le32toh(b);
902         } else if (e == SD_BUS_BIG_ENDIAN) {
903                 a = be32toh(a);
904                 b = be32toh(b);
905         } else
906                 return -EBADMSG;
907
908         sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
909         if (sum >= BUS_MESSAGE_SIZE_MAX)
910                 return -ENOBUFS;
911
912         *need = (size_t) sum;
913         return 0;
914 }
915
916 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
917         sd_bus_message *t;
918         void *b = NULL;
919         int r;
920
921         assert(bus);
922         assert(m);
923         assert(bus->rbuffer_size >= size);
924         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
925
926         if (bus->rbuffer_size > size) {
927                 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
928                 if (!b) {
929                         free(t);
930                         return -ENOMEM;
931                 }
932         }
933
934         r = bus_message_from_malloc(bus->rbuffer, size, &t);
935         if (r < 0) {
936                 free(b);
937                 return r;
938         }
939
940         bus->rbuffer = b;
941         bus->rbuffer_size -= size;
942
943         *m = t;
944         return 1;
945 }
946
947 static int message_read(sd_bus *bus, sd_bus_message **m) {
948         struct msghdr mh;
949         struct iovec iov;
950         ssize_t k;
951         size_t need;
952         int r;
953         void *b;
954
955         assert(bus);
956         assert(m);
957         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
958
959         r = message_read_need(bus, &need);
960         if (r < 0)
961                 return r;
962
963         if (bus->rbuffer_size >= need)
964                 return message_make(bus, need, m);
965
966         b = realloc(bus->rbuffer, need);
967         if (!b)
968                 return -ENOMEM;
969
970         bus->rbuffer = b;
971
972         zero(iov);
973         iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
974         iov.iov_len = need - bus->rbuffer_size;
975
976         zero(mh);
977         mh.msg_iov = &iov;
978         mh.msg_iovlen = 1;
979
980         k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
981         if (k < 0)
982                 return errno == EAGAIN ? 0 : -errno;
983
984         bus->rbuffer_size += k;
985
986         r = message_read_need(bus, &need);
987         if (r < 0)
988                 return r;
989
990         if (bus->rbuffer_size >= need)
991                 return message_make(bus, need, m);
992
993         return 1;
994 }
995
996 static int dispatch_wqueue(sd_bus *bus) {
997         int r, ret = 0;
998
999         assert(bus);
1000         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1001
1002         if (bus->fd < 0)
1003                 return -ENOTCONN;
1004
1005         while (bus->wqueue_size > 0) {
1006
1007                 r = message_write(bus, bus->wqueue[0], &bus->windex);
1008                 if (r < 0) {
1009                         sd_bus_close(bus);
1010                         return r;
1011                 } else if (r == 0)
1012                         /* Didn't do anything this time */
1013                         return ret;
1014                 else if (bus->windex >= bus->wqueue[0]->size) {
1015                         /* Fully written. Let's drop the entry from
1016                          * the queue.
1017                          *
1018                          * This isn't particularly optimized, but
1019                          * well, this is supposed to be our worst-case
1020                          * buffer only, and the socket buffer is
1021                          * supposed to be our primary buffer, and if
1022                          * it got full, then all bets are off
1023                          * anyway. */
1024
1025                         sd_bus_message_unref(bus->wqueue[0]);
1026                         bus->wqueue_size --;
1027                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1028                         bus->windex = 0;
1029
1030                         ret = 1;
1031                 }
1032         }
1033
1034         return ret;
1035 }
1036
1037 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1038         sd_bus_message *z = NULL;
1039         int r, ret = 0;
1040
1041         assert(bus);
1042         assert(m);
1043         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1044
1045         if (bus->fd < 0)
1046                 return -ENOTCONN;
1047
1048         if (bus->rqueue_size > 0) {
1049                 /* Dispatch a queued message */
1050
1051                 *m = bus->rqueue[0];
1052                 bus->rqueue_size --;
1053                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1054                 return 1;
1055         }
1056
1057         /* Try to read a new message */
1058         do {
1059                 r = message_read(bus, &z);
1060                 if (r < 0) {
1061                         sd_bus_close(bus);
1062                         return r;
1063                 }
1064                 if (r == 0)
1065                         return ret;
1066
1067                 r = 1;
1068         } while (!z);
1069
1070         *m = z;
1071         return 1;
1072 }
1073
1074 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1075         int r;
1076
1077         if (!bus)
1078                 return -EINVAL;
1079         if (bus->fd < 0)
1080                 return -ENOTCONN;
1081         if (!m)
1082                 return -EINVAL;
1083
1084         /* If the serial number isn't kept, then we know that no reply
1085          * is expected */
1086         if (!serial && !m->sealed)
1087                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1088
1089         r = bus_seal_message(bus, m);
1090         if (r < 0)
1091                 return r;
1092
1093         /* If this is a reply and no reply was requested, then let's
1094          * suppress this, if we can */
1095         if (m->dont_send && !serial)
1096                 return 0;
1097
1098         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1099                 size_t idx = 0;
1100
1101                 r = message_write(bus, m, &idx);
1102                 if (r < 0) {
1103                         sd_bus_close(bus);
1104                         return r;
1105                 } else if (idx < m->size)  {
1106                         /* Wasn't fully written. So let's remember how
1107                          * much was written. Note that the first entry
1108                          * of the wqueue array is always allocated so
1109                          * that we always can remember how much was
1110                          * written. */
1111                         bus->wqueue[0] = sd_bus_message_ref(m);
1112                         bus->wqueue_size = 1;
1113                         bus->windex = idx;
1114                 }
1115         } else {
1116                 sd_bus_message **q;
1117
1118                 /* Just append it to the queue. */
1119
1120                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1121                         return -ENOBUFS;
1122
1123                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1124                 if (!q)
1125                         return -ENOMEM;
1126
1127                 bus->wqueue = q;
1128                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1129         }
1130
1131         if (serial)
1132                 *serial = BUS_MESSAGE_SERIAL(m);
1133
1134         return 0;
1135 }
1136
1137 static usec_t calc_elapse(uint64_t usec) {
1138         if (usec == (uint64_t) -1)
1139                 return 0;
1140
1141         if (usec == 0)
1142                 usec = BUS_DEFAULT_TIMEOUT;
1143
1144         return now(CLOCK_MONOTONIC) + usec;
1145 }
1146
1147 static int timeout_compare(const void *a, const void *b) {
1148         const struct reply_callback *x = a, *y = b;
1149
1150         if (x->timeout != 0 && y->timeout == 0)
1151                 return -1;
1152
1153         if (x->timeout == 0 && y->timeout != 0)
1154                 return 1;
1155
1156         if (x->timeout < y->timeout)
1157                 return -1;
1158
1159         if (x->timeout > y->timeout)
1160                 return 1;
1161
1162         return 0;
1163 }
1164
1165 int sd_bus_send_with_reply(
1166                 sd_bus *bus,
1167                 sd_bus_message *m,
1168                 sd_message_handler_t callback,
1169                 void *userdata,
1170                 uint64_t usec,
1171                 uint64_t *serial) {
1172
1173         struct reply_callback *c;
1174         int r;
1175
1176         if (!bus)
1177                 return -EINVAL;
1178         if (bus->fd < 0)
1179                 return -ENOTCONN;
1180         if (!m)
1181                 return -EINVAL;
1182         if (!callback)
1183                 return -EINVAL;
1184         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1185                 return -EINVAL;
1186         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1187                 return -EINVAL;
1188
1189         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1190         if (r < 0)
1191                 return r;
1192
1193         if (usec != (uint64_t) -1) {
1194                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1195                 if (r < 0)
1196                         return r;
1197         }
1198
1199         r = bus_seal_message(bus, m);
1200         if (r < 0)
1201                 return r;
1202
1203         c = new(struct reply_callback, 1);
1204         if (!c)
1205                 return -ENOMEM;
1206
1207         c->callback = callback;
1208         c->userdata = userdata;
1209         c->serial = BUS_MESSAGE_SERIAL(m);
1210         c->timeout = calc_elapse(usec);
1211
1212         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1213         if (r < 0) {
1214                 free(c);
1215                 return r;
1216         }
1217
1218         if (c->timeout != 0) {
1219                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1220                 if (r < 0) {
1221                         c->timeout = 0;
1222                         sd_bus_send_with_reply_cancel(bus, c->serial);
1223                         return r;
1224                 }
1225         }
1226
1227         r = sd_bus_send(bus, m, serial);
1228         if (r < 0) {
1229                 sd_bus_send_with_reply_cancel(bus, c->serial);
1230                 return r;
1231         }
1232
1233         return r;
1234 }
1235
1236 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1237         struct reply_callback *c;
1238
1239         if (!bus)
1240                 return -EINVAL;
1241         if (serial == 0)
1242                 return -EINVAL;
1243
1244         c = hashmap_remove(bus->reply_callbacks, &serial);
1245         if (!c)
1246                 return 0;
1247
1248         if (c->timeout != 0)
1249                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1250
1251         free(c);
1252         return 1;
1253 }
1254
1255 static int ensure_running(sd_bus *bus) {
1256         int r;
1257
1258         assert(bus);
1259
1260         if (bus->state == BUS_RUNNING)
1261                 return 1;
1262
1263         for (;;) {
1264                 r = sd_bus_process(bus, NULL);
1265                 if (r < 0)
1266                         return r;
1267                 if (bus->state == BUS_RUNNING)
1268                         return 1;
1269                 if (r > 0)
1270                         continue;
1271
1272                 r = sd_bus_wait(bus, (uint64_t) -1);
1273                 if (r < 0)
1274                         return r;
1275         }
1276 }
1277
1278 int sd_bus_send_with_reply_and_block(
1279                 sd_bus *bus,
1280                 sd_bus_message *m,
1281                 uint64_t usec,
1282                 sd_bus_error *error,
1283                 sd_bus_message **reply) {
1284
1285         int r;
1286         usec_t timeout;
1287         uint64_t serial;
1288         bool room = false;
1289
1290         if (!bus)
1291                 return -EINVAL;
1292         if (bus->fd < 0)
1293                 return -ENOTCONN;
1294         if (!m)
1295                 return -EINVAL;
1296         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1297                 return -EINVAL;
1298         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1299                 return -EINVAL;
1300         if (bus_error_is_dirty(error))
1301                 return -EINVAL;
1302
1303         r = ensure_running(bus);
1304         if (r < 0)
1305                 return r;
1306
1307         r = sd_bus_send(bus, m, &serial);
1308         if (r < 0)
1309                 return r;
1310
1311         timeout = calc_elapse(usec);
1312
1313         for (;;) {
1314                 usec_t left;
1315                 sd_bus_message *incoming = NULL;
1316
1317                 if (!room) {
1318                         sd_bus_message **q;
1319
1320                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1321                                 return -ENOBUFS;
1322
1323                         /* Make sure there's room for queuing this
1324                          * locally, before we read the message */
1325
1326                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1327                         if (!q)
1328                                 return -ENOMEM;
1329
1330                         bus->rqueue = q;
1331                         room = true;
1332                 }
1333
1334                 r = message_read(bus, &incoming);
1335                 if (r < 0)
1336                         return r;
1337                 if (incoming) {
1338
1339                         if (incoming->reply_serial == serial) {
1340                                 /* Found a match! */
1341
1342                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1343                                         *reply = incoming;
1344                                         return 0;
1345                                 }
1346
1347                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1348                                         int k;
1349
1350                                         r = sd_bus_error_copy(error, &incoming->error);
1351                                         if (r < 0) {
1352                                                 sd_bus_message_unref(incoming);
1353                                                 return r;
1354                                         }
1355
1356                                         k = bus_error_to_errno(&incoming->error);
1357                                         sd_bus_message_unref(incoming);
1358                                         return k;
1359                                 }
1360
1361                                 sd_bus_message_unref(incoming);
1362                                 return -EIO;
1363                         }
1364
1365                         /* There's already guaranteed to be room for
1366                          * this, so need to resize things here */
1367                         bus->rqueue[bus->rqueue_size ++] = incoming;
1368                         room = false;
1369
1370                         /* Try to read more, right-away */
1371                         continue;
1372                 }
1373                 if (r != 0)
1374                         continue;
1375
1376                 if (timeout > 0) {
1377                         usec_t n;
1378
1379                         n = now(CLOCK_MONOTONIC);
1380                         if (n >= timeout)
1381                                 return -ETIMEDOUT;
1382
1383                         left = timeout - n;
1384                 } else
1385                         left = (uint64_t) -1;
1386
1387                 r = bus_poll(bus, true, left);
1388                 if (r < 0)
1389                         return r;
1390
1391                 r = dispatch_wqueue(bus);
1392                 if (r < 0)
1393                         return r;
1394         }
1395 }
1396
1397 int sd_bus_get_fd(sd_bus *bus) {
1398         if (!bus)
1399                 return -EINVAL;
1400
1401         if (bus->fd < 0)
1402                 return -ENOTCONN;
1403
1404         return bus->fd;
1405 }
1406
1407 int sd_bus_get_events(sd_bus *bus) {
1408         int flags = 0;
1409
1410         if (!bus)
1411                 return -EINVAL;
1412         if (bus->fd < 0)
1413                 return -ENOTCONN;
1414
1415         if (bus->state == BUS_OPENING)
1416                 flags |= POLLOUT;
1417         else if (bus->state == BUS_AUTHENTICATING) {
1418
1419                 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1420                         flags |= POLLOUT;
1421
1422                 flags |= POLLIN;
1423
1424         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1425                 if (bus->rqueue_size <= 0)
1426                         flags |= POLLIN;
1427                 if (bus->wqueue_size > 0)
1428                         flags |= POLLOUT;
1429         }
1430
1431         return flags;
1432 }
1433
1434 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1435         struct reply_callback *c;
1436
1437         if (!bus)
1438                 return -EINVAL;
1439         if (!timeout_usec)
1440                 return -EINVAL;
1441         if (bus->fd < 0)
1442                 return -ENOTCONN;
1443
1444         if (bus->state == BUS_AUTHENTICATING) {
1445                 *timeout_usec = bus->auth_timeout;
1446                 return 1;
1447         }
1448
1449         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1450                 return 0;
1451
1452         c = prioq_peek(bus->reply_callbacks_prioq);
1453         if (!c)
1454                 return 0;
1455
1456         *timeout_usec = c->timeout;
1457         return 1;
1458 }
1459
1460 static int process_timeout(sd_bus *bus) {
1461         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1462         struct reply_callback *c;
1463         usec_t n;
1464         int r;
1465
1466         assert(bus);
1467
1468         c = prioq_peek(bus->reply_callbacks_prioq);
1469         if (!c)
1470                 return 0;
1471
1472         n = now(CLOCK_MONOTONIC);
1473         if (c->timeout > n)
1474                 return 0;
1475
1476         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1477         hashmap_remove(bus->reply_callbacks, &c->serial);
1478
1479         r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1480         free(c);
1481
1482         return r < 0 ? r : 1;
1483 }
1484
1485 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1486         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1487         int r;
1488
1489         assert(bus);
1490         assert(m);
1491
1492         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1493                 return 0;
1494
1495         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1496                 return 0;
1497
1498         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1499                 return 1;
1500
1501         if (streq_ptr(m->member, "Ping"))
1502                 r = sd_bus_message_new_method_return(bus, m, &reply);
1503         else if (streq_ptr(m->member, "GetMachineId")) {
1504                 sd_id128_t id;
1505                 char sid[33];
1506
1507                 r = sd_id128_get_machine(&id);
1508                 if (r < 0)
1509                         return r;
1510
1511                 r = sd_bus_message_new_method_return(bus, m, &reply);
1512                 if (r < 0)
1513                         return r;
1514
1515                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1516         } else {
1517                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1518
1519                 sd_bus_error_set(&error,
1520                                  "org.freedesktop.DBus.Error.UnknownMethod",
1521                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1522
1523                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1524         }
1525
1526         if (r < 0)
1527                 return r;
1528
1529         r = sd_bus_send(bus, reply, NULL);
1530         if (r < 0)
1531                 return r;
1532
1533         return 1;
1534 }
1535
1536 static int process_message(sd_bus *bus, sd_bus_message *m) {
1537         struct filter_callback *l;
1538         int r;
1539
1540         assert(bus);
1541         assert(m);
1542
1543         if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1544                 struct reply_callback *c;
1545
1546                 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1547                 if (c) {
1548                         if (c->timeout != 0)
1549                                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1550
1551                         r = c->callback(bus, 0, m, c->userdata);
1552                         free(c);
1553
1554                         if (r != 0)
1555                                 return r;
1556                 }
1557         }
1558
1559         LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1560                 r = l->callback(bus, 0, m, l->userdata);
1561                 if (r != 0)
1562                         return r;
1563         }
1564
1565         return process_builtin(bus, m);
1566 }
1567
1568 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1569         int r;
1570
1571         /* Returns 0 when we didn't do anything. This should cause the
1572          * caller to invoke sd_bus_wait() before returning the next
1573          * time. Returns > 0 when we did something, which possibly
1574          * means *ret is filled in with an unprocessed message. */
1575
1576         if (!bus)
1577                 return -EINVAL;
1578         if (bus->fd < 0)
1579                 return -ENOTCONN;
1580
1581         if (bus->state == BUS_OPENING) {
1582                 struct pollfd p;
1583
1584                 zero(p);
1585                 p.fd = bus->fd;
1586                 p.events = POLLOUT;
1587
1588                 r = poll(&p, 1, 0);
1589                 if (r < 0)
1590                         return -errno;
1591
1592                 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1593                         int error = 0;
1594                         socklen_t slen = sizeof(error);
1595
1596                         r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1597                         if (r < 0)
1598                                 bus->last_connect_error = errno;
1599                         else if (error != 0)
1600                                 bus->last_connect_error = error;
1601                         else if (p.revents & (POLLERR|POLLHUP))
1602                                 bus->last_connect_error = ECONNREFUSED;
1603                         else {
1604                                 r = bus_start_auth(bus);
1605                                 goto null_message;
1606                         }
1607
1608                         /* Try next address */
1609                         r = bus_start_connect(bus);
1610                         goto null_message;
1611                 }
1612
1613                 r = 0;
1614                 goto null_message;
1615
1616         } else if (bus->state == BUS_AUTHENTICATING) {
1617
1618                 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1619                         return -ETIMEDOUT;
1620
1621                 r = bus_write_auth(bus);
1622                 if (r != 0)
1623                         goto null_message;
1624
1625                 r = bus_read_auth(bus);
1626                 goto null_message;
1627
1628         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1629                 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1630                 int k;
1631
1632                 r = process_timeout(bus);
1633                 if (r != 0)
1634                         goto null_message;
1635
1636                 r = dispatch_wqueue(bus);
1637                 if (r != 0)
1638                         goto null_message;
1639
1640                 k = r;
1641                 r = dispatch_rqueue(bus, &m);
1642                 if (r < 0)
1643                         return r;
1644                 if (!m) {
1645                         if (r == 0)
1646                                 r = k;
1647                         goto null_message;
1648                 }
1649
1650                 r = process_message(bus, m);
1651                 if (r != 0)
1652                         goto null_message;
1653
1654                 if (ret) {
1655                         *ret = m;
1656                         m = NULL;
1657                         return 1;
1658                 }
1659
1660                 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1661                         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1662                         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1663
1664                         sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1665
1666                         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1667                         if (r < 0)
1668                                 return r;
1669
1670                         r = sd_bus_send(bus, reply, NULL);
1671                         if (r < 0)
1672                                 return r;
1673                 }
1674
1675                 return 1;
1676         }
1677
1678         assert_not_reached("Unknown state");
1679
1680 null_message:
1681         if (r >= 0 && ret)
1682                 *ret = NULL;
1683
1684         return r;
1685 }
1686
1687 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1688         struct pollfd p;
1689         int r, e;
1690         struct timespec ts;
1691         usec_t until, m;
1692
1693         assert(bus);
1694
1695         if (bus->fd < 0)
1696                 return -ENOTCONN;
1697
1698         e = sd_bus_get_events(bus);
1699         if (e < 0)
1700                 return e;
1701
1702         if (need_more)
1703                 e |= POLLIN;
1704
1705         r = sd_bus_get_timeout(bus, &until);
1706         if (r < 0)
1707                 return r;
1708         if (r == 0)
1709                 m = (uint64_t) -1;
1710         else {
1711                 usec_t n;
1712                 n = now(CLOCK_MONOTONIC);
1713                 m = until > n ? until - n : 0;
1714         }
1715
1716         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1717                 m = timeout_usec;
1718
1719         zero(p);
1720         p.fd = bus->fd;
1721         p.events = e;
1722
1723         r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1724         if (r < 0)
1725                 return -errno;
1726
1727         return r > 0 ? 1 : 0;
1728 }
1729
1730 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1731
1732         if (!bus)
1733                 return -EINVAL;
1734         if (bus->fd < 0)
1735                 return -ENOTCONN;
1736         if (bus->rqueue_size > 0)
1737                 return 0;
1738
1739         return bus_poll(bus, false, timeout_usec);
1740 }
1741
1742 int sd_bus_flush(sd_bus *bus) {
1743         int r;
1744
1745         if (!bus)
1746                 return -EINVAL;
1747         if (bus->fd < 0)
1748                 return -ENOTCONN;
1749
1750         r = ensure_running(bus);
1751         if (r < 0)
1752                 return r;
1753
1754         if (bus->wqueue_size <= 0)
1755                 return 0;
1756
1757         for (;;) {
1758                 r = dispatch_wqueue(bus);
1759                 if (r < 0)
1760                         return r;
1761
1762                 if (bus->wqueue_size <= 0)
1763                         return 0;
1764
1765                 r = bus_poll(bus, false, (uint64_t) -1);
1766                 if (r < 0)
1767                         return r;
1768         }
1769 }
1770
1771 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1772         struct filter_callback *f;
1773
1774         if (!bus)
1775                 return -EINVAL;
1776         if (!callback)
1777                 return -EINVAL;
1778
1779         f = new(struct filter_callback, 1);
1780         if (!f)
1781                 return -ENOMEM;
1782         f->callback = callback;
1783         f->userdata = userdata;
1784
1785         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1786         return 0;
1787 }
1788
1789 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1790         struct filter_callback *f;
1791
1792         if (!bus)
1793                 return -EINVAL;
1794         if (!callback)
1795                 return -EINVAL;
1796
1797         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1798                 if (f->callback == callback && f->userdata == userdata) {
1799                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1800                         free(f);
1801                         return 1;
1802                 }
1803         }
1804
1805         return 0;
1806 }