chiark / gitweb /
09ea01b951545283b365740d54f9d84affb7fde1
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32
33 #include "sd-bus.h"
34 #include "bus-internal.h"
35 #include "bus-message.h"
36 #include "bus-type.h"
37
38 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
39
40 static void bus_free(sd_bus *b) {
41         struct filter_callback *f;
42         unsigned i;
43
44         assert(b);
45
46         if (b->fd >= 0)
47                 close_nointr_nofail(b->fd);
48
49         free(b->rbuffer);
50         free(b->unique_name);
51         free(b->auth_uid);
52         free(b->address);
53
54         for (i = 0; i < b->rqueue_size; i++)
55                 sd_bus_message_unref(b->rqueue[i]);
56         free(b->rqueue);
57
58         for (i = 0; i < b->wqueue_size; i++)
59                 sd_bus_message_unref(b->wqueue[i]);
60         free(b->wqueue);
61
62         hashmap_free_free(b->reply_callbacks);
63         prioq_free(b->reply_callbacks_prioq);
64
65         while ((f = b->filter_callbacks)) {
66                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
67                 free(f);
68         }
69
70         free(b);
71 }
72
73 static sd_bus* bus_new(void) {
74         sd_bus *r;
75
76         r = new0(sd_bus, 1);
77         if (!r)
78                 return NULL;
79
80         r->n_ref = 1;
81         r->fd = -1;
82         r->message_version = 1;
83
84         /* We guarantee that wqueue always has space for at least one
85          * entry */
86         r->wqueue = new(sd_bus_message*, 1);
87         if (!r->wqueue) {
88                 free(r);
89                 return NULL;
90         }
91
92         return r;
93 };
94
95 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
96         const char *s;
97         int r;
98
99         assert(bus);
100
101         if (error != 0)
102                 return -error;
103
104         assert(reply);
105
106         r = sd_bus_message_read(reply, "s", &s);
107         if (r < 0)
108                 return r;
109
110         if (!service_name_is_valid(s) || s[0] != ':')
111                 return -EBADMSG;
112
113         bus->unique_name = strdup(s);
114         if (!bus->unique_name)
115                 return -ENOMEM;
116
117         bus->state = BUS_RUNNING;
118
119         return 1;
120 }
121
122 static int bus_send_hello(sd_bus *bus) {
123         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
124         int r;
125
126         assert(bus);
127
128         r = sd_bus_message_new_method_call(
129                         bus,
130                         "org.freedesktop.DBus",
131                         "/",
132                         "org.freedesktop.DBus",
133                         "Hello",
134                         &m);
135         if (r < 0)
136                 return r;
137
138         r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
139         if (r < 0)
140                 return r;
141
142         bus->sent_hello = true;
143         return r;
144 }
145
146 static int bus_start_running(sd_bus *bus) {
147         assert(bus);
148
149         if (bus->sent_hello) {
150                 bus->state = BUS_HELLO;
151                 return 1;
152         }
153
154         bus->state = BUS_RUNNING;
155         return 1;
156 }
157
158 static int parse_address_key(const char **p, const char *key, char **value) {
159         size_t l, n = 0;
160         const char *a;
161         char *r = NULL;
162
163         assert(p);
164         assert(*p);
165         assert(key);
166         assert(value);
167
168         l = strlen(key);
169         if (strncmp(*p, key, l) != 0)
170                 return 0;
171
172         if ((*p)[l] != '=')
173                 return 0;
174
175         if (*value)
176                 return -EINVAL;
177
178         a = *p + l + 1;
179         while (*a != ',' && *a != 0) {
180                 char c, *t;
181
182                 if (*a == '%') {
183                         int x, y;
184
185                         x = unhexchar(a[1]);
186                         if (x < 0) {
187                                 free(r);
188                                 return x;
189                         }
190
191                         y = unhexchar(a[2]);
192                         if (y < 0) {
193                                 free(r);
194                                 return y;
195                         }
196
197                         c = (char) ((x << 4) | y);
198                         a += 3;
199                 } else {
200                         c = *a;
201                         a++;
202                 }
203
204                 t = realloc(r, n + 2);
205                 if (!t) {
206                         free(r);
207                         return -ENOMEM;
208                 }
209
210                 r = t;
211                 r[n++] = c;
212         }
213
214         if (!r) {
215                 r = strdup("");
216                 if (!r)
217                         return -ENOMEM;
218         } else
219                 r[n] = 0;
220
221         if (*a == ',')
222                 a++;
223
224         *p = a;
225         *value = r;
226         return 1;
227 }
228
229 static void skip_address_key(const char **p) {
230         assert(p);
231         assert(*p);
232
233         *p += strcspn(*p, ",");
234
235         if (**p == ',')
236                 (*p) ++;
237 }
238
239 static int bus_parse_next_address(sd_bus *b) {
240         const char *a, *p;
241         _cleanup_free_ char *guid = NULL;
242         int r;
243
244         assert(b);
245
246         if (!b->address)
247                 return 0;
248         if (b->address[b->address_index] == 0)
249                 return 0;
250
251         a = b->address + b->address_index;
252
253         zero(b->sockaddr);
254         b->sockaddr_size = 0;
255         b->peer = SD_ID128_NULL;
256
257         if (startswith(a, "unix:")) {
258                 _cleanup_free_ char *path = NULL, *abstract = NULL;
259
260                 p = a + 5;
261                 while (*p != 0) {
262                         r = parse_address_key(&p, "guid", &guid);
263                         if (r < 0)
264                                 return r;
265                         else if (r > 0)
266                                 continue;
267
268                         r = parse_address_key(&p, "path", &path);
269                         if (r < 0)
270                                 return r;
271                         else if (r > 0)
272                                 continue;
273
274                         r = parse_address_key(&p, "abstract", &abstract);
275                         if (r < 0)
276                                 return r;
277                         else if (r > 0)
278                                 continue;
279
280                         skip_address_key(&p);
281                 }
282
283                 if (!path && !abstract)
284                         return -EINVAL;
285
286                 if (path && abstract)
287                         return -EINVAL;
288
289                 if (path) {
290                         size_t l;
291
292                         l = strlen(path);
293                         if (l > sizeof(b->sockaddr.un.sun_path))
294                                 return -E2BIG;
295
296                         b->sockaddr.un.sun_family = AF_UNIX;
297                         strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
298                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
299                 } else if (abstract) {
300                         size_t l;
301
302                         l = strlen(abstract);
303                         if (l > sizeof(b->sockaddr.un.sun_path) - 1)
304                                 return -E2BIG;
305
306                         b->sockaddr.un.sun_family = AF_UNIX;
307                         b->sockaddr.un.sun_path[0] = 0;
308                         strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
309                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
310                 }
311
312         } else if (startswith(a, "tcp:")) {
313                 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
314                 struct addrinfo hints, *result;
315
316                 p = a + 4;
317                 while (*p != 0) {
318                         r = parse_address_key(&p, "guid", &guid);
319                         if (r < 0)
320                                 return r;
321                         else if (r > 0)
322                                 continue;
323
324                         r = parse_address_key(&p, "host", &host);
325                         if (r < 0)
326                                 return r;
327                         else if (r > 0)
328                                 continue;
329
330                         r = parse_address_key(&p, "port", &port);
331                         if (r < 0)
332                                 return r;
333                         else if (r > 0)
334                                 continue;
335
336                         r = parse_address_key(&p, "family", &family);
337                         if (r < 0)
338                                 return r;
339                         else if (r > 0)
340                                 continue;
341
342                         skip_address_key(&p);
343                 }
344
345                 if (!host || !port)
346                         return -EINVAL;
347
348                 zero(hints);
349                 hints.ai_socktype = SOCK_STREAM;
350                 hints.ai_flags = AI_ADDRCONFIG;
351
352                 if (family) {
353                         if (streq(family, "ipv4"))
354                                 hints.ai_family = AF_INET;
355                         else if (streq(family, "ipv6"))
356                                 hints.ai_family = AF_INET6;
357                         else
358                                 return -EINVAL;
359                 }
360
361                 r = getaddrinfo(host, port, &hints, &result);
362                 if (r == EAI_SYSTEM)
363                         return -errno;
364                 else if (r != 0)
365                         return -EADDRNOTAVAIL;
366
367                 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
368                 b->sockaddr_size = result->ai_addrlen;
369
370                 freeaddrinfo(result);
371         }
372
373         if (guid) {
374                 r = sd_id128_from_string(guid, &b->peer);
375                 if (r < 0)
376                         return r;
377         }
378
379         b->address_index = p - b->address;
380         return 1;
381 }
382
383 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
384
385         while (size > 0) {
386                 struct iovec *i = iov + *idx;
387
388                 if (i->iov_len > size) {
389                         i->iov_base = (uint8_t*) i->iov_base + size;
390                         i->iov_len -= size;
391                         return;
392                 }
393
394                 size -= i->iov_len;
395
396                 i->iov_base = NULL;
397                 i->iov_len = 0;
398
399                 (*idx) ++;
400         }
401 }
402
403 static int bus_write_auth(sd_bus *b) {
404         struct msghdr mh;
405         ssize_t k;
406
407         assert(b);
408         assert(b->state == BUS_AUTHENTICATING);
409
410         if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
411                 return 0;
412
413         if (b->auth_timeout == 0)
414                 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
415
416         zero(mh);
417         mh.msg_iov = b->auth_iovec + b->auth_index;
418         mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
419
420         k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
421         if (k < 0)
422                 return errno == EAGAIN ? 0 : -errno;
423
424         iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
425
426         return 1;
427 }
428
429 static int bus_auth_verify(sd_bus *b) {
430         char *e, *f;
431         sd_id128_t peer;
432         unsigned i;
433         int r;
434
435         /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
436          * that's it */
437
438         e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
439         if (!e)
440                 return 0;
441
442         f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
443         if (!f)
444                 return 0;
445
446         if (e - (char*) b->rbuffer != 3 + 32)
447                 return -EPERM;
448
449         if (memcmp(b->rbuffer, "OK ", 3))
450                 return -EPERM;
451
452         for (i = 0; i < 32; i += 2) {
453                 int x, y;
454
455                 x = unhexchar(((char*) b->rbuffer)[3 + i]);
456                 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
457
458                 if (x < 0 || y < 0)
459                         return -EINVAL;
460
461                 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
462         }
463
464         if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
465             !sd_id128_equal(b->peer, peer))
466                 return -EPERM;
467
468         b->peer = peer;
469
470         b->can_fds =
471                 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
472                 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
473
474         b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
475         memmove(b->rbuffer, f + 2, b->rbuffer_size);
476
477         r = bus_start_running(b);
478         if (r < 0)
479                 return r;
480
481         return 1;
482 }
483
484 static int bus_read_auth(sd_bus *b) {
485         struct msghdr mh;
486         struct iovec iov;
487         size_t n;
488         ssize_t k;
489         int r;
490         void *p;
491
492         assert(b);
493
494         r = bus_auth_verify(b);
495         if (r != 0)
496                 return r;
497
498         n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
499
500         if (n > BUS_AUTH_SIZE_MAX)
501                 n = BUS_AUTH_SIZE_MAX;
502
503         if (b->rbuffer_size >= n)
504                 return -ENOBUFS;
505
506         p = realloc(b->rbuffer, n);
507         if (!p)
508                 return -ENOMEM;
509
510         b->rbuffer = p;
511
512         zero(iov);
513         iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
514         iov.iov_len = n - b->rbuffer_size;
515
516         zero(mh);
517         mh.msg_iov = &iov;
518         mh.msg_iovlen = 1;
519
520         k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
521         if (k < 0)
522                 return errno == EAGAIN ? 0 : -errno;
523
524         b->rbuffer_size += k;
525
526         r = bus_auth_verify(b);
527         if (r != 0)
528                 return r;
529
530         return 1;
531 }
532
533 static int bus_start_auth(sd_bus *b) {
534         static const char auth_prefix[] = "\0AUTH EXTERNAL ";
535         static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
536
537         char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
538         size_t l;
539
540         assert(b);
541
542         b->state = BUS_AUTHENTICATING;
543
544         snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
545         char_array_0(text);
546
547         l = strlen(text);
548         b->auth_uid = hexmem(text, l);
549         if (!b->auth_uid)
550                 return -ENOMEM;
551
552         b->auth_iovec[0].iov_base = (void*) auth_prefix;
553         b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
554         b->auth_iovec[1].iov_base = (void*) b->auth_uid;
555         b->auth_iovec[1].iov_len = l * 2;
556         b->auth_iovec[2].iov_base = (void*) auth_suffix;
557         b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
558         b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
559
560         return bus_write_auth(b);
561 }
562
563 static int bus_start_connect(sd_bus *b) {
564         int r;
565
566         assert(b);
567         assert(b->fd < 0);
568
569         for (;;) {
570                 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
571                         r = bus_parse_next_address(b);
572                         if (r < 0)
573                                 return r;
574                         if (r == 0)
575                                 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
576                 }
577
578                 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
579                 if (b->fd < 0) {
580                         b->last_connect_error = errno;
581                         zero(b->sockaddr);
582                         continue;
583                 }
584
585                 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
586                 if (r < 0) {
587                         if (errno == EINPROGRESS)
588                                 return 1;
589
590                         b->last_connect_error = errno;
591                         close_nointr_nofail(b->fd);
592                         b->fd = -1;
593                         zero(b->sockaddr);
594                         continue;
595                 }
596
597                 return bus_start_auth(b);
598         }
599 }
600
601 int sd_bus_open_system(sd_bus **ret) {
602         const char *e;
603         sd_bus *b;
604         int r;
605
606         if (!ret)
607                 return -EINVAL;
608
609         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
610         if (e) {
611                 r = sd_bus_open_address(e, &b);
612                 if (r < 0)
613                         return r;
614         } else {
615                 b = bus_new();
616                 if (!b)
617                         return -ENOMEM;
618
619                 b->sockaddr.un.sun_family = AF_UNIX;
620                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
621                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
622
623                 r = bus_start_connect(b);
624                 if (r < 0) {
625                         bus_free(b);
626                         return r;
627                 }
628         }
629
630         r = bus_send_hello(b);
631         if (r < 0) {
632                 sd_bus_unref(b);
633                 return r;
634         }
635
636         *ret = b;
637         return 0;
638 }
639
640 int sd_bus_open_user(sd_bus **ret) {
641         const char *e;
642         sd_bus *b;
643         size_t l;
644         int r;
645
646         if (!ret)
647                 return -EINVAL;
648
649         e = getenv("DBUS_SESSION_BUS_ADDRESS");
650         if (e) {
651                 r = sd_bus_open_address(e, &b);
652                 if (r < 0)
653                         return r;
654         } else {
655                 e = getenv("XDG_RUNTIME_DIR");
656                 if (!e)
657                         return -ENOENT;
658
659                 l = strlen(e);
660                 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
661                         return -E2BIG;
662
663                 b = bus_new();
664                 if (!b)
665                         return -ENOMEM;
666
667                 b->sockaddr.un.sun_family = AF_UNIX;
668                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
669                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
670
671                 r = bus_start_connect(b);
672                 if (r < 0) {
673                         bus_free(b);
674                         return r;
675                 }
676         }
677
678         r = bus_send_hello(b);
679         if (r < 0) {
680                 sd_bus_unref(b);
681                 return r;
682         }
683
684         *ret = b;
685         return 0;
686 }
687
688 int sd_bus_open_address(const char *address, sd_bus **ret) {
689         sd_bus *b;
690         int r;
691
692         if (!address)
693                 return -EINVAL;
694         if (!ret)
695                 return -EINVAL;
696
697         b = bus_new();
698         if (!b)
699                 return -ENOMEM;
700
701         b->address = strdup(address);
702         if (!b->address) {
703                 bus_free(b);
704                 return -ENOMEM;
705         }
706
707         r = bus_start_connect(b);
708         if (r < 0) {
709                 bus_free(b);
710                 return r;
711         }
712
713         *ret = b;
714         return 0;
715 }
716
717 int sd_bus_open_fd(int fd, sd_bus **ret) {
718         sd_bus *b;
719         int r;
720
721         if (fd < 0)
722                 return -EINVAL;
723         if (!ret)
724                 return -EINVAL;
725
726         b = bus_new();
727         if (!b)
728                 return -ENOMEM;
729
730         b->fd = fd;
731         fd_nonblock(b->fd, true);
732         fd_cloexec(b->fd, true);
733
734         r = bus_start_auth(b);
735         if (r < 0) {
736                 bus_free(b);
737                 return r;
738         }
739
740         *ret = b;
741         return 0;
742 }
743
744 void sd_bus_close(sd_bus *bus) {
745         if (!bus)
746                 return;
747         if (bus->fd < 0)
748                 return;
749
750         close_nointr_nofail(bus->fd);
751         bus->fd = -1;
752 }
753
754 sd_bus *sd_bus_ref(sd_bus *bus) {
755         if (!bus)
756                 return NULL;
757
758         assert(bus->n_ref > 0);
759
760         bus->n_ref++;
761         return bus;
762 }
763
764 sd_bus *sd_bus_unref(sd_bus *bus) {
765         if (!bus)
766                 return NULL;
767
768         assert(bus->n_ref > 0);
769         bus->n_ref--;
770
771         if (bus->n_ref <= 0)
772                 bus_free(bus);
773
774         return NULL;
775 }
776
777 int sd_bus_is_open(sd_bus *bus) {
778         if (!bus)
779                 return -EINVAL;
780
781         return bus->fd >= 0;
782 }
783
784 int sd_bus_can_send(sd_bus *bus, char type) {
785         int r;
786
787         if (!bus)
788                 return -EINVAL;
789
790         if (type == SD_BUS_TYPE_UNIX_FD) {
791                 r = bus_ensure_running(bus);
792                 if (r < 0)
793                         return r;
794
795                 return bus->can_fds;
796         }
797
798         return bus_type_is_valid(type);
799 }
800
801 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
802         int r;
803
804         if (!bus)
805                 return -EINVAL;
806         if (!peer)
807                 return -EINVAL;
808
809         r = bus_ensure_running(bus);
810         if (r < 0)
811                 return r;
812
813         *peer = bus->peer;
814         return 0;
815 }
816
817 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
818         assert(m);
819
820         if (m->header->version > b->message_version)
821                 return -EPERM;
822
823         if (m->sealed)
824                 return 0;
825
826         return bus_message_seal(m, ++b->serial);
827 }
828
829 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
830         struct msghdr mh;
831         struct iovec *iov;
832         ssize_t k;
833         size_t n;
834         unsigned j;
835
836         assert(bus);
837         assert(m);
838         assert(idx);
839         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
840
841         if (*idx >= m->size)
842                 return 0;
843
844         n = m->n_iovec * sizeof(struct iovec);
845         iov = alloca(n);
846         memcpy(iov, m->iovec, n);
847
848         j = 0;
849         iovec_advance(iov, &j, *idx);
850
851         zero(mh);
852         mh.msg_iov = iov;
853         mh.msg_iovlen = m->n_iovec;
854
855         k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
856         if (k < 0)
857                 return errno == EAGAIN ? 0 : -errno;
858
859         *idx += (size_t) k;
860         return 1;
861 }
862
863 static int message_read_need(sd_bus *bus, size_t *need) {
864         uint32_t a, b;
865         uint8_t e;
866         uint64_t sum;
867
868         assert(bus);
869         assert(need);
870         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
871
872         if (bus->rbuffer_size < sizeof(struct bus_header)) {
873                 *need = sizeof(struct bus_header) + 8;
874
875                 /* Minimum message size:
876                  *
877                  * Header +
878                  *
879                  *  Method Call: +2 string headers
880                  *       Signal: +3 string headers
881                  * Method Error: +1 string headers
882                  *               +1 uint32 headers
883                  * Method Reply: +1 uint32 headers
884                  *
885                  * A string header is at least 9 bytes
886                  * A uint32 header is at least 8 bytes
887                  *
888                  * Hence the minimum message size of a valid message
889                  * is header + 8 bytes */
890
891                 return 0;
892         }
893
894         a = ((const uint32_t*) bus->rbuffer)[1];
895         b = ((const uint32_t*) bus->rbuffer)[3];
896
897         e = ((const uint8_t*) bus->rbuffer)[0];
898         if (e == SD_BUS_LITTLE_ENDIAN) {
899                 a = le32toh(a);
900                 b = le32toh(b);
901         } else if (e == SD_BUS_BIG_ENDIAN) {
902                 a = be32toh(a);
903                 b = be32toh(b);
904         } else
905                 return -EBADMSG;
906
907         sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
908         if (sum >= BUS_MESSAGE_SIZE_MAX)
909                 return -ENOBUFS;
910
911         *need = (size_t) sum;
912         return 0;
913 }
914
915 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
916         sd_bus_message *t;
917         void *b = NULL;
918         int r;
919
920         assert(bus);
921         assert(m);
922         assert(bus->rbuffer_size >= size);
923         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
924
925         if (bus->rbuffer_size > size) {
926                 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
927                 if (!b) {
928                         free(t);
929                         return -ENOMEM;
930                 }
931         }
932
933         r = bus_message_from_malloc(bus->rbuffer, size, &t);
934         if (r < 0) {
935                 free(b);
936                 return r;
937         }
938
939         bus->rbuffer = b;
940         bus->rbuffer_size -= size;
941
942         *m = t;
943         return 1;
944 }
945
946 static int message_read(sd_bus *bus, sd_bus_message **m) {
947         struct msghdr mh;
948         struct iovec iov;
949         ssize_t k;
950         size_t need;
951         int r;
952         void *b;
953
954         assert(bus);
955         assert(m);
956         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
957
958         r = message_read_need(bus, &need);
959         if (r < 0)
960                 return r;
961
962         if (bus->rbuffer_size >= need)
963                 return message_make(bus, need, m);
964
965         b = realloc(bus->rbuffer, need);
966         if (!b)
967                 return -ENOMEM;
968
969         bus->rbuffer = b;
970
971         zero(iov);
972         iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
973         iov.iov_len = need - bus->rbuffer_size;
974
975         zero(mh);
976         mh.msg_iov = &iov;
977         mh.msg_iovlen = 1;
978
979         k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
980         if (k < 0)
981                 return errno == EAGAIN ? 0 : -errno;
982
983         bus->rbuffer_size += k;
984
985         r = message_read_need(bus, &need);
986         if (r < 0)
987                 return r;
988
989         if (bus->rbuffer_size >= need)
990                 return message_make(bus, need, m);
991
992         return 1;
993 }
994
995 static int dispatch_wqueue(sd_bus *bus) {
996         int r, ret = 0;
997
998         assert(bus);
999         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1000
1001         if (bus->fd < 0)
1002                 return -ENOTCONN;
1003
1004         while (bus->wqueue_size > 0) {
1005
1006                 r = message_write(bus, bus->wqueue[0], &bus->windex);
1007                 if (r < 0) {
1008                         sd_bus_close(bus);
1009                         return r;
1010                 } else if (r == 0)
1011                         /* Didn't do anything this time */
1012                         return ret;
1013                 else if (bus->windex >= bus->wqueue[0]->size) {
1014                         /* Fully written. Let's drop the entry from
1015                          * the queue.
1016                          *
1017                          * This isn't particularly optimized, but
1018                          * well, this is supposed to be our worst-case
1019                          * buffer only, and the socket buffer is
1020                          * supposed to be our primary buffer, and if
1021                          * it got full, then all bets are off
1022                          * anyway. */
1023
1024                         sd_bus_message_unref(bus->wqueue[0]);
1025                         bus->wqueue_size --;
1026                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1027                         bus->windex = 0;
1028
1029                         ret = 1;
1030                 }
1031         }
1032
1033         return ret;
1034 }
1035
1036 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1037         sd_bus_message *z = NULL;
1038         int r, ret = 0;
1039
1040         assert(bus);
1041         assert(m);
1042         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1043
1044         if (bus->fd < 0)
1045                 return -ENOTCONN;
1046
1047         if (bus->rqueue_size > 0) {
1048                 /* Dispatch a queued message */
1049
1050                 *m = bus->rqueue[0];
1051                 bus->rqueue_size --;
1052                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1053                 return 1;
1054         }
1055
1056         /* Try to read a new message */
1057         do {
1058                 r = message_read(bus, &z);
1059                 if (r < 0) {
1060                         sd_bus_close(bus);
1061                         return r;
1062                 }
1063                 if (r == 0)
1064                         return ret;
1065
1066                 r = 1;
1067         } while (!z);
1068
1069         *m = z;
1070         return 1;
1071 }
1072
1073 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1074         int r;
1075
1076         if (!bus)
1077                 return -EINVAL;
1078         if (bus->fd < 0)
1079                 return -ENOTCONN;
1080         if (!m)
1081                 return -EINVAL;
1082
1083         /* If the serial number isn't kept, then we know that no reply
1084          * is expected */
1085         if (!serial && !m->sealed)
1086                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1087
1088         r = bus_seal_message(bus, m);
1089         if (r < 0)
1090                 return r;
1091
1092         /* If this is a reply and no reply was requested, then let's
1093          * suppress this, if we can */
1094         if (m->dont_send && !serial)
1095                 return 0;
1096
1097         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1098                 size_t idx = 0;
1099
1100                 r = message_write(bus, m, &idx);
1101                 if (r < 0) {
1102                         sd_bus_close(bus);
1103                         return r;
1104                 } else if (idx < m->size)  {
1105                         /* Wasn't fully written. So let's remember how
1106                          * much was written. Note that the first entry
1107                          * of the wqueue array is always allocated so
1108                          * that we always can remember how much was
1109                          * written. */
1110                         bus->wqueue[0] = sd_bus_message_ref(m);
1111                         bus->wqueue_size = 1;
1112                         bus->windex = idx;
1113                 }
1114         } else {
1115                 sd_bus_message **q;
1116
1117                 /* Just append it to the queue. */
1118
1119                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1120                         return -ENOBUFS;
1121
1122                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1123                 if (!q)
1124                         return -ENOMEM;
1125
1126                 bus->wqueue = q;
1127                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1128         }
1129
1130         if (serial)
1131                 *serial = BUS_MESSAGE_SERIAL(m);
1132
1133         return 0;
1134 }
1135
1136 static usec_t calc_elapse(uint64_t usec) {
1137         if (usec == (uint64_t) -1)
1138                 return 0;
1139
1140         if (usec == 0)
1141                 usec = BUS_DEFAULT_TIMEOUT;
1142
1143         return now(CLOCK_MONOTONIC) + usec;
1144 }
1145
1146 static int timeout_compare(const void *a, const void *b) {
1147         const struct reply_callback *x = a, *y = b;
1148
1149         if (x->timeout != 0 && y->timeout == 0)
1150                 return -1;
1151
1152         if (x->timeout == 0 && y->timeout != 0)
1153                 return 1;
1154
1155         if (x->timeout < y->timeout)
1156                 return -1;
1157
1158         if (x->timeout > y->timeout)
1159                 return 1;
1160
1161         return 0;
1162 }
1163
1164 int sd_bus_send_with_reply(
1165                 sd_bus *bus,
1166                 sd_bus_message *m,
1167                 sd_message_handler_t callback,
1168                 void *userdata,
1169                 uint64_t usec,
1170                 uint64_t *serial) {
1171
1172         struct reply_callback *c;
1173         int r;
1174
1175         if (!bus)
1176                 return -EINVAL;
1177         if (bus->fd < 0)
1178                 return -ENOTCONN;
1179         if (!m)
1180                 return -EINVAL;
1181         if (!callback)
1182                 return -EINVAL;
1183         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1184                 return -EINVAL;
1185         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1186                 return -EINVAL;
1187
1188         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1189         if (r < 0)
1190                 return r;
1191
1192         if (usec != (uint64_t) -1) {
1193                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1194                 if (r < 0)
1195                         return r;
1196         }
1197
1198         r = bus_seal_message(bus, m);
1199         if (r < 0)
1200                 return r;
1201
1202         c = new(struct reply_callback, 1);
1203         if (!c)
1204                 return -ENOMEM;
1205
1206         c->callback = callback;
1207         c->userdata = userdata;
1208         c->serial = BUS_MESSAGE_SERIAL(m);
1209         c->timeout = calc_elapse(usec);
1210
1211         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1212         if (r < 0) {
1213                 free(c);
1214                 return r;
1215         }
1216
1217         if (c->timeout != 0) {
1218                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1219                 if (r < 0) {
1220                         c->timeout = 0;
1221                         sd_bus_send_with_reply_cancel(bus, c->serial);
1222                         return r;
1223                 }
1224         }
1225
1226         r = sd_bus_send(bus, m, serial);
1227         if (r < 0) {
1228                 sd_bus_send_with_reply_cancel(bus, c->serial);
1229                 return r;
1230         }
1231
1232         return r;
1233 }
1234
1235 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1236         struct reply_callback *c;
1237
1238         if (!bus)
1239                 return -EINVAL;
1240         if (serial == 0)
1241                 return -EINVAL;
1242
1243         c = hashmap_remove(bus->reply_callbacks, &serial);
1244         if (!c)
1245                 return 0;
1246
1247         if (c->timeout != 0)
1248                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1249
1250         free(c);
1251         return 1;
1252 }
1253
1254 int bus_ensure_running(sd_bus *bus) {
1255         int r;
1256
1257         assert(bus);
1258
1259         if (bus->state == BUS_RUNNING)
1260                 return 1;
1261
1262         for (;;) {
1263                 r = sd_bus_process(bus, NULL);
1264                 if (r < 0)
1265                         return r;
1266                 if (bus->state == BUS_RUNNING)
1267                         return 1;
1268                 if (r > 0)
1269                         continue;
1270
1271                 r = sd_bus_wait(bus, (uint64_t) -1);
1272                 if (r < 0)
1273                         return r;
1274         }
1275 }
1276
1277 int sd_bus_send_with_reply_and_block(
1278                 sd_bus *bus,
1279                 sd_bus_message *m,
1280                 uint64_t usec,
1281                 sd_bus_error *error,
1282                 sd_bus_message **reply) {
1283
1284         int r;
1285         usec_t timeout;
1286         uint64_t serial;
1287         bool room = false;
1288
1289         if (!bus)
1290                 return -EINVAL;
1291         if (bus->fd < 0)
1292                 return -ENOTCONN;
1293         if (!m)
1294                 return -EINVAL;
1295         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1296                 return -EINVAL;
1297         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1298                 return -EINVAL;
1299         if (bus_error_is_dirty(error))
1300                 return -EINVAL;
1301
1302         r = bus_ensure_running(bus);
1303         if (r < 0)
1304                 return r;
1305
1306         r = sd_bus_send(bus, m, &serial);
1307         if (r < 0)
1308                 return r;
1309
1310         timeout = calc_elapse(usec);
1311
1312         for (;;) {
1313                 usec_t left;
1314                 sd_bus_message *incoming = NULL;
1315
1316                 if (!room) {
1317                         sd_bus_message **q;
1318
1319                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1320                                 return -ENOBUFS;
1321
1322                         /* Make sure there's room for queuing this
1323                          * locally, before we read the message */
1324
1325                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1326                         if (!q)
1327                                 return -ENOMEM;
1328
1329                         bus->rqueue = q;
1330                         room = true;
1331                 }
1332
1333                 r = message_read(bus, &incoming);
1334                 if (r < 0)
1335                         return r;
1336                 if (incoming) {
1337
1338                         if (incoming->reply_serial == serial) {
1339                                 /* Found a match! */
1340
1341                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1342                                         *reply = incoming;
1343                                         return 0;
1344                                 }
1345
1346                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1347                                         int k;
1348
1349                                         r = sd_bus_error_copy(error, &incoming->error);
1350                                         if (r < 0) {
1351                                                 sd_bus_message_unref(incoming);
1352                                                 return r;
1353                                         }
1354
1355                                         k = bus_error_to_errno(&incoming->error);
1356                                         sd_bus_message_unref(incoming);
1357                                         return k;
1358                                 }
1359
1360                                 sd_bus_message_unref(incoming);
1361                                 return -EIO;
1362                         }
1363
1364                         /* There's already guaranteed to be room for
1365                          * this, so need to resize things here */
1366                         bus->rqueue[bus->rqueue_size ++] = incoming;
1367                         room = false;
1368
1369                         /* Try to read more, right-away */
1370                         continue;
1371                 }
1372                 if (r != 0)
1373                         continue;
1374
1375                 if (timeout > 0) {
1376                         usec_t n;
1377
1378                         n = now(CLOCK_MONOTONIC);
1379                         if (n >= timeout)
1380                                 return -ETIMEDOUT;
1381
1382                         left = timeout - n;
1383                 } else
1384                         left = (uint64_t) -1;
1385
1386                 r = bus_poll(bus, true, left);
1387                 if (r < 0)
1388                         return r;
1389
1390                 r = dispatch_wqueue(bus);
1391                 if (r < 0)
1392                         return r;
1393         }
1394 }
1395
1396 int sd_bus_get_fd(sd_bus *bus) {
1397         if (!bus)
1398                 return -EINVAL;
1399
1400         if (bus->fd < 0)
1401                 return -ENOTCONN;
1402
1403         return bus->fd;
1404 }
1405
1406 int sd_bus_get_events(sd_bus *bus) {
1407         int flags = 0;
1408
1409         if (!bus)
1410                 return -EINVAL;
1411         if (bus->fd < 0)
1412                 return -ENOTCONN;
1413
1414         if (bus->state == BUS_OPENING)
1415                 flags |= POLLOUT;
1416         else if (bus->state == BUS_AUTHENTICATING) {
1417
1418                 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1419                         flags |= POLLOUT;
1420
1421                 flags |= POLLIN;
1422
1423         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1424                 if (bus->rqueue_size <= 0)
1425                         flags |= POLLIN;
1426                 if (bus->wqueue_size > 0)
1427                         flags |= POLLOUT;
1428         }
1429
1430         return flags;
1431 }
1432
1433 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1434         struct reply_callback *c;
1435
1436         if (!bus)
1437                 return -EINVAL;
1438         if (!timeout_usec)
1439                 return -EINVAL;
1440         if (bus->fd < 0)
1441                 return -ENOTCONN;
1442
1443         if (bus->state == BUS_AUTHENTICATING) {
1444                 *timeout_usec = bus->auth_timeout;
1445                 return 1;
1446         }
1447
1448         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1449                 return 0;
1450
1451         c = prioq_peek(bus->reply_callbacks_prioq);
1452         if (!c)
1453                 return 0;
1454
1455         *timeout_usec = c->timeout;
1456         return 1;
1457 }
1458
1459 static int process_timeout(sd_bus *bus) {
1460         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1461         struct reply_callback *c;
1462         usec_t n;
1463         int r;
1464
1465         assert(bus);
1466
1467         c = prioq_peek(bus->reply_callbacks_prioq);
1468         if (!c)
1469                 return 0;
1470
1471         n = now(CLOCK_MONOTONIC);
1472         if (c->timeout > n)
1473                 return 0;
1474
1475         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1476         hashmap_remove(bus->reply_callbacks, &c->serial);
1477
1478         r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1479         free(c);
1480
1481         return r < 0 ? r : 1;
1482 }
1483
1484 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1485         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1486         int r;
1487
1488         assert(bus);
1489         assert(m);
1490
1491         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1492                 return 0;
1493
1494         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1495                 return 0;
1496
1497         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1498                 return 1;
1499
1500         if (streq_ptr(m->member, "Ping"))
1501                 r = sd_bus_message_new_method_return(bus, m, &reply);
1502         else if (streq_ptr(m->member, "GetMachineId")) {
1503                 sd_id128_t id;
1504                 char sid[33];
1505
1506                 r = sd_id128_get_machine(&id);
1507                 if (r < 0)
1508                         return r;
1509
1510                 r = sd_bus_message_new_method_return(bus, m, &reply);
1511                 if (r < 0)
1512                         return r;
1513
1514                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1515         } else {
1516                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1517
1518                 sd_bus_error_set(&error,
1519                                  "org.freedesktop.DBus.Error.UnknownMethod",
1520                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1521
1522                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1523         }
1524
1525         if (r < 0)
1526                 return r;
1527
1528         r = sd_bus_send(bus, reply, NULL);
1529         if (r < 0)
1530                 return r;
1531
1532         return 1;
1533 }
1534
1535 static int process_message(sd_bus *bus, sd_bus_message *m) {
1536         struct filter_callback *l;
1537         int r;
1538
1539         assert(bus);
1540         assert(m);
1541
1542         if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1543                 struct reply_callback *c;
1544
1545                 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1546                 if (c) {
1547                         if (c->timeout != 0)
1548                                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1549
1550                         r = c->callback(bus, 0, m, c->userdata);
1551                         free(c);
1552
1553                         if (r != 0)
1554                                 return r;
1555                 }
1556         }
1557
1558         LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1559                 r = l->callback(bus, 0, m, l->userdata);
1560                 if (r != 0)
1561                         return r;
1562         }
1563
1564         return process_builtin(bus, m);
1565 }
1566
1567 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1568         int r;
1569
1570         /* Returns 0 when we didn't do anything. This should cause the
1571          * caller to invoke sd_bus_wait() before returning the next
1572          * time. Returns > 0 when we did something, which possibly
1573          * means *ret is filled in with an unprocessed message. */
1574
1575         if (!bus)
1576                 return -EINVAL;
1577         if (bus->fd < 0)
1578                 return -ENOTCONN;
1579
1580         if (bus->state == BUS_OPENING) {
1581                 struct pollfd p;
1582
1583                 zero(p);
1584                 p.fd = bus->fd;
1585                 p.events = POLLOUT;
1586
1587                 r = poll(&p, 1, 0);
1588                 if (r < 0)
1589                         return -errno;
1590
1591                 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1592                         int error = 0;
1593                         socklen_t slen = sizeof(error);
1594
1595                         r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1596                         if (r < 0)
1597                                 bus->last_connect_error = errno;
1598                         else if (error != 0)
1599                                 bus->last_connect_error = error;
1600                         else if (p.revents & (POLLERR|POLLHUP))
1601                                 bus->last_connect_error = ECONNREFUSED;
1602                         else {
1603                                 r = bus_start_auth(bus);
1604                                 goto null_message;
1605                         }
1606
1607                         /* Try next address */
1608                         r = bus_start_connect(bus);
1609                         goto null_message;
1610                 }
1611
1612                 r = 0;
1613                 goto null_message;
1614
1615         } else if (bus->state == BUS_AUTHENTICATING) {
1616
1617                 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1618                         return -ETIMEDOUT;
1619
1620                 r = bus_write_auth(bus);
1621                 if (r != 0)
1622                         goto null_message;
1623
1624                 r = bus_read_auth(bus);
1625                 goto null_message;
1626
1627         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1628                 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1629                 int k;
1630
1631                 r = process_timeout(bus);
1632                 if (r != 0)
1633                         goto null_message;
1634
1635                 r = dispatch_wqueue(bus);
1636                 if (r != 0)
1637                         goto null_message;
1638
1639                 k = r;
1640                 r = dispatch_rqueue(bus, &m);
1641                 if (r < 0)
1642                         return r;
1643                 if (!m) {
1644                         if (r == 0)
1645                                 r = k;
1646                         goto null_message;
1647                 }
1648
1649                 r = process_message(bus, m);
1650                 if (r != 0)
1651                         goto null_message;
1652
1653                 if (ret) {
1654                         *ret = m;
1655                         m = NULL;
1656                         return 1;
1657                 }
1658
1659                 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1660                         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1661                         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1662
1663                         sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1664
1665                         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1666                         if (r < 0)
1667                                 return r;
1668
1669                         r = sd_bus_send(bus, reply, NULL);
1670                         if (r < 0)
1671                                 return r;
1672                 }
1673
1674                 return 1;
1675         }
1676
1677         assert_not_reached("Unknown state");
1678
1679 null_message:
1680         if (r >= 0 && ret)
1681                 *ret = NULL;
1682
1683         return r;
1684 }
1685
1686 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1687         struct pollfd p;
1688         int r, e;
1689         struct timespec ts;
1690         usec_t until, m;
1691
1692         assert(bus);
1693
1694         if (bus->fd < 0)
1695                 return -ENOTCONN;
1696
1697         e = sd_bus_get_events(bus);
1698         if (e < 0)
1699                 return e;
1700
1701         if (need_more)
1702                 e |= POLLIN;
1703
1704         r = sd_bus_get_timeout(bus, &until);
1705         if (r < 0)
1706                 return r;
1707         if (r == 0)
1708                 m = (uint64_t) -1;
1709         else {
1710                 usec_t n;
1711                 n = now(CLOCK_MONOTONIC);
1712                 m = until > n ? until - n : 0;
1713         }
1714
1715         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1716                 m = timeout_usec;
1717
1718         zero(p);
1719         p.fd = bus->fd;
1720         p.events = e;
1721
1722         r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1723         if (r < 0)
1724                 return -errno;
1725
1726         return r > 0 ? 1 : 0;
1727 }
1728
1729 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1730
1731         if (!bus)
1732                 return -EINVAL;
1733         if (bus->fd < 0)
1734                 return -ENOTCONN;
1735         if (bus->rqueue_size > 0)
1736                 return 0;
1737
1738         return bus_poll(bus, false, timeout_usec);
1739 }
1740
1741 int sd_bus_flush(sd_bus *bus) {
1742         int r;
1743
1744         if (!bus)
1745                 return -EINVAL;
1746         if (bus->fd < 0)
1747                 return -ENOTCONN;
1748
1749         r = bus_ensure_running(bus);
1750         if (r < 0)
1751                 return r;
1752
1753         if (bus->wqueue_size <= 0)
1754                 return 0;
1755
1756         for (;;) {
1757                 r = dispatch_wqueue(bus);
1758                 if (r < 0)
1759                         return r;
1760
1761                 if (bus->wqueue_size <= 0)
1762                         return 0;
1763
1764                 r = bus_poll(bus, false, (uint64_t) -1);
1765                 if (r < 0)
1766                         return r;
1767         }
1768 }
1769
1770 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1771         struct filter_callback *f;
1772
1773         if (!bus)
1774                 return -EINVAL;
1775         if (!callback)
1776                 return -EINVAL;
1777
1778         f = new(struct filter_callback, 1);
1779         if (!f)
1780                 return -ENOMEM;
1781         f->callback = callback;
1782         f->userdata = userdata;
1783
1784         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1785         return 0;
1786 }
1787
1788 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1789         struct filter_callback *f;
1790
1791         if (!bus)
1792                 return -EINVAL;
1793         if (!callback)
1794                 return -EINVAL;
1795
1796         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1797                 if (f->callback == callback && f->userdata == userdata) {
1798                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1799                         free(f);
1800                         return 1;
1801                 }
1802         }
1803
1804         return 0;
1805 }