chiark / gitweb /
bus: implicitly handle peer commands Ping() and GetMachineId()
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32
33 #include "sd-bus.h"
34 #include "bus-internal.h"
35 #include "bus-message.h"
36 #include "bus-type.h"
37
38 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
39
40 static void bus_free(sd_bus *b) {
41         struct filter_callback *f;
42         unsigned i;
43
44         assert(b);
45
46         if (b->fd >= 0)
47                 close_nointr_nofail(b->fd);
48
49         free(b->rbuffer);
50         free(b->unique_name);
51         free(b->auth_uid);
52         free(b->address);
53
54         for (i = 0; i < b->rqueue_size; i++)
55                 sd_bus_message_unref(b->rqueue[i]);
56         free(b->rqueue);
57
58         for (i = 0; i < b->wqueue_size; i++)
59                 sd_bus_message_unref(b->wqueue[i]);
60         free(b->wqueue);
61
62         hashmap_free_free(b->reply_callbacks);
63         prioq_free(b->reply_callbacks_prioq);
64
65         while ((f = b->filter_callbacks)) {
66                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
67                 free(f);
68         }
69
70         free(b);
71 }
72
73 static sd_bus* bus_new(void) {
74         sd_bus *r;
75
76         r = new0(sd_bus, 1);
77         if (!r)
78                 return NULL;
79
80         r->n_ref = 1;
81         r->fd = -1;
82         r->message_version = 1;
83
84         /* We guarantee that wqueue always has space for at least one
85          * entry */
86         r->wqueue = new(sd_bus_message*, 1);
87         if (!r->wqueue) {
88                 free(r);
89                 return NULL;
90         }
91
92         return r;
93 };
94
95 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
96         const char *s;
97         int r;
98
99         assert(bus);
100
101         if (error != 0)
102                 return -error;
103
104         assert(reply);
105
106         bus->state = BUS_RUNNING;
107
108         r = sd_bus_message_read(reply, "s", &s);
109         if (r < 0)
110                 return r;
111
112         bus->unique_name = strdup(s);
113         if (!bus->unique_name)
114                 return -ENOMEM;
115
116         return 1;
117 }
118
119 static int bus_send_hello(sd_bus *bus) {
120         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
121         int r;
122
123         assert(bus);
124
125         r = sd_bus_message_new_method_call(
126                         bus,
127                         "org.freedesktop.DBus",
128                         "/",
129                         "org.freedesktop.DBus",
130                         "Hello",
131                         &m);
132         if (r < 0)
133                 return r;
134
135         r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
136         if (r < 0)
137                 return r;
138
139         bus->sent_hello = true;
140         return r;
141 }
142
143 static int bus_start_running(sd_bus *bus) {
144         assert(bus);
145
146         if (bus->sent_hello) {
147                 bus->state = BUS_HELLO;
148                 return 1;
149         }
150
151         bus->state = BUS_RUNNING;
152         return 1;
153 }
154
155 static int parse_address_key(const char **p, const char *key, char **value) {
156         size_t l, n = 0;
157         const char *a;
158         char *r = NULL;
159
160         assert(p);
161         assert(*p);
162         assert(key);
163         assert(value);
164
165         l = strlen(key);
166         if (strncmp(*p, key, l) != 0)
167                 return 0;
168
169         if ((*p)[l] != '=')
170                 return 0;
171
172         if (*value)
173                 return -EINVAL;
174
175         a = *p + l + 1;
176         while (*a != ',' && *a != 0) {
177                 char c, *t;
178
179                 if (*a == '%') {
180                         int x, y;
181
182                         x = unhexchar(a[1]);
183                         if (x < 0) {
184                                 free(r);
185                                 return x;
186                         }
187
188                         y = unhexchar(a[2]);
189                         if (y < 0) {
190                                 free(r);
191                                 return y;
192                         }
193
194                         c = (char) ((x << 4) | y);
195                         a += 3;
196                 } else {
197                         c = *a;
198                         a++;
199                 }
200
201                 t = realloc(r, n + 2);
202                 if (!t) {
203                         free(r);
204                         return -ENOMEM;
205                 }
206
207                 r = t;
208                 r[n++] = c;
209         }
210
211         if (!r) {
212                 r = strdup("");
213                 if (!r)
214                         return -ENOMEM;
215         } else
216                 r[n] = 0;
217
218         if (*a == ',')
219                 a++;
220
221         *p = a;
222         *value = r;
223         return 1;
224 }
225
226 static void skip_address_key(const char **p) {
227         assert(p);
228         assert(*p);
229
230         *p += strcspn(*p, ",");
231
232         if (**p == ',')
233                 (*p) ++;
234 }
235
236 static int bus_parse_next_address(sd_bus *b) {
237         const char *a, *p;
238         _cleanup_free_ char *guid = NULL;
239         int r;
240
241         assert(b);
242
243         if (!b->address)
244                 return 0;
245         if (b->address[b->address_index] == 0)
246                 return 0;
247
248         a = b->address + b->address_index;
249
250         zero(b->sockaddr);
251         b->sockaddr_size = 0;
252         b->peer = SD_ID128_NULL;
253
254         if (startswith(a, "unix:")) {
255                 _cleanup_free_ char *path = NULL, *abstract = NULL;
256
257                 p = a + 5;
258                 while (*p != 0) {
259                         r = parse_address_key(&p, "guid", &guid);
260                         if (r < 0)
261                                 return r;
262                         else if (r > 0)
263                                 continue;
264
265                         r = parse_address_key(&p, "path", &path);
266                         if (r < 0)
267                                 return r;
268                         else if (r > 0)
269                                 continue;
270
271                         r = parse_address_key(&p, "abstract", &abstract);
272                         if (r < 0)
273                                 return r;
274                         else if (r > 0)
275                                 continue;
276
277                         skip_address_key(&p);
278                 }
279
280                 if (!path && !abstract)
281                         return -EINVAL;
282
283                 if (path && abstract)
284                         return -EINVAL;
285
286                 if (path) {
287                         size_t l;
288
289                         l = strlen(path);
290                         if (l > sizeof(b->sockaddr.un.sun_path))
291                                 return -E2BIG;
292
293                         b->sockaddr.un.sun_family = AF_UNIX;
294                         strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
295                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
296                 } else if (abstract) {
297                         size_t l;
298
299                         l = strlen(abstract);
300                         if (l > sizeof(b->sockaddr.un.sun_path) - 1)
301                                 return -E2BIG;
302
303                         b->sockaddr.un.sun_family = AF_UNIX;
304                         b->sockaddr.un.sun_path[0] = 0;
305                         strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
306                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
307                 }
308
309         } else if (startswith(a, "tcp:")) {
310                 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
311                 struct addrinfo hints, *result;
312
313                 p = a + 4;
314                 while (*p != 0) {
315                         r = parse_address_key(&p, "guid", &guid);
316                         if (r < 0)
317                                 return r;
318                         else if (r > 0)
319                                 continue;
320
321                         r = parse_address_key(&p, "host", &host);
322                         if (r < 0)
323                                 return r;
324                         else if (r > 0)
325                                 continue;
326
327                         r = parse_address_key(&p, "port", &port);
328                         if (r < 0)
329                                 return r;
330                         else if (r > 0)
331                                 continue;
332
333                         r = parse_address_key(&p, "family", &family);
334                         if (r < 0)
335                                 return r;
336                         else if (r > 0)
337                                 continue;
338
339                         skip_address_key(&p);
340                 }
341
342                 if (!host || !port)
343                         return -EINVAL;
344
345                 zero(hints);
346                 hints.ai_socktype = SOCK_STREAM;
347                 hints.ai_flags = AI_ADDRCONFIG;
348
349                 if (family) {
350                         if (streq(family, "ipv4"))
351                                 hints.ai_family = AF_INET;
352                         else if (streq(family, "ipv6"))
353                                 hints.ai_family = AF_INET6;
354                         else
355                                 return -EINVAL;
356                 }
357
358                 r = getaddrinfo(host, port, &hints, &result);
359                 if (r == EAI_SYSTEM)
360                         return -errno;
361                 else if (r != 0)
362                         return -EADDRNOTAVAIL;
363
364                 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
365                 b->sockaddr_size = result->ai_addrlen;
366
367                 freeaddrinfo(result);
368         }
369
370         if (guid) {
371                 r = sd_id128_from_string(guid, &b->peer);
372                 if (r < 0)
373                         return r;
374         }
375
376         b->address_index = p - b->address;
377         return 1;
378 }
379
380 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
381
382         while (size > 0) {
383                 struct iovec *i = iov + *idx;
384
385                 if (i->iov_len > size) {
386                         i->iov_base = (uint8_t*) i->iov_base + size;
387                         i->iov_len -= size;
388                         return;
389                 }
390
391                 size -= i->iov_len;
392
393                 i->iov_base = NULL;
394                 i->iov_len = 0;
395
396                 (*idx) ++;
397         }
398 }
399
400 static int bus_write_auth(sd_bus *b) {
401         struct msghdr mh;
402         ssize_t k;
403
404         assert(b);
405         assert(b->state == BUS_AUTHENTICATING);
406
407         if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
408                 return 0;
409
410         if (b->auth_timeout == 0)
411                 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
412
413         zero(mh);
414         mh.msg_iov = b->auth_iovec + b->auth_index;
415         mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
416
417         k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
418         if (k < 0)
419                 return errno == EAGAIN ? 0 : -errno;
420
421         iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
422
423         return 1;
424 }
425
426 static int bus_auth_verify(sd_bus *b) {
427         char *e, *f;
428         sd_id128_t peer;
429         unsigned i;
430         int r;
431
432         /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
433          * that's it */
434
435         e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
436         if (!e)
437                 return 0;
438
439         f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
440         if (!f)
441                 return 0;
442
443         if (e - (char*) b->rbuffer != 3 + 32)
444                 return -EPERM;
445
446         if (memcmp(b->rbuffer, "OK ", 3))
447                 return -EPERM;
448
449         for (i = 0; i < 32; i += 2) {
450                 int x, y;
451
452                 x = unhexchar(((char*) b->rbuffer)[3 + i]);
453                 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
454
455                 if (x < 0 || y < 0)
456                         return -EINVAL;
457
458                 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
459         }
460
461         if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
462             !sd_id128_equal(b->peer, peer))
463                 return -EPERM;
464
465         b->peer = peer;
466
467         b->can_fds =
468                 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
469                 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
470
471         b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
472         memmove(b->rbuffer, f + 2, b->rbuffer_size);
473
474         r = bus_start_running(b);
475         if (r < 0)
476                 return r;
477
478         return 1;
479 }
480
481 static int bus_read_auth(sd_bus *b) {
482         struct msghdr mh;
483         struct iovec iov;
484         size_t n;
485         ssize_t k;
486         int r;
487         void *p;
488
489         assert(b);
490
491         r = bus_auth_verify(b);
492         if (r != 0)
493                 return r;
494
495         n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
496
497         if (n > BUS_AUTH_SIZE_MAX)
498                 n = BUS_AUTH_SIZE_MAX;
499
500         if (b->rbuffer_size >= n)
501                 return -ENOBUFS;
502
503         p = realloc(b->rbuffer, n);
504         if (!p)
505                 return -ENOMEM;
506
507         b->rbuffer = p;
508
509         zero(iov);
510         iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
511         iov.iov_len = n - b->rbuffer_size;
512
513         zero(mh);
514         mh.msg_iov = &iov;
515         mh.msg_iovlen = 1;
516
517         k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
518         if (k < 0)
519                 return errno == EAGAIN ? 0 : -errno;
520
521         b->rbuffer_size += k;
522
523         r = bus_auth_verify(b);
524         if (r != 0)
525                 return r;
526
527         return 1;
528 }
529
530 static int bus_start_auth(sd_bus *b) {
531         static const char auth_prefix[] = "\0AUTH EXTERNAL ";
532         static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
533
534         char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
535         size_t l;
536
537         assert(b);
538
539         b->state = BUS_AUTHENTICATING;
540
541         snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
542         char_array_0(text);
543
544         l = strlen(text);
545         b->auth_uid = hexmem(text, l);
546         if (!b->auth_uid)
547                 return -ENOMEM;
548
549         b->auth_iovec[0].iov_base = (void*) auth_prefix;
550         b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
551         b->auth_iovec[1].iov_base = (void*) b->auth_uid;
552         b->auth_iovec[1].iov_len = l * 2;
553         b->auth_iovec[2].iov_base = (void*) auth_suffix;
554         b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
555         b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
556
557         return bus_write_auth(b);
558 }
559
560 static int bus_start_connect(sd_bus *b) {
561         int r;
562
563         assert(b);
564         assert(b->fd < 0);
565
566         for (;;) {
567                 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
568                         r = bus_parse_next_address(b);
569                         if (r < 0)
570                                 return r;
571                         if (r == 0)
572                                 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
573                 }
574
575                 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
576                 if (b->fd < 0) {
577                         b->last_connect_error = errno;
578                         zero(b->sockaddr);
579                         continue;
580                 }
581
582                 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
583                 if (r < 0) {
584                         if (errno == EINPROGRESS)
585                                 return 1;
586
587                         b->last_connect_error = errno;
588                         close_nointr_nofail(b->fd);
589                         b->fd = -1;
590                         zero(b->sockaddr);
591                         continue;
592                 }
593
594                 return bus_start_auth(b);
595         }
596 }
597
598 int sd_bus_open_system(sd_bus **ret) {
599         const char *e;
600         sd_bus *b;
601         int r;
602
603         if (!ret)
604                 return -EINVAL;
605
606         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
607         if (e) {
608                 r = sd_bus_open_address(e, &b);
609                 if (r < 0)
610                         return r;
611         } else {
612                 b = bus_new();
613                 if (!b)
614                         return -ENOMEM;
615
616                 b->sockaddr.un.sun_family = AF_UNIX;
617                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
618                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
619
620                 r = bus_start_connect(b);
621                 if (r < 0) {
622                         bus_free(b);
623                         return r;
624                 }
625         }
626
627         r = bus_send_hello(b);
628         if (r < 0) {
629                 sd_bus_unref(b);
630                 return r;
631         }
632
633         *ret = b;
634         return 0;
635 }
636
637 int sd_bus_open_user(sd_bus **ret) {
638         const char *e;
639         sd_bus *b;
640         size_t l;
641         int r;
642
643         if (!ret)
644                 return -EINVAL;
645
646         e = getenv("DBUS_SESSION_BUS_ADDRESS");
647         if (e) {
648                 r = sd_bus_open_address(e, &b);
649                 if (r < 0)
650                         return r;
651         } else {
652                 e = getenv("XDG_RUNTIME_DIR");
653                 if (!e)
654                         return -ENOENT;
655
656                 l = strlen(e);
657                 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
658                         return -E2BIG;
659
660                 b = bus_new();
661                 if (!b)
662                         return -ENOMEM;
663
664                 b->sockaddr.un.sun_family = AF_UNIX;
665                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
666                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
667
668                 r = bus_start_connect(b);
669                 if (r < 0) {
670                         bus_free(b);
671                         return r;
672                 }
673         }
674
675         r = bus_send_hello(b);
676         if (r < 0) {
677                 sd_bus_unref(b);
678                 return r;
679         }
680
681         *ret = b;
682         return 0;
683 }
684
685 int sd_bus_open_address(const char *address, sd_bus **ret) {
686         sd_bus *b;
687         int r;
688
689         if (!address)
690                 return -EINVAL;
691         if (!ret)
692                 return -EINVAL;
693
694         b = bus_new();
695         if (!b)
696                 return -ENOMEM;
697
698         b->address = strdup(address);
699         if (!b->address) {
700                 bus_free(b);
701                 return -ENOMEM;
702         }
703
704         r = bus_start_connect(b);
705         if (r < 0) {
706                 bus_free(b);
707                 return r;
708         }
709
710         *ret = b;
711         return 0;
712 }
713
714 int sd_bus_open_fd(int fd, sd_bus **ret) {
715         sd_bus *b;
716         int r;
717
718         if (fd < 0)
719                 return -EINVAL;
720         if (!ret)
721                 return -EINVAL;
722
723         b = bus_new();
724         if (!b)
725                 return -ENOMEM;
726
727         b->fd = fd;
728         fd_nonblock(b->fd, true);
729         fd_cloexec(b->fd, true);
730
731         r = bus_start_auth(b);
732         if (r < 0) {
733                 bus_free(b);
734                 return r;
735         }
736
737         *ret = b;
738         return 0;
739 }
740
741 void sd_bus_close(sd_bus *bus) {
742         if (!bus)
743                 return;
744         if (bus->fd < 0)
745                 return;
746
747         close_nointr_nofail(bus->fd);
748         bus->fd = -1;
749 }
750
751 sd_bus *sd_bus_ref(sd_bus *bus) {
752         if (!bus)
753                 return NULL;
754
755         assert(bus->n_ref > 0);
756
757         bus->n_ref++;
758         return bus;
759 }
760
761 sd_bus *sd_bus_unref(sd_bus *bus) {
762         if (!bus)
763                 return NULL;
764
765         assert(bus->n_ref > 0);
766         bus->n_ref--;
767
768         if (bus->n_ref <= 0)
769                 bus_free(bus);
770
771         return NULL;
772 }
773
774 int sd_bus_is_open(sd_bus *bus) {
775         if (!bus)
776                 return -EINVAL;
777
778         return bus->fd >= 0;
779 }
780
781 int sd_bus_is_running(sd_bus *bus) {
782         if (!bus)
783                 return -EINVAL;
784
785         if (bus->fd < 0)
786                 return -ENOTCONN;
787
788         return bus->state == BUS_RUNNING;
789 }
790
791 int sd_bus_can_send(sd_bus *bus, char type) {
792
793         if (!bus)
794                 return -EINVAL;
795         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
796                 return -EAGAIN;
797
798         if (type == SD_BUS_TYPE_UNIX_FD)
799                 return bus->can_fds;
800
801         return bus_type_is_valid(type);
802 }
803
804 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
805         assert(m);
806
807         if (m->header->version > b->message_version)
808                 return -EPERM;
809
810         if (m->sealed)
811                 return 0;
812
813         return bus_message_seal(m, ++b->serial);
814 }
815
816 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
817         struct msghdr mh;
818         struct iovec *iov;
819         ssize_t k;
820         size_t n;
821         unsigned j;
822
823         assert(bus);
824         assert(m);
825         assert(idx);
826         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
827
828         if (*idx >= m->size)
829                 return 0;
830
831         n = m->n_iovec * sizeof(struct iovec);
832         iov = alloca(n);
833         memcpy(iov, m->iovec, n);
834
835         j = 0;
836         iovec_advance(iov, &j, *idx);
837
838         zero(mh);
839         mh.msg_iov = iov;
840         mh.msg_iovlen = m->n_iovec;
841
842         k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
843         if (k < 0)
844                 return errno == EAGAIN ? 0 : -errno;
845
846         *idx += (size_t) k;
847         return 1;
848 }
849
850 static int message_read_need(sd_bus *bus, size_t *need) {
851         uint32_t a, b;
852         uint8_t e;
853         uint64_t sum;
854
855         assert(bus);
856         assert(need);
857         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
858
859         if (bus->rbuffer_size < sizeof(struct bus_header)) {
860                 *need = sizeof(struct bus_header) + 8;
861
862                 /* Minimum message size:
863                  *
864                  * Header +
865                  *
866                  *  Method Call: +2 string headers
867                  *       Signal: +3 string headers
868                  * Method Error: +1 string headers
869                  *               +1 uint32 headers
870                  * Method Reply: +1 uint32 headers
871                  *
872                  * A string header is at least 9 bytes
873                  * A uint32 header is at least 8 bytes
874                  *
875                  * Hence the minimum message size of a valid message
876                  * is header + 8 bytes */
877
878                 return 0;
879         }
880
881         a = ((const uint32_t*) bus->rbuffer)[1];
882         b = ((const uint32_t*) bus->rbuffer)[3];
883
884         e = ((const uint8_t*) bus->rbuffer)[0];
885         if (e == SD_BUS_LITTLE_ENDIAN) {
886                 a = le32toh(a);
887                 b = le32toh(b);
888         } else if (e == SD_BUS_BIG_ENDIAN) {
889                 a = be32toh(a);
890                 b = be32toh(b);
891         } else
892                 return -EBADMSG;
893
894         sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
895         if (sum >= BUS_MESSAGE_SIZE_MAX)
896                 return -ENOBUFS;
897
898         *need = (size_t) sum;
899         return 0;
900 }
901
902 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
903         sd_bus_message *t;
904         void *b = NULL;
905         int r;
906
907         assert(bus);
908         assert(m);
909         assert(bus->rbuffer_size >= size);
910         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
911
912         if (bus->rbuffer_size > size) {
913                 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
914                 if (!b) {
915                         free(t);
916                         return -ENOMEM;
917                 }
918         }
919
920         r = bus_message_from_malloc(bus->rbuffer, size, &t);
921         if (r < 0) {
922                 free(b);
923                 return r;
924         }
925
926         bus->rbuffer = b;
927         bus->rbuffer_size -= size;
928
929         *m = t;
930         return 1;
931 }
932
933 static int message_read(sd_bus *bus, sd_bus_message **m) {
934         struct msghdr mh;
935         struct iovec iov;
936         ssize_t k;
937         size_t need;
938         int r;
939         void *b;
940
941         assert(bus);
942         assert(m);
943         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
944
945         r = message_read_need(bus, &need);
946         if (r < 0)
947                 return r;
948
949         if (bus->rbuffer_size >= need)
950                 return message_make(bus, need, m);
951
952         b = realloc(bus->rbuffer, need);
953         if (!b)
954                 return -ENOMEM;
955
956         bus->rbuffer = b;
957
958         zero(iov);
959         iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
960         iov.iov_len = need - bus->rbuffer_size;
961
962         zero(mh);
963         mh.msg_iov = &iov;
964         mh.msg_iovlen = 1;
965
966         k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
967         if (k < 0)
968                 return errno == EAGAIN ? 0 : -errno;
969
970         bus->rbuffer_size += k;
971
972         r = message_read_need(bus, &need);
973         if (r < 0)
974                 return r;
975
976         if (bus->rbuffer_size >= need)
977                 return message_make(bus, need, m);
978
979         return 1;
980 }
981
982 static int dispatch_wqueue(sd_bus *bus) {
983         int r, ret = 0;
984
985         assert(bus);
986         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
987
988         if (bus->fd < 0)
989                 return -ENOTCONN;
990
991         while (bus->wqueue_size > 0) {
992
993                 r = message_write(bus, bus->wqueue[0], &bus->windex);
994                 if (r < 0) {
995                         sd_bus_close(bus);
996                         return r;
997                 } else if (r == 0)
998                         /* Didn't do anything this time */
999                         return ret;
1000                 else if (bus->windex >= bus->wqueue[0]->size) {
1001                         /* Fully written. Let's drop the entry from
1002                          * the queue.
1003                          *
1004                          * This isn't particularly optimized, but
1005                          * well, this is supposed to be our worst-case
1006                          * buffer only, and the socket buffer is
1007                          * supposed to be our primary buffer, and if
1008                          * it got full, then all bets are off
1009                          * anyway. */
1010
1011                         sd_bus_message_unref(bus->wqueue[0]);
1012                         bus->wqueue_size --;
1013                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1014                         bus->windex = 0;
1015
1016                         ret = 1;
1017                 }
1018         }
1019
1020         return ret;
1021 }
1022
1023 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1024         sd_bus_message *z;
1025         int r, ret = 0;
1026
1027         assert(bus);
1028         assert(m);
1029         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1030
1031         if (bus->fd < 0)
1032                 return -ENOTCONN;
1033
1034         if (bus->rqueue_size > 0) {
1035                 /* Dispatch a queued message */
1036
1037                 *m = bus->rqueue[0];
1038                 bus->rqueue_size --;
1039                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1040                 return 1;
1041         }
1042
1043         /* Try to read a new message */
1044         do {
1045                 r = message_read(bus, &z);
1046                 if (r < 0) {
1047                         sd_bus_close(bus);
1048                         return r;
1049                 }
1050                 if (r == 0)
1051                         return ret;
1052
1053                 r = 1;
1054         } while (!z);
1055
1056         *m = z;
1057         return 1;
1058 }
1059
1060 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1061         int r;
1062
1063         if (!bus)
1064                 return -EINVAL;
1065         if (bus->fd < 0)
1066                 return -ENOTCONN;
1067         if (!m)
1068                 return -EINVAL;
1069
1070         /* If the serial number isn't kept, then we know that no reply
1071          * is expected */
1072         if (!serial && !m->sealed)
1073                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1074
1075         r = bus_seal_message(bus, m);
1076         if (r < 0)
1077                 return r;
1078
1079         /* If this is a reply and no reply was requested, then let's
1080          * suppress this, if we can */
1081         if (m->dont_send && !serial)
1082                 return 0;
1083
1084         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1085                 size_t idx = 0;
1086
1087                 r = message_write(bus, m, &idx);
1088                 if (r < 0) {
1089                         sd_bus_close(bus);
1090                         return r;
1091                 } else if (idx < m->size)  {
1092                         /* Wasn't fully written. So let's remember how
1093                          * much was written. Note that the first entry
1094                          * of the wqueue array is always allocated so
1095                          * that we always can remember how much was
1096                          * written. */
1097                         bus->wqueue[0] = sd_bus_message_ref(m);
1098                         bus->wqueue_size = 1;
1099                         bus->windex = idx;
1100                 }
1101         } else {
1102                 sd_bus_message **q;
1103
1104                 /* Just append it to the queue. */
1105
1106                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1107                         return -ENOBUFS;
1108
1109                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1110                 if (!q)
1111                         return -ENOMEM;
1112
1113                 bus->wqueue = q;
1114                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1115         }
1116
1117         if (serial)
1118                 *serial = BUS_MESSAGE_SERIAL(m);
1119
1120         return 0;
1121 }
1122
1123 static usec_t calc_elapse(uint64_t usec) {
1124         if (usec == (uint64_t) -1)
1125                 return 0;
1126
1127         if (usec == 0)
1128                 usec = BUS_DEFAULT_TIMEOUT;
1129
1130         return now(CLOCK_MONOTONIC) + usec;
1131 }
1132
1133 static int timeout_compare(const void *a, const void *b) {
1134         const struct reply_callback *x = a, *y = b;
1135
1136         if (x->timeout != 0 && y->timeout == 0)
1137                 return -1;
1138
1139         if (x->timeout == 0 && y->timeout != 0)
1140                 return 1;
1141
1142         if (x->timeout < y->timeout)
1143                 return -1;
1144
1145         if (x->timeout > y->timeout)
1146                 return 1;
1147
1148         return 0;
1149 }
1150
1151 int sd_bus_send_with_reply(
1152                 sd_bus *bus,
1153                 sd_bus_message *m,
1154                 sd_message_handler_t callback,
1155                 void *userdata,
1156                 uint64_t usec,
1157                 uint64_t *serial) {
1158
1159         struct reply_callback *c;
1160         int r;
1161
1162         if (!bus)
1163                 return -EINVAL;
1164         if (bus->fd < 0)
1165                 return -ENOTCONN;
1166         if (!m)
1167                 return -EINVAL;
1168         if (!callback)
1169                 return -EINVAL;
1170         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1171                 return -EINVAL;
1172         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1173                 return -EINVAL;
1174
1175         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1176         if (r < 0)
1177                 return r;
1178
1179         if (usec != (uint64_t) -1) {
1180                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1181                 if (r < 0)
1182                         return r;
1183         }
1184
1185         r = bus_seal_message(bus, m);
1186         if (r < 0)
1187                 return r;
1188
1189         c = new(struct reply_callback, 1);
1190         if (!c)
1191                 return -ENOMEM;
1192
1193         c->callback = callback;
1194         c->userdata = userdata;
1195         c->serial = BUS_MESSAGE_SERIAL(m);
1196         c->timeout = calc_elapse(usec);
1197
1198         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1199         if (r < 0) {
1200                 free(c);
1201                 return r;
1202         }
1203
1204         if (c->timeout != 0) {
1205                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1206                 if (r < 0) {
1207                         c->timeout = 0;
1208                         sd_bus_send_with_reply_cancel(bus, c->serial);
1209                         return r;
1210                 }
1211         }
1212
1213         r = sd_bus_send(bus, m, serial);
1214         if (r < 0) {
1215                 sd_bus_send_with_reply_cancel(bus, c->serial);
1216                 return r;
1217         }
1218
1219         return r;
1220 }
1221
1222 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1223         struct reply_callback *c;
1224
1225         if (!bus)
1226                 return -EINVAL;
1227         if (serial == 0)
1228                 return -EINVAL;
1229
1230         c = hashmap_remove(bus->reply_callbacks, &serial);
1231         if (!c)
1232                 return 0;
1233
1234         if (c->timeout != 0)
1235                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1236
1237         free(c);
1238         return 1;
1239 }
1240
1241 static int ensure_running(sd_bus *bus) {
1242         int r;
1243
1244         assert(bus);
1245
1246         r = sd_bus_is_running(bus);
1247         if (r != 0)
1248                 return r;
1249
1250         for (;;) {
1251                 int k;
1252
1253                 r = sd_bus_process(bus, NULL);
1254
1255                 if (r < 0)
1256                         return r;
1257
1258                 k = sd_bus_is_running(bus);
1259                 if (k != 0)
1260                         return k;
1261
1262                 if (r > 0)
1263                         continue;
1264
1265                 r = sd_bus_wait(bus, (uint64_t) -1);
1266                 if (r < 0)
1267                         return r;
1268         }
1269 }
1270
1271 int sd_bus_send_with_reply_and_block(
1272                 sd_bus *bus,
1273                 sd_bus_message *m,
1274                 uint64_t usec,
1275                 sd_bus_error *error,
1276                 sd_bus_message **reply) {
1277
1278         int r;
1279         usec_t timeout;
1280         uint64_t serial;
1281         bool room = false;
1282
1283         if (!bus)
1284                 return -EINVAL;
1285         if (bus->fd < 0)
1286                 return -ENOTCONN;
1287         if (!m)
1288                 return -EINVAL;
1289         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1290                 return -EINVAL;
1291         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1292                 return -EINVAL;
1293         if (bus_error_is_dirty(error))
1294                 return -EINVAL;
1295
1296         r = ensure_running(bus);
1297         if (r < 0)
1298                 return r;
1299
1300         r = sd_bus_send(bus, m, &serial);
1301         if (r < 0)
1302                 return r;
1303
1304         timeout = calc_elapse(usec);
1305
1306         for (;;) {
1307                 usec_t left;
1308                 sd_bus_message *incoming = NULL;
1309
1310                 if (!room) {
1311                         sd_bus_message **q;
1312
1313                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1314                                 return -ENOBUFS;
1315
1316                         /* Make sure there's room for queuing this
1317                          * locally, before we read the message */
1318
1319                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1320                         if (!q)
1321                                 return -ENOMEM;
1322
1323                         bus->rqueue = q;
1324                         room = true;
1325                 }
1326
1327                 r = message_read(bus, &incoming);
1328                 if (r < 0)
1329                         return r;
1330                 if (incoming) {
1331
1332                         if (incoming->reply_serial == serial) {
1333                                 /* Found a match! */
1334
1335                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1336                                         *reply = incoming;
1337                                         return 0;
1338                                 }
1339
1340                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1341                                         int k;
1342
1343                                         r = sd_bus_error_copy(error, &incoming->error);
1344                                         if (r < 0) {
1345                                                 sd_bus_message_unref(incoming);
1346                                                 return r;
1347                                         }
1348
1349                                         k = bus_error_to_errno(&incoming->error);
1350                                         sd_bus_message_unref(incoming);
1351                                         return k;
1352                                 }
1353
1354                                 sd_bus_message_unref(incoming);
1355                                 return -EIO;
1356                         }
1357
1358                         /* There's already guaranteed to be room for
1359                          * this, so need to resize things here */
1360                         bus->rqueue[bus->rqueue_size ++] = incoming;
1361                         room = false;
1362
1363                         /* Try to read more, right-away */
1364                         continue;
1365                 }
1366                 if (r != 0)
1367                         continue;
1368
1369                 if (timeout > 0) {
1370                         usec_t n;
1371
1372                         n = now(CLOCK_MONOTONIC);
1373                         if (n >= timeout)
1374                                 return -ETIMEDOUT;
1375
1376                         left = timeout - n;
1377                 } else
1378                         left = (uint64_t) -1;
1379
1380                 r = bus_poll(bus, true, left);
1381                 if (r < 0)
1382                         return r;
1383
1384                 r = dispatch_wqueue(bus);
1385                 if (r < 0)
1386                         return r;
1387         }
1388 }
1389
1390 int sd_bus_get_fd(sd_bus *bus) {
1391         if (!bus)
1392                 return -EINVAL;
1393
1394         if (bus->fd < 0)
1395                 return -ENOTCONN;
1396
1397         return bus->fd;
1398 }
1399
1400 int sd_bus_get_events(sd_bus *bus) {
1401         int flags = 0;
1402
1403         if (!bus)
1404                 return -EINVAL;
1405         if (bus->fd < 0)
1406                 return -ENOTCONN;
1407
1408         if (bus->state == BUS_OPENING)
1409                 flags |= POLLOUT;
1410         else if (bus->state == BUS_AUTHENTICATING) {
1411
1412                 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1413                         flags |= POLLOUT;
1414
1415                 flags |= POLLIN;
1416
1417         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1418                 if (bus->rqueue_size <= 0)
1419                         flags |= POLLIN;
1420                 if (bus->wqueue_size > 0)
1421                         flags |= POLLOUT;
1422         }
1423
1424         return flags;
1425 }
1426
1427 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1428         struct reply_callback *c;
1429
1430         if (!bus)
1431                 return -EINVAL;
1432         if (!timeout_usec)
1433                 return -EINVAL;
1434         if (bus->fd < 0)
1435                 return -ENOTCONN;
1436
1437         if (bus->state == BUS_AUTHENTICATING) {
1438                 *timeout_usec = bus->auth_timeout;
1439                 return 1;
1440         }
1441
1442         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1443                 return 0;
1444
1445         c = prioq_peek(bus->reply_callbacks_prioq);
1446         if (!c)
1447                 return 0;
1448
1449         *timeout_usec = c->timeout;
1450         return 1;
1451 }
1452
1453 static int process_timeout(sd_bus *bus) {
1454         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1455         struct reply_callback *c;
1456         usec_t n;
1457         int r;
1458
1459         assert(bus);
1460
1461         c = prioq_peek(bus->reply_callbacks_prioq);
1462         if (!c)
1463                 return 0;
1464
1465         n = now(CLOCK_MONOTONIC);
1466         if (c->timeout > n)
1467                 return 0;
1468
1469         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1470         hashmap_remove(bus->reply_callbacks, &c->serial);
1471
1472         r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1473         free(c);
1474
1475         return r < 0 ? r : 1;
1476 }
1477
1478 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1479         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1480         int r;
1481
1482         assert(bus);
1483         assert(m);
1484
1485         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1486                 return 0;
1487
1488         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1489                 return 0;
1490
1491         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1492                 return 1;
1493
1494         if (streq_ptr(m->member, "Ping"))
1495                 r = sd_bus_message_new_method_return(bus, m, &reply);
1496         else if (streq_ptr(m->member, "GetMachineId")) {
1497                 sd_id128_t id;
1498                 char sid[33];
1499
1500                 r = sd_id128_get_machine(&id);
1501                 if (r < 0)
1502                         return r;
1503
1504                 r = sd_bus_message_new_method_return(bus, m, &reply);
1505                 if (r < 0)
1506                         return r;
1507
1508                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1509         } else {
1510                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1511
1512                 sd_bus_error_set(&error,
1513                                  "org.freedesktop.DBus.Error.UnknownMethod",
1514                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1515
1516                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1517         }
1518
1519         if (r < 0)
1520                 return r;
1521
1522         r = sd_bus_send(bus, reply, NULL);
1523         if (r < 0)
1524                 return r;
1525
1526         return 1;
1527 }
1528
1529 static int process_message(sd_bus *bus, sd_bus_message *m) {
1530         struct filter_callback *l;
1531         int r;
1532
1533         assert(bus);
1534         assert(m);
1535
1536         if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1537                 struct reply_callback *c;
1538
1539                 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1540                 if (c) {
1541                         if (c->timeout != 0)
1542                                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1543
1544                         r = c->callback(bus, 0, m, c->userdata);
1545                         free(c);
1546
1547                         if (r != 0)
1548                                 return r;
1549                 }
1550         }
1551
1552         LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1553                 r = l->callback(bus, 0, m, l->userdata);
1554                 if (r != 0)
1555                         return r;
1556         }
1557
1558         return process_builtin(bus, m);
1559 }
1560
1561 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1562         int r;
1563
1564         /* Returns 0 when we didn't do anything. This should cause the
1565          * caller to invoke sd_bus_wait() before returning the next
1566          * time. Returns > 0 when we did something, which possibly
1567          * means *ret is filled in with an unprocessed message. */
1568
1569         if (!bus)
1570                 return -EINVAL;
1571         if (bus->fd < 0)
1572                 return -ENOTCONN;
1573
1574         if (bus->state == BUS_OPENING) {
1575                 struct pollfd p;
1576
1577                 zero(p);
1578                 p.fd = bus->fd;
1579                 p.events = POLLOUT;
1580
1581                 r = poll(&p, 1, 0);
1582                 if (r < 0)
1583                         return -errno;
1584
1585                 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1586                         int error = 0;
1587                         socklen_t slen = sizeof(error);
1588
1589                         r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1590                         if (r < 0)
1591                                 bus->last_connect_error = errno;
1592                         else if (error != 0)
1593                                 bus->last_connect_error = error;
1594                         else if (p.revents & (POLLERR|POLLHUP))
1595                                 bus->last_connect_error = ECONNREFUSED;
1596                         else {
1597                                 r = bus_start_auth(bus);
1598                                 goto null_message;
1599                         }
1600
1601                         /* Try next address */
1602                         r = bus_start_connect(bus);
1603                         goto null_message;
1604                 }
1605
1606                 r = 0;
1607                 goto null_message;
1608
1609         } else if (bus->state == BUS_AUTHENTICATING) {
1610
1611                 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1612                         return -ETIMEDOUT;
1613
1614                 r = bus_write_auth(bus);
1615                 if (r != 0)
1616                         goto null_message;
1617
1618                 r = bus_read_auth(bus);
1619                 goto null_message;
1620
1621         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1622                 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1623                 int k;
1624
1625                 r = process_timeout(bus);
1626                 if (r != 0)
1627                         goto null_message;
1628
1629                 r = dispatch_wqueue(bus);
1630                 if (r != 0)
1631                         goto null_message;
1632
1633                 k = r;
1634                 r = dispatch_rqueue(bus, &m);
1635                 if (r < 0)
1636                         return r;
1637                 if (!m) {
1638                         if (r == 0)
1639                                 r = k;
1640                         goto null_message;
1641                 }
1642
1643                 r = process_message(bus, m);
1644                 if (r != 0)
1645                         goto null_message;
1646
1647                 if (ret) {
1648                         *ret = m;
1649                         m = NULL;
1650                         return 1;
1651                 }
1652
1653                 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1654                         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1655                         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1656
1657                         sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1658
1659                         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1660                         if (r < 0)
1661                                 return r;
1662
1663                         r = sd_bus_send(bus, reply, NULL);
1664                         if (r < 0)
1665                                 return r;
1666                 }
1667
1668                 return 1;
1669         }
1670
1671         assert_not_reached("Unknown state");
1672
1673 null_message:
1674         if (r >= 0 && ret)
1675                 *ret = NULL;
1676
1677         return r;
1678 }
1679
1680 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1681         struct pollfd p;
1682         int r, e;
1683         struct timespec ts;
1684         usec_t until, m;
1685
1686         assert(bus);
1687
1688         if (bus->fd < 0)
1689                 return -ENOTCONN;
1690
1691         e = sd_bus_get_events(bus);
1692         if (e < 0)
1693                 return e;
1694
1695         if (need_more)
1696                 e |= POLLIN;
1697
1698         r = sd_bus_get_timeout(bus, &until);
1699         if (r < 0)
1700                 return r;
1701         if (r == 0)
1702                 m = (uint64_t) -1;
1703         else {
1704                 usec_t n;
1705                 n = now(CLOCK_MONOTONIC);
1706                 m = until > n ? until - n : 0;
1707         }
1708
1709         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1710                 m = timeout_usec;
1711
1712         zero(p);
1713         p.fd = bus->fd;
1714         p.events = e;
1715
1716         r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1717         if (r < 0)
1718                 return -errno;
1719
1720         return r > 0 ? 1 : 0;
1721 }
1722
1723 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1724
1725         if (!bus)
1726                 return -EINVAL;
1727         if (bus->fd < 0)
1728                 return -ENOTCONN;
1729         if (bus->rqueue_size > 0)
1730                 return 0;
1731
1732         return bus_poll(bus, false, timeout_usec);
1733 }
1734
1735 int sd_bus_flush(sd_bus *bus) {
1736         int r;
1737
1738         if (!bus)
1739                 return -EINVAL;
1740         if (bus->fd < 0)
1741                 return -ENOTCONN;
1742
1743         r = ensure_running(bus);
1744         if (r < 0)
1745                 return r;
1746
1747         if (bus->wqueue_size <= 0)
1748                 return 0;
1749
1750         for (;;) {
1751                 r = dispatch_wqueue(bus);
1752                 if (r < 0)
1753                         return r;
1754
1755                 if (bus->wqueue_size <= 0)
1756                         return 0;
1757
1758                 r = bus_poll(bus, false, (uint64_t) -1);
1759                 if (r < 0)
1760                         return r;
1761         }
1762 }
1763
1764 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1765         struct filter_callback *f;
1766
1767         if (!bus)
1768                 return -EINVAL;
1769         if (!callback)
1770                 return -EINVAL;
1771
1772         f = new(struct filter_callback, 1);
1773         if (!f)
1774                 return -ENOMEM;
1775         f->callback = callback;
1776         f->userdata = userdata;
1777
1778         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1779         return 0;
1780 }
1781
1782 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1783         struct filter_callback *f;
1784
1785         if (!bus)
1786                 return -EINVAL;
1787         if (!callback)
1788                 return -EINVAL;
1789
1790         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1791                 if (f->callback == callback && f->userdata == userdata) {
1792                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1793                         free(f);
1794                         return 1;
1795                 }
1796         }
1797
1798         return 0;
1799 }