chiark / gitweb /
6a6d43fac0a03dd01eac4f38faf6527345601804
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32
33 #include "sd-bus.h"
34 #include "bus-internal.h"
35 #include "bus-message.h"
36 #include "bus-type.h"
37
38 static int ensure_running(sd_bus *bus);
39 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
40
41 static void bus_free(sd_bus *b) {
42         struct filter_callback *f;
43         unsigned i;
44
45         assert(b);
46
47         if (b->fd >= 0)
48                 close_nointr_nofail(b->fd);
49
50         free(b->rbuffer);
51         free(b->unique_name);
52         free(b->auth_uid);
53         free(b->address);
54
55         for (i = 0; i < b->rqueue_size; i++)
56                 sd_bus_message_unref(b->rqueue[i]);
57         free(b->rqueue);
58
59         for (i = 0; i < b->wqueue_size; i++)
60                 sd_bus_message_unref(b->wqueue[i]);
61         free(b->wqueue);
62
63         hashmap_free_free(b->reply_callbacks);
64         prioq_free(b->reply_callbacks_prioq);
65
66         while ((f = b->filter_callbacks)) {
67                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
68                 free(f);
69         }
70
71         free(b);
72 }
73
74 static sd_bus* bus_new(void) {
75         sd_bus *r;
76
77         r = new0(sd_bus, 1);
78         if (!r)
79                 return NULL;
80
81         r->n_ref = 1;
82         r->fd = -1;
83         r->message_version = 1;
84
85         /* We guarantee that wqueue always has space for at least one
86          * entry */
87         r->wqueue = new(sd_bus_message*, 1);
88         if (!r->wqueue) {
89                 free(r);
90                 return NULL;
91         }
92
93         return r;
94 };
95
96 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
97         const char *s;
98         int r;
99
100         assert(bus);
101
102         if (error != 0)
103                 return -error;
104
105         assert(reply);
106
107         bus->state = BUS_RUNNING;
108
109         r = sd_bus_message_read(reply, "s", &s);
110         if (r < 0)
111                 return r;
112
113         bus->unique_name = strdup(s);
114         if (!bus->unique_name)
115                 return -ENOMEM;
116
117         return 1;
118 }
119
120 static int bus_send_hello(sd_bus *bus) {
121         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
122         int r;
123
124         assert(bus);
125
126         r = sd_bus_message_new_method_call(
127                         bus,
128                         "org.freedesktop.DBus",
129                         "/",
130                         "org.freedesktop.DBus",
131                         "Hello",
132                         &m);
133         if (r < 0)
134                 return r;
135
136         r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
137         if (r < 0)
138                 return r;
139
140         bus->sent_hello = true;
141         return r;
142 }
143
144 static int bus_start_running(sd_bus *bus) {
145         assert(bus);
146
147         if (bus->sent_hello) {
148                 bus->state = BUS_HELLO;
149                 return 1;
150         }
151
152         bus->state = BUS_RUNNING;
153         return 1;
154 }
155
156 static int parse_address_key(const char **p, const char *key, char **value) {
157         size_t l, n = 0;
158         const char *a;
159         char *r = NULL;
160
161         assert(p);
162         assert(*p);
163         assert(key);
164         assert(value);
165
166         l = strlen(key);
167         if (strncmp(*p, key, l) != 0)
168                 return 0;
169
170         if ((*p)[l] != '=')
171                 return 0;
172
173         if (*value)
174                 return -EINVAL;
175
176         a = *p + l + 1;
177         while (*a != ',' && *a != 0) {
178                 char c, *t;
179
180                 if (*a == '%') {
181                         int x, y;
182
183                         x = unhexchar(a[1]);
184                         if (x < 0) {
185                                 free(r);
186                                 return x;
187                         }
188
189                         y = unhexchar(a[2]);
190                         if (y < 0) {
191                                 free(r);
192                                 return y;
193                         }
194
195                         c = (char) ((x << 4) | y);
196                         a += 3;
197                 } else {
198                         c = *a;
199                         a++;
200                 }
201
202                 t = realloc(r, n + 2);
203                 if (!t) {
204                         free(r);
205                         return -ENOMEM;
206                 }
207
208                 r = t;
209                 r[n++] = c;
210         }
211
212         if (!r) {
213                 r = strdup("");
214                 if (!r)
215                         return -ENOMEM;
216         } else
217                 r[n] = 0;
218
219         if (*a == ',')
220                 a++;
221
222         *p = a;
223         *value = r;
224         return 1;
225 }
226
227 static void skip_address_key(const char **p) {
228         assert(p);
229         assert(*p);
230
231         *p += strcspn(*p, ",");
232
233         if (**p == ',')
234                 (*p) ++;
235 }
236
237 static int bus_parse_next_address(sd_bus *b) {
238         const char *a, *p;
239         _cleanup_free_ char *guid = NULL;
240         int r;
241
242         assert(b);
243
244         if (!b->address)
245                 return 0;
246         if (b->address[b->address_index] == 0)
247                 return 0;
248
249         a = b->address + b->address_index;
250
251         zero(b->sockaddr);
252         b->sockaddr_size = 0;
253         b->peer = SD_ID128_NULL;
254
255         if (startswith(a, "unix:")) {
256                 _cleanup_free_ char *path = NULL, *abstract = NULL;
257
258                 p = a + 5;
259                 while (*p != 0) {
260                         r = parse_address_key(&p, "guid", &guid);
261                         if (r < 0)
262                                 return r;
263                         else if (r > 0)
264                                 continue;
265
266                         r = parse_address_key(&p, "path", &path);
267                         if (r < 0)
268                                 return r;
269                         else if (r > 0)
270                                 continue;
271
272                         r = parse_address_key(&p, "abstract", &abstract);
273                         if (r < 0)
274                                 return r;
275                         else if (r > 0)
276                                 continue;
277
278                         skip_address_key(&p);
279                 }
280
281                 if (!path && !abstract)
282                         return -EINVAL;
283
284                 if (path && abstract)
285                         return -EINVAL;
286
287                 if (path) {
288                         size_t l;
289
290                         l = strlen(path);
291                         if (l > sizeof(b->sockaddr.un.sun_path))
292                                 return -E2BIG;
293
294                         b->sockaddr.un.sun_family = AF_UNIX;
295                         strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
296                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
297                 } else if (abstract) {
298                         size_t l;
299
300                         l = strlen(abstract);
301                         if (l > sizeof(b->sockaddr.un.sun_path) - 1)
302                                 return -E2BIG;
303
304                         b->sockaddr.un.sun_family = AF_UNIX;
305                         b->sockaddr.un.sun_path[0] = 0;
306                         strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
307                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
308                 }
309
310         } else if (startswith(a, "tcp:")) {
311                 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
312                 struct addrinfo hints, *result;
313
314                 p = a + 4;
315                 while (*p != 0) {
316                         r = parse_address_key(&p, "guid", &guid);
317                         if (r < 0)
318                                 return r;
319                         else if (r > 0)
320                                 continue;
321
322                         r = parse_address_key(&p, "host", &host);
323                         if (r < 0)
324                                 return r;
325                         else if (r > 0)
326                                 continue;
327
328                         r = parse_address_key(&p, "port", &port);
329                         if (r < 0)
330                                 return r;
331                         else if (r > 0)
332                                 continue;
333
334                         r = parse_address_key(&p, "family", &family);
335                         if (r < 0)
336                                 return r;
337                         else if (r > 0)
338                                 continue;
339
340                         skip_address_key(&p);
341                 }
342
343                 if (!host || !port)
344                         return -EINVAL;
345
346                 zero(hints);
347                 hints.ai_socktype = SOCK_STREAM;
348                 hints.ai_flags = AI_ADDRCONFIG;
349
350                 if (family) {
351                         if (streq(family, "ipv4"))
352                                 hints.ai_family = AF_INET;
353                         else if (streq(family, "ipv6"))
354                                 hints.ai_family = AF_INET6;
355                         else
356                                 return -EINVAL;
357                 }
358
359                 r = getaddrinfo(host, port, &hints, &result);
360                 if (r == EAI_SYSTEM)
361                         return -errno;
362                 else if (r != 0)
363                         return -EADDRNOTAVAIL;
364
365                 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
366                 b->sockaddr_size = result->ai_addrlen;
367
368                 freeaddrinfo(result);
369         }
370
371         if (guid) {
372                 r = sd_id128_from_string(guid, &b->peer);
373                 if (r < 0)
374                         return r;
375         }
376
377         b->address_index = p - b->address;
378         return 1;
379 }
380
381 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
382
383         while (size > 0) {
384                 struct iovec *i = iov + *idx;
385
386                 if (i->iov_len > size) {
387                         i->iov_base = (uint8_t*) i->iov_base + size;
388                         i->iov_len -= size;
389                         return;
390                 }
391
392                 size -= i->iov_len;
393
394                 i->iov_base = NULL;
395                 i->iov_len = 0;
396
397                 (*idx) ++;
398         }
399 }
400
401 static int bus_write_auth(sd_bus *b) {
402         struct msghdr mh;
403         ssize_t k;
404
405         assert(b);
406         assert(b->state == BUS_AUTHENTICATING);
407
408         if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
409                 return 0;
410
411         if (b->auth_timeout == 0)
412                 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
413
414         zero(mh);
415         mh.msg_iov = b->auth_iovec + b->auth_index;
416         mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
417
418         k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
419         if (k < 0)
420                 return errno == EAGAIN ? 0 : -errno;
421
422         iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
423
424         return 1;
425 }
426
427 static int bus_auth_verify(sd_bus *b) {
428         char *e, *f;
429         sd_id128_t peer;
430         unsigned i;
431         int r;
432
433         /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
434          * that's it */
435
436         e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
437         if (!e)
438                 return 0;
439
440         f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
441         if (!f)
442                 return 0;
443
444         if (e - (char*) b->rbuffer != 3 + 32)
445                 return -EPERM;
446
447         if (memcmp(b->rbuffer, "OK ", 3))
448                 return -EPERM;
449
450         for (i = 0; i < 32; i += 2) {
451                 int x, y;
452
453                 x = unhexchar(((char*) b->rbuffer)[3 + i]);
454                 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
455
456                 if (x < 0 || y < 0)
457                         return -EINVAL;
458
459                 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
460         }
461
462         if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
463             !sd_id128_equal(b->peer, peer))
464                 return -EPERM;
465
466         b->peer = peer;
467
468         b->can_fds =
469                 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
470                 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
471
472         b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
473         memmove(b->rbuffer, f + 2, b->rbuffer_size);
474
475         r = bus_start_running(b);
476         if (r < 0)
477                 return r;
478
479         return 1;
480 }
481
482 static int bus_read_auth(sd_bus *b) {
483         struct msghdr mh;
484         struct iovec iov;
485         size_t n;
486         ssize_t k;
487         int r;
488         void *p;
489
490         assert(b);
491
492         r = bus_auth_verify(b);
493         if (r != 0)
494                 return r;
495
496         n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
497
498         if (n > BUS_AUTH_SIZE_MAX)
499                 n = BUS_AUTH_SIZE_MAX;
500
501         if (b->rbuffer_size >= n)
502                 return -ENOBUFS;
503
504         p = realloc(b->rbuffer, n);
505         if (!p)
506                 return -ENOMEM;
507
508         b->rbuffer = p;
509
510         zero(iov);
511         iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
512         iov.iov_len = n - b->rbuffer_size;
513
514         zero(mh);
515         mh.msg_iov = &iov;
516         mh.msg_iovlen = 1;
517
518         k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
519         if (k < 0)
520                 return errno == EAGAIN ? 0 : -errno;
521
522         b->rbuffer_size += k;
523
524         r = bus_auth_verify(b);
525         if (r != 0)
526                 return r;
527
528         return 1;
529 }
530
531 static int bus_start_auth(sd_bus *b) {
532         static const char auth_prefix[] = "\0AUTH EXTERNAL ";
533         static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
534
535         char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
536         size_t l;
537
538         assert(b);
539
540         b->state = BUS_AUTHENTICATING;
541
542         snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
543         char_array_0(text);
544
545         l = strlen(text);
546         b->auth_uid = hexmem(text, l);
547         if (!b->auth_uid)
548                 return -ENOMEM;
549
550         b->auth_iovec[0].iov_base = (void*) auth_prefix;
551         b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
552         b->auth_iovec[1].iov_base = (void*) b->auth_uid;
553         b->auth_iovec[1].iov_len = l * 2;
554         b->auth_iovec[2].iov_base = (void*) auth_suffix;
555         b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
556         b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
557
558         return bus_write_auth(b);
559 }
560
561 static int bus_start_connect(sd_bus *b) {
562         int r;
563
564         assert(b);
565         assert(b->fd < 0);
566
567         for (;;) {
568                 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
569                         r = bus_parse_next_address(b);
570                         if (r < 0)
571                                 return r;
572                         if (r == 0)
573                                 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
574                 }
575
576                 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
577                 if (b->fd < 0) {
578                         b->last_connect_error = errno;
579                         zero(b->sockaddr);
580                         continue;
581                 }
582
583                 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
584                 if (r < 0) {
585                         if (errno == EINPROGRESS)
586                                 return 1;
587
588                         b->last_connect_error = errno;
589                         close_nointr_nofail(b->fd);
590                         b->fd = -1;
591                         zero(b->sockaddr);
592                         continue;
593                 }
594
595                 return bus_start_auth(b);
596         }
597 }
598
599 int sd_bus_open_system(sd_bus **ret) {
600         const char *e;
601         sd_bus *b;
602         int r;
603
604         if (!ret)
605                 return -EINVAL;
606
607         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
608         if (e) {
609                 r = sd_bus_open_address(e, &b);
610                 if (r < 0)
611                         return r;
612         } else {
613                 b = bus_new();
614                 if (!b)
615                         return -ENOMEM;
616
617                 b->sockaddr.un.sun_family = AF_UNIX;
618                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
619                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
620
621                 r = bus_start_connect(b);
622                 if (r < 0) {
623                         bus_free(b);
624                         return r;
625                 }
626         }
627
628         r = bus_send_hello(b);
629         if (r < 0) {
630                 sd_bus_unref(b);
631                 return r;
632         }
633
634         *ret = b;
635         return 0;
636 }
637
638 int sd_bus_open_user(sd_bus **ret) {
639         const char *e;
640         sd_bus *b;
641         size_t l;
642         int r;
643
644         if (!ret)
645                 return -EINVAL;
646
647         e = getenv("DBUS_SESSION_BUS_ADDRESS");
648         if (e) {
649                 r = sd_bus_open_address(e, &b);
650                 if (r < 0)
651                         return r;
652         } else {
653                 e = getenv("XDG_RUNTIME_DIR");
654                 if (!e)
655                         return -ENOENT;
656
657                 l = strlen(e);
658                 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
659                         return -E2BIG;
660
661                 b = bus_new();
662                 if (!b)
663                         return -ENOMEM;
664
665                 b->sockaddr.un.sun_family = AF_UNIX;
666                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
667                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
668
669                 r = bus_start_connect(b);
670                 if (r < 0) {
671                         bus_free(b);
672                         return r;
673                 }
674         }
675
676         r = bus_send_hello(b);
677         if (r < 0) {
678                 sd_bus_unref(b);
679                 return r;
680         }
681
682         *ret = b;
683         return 0;
684 }
685
686 int sd_bus_open_address(const char *address, sd_bus **ret) {
687         sd_bus *b;
688         int r;
689
690         if (!address)
691                 return -EINVAL;
692         if (!ret)
693                 return -EINVAL;
694
695         b = bus_new();
696         if (!b)
697                 return -ENOMEM;
698
699         b->address = strdup(address);
700         if (!b->address) {
701                 bus_free(b);
702                 return -ENOMEM;
703         }
704
705         r = bus_start_connect(b);
706         if (r < 0) {
707                 bus_free(b);
708                 return r;
709         }
710
711         *ret = b;
712         return 0;
713 }
714
715 int sd_bus_open_fd(int fd, sd_bus **ret) {
716         sd_bus *b;
717         int r;
718
719         if (fd < 0)
720                 return -EINVAL;
721         if (!ret)
722                 return -EINVAL;
723
724         b = bus_new();
725         if (!b)
726                 return -ENOMEM;
727
728         b->fd = fd;
729         fd_nonblock(b->fd, true);
730         fd_cloexec(b->fd, true);
731
732         r = bus_start_auth(b);
733         if (r < 0) {
734                 bus_free(b);
735                 return r;
736         }
737
738         *ret = b;
739         return 0;
740 }
741
742 void sd_bus_close(sd_bus *bus) {
743         if (!bus)
744                 return;
745         if (bus->fd < 0)
746                 return;
747
748         close_nointr_nofail(bus->fd);
749         bus->fd = -1;
750 }
751
752 sd_bus *sd_bus_ref(sd_bus *bus) {
753         if (!bus)
754                 return NULL;
755
756         assert(bus->n_ref > 0);
757
758         bus->n_ref++;
759         return bus;
760 }
761
762 sd_bus *sd_bus_unref(sd_bus *bus) {
763         if (!bus)
764                 return NULL;
765
766         assert(bus->n_ref > 0);
767         bus->n_ref--;
768
769         if (bus->n_ref <= 0)
770                 bus_free(bus);
771
772         return NULL;
773 }
774
775 int sd_bus_is_open(sd_bus *bus) {
776         if (!bus)
777                 return -EINVAL;
778
779         return bus->fd >= 0;
780 }
781
782 int sd_bus_can_send(sd_bus *bus, char type) {
783         int r;
784
785         if (!bus)
786                 return -EINVAL;
787
788         if (type == SD_BUS_TYPE_UNIX_FD) {
789                 r = ensure_running(bus);
790                 if (r < 0)
791                         return r;
792
793                 return bus->can_fds;
794         }
795
796         return bus_type_is_valid(type);
797 }
798
799 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
800         int r;
801
802         if (!bus)
803                 return -EINVAL;
804         if (!peer)
805                 return -EINVAL;
806
807         r = ensure_running(bus);
808         if (r < 0)
809                 return r;
810
811         *peer = bus->peer;
812         return 0;
813 }
814
815 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
816         assert(m);
817
818         if (m->header->version > b->message_version)
819                 return -EPERM;
820
821         if (m->sealed)
822                 return 0;
823
824         return bus_message_seal(m, ++b->serial);
825 }
826
827 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
828         struct msghdr mh;
829         struct iovec *iov;
830         ssize_t k;
831         size_t n;
832         unsigned j;
833
834         assert(bus);
835         assert(m);
836         assert(idx);
837         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
838
839         if (*idx >= m->size)
840                 return 0;
841
842         n = m->n_iovec * sizeof(struct iovec);
843         iov = alloca(n);
844         memcpy(iov, m->iovec, n);
845
846         j = 0;
847         iovec_advance(iov, &j, *idx);
848
849         zero(mh);
850         mh.msg_iov = iov;
851         mh.msg_iovlen = m->n_iovec;
852
853         k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
854         if (k < 0)
855                 return errno == EAGAIN ? 0 : -errno;
856
857         *idx += (size_t) k;
858         return 1;
859 }
860
861 static int message_read_need(sd_bus *bus, size_t *need) {
862         uint32_t a, b;
863         uint8_t e;
864         uint64_t sum;
865
866         assert(bus);
867         assert(need);
868         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
869
870         if (bus->rbuffer_size < sizeof(struct bus_header)) {
871                 *need = sizeof(struct bus_header) + 8;
872
873                 /* Minimum message size:
874                  *
875                  * Header +
876                  *
877                  *  Method Call: +2 string headers
878                  *       Signal: +3 string headers
879                  * Method Error: +1 string headers
880                  *               +1 uint32 headers
881                  * Method Reply: +1 uint32 headers
882                  *
883                  * A string header is at least 9 bytes
884                  * A uint32 header is at least 8 bytes
885                  *
886                  * Hence the minimum message size of a valid message
887                  * is header + 8 bytes */
888
889                 return 0;
890         }
891
892         a = ((const uint32_t*) bus->rbuffer)[1];
893         b = ((const uint32_t*) bus->rbuffer)[3];
894
895         e = ((const uint8_t*) bus->rbuffer)[0];
896         if (e == SD_BUS_LITTLE_ENDIAN) {
897                 a = le32toh(a);
898                 b = le32toh(b);
899         } else if (e == SD_BUS_BIG_ENDIAN) {
900                 a = be32toh(a);
901                 b = be32toh(b);
902         } else
903                 return -EBADMSG;
904
905         sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
906         if (sum >= BUS_MESSAGE_SIZE_MAX)
907                 return -ENOBUFS;
908
909         *need = (size_t) sum;
910         return 0;
911 }
912
913 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
914         sd_bus_message *t;
915         void *b = NULL;
916         int r;
917
918         assert(bus);
919         assert(m);
920         assert(bus->rbuffer_size >= size);
921         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
922
923         if (bus->rbuffer_size > size) {
924                 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
925                 if (!b) {
926                         free(t);
927                         return -ENOMEM;
928                 }
929         }
930
931         r = bus_message_from_malloc(bus->rbuffer, size, &t);
932         if (r < 0) {
933                 free(b);
934                 return r;
935         }
936
937         bus->rbuffer = b;
938         bus->rbuffer_size -= size;
939
940         *m = t;
941         return 1;
942 }
943
944 static int message_read(sd_bus *bus, sd_bus_message **m) {
945         struct msghdr mh;
946         struct iovec iov;
947         ssize_t k;
948         size_t need;
949         int r;
950         void *b;
951
952         assert(bus);
953         assert(m);
954         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
955
956         r = message_read_need(bus, &need);
957         if (r < 0)
958                 return r;
959
960         if (bus->rbuffer_size >= need)
961                 return message_make(bus, need, m);
962
963         b = realloc(bus->rbuffer, need);
964         if (!b)
965                 return -ENOMEM;
966
967         bus->rbuffer = b;
968
969         zero(iov);
970         iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
971         iov.iov_len = need - bus->rbuffer_size;
972
973         zero(mh);
974         mh.msg_iov = &iov;
975         mh.msg_iovlen = 1;
976
977         k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
978         if (k < 0)
979                 return errno == EAGAIN ? 0 : -errno;
980
981         bus->rbuffer_size += k;
982
983         r = message_read_need(bus, &need);
984         if (r < 0)
985                 return r;
986
987         if (bus->rbuffer_size >= need)
988                 return message_make(bus, need, m);
989
990         return 1;
991 }
992
993 static int dispatch_wqueue(sd_bus *bus) {
994         int r, ret = 0;
995
996         assert(bus);
997         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
998
999         if (bus->fd < 0)
1000                 return -ENOTCONN;
1001
1002         while (bus->wqueue_size > 0) {
1003
1004                 r = message_write(bus, bus->wqueue[0], &bus->windex);
1005                 if (r < 0) {
1006                         sd_bus_close(bus);
1007                         return r;
1008                 } else if (r == 0)
1009                         /* Didn't do anything this time */
1010                         return ret;
1011                 else if (bus->windex >= bus->wqueue[0]->size) {
1012                         /* Fully written. Let's drop the entry from
1013                          * the queue.
1014                          *
1015                          * This isn't particularly optimized, but
1016                          * well, this is supposed to be our worst-case
1017                          * buffer only, and the socket buffer is
1018                          * supposed to be our primary buffer, and if
1019                          * it got full, then all bets are off
1020                          * anyway. */
1021
1022                         sd_bus_message_unref(bus->wqueue[0]);
1023                         bus->wqueue_size --;
1024                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1025                         bus->windex = 0;
1026
1027                         ret = 1;
1028                 }
1029         }
1030
1031         return ret;
1032 }
1033
1034 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1035         sd_bus_message *z;
1036         int r, ret = 0;
1037
1038         assert(bus);
1039         assert(m);
1040         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1041
1042         if (bus->fd < 0)
1043                 return -ENOTCONN;
1044
1045         if (bus->rqueue_size > 0) {
1046                 /* Dispatch a queued message */
1047
1048                 *m = bus->rqueue[0];
1049                 bus->rqueue_size --;
1050                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1051                 return 1;
1052         }
1053
1054         /* Try to read a new message */
1055         do {
1056                 r = message_read(bus, &z);
1057                 if (r < 0) {
1058                         sd_bus_close(bus);
1059                         return r;
1060                 }
1061                 if (r == 0)
1062                         return ret;
1063
1064                 r = 1;
1065         } while (!z);
1066
1067         *m = z;
1068         return 1;
1069 }
1070
1071 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1072         int r;
1073
1074         if (!bus)
1075                 return -EINVAL;
1076         if (bus->fd < 0)
1077                 return -ENOTCONN;
1078         if (!m)
1079                 return -EINVAL;
1080
1081         /* If the serial number isn't kept, then we know that no reply
1082          * is expected */
1083         if (!serial && !m->sealed)
1084                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1085
1086         r = bus_seal_message(bus, m);
1087         if (r < 0)
1088                 return r;
1089
1090         /* If this is a reply and no reply was requested, then let's
1091          * suppress this, if we can */
1092         if (m->dont_send && !serial)
1093                 return 0;
1094
1095         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1096                 size_t idx = 0;
1097
1098                 r = message_write(bus, m, &idx);
1099                 if (r < 0) {
1100                         sd_bus_close(bus);
1101                         return r;
1102                 } else if (idx < m->size)  {
1103                         /* Wasn't fully written. So let's remember how
1104                          * much was written. Note that the first entry
1105                          * of the wqueue array is always allocated so
1106                          * that we always can remember how much was
1107                          * written. */
1108                         bus->wqueue[0] = sd_bus_message_ref(m);
1109                         bus->wqueue_size = 1;
1110                         bus->windex = idx;
1111                 }
1112         } else {
1113                 sd_bus_message **q;
1114
1115                 /* Just append it to the queue. */
1116
1117                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1118                         return -ENOBUFS;
1119
1120                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1121                 if (!q)
1122                         return -ENOMEM;
1123
1124                 bus->wqueue = q;
1125                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1126         }
1127
1128         if (serial)
1129                 *serial = BUS_MESSAGE_SERIAL(m);
1130
1131         return 0;
1132 }
1133
1134 static usec_t calc_elapse(uint64_t usec) {
1135         if (usec == (uint64_t) -1)
1136                 return 0;
1137
1138         if (usec == 0)
1139                 usec = BUS_DEFAULT_TIMEOUT;
1140
1141         return now(CLOCK_MONOTONIC) + usec;
1142 }
1143
1144 static int timeout_compare(const void *a, const void *b) {
1145         const struct reply_callback *x = a, *y = b;
1146
1147         if (x->timeout != 0 && y->timeout == 0)
1148                 return -1;
1149
1150         if (x->timeout == 0 && y->timeout != 0)
1151                 return 1;
1152
1153         if (x->timeout < y->timeout)
1154                 return -1;
1155
1156         if (x->timeout > y->timeout)
1157                 return 1;
1158
1159         return 0;
1160 }
1161
1162 int sd_bus_send_with_reply(
1163                 sd_bus *bus,
1164                 sd_bus_message *m,
1165                 sd_message_handler_t callback,
1166                 void *userdata,
1167                 uint64_t usec,
1168                 uint64_t *serial) {
1169
1170         struct reply_callback *c;
1171         int r;
1172
1173         if (!bus)
1174                 return -EINVAL;
1175         if (bus->fd < 0)
1176                 return -ENOTCONN;
1177         if (!m)
1178                 return -EINVAL;
1179         if (!callback)
1180                 return -EINVAL;
1181         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1182                 return -EINVAL;
1183         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1184                 return -EINVAL;
1185
1186         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1187         if (r < 0)
1188                 return r;
1189
1190         if (usec != (uint64_t) -1) {
1191                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1192                 if (r < 0)
1193                         return r;
1194         }
1195
1196         r = bus_seal_message(bus, m);
1197         if (r < 0)
1198                 return r;
1199
1200         c = new(struct reply_callback, 1);
1201         if (!c)
1202                 return -ENOMEM;
1203
1204         c->callback = callback;
1205         c->userdata = userdata;
1206         c->serial = BUS_MESSAGE_SERIAL(m);
1207         c->timeout = calc_elapse(usec);
1208
1209         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1210         if (r < 0) {
1211                 free(c);
1212                 return r;
1213         }
1214
1215         if (c->timeout != 0) {
1216                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1217                 if (r < 0) {
1218                         c->timeout = 0;
1219                         sd_bus_send_with_reply_cancel(bus, c->serial);
1220                         return r;
1221                 }
1222         }
1223
1224         r = sd_bus_send(bus, m, serial);
1225         if (r < 0) {
1226                 sd_bus_send_with_reply_cancel(bus, c->serial);
1227                 return r;
1228         }
1229
1230         return r;
1231 }
1232
1233 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1234         struct reply_callback *c;
1235
1236         if (!bus)
1237                 return -EINVAL;
1238         if (serial == 0)
1239                 return -EINVAL;
1240
1241         c = hashmap_remove(bus->reply_callbacks, &serial);
1242         if (!c)
1243                 return 0;
1244
1245         if (c->timeout != 0)
1246                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1247
1248         free(c);
1249         return 1;
1250 }
1251
1252 static int ensure_running(sd_bus *bus) {
1253         int r;
1254
1255         assert(bus);
1256
1257         if (bus->state == BUS_RUNNING)
1258                 return 1;
1259
1260         for (;;) {
1261                 r = sd_bus_process(bus, NULL);
1262                 if (r < 0)
1263                         return r;
1264                 if (bus->state == BUS_RUNNING)
1265                         return 1;
1266                 if (r > 0)
1267                         continue;
1268
1269                 r = sd_bus_wait(bus, (uint64_t) -1);
1270                 if (r < 0)
1271                         return r;
1272         }
1273 }
1274
1275 int sd_bus_send_with_reply_and_block(
1276                 sd_bus *bus,
1277                 sd_bus_message *m,
1278                 uint64_t usec,
1279                 sd_bus_error *error,
1280                 sd_bus_message **reply) {
1281
1282         int r;
1283         usec_t timeout;
1284         uint64_t serial;
1285         bool room = false;
1286
1287         if (!bus)
1288                 return -EINVAL;
1289         if (bus->fd < 0)
1290                 return -ENOTCONN;
1291         if (!m)
1292                 return -EINVAL;
1293         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1294                 return -EINVAL;
1295         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1296                 return -EINVAL;
1297         if (bus_error_is_dirty(error))
1298                 return -EINVAL;
1299
1300         r = ensure_running(bus);
1301         if (r < 0)
1302                 return r;
1303
1304         r = sd_bus_send(bus, m, &serial);
1305         if (r < 0)
1306                 return r;
1307
1308         timeout = calc_elapse(usec);
1309
1310         for (;;) {
1311                 usec_t left;
1312                 sd_bus_message *incoming = NULL;
1313
1314                 if (!room) {
1315                         sd_bus_message **q;
1316
1317                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1318                                 return -ENOBUFS;
1319
1320                         /* Make sure there's room for queuing this
1321                          * locally, before we read the message */
1322
1323                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1324                         if (!q)
1325                                 return -ENOMEM;
1326
1327                         bus->rqueue = q;
1328                         room = true;
1329                 }
1330
1331                 r = message_read(bus, &incoming);
1332                 if (r < 0)
1333                         return r;
1334                 if (incoming) {
1335
1336                         if (incoming->reply_serial == serial) {
1337                                 /* Found a match! */
1338
1339                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1340                                         *reply = incoming;
1341                                         return 0;
1342                                 }
1343
1344                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1345                                         int k;
1346
1347                                         r = sd_bus_error_copy(error, &incoming->error);
1348                                         if (r < 0) {
1349                                                 sd_bus_message_unref(incoming);
1350                                                 return r;
1351                                         }
1352
1353                                         k = bus_error_to_errno(&incoming->error);
1354                                         sd_bus_message_unref(incoming);
1355                                         return k;
1356                                 }
1357
1358                                 sd_bus_message_unref(incoming);
1359                                 return -EIO;
1360                         }
1361
1362                         /* There's already guaranteed to be room for
1363                          * this, so need to resize things here */
1364                         bus->rqueue[bus->rqueue_size ++] = incoming;
1365                         room = false;
1366
1367                         /* Try to read more, right-away */
1368                         continue;
1369                 }
1370                 if (r != 0)
1371                         continue;
1372
1373                 if (timeout > 0) {
1374                         usec_t n;
1375
1376                         n = now(CLOCK_MONOTONIC);
1377                         if (n >= timeout)
1378                                 return -ETIMEDOUT;
1379
1380                         left = timeout - n;
1381                 } else
1382                         left = (uint64_t) -1;
1383
1384                 r = bus_poll(bus, true, left);
1385                 if (r < 0)
1386                         return r;
1387
1388                 r = dispatch_wqueue(bus);
1389                 if (r < 0)
1390                         return r;
1391         }
1392 }
1393
1394 int sd_bus_get_fd(sd_bus *bus) {
1395         if (!bus)
1396                 return -EINVAL;
1397
1398         if (bus->fd < 0)
1399                 return -ENOTCONN;
1400
1401         return bus->fd;
1402 }
1403
1404 int sd_bus_get_events(sd_bus *bus) {
1405         int flags = 0;
1406
1407         if (!bus)
1408                 return -EINVAL;
1409         if (bus->fd < 0)
1410                 return -ENOTCONN;
1411
1412         if (bus->state == BUS_OPENING)
1413                 flags |= POLLOUT;
1414         else if (bus->state == BUS_AUTHENTICATING) {
1415
1416                 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1417                         flags |= POLLOUT;
1418
1419                 flags |= POLLIN;
1420
1421         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1422                 if (bus->rqueue_size <= 0)
1423                         flags |= POLLIN;
1424                 if (bus->wqueue_size > 0)
1425                         flags |= POLLOUT;
1426         }
1427
1428         return flags;
1429 }
1430
1431 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1432         struct reply_callback *c;
1433
1434         if (!bus)
1435                 return -EINVAL;
1436         if (!timeout_usec)
1437                 return -EINVAL;
1438         if (bus->fd < 0)
1439                 return -ENOTCONN;
1440
1441         if (bus->state == BUS_AUTHENTICATING) {
1442                 *timeout_usec = bus->auth_timeout;
1443                 return 1;
1444         }
1445
1446         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1447                 return 0;
1448
1449         c = prioq_peek(bus->reply_callbacks_prioq);
1450         if (!c)
1451                 return 0;
1452
1453         *timeout_usec = c->timeout;
1454         return 1;
1455 }
1456
1457 static int process_timeout(sd_bus *bus) {
1458         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1459         struct reply_callback *c;
1460         usec_t n;
1461         int r;
1462
1463         assert(bus);
1464
1465         c = prioq_peek(bus->reply_callbacks_prioq);
1466         if (!c)
1467                 return 0;
1468
1469         n = now(CLOCK_MONOTONIC);
1470         if (c->timeout > n)
1471                 return 0;
1472
1473         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1474         hashmap_remove(bus->reply_callbacks, &c->serial);
1475
1476         r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1477         free(c);
1478
1479         return r < 0 ? r : 1;
1480 }
1481
1482 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1483         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1484         int r;
1485
1486         assert(bus);
1487         assert(m);
1488
1489         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1490                 return 0;
1491
1492         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1493                 return 0;
1494
1495         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1496                 return 1;
1497
1498         if (streq_ptr(m->member, "Ping"))
1499                 r = sd_bus_message_new_method_return(bus, m, &reply);
1500         else if (streq_ptr(m->member, "GetMachineId")) {
1501                 sd_id128_t id;
1502                 char sid[33];
1503
1504                 r = sd_id128_get_machine(&id);
1505                 if (r < 0)
1506                         return r;
1507
1508                 r = sd_bus_message_new_method_return(bus, m, &reply);
1509                 if (r < 0)
1510                         return r;
1511
1512                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1513         } else {
1514                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1515
1516                 sd_bus_error_set(&error,
1517                                  "org.freedesktop.DBus.Error.UnknownMethod",
1518                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1519
1520                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1521         }
1522
1523         if (r < 0)
1524                 return r;
1525
1526         r = sd_bus_send(bus, reply, NULL);
1527         if (r < 0)
1528                 return r;
1529
1530         return 1;
1531 }
1532
1533 static int process_message(sd_bus *bus, sd_bus_message *m) {
1534         struct filter_callback *l;
1535         int r;
1536
1537         assert(bus);
1538         assert(m);
1539
1540         if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1541                 struct reply_callback *c;
1542
1543                 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1544                 if (c) {
1545                         if (c->timeout != 0)
1546                                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1547
1548                         r = c->callback(bus, 0, m, c->userdata);
1549                         free(c);
1550
1551                         if (r != 0)
1552                                 return r;
1553                 }
1554         }
1555
1556         LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1557                 r = l->callback(bus, 0, m, l->userdata);
1558                 if (r != 0)
1559                         return r;
1560         }
1561
1562         return process_builtin(bus, m);
1563 }
1564
1565 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1566         int r;
1567
1568         /* Returns 0 when we didn't do anything. This should cause the
1569          * caller to invoke sd_bus_wait() before returning the next
1570          * time. Returns > 0 when we did something, which possibly
1571          * means *ret is filled in with an unprocessed message. */
1572
1573         if (!bus)
1574                 return -EINVAL;
1575         if (bus->fd < 0)
1576                 return -ENOTCONN;
1577
1578         if (bus->state == BUS_OPENING) {
1579                 struct pollfd p;
1580
1581                 zero(p);
1582                 p.fd = bus->fd;
1583                 p.events = POLLOUT;
1584
1585                 r = poll(&p, 1, 0);
1586                 if (r < 0)
1587                         return -errno;
1588
1589                 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1590                         int error = 0;
1591                         socklen_t slen = sizeof(error);
1592
1593                         r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1594                         if (r < 0)
1595                                 bus->last_connect_error = errno;
1596                         else if (error != 0)
1597                                 bus->last_connect_error = error;
1598                         else if (p.revents & (POLLERR|POLLHUP))
1599                                 bus->last_connect_error = ECONNREFUSED;
1600                         else {
1601                                 r = bus_start_auth(bus);
1602                                 goto null_message;
1603                         }
1604
1605                         /* Try next address */
1606                         r = bus_start_connect(bus);
1607                         goto null_message;
1608                 }
1609
1610                 r = 0;
1611                 goto null_message;
1612
1613         } else if (bus->state == BUS_AUTHENTICATING) {
1614
1615                 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1616                         return -ETIMEDOUT;
1617
1618                 r = bus_write_auth(bus);
1619                 if (r != 0)
1620                         goto null_message;
1621
1622                 r = bus_read_auth(bus);
1623                 goto null_message;
1624
1625         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1626                 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1627                 int k;
1628
1629                 r = process_timeout(bus);
1630                 if (r != 0)
1631                         goto null_message;
1632
1633                 r = dispatch_wqueue(bus);
1634                 if (r != 0)
1635                         goto null_message;
1636
1637                 k = r;
1638                 r = dispatch_rqueue(bus, &m);
1639                 if (r < 0)
1640                         return r;
1641                 if (!m) {
1642                         if (r == 0)
1643                                 r = k;
1644                         goto null_message;
1645                 }
1646
1647                 r = process_message(bus, m);
1648                 if (r != 0)
1649                         goto null_message;
1650
1651                 if (ret) {
1652                         *ret = m;
1653                         m = NULL;
1654                         return 1;
1655                 }
1656
1657                 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1658                         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1659                         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1660
1661                         sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1662
1663                         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1664                         if (r < 0)
1665                                 return r;
1666
1667                         r = sd_bus_send(bus, reply, NULL);
1668                         if (r < 0)
1669                                 return r;
1670                 }
1671
1672                 return 1;
1673         }
1674
1675         assert_not_reached("Unknown state");
1676
1677 null_message:
1678         if (r >= 0 && ret)
1679                 *ret = NULL;
1680
1681         return r;
1682 }
1683
1684 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1685         struct pollfd p;
1686         int r, e;
1687         struct timespec ts;
1688         usec_t until, m;
1689
1690         assert(bus);
1691
1692         if (bus->fd < 0)
1693                 return -ENOTCONN;
1694
1695         e = sd_bus_get_events(bus);
1696         if (e < 0)
1697                 return e;
1698
1699         if (need_more)
1700                 e |= POLLIN;
1701
1702         r = sd_bus_get_timeout(bus, &until);
1703         if (r < 0)
1704                 return r;
1705         if (r == 0)
1706                 m = (uint64_t) -1;
1707         else {
1708                 usec_t n;
1709                 n = now(CLOCK_MONOTONIC);
1710                 m = until > n ? until - n : 0;
1711         }
1712
1713         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1714                 m = timeout_usec;
1715
1716         zero(p);
1717         p.fd = bus->fd;
1718         p.events = e;
1719
1720         r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1721         if (r < 0)
1722                 return -errno;
1723
1724         return r > 0 ? 1 : 0;
1725 }
1726
1727 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1728
1729         if (!bus)
1730                 return -EINVAL;
1731         if (bus->fd < 0)
1732                 return -ENOTCONN;
1733         if (bus->rqueue_size > 0)
1734                 return 0;
1735
1736         return bus_poll(bus, false, timeout_usec);
1737 }
1738
1739 int sd_bus_flush(sd_bus *bus) {
1740         int r;
1741
1742         if (!bus)
1743                 return -EINVAL;
1744         if (bus->fd < 0)
1745                 return -ENOTCONN;
1746
1747         r = ensure_running(bus);
1748         if (r < 0)
1749                 return r;
1750
1751         if (bus->wqueue_size <= 0)
1752                 return 0;
1753
1754         for (;;) {
1755                 r = dispatch_wqueue(bus);
1756                 if (r < 0)
1757                         return r;
1758
1759                 if (bus->wqueue_size <= 0)
1760                         return 0;
1761
1762                 r = bus_poll(bus, false, (uint64_t) -1);
1763                 if (r < 0)
1764                         return r;
1765         }
1766 }
1767
1768 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1769         struct filter_callback *f;
1770
1771         if (!bus)
1772                 return -EINVAL;
1773         if (!callback)
1774                 return -EINVAL;
1775
1776         f = new(struct filter_callback, 1);
1777         if (!f)
1778                 return -ENOMEM;
1779         f->callback = callback;
1780         f->userdata = userdata;
1781
1782         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1783         return 0;
1784 }
1785
1786 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1787         struct filter_callback *f;
1788
1789         if (!bus)
1790                 return -EINVAL;
1791         if (!callback)
1792                 return -EINVAL;
1793
1794         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1795                 if (f->callback == callback && f->userdata == userdata) {
1796                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1797                         free(f);
1798                         return 1;
1799                 }
1800         }
1801
1802         return 0;
1803 }