chiark / gitweb /
6eab4a5469cc44d4c71c401696594e5b643e3bd8
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32 #include "strv.h"
33
34 #include "sd-bus.h"
35 #include "bus-internal.h"
36 #include "bus-message.h"
37 #include "bus-type.h"
38 #include "bus-socket.h"
39
40 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
41
42 static void bus_free(sd_bus *b) {
43         struct filter_callback *f;
44         struct object_callback *c;
45         unsigned i;
46
47         assert(b);
48
49         if (b->fd >= 0)
50                 close_nointr_nofail(b->fd);
51
52         free(b->rbuffer);
53         free(b->unique_name);
54         free(b->auth_uid);
55         free(b->address);
56
57         free(b->exec_path);
58         strv_free(b->exec_argv);
59
60         close_many(b->fds, b->n_fds);
61         free(b->fds);
62
63         for (i = 0; i < b->rqueue_size; i++)
64                 sd_bus_message_unref(b->rqueue[i]);
65         free(b->rqueue);
66
67         for (i = 0; i < b->wqueue_size; i++)
68                 sd_bus_message_unref(b->wqueue[i]);
69         free(b->wqueue);
70
71         hashmap_free_free(b->reply_callbacks);
72         prioq_free(b->reply_callbacks_prioq);
73
74         while ((f = b->filter_callbacks)) {
75                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
76                 free(f);
77         }
78
79         while ((c = hashmap_steal_first(b->object_callbacks))) {
80                 free(c->path);
81                 free(c);
82         }
83
84         hashmap_free(b->object_callbacks);
85
86         free(b);
87 }
88
89 int sd_bus_new(sd_bus **ret) {
90         sd_bus *r;
91
92         if (!ret)
93                 return -EINVAL;
94
95         r = new0(sd_bus, 1);
96         if (!r)
97                 return -ENOMEM;
98
99         r->n_ref = 1;
100         r->fd = -1;
101         r->message_version = 1;
102         r->negotiate_fds = true;
103
104         /* We guarantee that wqueue always has space for at least one
105          * entry */
106         r->wqueue = new(sd_bus_message*, 1);
107         if (!r->wqueue) {
108                 free(r);
109                 return -ENOMEM;
110         }
111
112         *ret = r;
113         return 0;
114 }
115
116 int sd_bus_set_address(sd_bus *bus, const char *address) {
117         char *a;
118
119         if (!bus)
120                 return -EINVAL;
121         if (bus->state != BUS_UNSET)
122                 return -EPERM;
123         if (!address)
124                 return -EINVAL;
125
126         a = strdup(address);
127         if (!a)
128                 return -ENOMEM;
129
130         free(bus->address);
131         bus->address = a;
132
133         return 0;
134 }
135
136 int sd_bus_set_fd(sd_bus *bus, int fd) {
137         if (!bus)
138                 return -EINVAL;
139         if (bus->state != BUS_UNSET)
140                 return -EPERM;
141         if (fd < 0)
142                 return -EINVAL;
143
144         bus->fd = fd;
145         return 0;
146 }
147
148 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
149         char *p, **a;
150
151         if (!bus)
152                 return -EINVAL;
153         if (bus->state != BUS_UNSET)
154                 return -EPERM;
155         if (!path)
156                 return -EINVAL;
157         if (strv_isempty(argv))
158                 return -EINVAL;
159
160         p = strdup(path);
161         if (!p)
162                 return -ENOMEM;
163
164         a = strv_copy(argv);
165         if (!a) {
166                 free(p);
167                 return -ENOMEM;
168         }
169
170         free(bus->exec_path);
171         strv_free(bus->exec_argv);
172
173         bus->exec_path = p;
174         bus->exec_argv = a;
175
176         return 0;
177 }
178
179 int sd_bus_set_hello(sd_bus *bus, int b) {
180         if (!bus)
181                 return -EINVAL;
182         if (bus->state != BUS_UNSET)
183                 return -EPERM;
184
185         bus->send_hello = !!b;
186         return 0;
187 }
188
189 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
190         if (!bus)
191                 return -EINVAL;
192         if (bus->state != BUS_UNSET)
193                 return -EPERM;
194
195         bus->negotiate_fds = !!b;
196         return 0;
197 }
198
199 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
200         const char *s;
201         int r;
202
203         assert(bus);
204         assert(bus->state == BUS_HELLO);
205
206         if (error != 0)
207                 return -error;
208
209         assert(reply);
210
211         r = sd_bus_message_read(reply, "s", &s);
212         if (r < 0)
213                 return r;
214
215         if (!service_name_is_valid(s) || s[0] != ':')
216                 return -EBADMSG;
217
218         bus->unique_name = strdup(s);
219         if (!bus->unique_name)
220                 return -ENOMEM;
221
222         bus->state = BUS_RUNNING;
223
224         return 1;
225 }
226
227 static int bus_send_hello(sd_bus *bus) {
228         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
229         int r;
230
231         assert(bus);
232
233         if (!bus->send_hello)
234                 return 0;
235
236         r = sd_bus_message_new_method_call(
237                         bus,
238                         "org.freedesktop.DBus",
239                         "/",
240                         "org.freedesktop.DBus",
241                         "Hello",
242                         &m);
243         if (r < 0)
244                 return r;
245
246         r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
247         if (r < 0)
248                 return r;
249
250         return r;
251 }
252
253 int bus_start_running(sd_bus *bus) {
254         assert(bus);
255
256         if (bus->send_hello) {
257                 bus->state = BUS_HELLO;
258                 return 1;
259         }
260
261         bus->state = BUS_RUNNING;
262         return 1;
263 }
264
265 static int parse_address_key(const char **p, const char *key, char **value) {
266         size_t l, n = 0;
267         const char *a;
268         char *r = NULL;
269
270         assert(p);
271         assert(*p);
272         assert(value);
273
274         if (key) {
275                 l = strlen(key);
276                 if (strncmp(*p, key, l) != 0)
277                         return 0;
278
279                 if ((*p)[l] != '=')
280                         return 0;
281
282                 if (*value)
283                         return -EINVAL;
284
285                 a = *p + l + 1;
286         } else
287                 a = *p;
288
289         while (*a != ';' && *a != ',' && *a != 0) {
290                 char c, *t;
291
292                 if (*a == '%') {
293                         int x, y;
294
295                         x = unhexchar(a[1]);
296                         if (x < 0) {
297                                 free(r);
298                                 return x;
299                         }
300
301                         y = unhexchar(a[2]);
302                         if (y < 0) {
303                                 free(r);
304                                 return y;
305                         }
306
307                         c = (char) ((x << 4) | y);
308                         a += 3;
309                 } else {
310                         c = *a;
311                         a++;
312                 }
313
314                 t = realloc(r, n + 2);
315                 if (!t) {
316                         free(r);
317                         return -ENOMEM;
318                 }
319
320                 r = t;
321                 r[n++] = c;
322         }
323
324         if (!r) {
325                 r = strdup("");
326                 if (!r)
327                         return -ENOMEM;
328         } else
329                 r[n] = 0;
330
331         if (*a == ',')
332                 a++;
333
334         *p = a;
335
336         free(*value);
337         *value = r;
338
339         return 1;
340 }
341
342 static void skip_address_key(const char **p) {
343         assert(p);
344         assert(*p);
345
346         *p += strcspn(*p, ",");
347
348         if (**p == ',')
349                 (*p) ++;
350 }
351
352 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
353         _cleanup_free_ char *path = NULL, *abstract = NULL;
354         size_t l;
355         int r;
356
357         assert(b);
358         assert(p);
359         assert(*p);
360         assert(guid);
361
362         while (**p != 0 && **p != ';') {
363                 r = parse_address_key(p, "guid", guid);
364                 if (r < 0)
365                         return r;
366                 else if (r > 0)
367                         continue;
368
369                 r = parse_address_key(p, "path", &path);
370                 if (r < 0)
371                         return r;
372                 else if (r > 0)
373                         continue;
374
375                 r = parse_address_key(p, "abstract", &abstract);
376                 if (r < 0)
377                         return r;
378                 else if (r > 0)
379                         continue;
380
381                 skip_address_key(p);
382         }
383
384         if (!path && !abstract)
385                 return -EINVAL;
386
387         if (path && abstract)
388                 return -EINVAL;
389
390         if (path) {
391                 l = strlen(path);
392                 if (l > sizeof(b->sockaddr.un.sun_path))
393                         return -E2BIG;
394
395                 b->sockaddr.un.sun_family = AF_UNIX;
396                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
397                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
398         } else if (abstract) {
399                 l = strlen(abstract);
400                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
401                         return -E2BIG;
402
403                 b->sockaddr.un.sun_family = AF_UNIX;
404                 b->sockaddr.un.sun_path[0] = 0;
405                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
406                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
407         }
408
409         return 0;
410 }
411
412 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
413         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
414         struct addrinfo hints, *result;
415         int r;
416
417         assert(b);
418         assert(p);
419         assert(*p);
420         assert(guid);
421
422         while (**p != 0 && **p != ';') {
423                 r = parse_address_key(p, "guid", guid);
424                 if (r < 0)
425                         return r;
426                 else if (r > 0)
427                         continue;
428
429                 r = parse_address_key(p, "host", &host);
430                 if (r < 0)
431                         return r;
432                 else if (r > 0)
433                         continue;
434
435                 r = parse_address_key(p, "port", &port);
436                 if (r < 0)
437                         return r;
438                 else if (r > 0)
439                         continue;
440
441                 r = parse_address_key(p, "family", &family);
442                 if (r < 0)
443                         return r;
444                 else if (r > 0)
445                         continue;
446
447                 skip_address_key(p);
448         }
449
450         if (!host || !port)
451                 return -EINVAL;
452
453         zero(hints);
454         hints.ai_socktype = SOCK_STREAM;
455         hints.ai_flags = AI_ADDRCONFIG;
456
457         if (family) {
458                 if (streq(family, "ipv4"))
459                         hints.ai_family = AF_INET;
460                 else if (streq(family, "ipv6"))
461                         hints.ai_family = AF_INET6;
462                 else
463                         return -EINVAL;
464         }
465
466         r = getaddrinfo(host, port, &hints, &result);
467         if (r == EAI_SYSTEM)
468                 return -errno;
469         else if (r != 0)
470                 return -EADDRNOTAVAIL;
471
472         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
473         b->sockaddr_size = result->ai_addrlen;
474
475         freeaddrinfo(result);
476
477         return 0;
478 }
479
480 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
481         char *path = NULL;
482         unsigned n_argv = 0, j;
483         char **argv = NULL;
484         int r;
485
486         assert(b);
487         assert(p);
488         assert(*p);
489         assert(guid);
490
491         while (**p != 0 && **p != ';') {
492                 r = parse_address_key(p, "guid", guid);
493                 if (r < 0)
494                         goto fail;
495                 else if (r > 0)
496                         continue;
497
498                 r = parse_address_key(p, "path", &path);
499                 if (r < 0)
500                         goto fail;
501                 else if (r > 0)
502                         continue;
503
504                 if (startswith(*p, "argv")) {
505                         unsigned ul;
506
507                         errno = 0;
508                         ul = strtoul(*p + 4, (char**) p, 10);
509                         if (errno != 0 || **p != '=' || ul > 256) {
510                                 r = -EINVAL;
511                                 goto fail;
512                         }
513
514                         (*p) ++;
515
516                         if (ul >= n_argv) {
517                                 char **x;
518
519                                 x = realloc(argv, sizeof(char*) * (ul + 2));
520                                 if (!x) {
521                                         r = -ENOMEM;
522                                         goto fail;
523                                 }
524
525                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
526
527                                 argv = x;
528                                 n_argv = ul + 1;
529                         }
530
531                         r = parse_address_key(p, NULL, argv + ul);
532                         if (r < 0)
533                                 goto fail;
534
535                         continue;
536                 }
537
538                 skip_address_key(p);
539         }
540
541         if (!path)
542                 goto fail;
543
544         /* Make sure there are no holes in the array, with the
545          * exception of argv[0] */
546         for (j = 1; j < n_argv; j++)
547                 if (!argv[j]) {
548                         r = -EINVAL;
549                         goto fail;
550                 }
551
552         if (argv && argv[0] == NULL) {
553                 argv[0] = strdup(path);
554                 if (!argv[0]) {
555                         r = -ENOMEM;
556                         goto fail;
557                 }
558         }
559
560         b->exec_path = path;
561         b->exec_argv = argv;
562         return 0;
563
564 fail:
565         for (j = 0; j < n_argv; j++)
566                 free(argv[j]);
567
568         free(argv);
569         free(path);
570         return r;
571 }
572
573 static void bus_reset_parsed_address(sd_bus *b) {
574         assert(b);
575
576         zero(b->sockaddr);
577         b->sockaddr_size = 0;
578         strv_free(b->exec_argv);
579         free(b->exec_path);
580         b->exec_path = NULL;
581         b->exec_argv = NULL;
582         b->peer = SD_ID128_NULL;
583 }
584
585 static int bus_parse_next_address(sd_bus *b) {
586         _cleanup_free_ char *guid = NULL;
587         const char *a;
588         int r;
589
590         assert(b);
591
592         if (!b->address)
593                 return 0;
594         if (b->address[b->address_index] == 0)
595                 return 0;
596
597         bus_reset_parsed_address(b);
598
599         a = b->address + b->address_index;
600
601         while (*a != 0) {
602
603                 if (*a == ';') {
604                         a++;
605                         continue;
606                 }
607
608                 if (startswith(a, "unix:")) {
609                         a += 5;
610
611                         r = parse_unix_address(b, &a, &guid);
612                         if (r < 0)
613                                 return r;
614                         break;
615
616                 } else if (startswith(a, "tcp:")) {
617
618                         a += 4;
619                         r = parse_tcp_address(b, &a, &guid);
620                         if (r < 0)
621                                 return r;
622
623                         break;
624
625                 } else if (startswith(a, "unixexec:")) {
626
627                         a += 9;
628                         r = parse_exec_address(b, &a, &guid);
629                         if (r < 0)
630                                 return r;
631
632                         break;
633
634                 }
635
636                 a = strchr(a, ';');
637                 if (!a)
638                         return 0;
639         }
640
641         if (guid) {
642                 r = sd_id128_from_string(guid, &b->peer);
643                 if (r < 0)
644                         return r;
645         }
646
647         b->address_index = a - b->address;
648         return 1;
649 }
650
651 static int bus_start_address(sd_bus *b) {
652         int r;
653
654         assert(b);
655
656         for (;;) {
657                 if (b->fd >= 0) {
658                         close_nointr_nofail(b->fd);
659                         b->fd = -1;
660                 }
661
662                 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
663
664                         r = bus_socket_connect(b);
665                         if (r >= 0)
666                                 return r;
667
668                         b->last_connect_error = -r;
669
670                 } else if (b->exec_path) {
671
672                         r = bus_socket_exec(b);
673                         if (r >= 0)
674                                 return r;
675
676                         b->last_connect_error = -r;
677                 }
678
679                 r = bus_parse_next_address(b);
680                 if (r < 0)
681                         return r;
682                 if (r == 0)
683                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
684         }
685 }
686
687 int bus_next_address(sd_bus *b) {
688         assert(b);
689
690         bus_reset_parsed_address(b);
691         return bus_start_address(b);
692 }
693
694 static int bus_start_fd(sd_bus *b) {
695         int r;
696
697         assert(b);
698
699         r = fd_nonblock(b->fd, true);
700         if (r < 0)
701                 return r;
702
703         r = fd_cloexec(b->fd, true);
704         if (r < 0)
705                 return r;
706
707         return bus_socket_take_fd(b);
708 }
709
710 int sd_bus_start(sd_bus *bus) {
711         int r;
712
713         if (!bus)
714                 return -EINVAL;
715         if (bus->state != BUS_UNSET)
716                 return -EPERM;
717
718         bus->state = BUS_OPENING;
719
720         if (bus->fd >= 0)
721                 r = bus_start_fd(bus);
722         else if (bus->address)
723                 r = bus_start_address(bus);
724         else
725                 return -EINVAL;
726
727         if (r < 0)
728                 return r;
729
730         return bus_send_hello(bus);
731 }
732
733 int sd_bus_open_system(sd_bus **ret) {
734         const char *e;
735         sd_bus *b;
736         int r;
737
738         if (!ret)
739                 return -EINVAL;
740
741         r = sd_bus_new(&b);
742         if (r < 0)
743                 return r;
744
745         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
746         if (e) {
747                 r = sd_bus_set_address(b, e);
748                 if (r < 0)
749                         goto fail;
750         } else {
751                 b->sockaddr.un.sun_family = AF_UNIX;
752                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
753                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
754         }
755
756         b->send_hello = true;
757
758         r = sd_bus_start(b);
759         if (r < 0)
760                 goto fail;
761
762         *ret = b;
763         return 0;
764
765 fail:
766         bus_free(b);
767         return r;
768 }
769
770 int sd_bus_open_user(sd_bus **ret) {
771         const char *e;
772         sd_bus *b;
773         size_t l;
774         int r;
775
776         if (!ret)
777                 return -EINVAL;
778
779         r = sd_bus_new(&b);
780         if (r < 0)
781                 return r;
782
783         e = getenv("DBUS_SESSION_BUS_ADDRESS");
784         if (e) {
785                 r = sd_bus_set_address(b, e);
786                 if (r < 0)
787                         goto fail;
788         } else {
789                 e = getenv("XDG_RUNTIME_DIR");
790                 if (!e) {
791                         r = -ENOENT;
792                         goto fail;
793                 }
794
795                 l = strlen(e);
796                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
797                         r = -E2BIG;
798                         goto fail;
799                 }
800
801                 b->sockaddr.un.sun_family = AF_UNIX;
802                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
803                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
804         }
805
806         b->send_hello = true;
807
808         r = sd_bus_start(b);
809         if (r < 0)
810                 goto fail;
811
812         *ret = b;
813         return 0;
814
815 fail:
816         bus_free(b);
817         return r;
818 }
819
820 void sd_bus_close(sd_bus *bus) {
821         if (!bus)
822                 return;
823         if (bus->fd < 0)
824                 return;
825
826         close_nointr_nofail(bus->fd);
827         bus->fd = -1;
828 }
829
830 sd_bus *sd_bus_ref(sd_bus *bus) {
831         if (!bus)
832                 return NULL;
833
834         assert(bus->n_ref > 0);
835
836         bus->n_ref++;
837         return bus;
838 }
839
840 sd_bus *sd_bus_unref(sd_bus *bus) {
841         if (!bus)
842                 return NULL;
843
844         assert(bus->n_ref > 0);
845         bus->n_ref--;
846
847         if (bus->n_ref <= 0)
848                 bus_free(bus);
849
850         return NULL;
851 }
852
853 int sd_bus_is_open(sd_bus *bus) {
854         if (!bus)
855                 return -EINVAL;
856
857         return bus->state != BUS_UNSET && bus->fd >= 0;
858 }
859
860 int sd_bus_can_send(sd_bus *bus, char type) {
861         int r;
862
863         if (!bus)
864                 return -EINVAL;
865         if (bus->fd < 0)
866                 return -ENOTCONN;
867
868         if (type == SD_BUS_TYPE_UNIX_FD) {
869                 if (!bus->negotiate_fds)
870                         return 0;
871
872                 r = bus_ensure_running(bus);
873                 if (r < 0)
874                         return r;
875
876                 return bus->can_fds;
877         }
878
879         return bus_type_is_valid(type);
880 }
881
882 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
883         int r;
884
885         if (!bus)
886                 return -EINVAL;
887         if (!peer)
888                 return -EINVAL;
889
890         r = bus_ensure_running(bus);
891         if (r < 0)
892                 return r;
893
894         *peer = bus->peer;
895         return 0;
896 }
897
898 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
899         assert(m);
900
901         if (m->header->version > b->message_version)
902                 return -EPERM;
903
904         if (m->sealed)
905                 return 0;
906
907         return bus_message_seal(m, ++b->serial);
908 }
909
910 static int dispatch_wqueue(sd_bus *bus) {
911         int r, ret = 0;
912
913         assert(bus);
914         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
915
916         if (bus->fd < 0)
917                 return -ENOTCONN;
918
919         while (bus->wqueue_size > 0) {
920
921                 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
922                 if (r < 0) {
923                         sd_bus_close(bus);
924                         return r;
925                 } else if (r == 0)
926                         /* Didn't do anything this time */
927                         return ret;
928                 else if (bus->windex >= bus->wqueue[0]->size) {
929                         /* Fully written. Let's drop the entry from
930                          * the queue.
931                          *
932                          * This isn't particularly optimized, but
933                          * well, this is supposed to be our worst-case
934                          * buffer only, and the socket buffer is
935                          * supposed to be our primary buffer, and if
936                          * it got full, then all bets are off
937                          * anyway. */
938
939                         sd_bus_message_unref(bus->wqueue[0]);
940                         bus->wqueue_size --;
941                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
942                         bus->windex = 0;
943
944                         ret = 1;
945                 }
946         }
947
948         return ret;
949 }
950
951 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
952         sd_bus_message *z = NULL;
953         int r, ret = 0;
954
955         assert(bus);
956         assert(m);
957         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
958
959         if (bus->fd < 0)
960                 return -ENOTCONN;
961
962         if (bus->rqueue_size > 0) {
963                 /* Dispatch a queued message */
964
965                 *m = bus->rqueue[0];
966                 bus->rqueue_size --;
967                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
968                 return 1;
969         }
970
971         /* Try to read a new message */
972         do {
973                 r = bus_socket_read_message(bus, &z);
974                 if (r < 0) {
975                         sd_bus_close(bus);
976                         return r;
977                 }
978                 if (r == 0)
979                         return ret;
980
981                 r = 1;
982         } while (!z);
983
984         *m = z;
985         return 1;
986 }
987
988 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
989         int r;
990
991         if (!bus)
992                 return -EINVAL;
993         if (bus->state == BUS_UNSET)
994                 return -ENOTCONN;
995         if (bus->fd < 0)
996                 return -ENOTCONN;
997         if (!m)
998                 return -EINVAL;
999
1000         if (m->n_fds > 0) {
1001                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1002                 if (r < 0)
1003                         return r;
1004                 if (r == 0)
1005                         return -ENOTSUP;
1006         }
1007
1008         /* If the serial number isn't kept, then we know that no reply
1009          * is expected */
1010         if (!serial && !m->sealed)
1011                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1012
1013         r = bus_seal_message(bus, m);
1014         if (r < 0)
1015                 return r;
1016
1017         /* If this is a reply and no reply was requested, then let's
1018          * suppress this, if we can */
1019         if (m->dont_send && !serial)
1020                 return 0;
1021
1022         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1023                 size_t idx = 0;
1024
1025                 r = bus_socket_write_message(bus, m, &idx);
1026                 if (r < 0) {
1027                         sd_bus_close(bus);
1028                         return r;
1029                 } else if (idx < m->size)  {
1030                         /* Wasn't fully written. So let's remember how
1031                          * much was written. Note that the first entry
1032                          * of the wqueue array is always allocated so
1033                          * that we always can remember how much was
1034                          * written. */
1035                         bus->wqueue[0] = sd_bus_message_ref(m);
1036                         bus->wqueue_size = 1;
1037                         bus->windex = idx;
1038                 }
1039         } else {
1040                 sd_bus_message **q;
1041
1042                 /* Just append it to the queue. */
1043
1044                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1045                         return -ENOBUFS;
1046
1047                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1048                 if (!q)
1049                         return -ENOMEM;
1050
1051                 bus->wqueue = q;
1052                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1053         }
1054
1055         if (serial)
1056                 *serial = BUS_MESSAGE_SERIAL(m);
1057
1058         return 0;
1059 }
1060
1061 static usec_t calc_elapse(uint64_t usec) {
1062         if (usec == (uint64_t) -1)
1063                 return 0;
1064
1065         if (usec == 0)
1066                 usec = BUS_DEFAULT_TIMEOUT;
1067
1068         return now(CLOCK_MONOTONIC) + usec;
1069 }
1070
1071 static int timeout_compare(const void *a, const void *b) {
1072         const struct reply_callback *x = a, *y = b;
1073
1074         if (x->timeout != 0 && y->timeout == 0)
1075                 return -1;
1076
1077         if (x->timeout == 0 && y->timeout != 0)
1078                 return 1;
1079
1080         if (x->timeout < y->timeout)
1081                 return -1;
1082
1083         if (x->timeout > y->timeout)
1084                 return 1;
1085
1086         return 0;
1087 }
1088
1089 int sd_bus_send_with_reply(
1090                 sd_bus *bus,
1091                 sd_bus_message *m,
1092                 sd_message_handler_t callback,
1093                 void *userdata,
1094                 uint64_t usec,
1095                 uint64_t *serial) {
1096
1097         struct reply_callback *c;
1098         int r;
1099
1100         if (!bus)
1101                 return -EINVAL;
1102         if (bus->state == BUS_UNSET)
1103                 return -ENOTCONN;
1104         if (bus->fd < 0)
1105                 return -ENOTCONN;
1106         if (!m)
1107                 return -EINVAL;
1108         if (!callback)
1109                 return -EINVAL;
1110         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1111                 return -EINVAL;
1112         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1113                 return -EINVAL;
1114
1115         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1116         if (r < 0)
1117                 return r;
1118
1119         if (usec != (uint64_t) -1) {
1120                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1121                 if (r < 0)
1122                         return r;
1123         }
1124
1125         r = bus_seal_message(bus, m);
1126         if (r < 0)
1127                 return r;
1128
1129         c = new(struct reply_callback, 1);
1130         if (!c)
1131                 return -ENOMEM;
1132
1133         c->callback = callback;
1134         c->userdata = userdata;
1135         c->serial = BUS_MESSAGE_SERIAL(m);
1136         c->timeout = calc_elapse(usec);
1137
1138         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1139         if (r < 0) {
1140                 free(c);
1141                 return r;
1142         }
1143
1144         if (c->timeout != 0) {
1145                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1146                 if (r < 0) {
1147                         c->timeout = 0;
1148                         sd_bus_send_with_reply_cancel(bus, c->serial);
1149                         return r;
1150                 }
1151         }
1152
1153         r = sd_bus_send(bus, m, serial);
1154         if (r < 0) {
1155                 sd_bus_send_with_reply_cancel(bus, c->serial);
1156                 return r;
1157         }
1158
1159         return r;
1160 }
1161
1162 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1163         struct reply_callback *c;
1164
1165         if (!bus)
1166                 return -EINVAL;
1167         if (serial == 0)
1168                 return -EINVAL;
1169
1170         c = hashmap_remove(bus->reply_callbacks, &serial);
1171         if (!c)
1172                 return 0;
1173
1174         if (c->timeout != 0)
1175                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1176
1177         free(c);
1178         return 1;
1179 }
1180
1181 int bus_ensure_running(sd_bus *bus) {
1182         int r;
1183
1184         assert(bus);
1185
1186         if (bus->fd < 0)
1187                 return -ENOTCONN;
1188         if (bus->state == BUS_UNSET)
1189                 return -ENOTCONN;
1190
1191         if (bus->state == BUS_RUNNING)
1192                 return 1;
1193
1194         for (;;) {
1195                 r = sd_bus_process(bus, NULL);
1196                 if (r < 0)
1197                         return r;
1198                 if (bus->state == BUS_RUNNING)
1199                         return 1;
1200                 if (r > 0)
1201                         continue;
1202
1203                 r = sd_bus_wait(bus, (uint64_t) -1);
1204                 if (r < 0)
1205                         return r;
1206         }
1207 }
1208
1209 int sd_bus_send_with_reply_and_block(
1210                 sd_bus *bus,
1211                 sd_bus_message *m,
1212                 uint64_t usec,
1213                 sd_bus_error *error,
1214                 sd_bus_message **reply) {
1215
1216         int r;
1217         usec_t timeout;
1218         uint64_t serial;
1219         bool room = false;
1220
1221         if (!bus)
1222                 return -EINVAL;
1223         if (bus->fd < 0)
1224                 return -ENOTCONN;
1225         if (bus->state == BUS_UNSET)
1226                 return -ENOTCONN;
1227         if (!m)
1228                 return -EINVAL;
1229         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1230                 return -EINVAL;
1231         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1232                 return -EINVAL;
1233         if (bus_error_is_dirty(error))
1234                 return -EINVAL;
1235
1236         r = bus_ensure_running(bus);
1237         if (r < 0)
1238                 return r;
1239
1240         r = sd_bus_send(bus, m, &serial);
1241         if (r < 0)
1242                 return r;
1243
1244         timeout = calc_elapse(usec);
1245
1246         for (;;) {
1247                 usec_t left;
1248                 sd_bus_message *incoming = NULL;
1249
1250                 if (!room) {
1251                         sd_bus_message **q;
1252
1253                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1254                                 return -ENOBUFS;
1255
1256                         /* Make sure there's room for queuing this
1257                          * locally, before we read the message */
1258
1259                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1260                         if (!q)
1261                                 return -ENOMEM;
1262
1263                         bus->rqueue = q;
1264                         room = true;
1265                 }
1266
1267                 r = bus_socket_read_message(bus, &incoming);
1268                 if (r < 0)
1269                         return r;
1270                 if (incoming) {
1271
1272                         if (incoming->reply_serial == serial) {
1273                                 /* Found a match! */
1274
1275                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1276                                         *reply = incoming;
1277                                         return 0;
1278                                 }
1279
1280                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1281                                         int k;
1282
1283                                         r = sd_bus_error_copy(error, &incoming->error);
1284                                         if (r < 0) {
1285                                                 sd_bus_message_unref(incoming);
1286                                                 return r;
1287                                         }
1288
1289                                         k = bus_error_to_errno(&incoming->error);
1290                                         sd_bus_message_unref(incoming);
1291                                         return k;
1292                                 }
1293
1294                                 sd_bus_message_unref(incoming);
1295                                 return -EIO;
1296                         }
1297
1298                         /* There's already guaranteed to be room for
1299                          * this, so need to resize things here */
1300                         bus->rqueue[bus->rqueue_size ++] = incoming;
1301                         room = false;
1302
1303                         /* Try to read more, right-away */
1304                         continue;
1305                 }
1306                 if (r != 0)
1307                         continue;
1308
1309                 if (timeout > 0) {
1310                         usec_t n;
1311
1312                         n = now(CLOCK_MONOTONIC);
1313                         if (n >= timeout)
1314                                 return -ETIMEDOUT;
1315
1316                         left = timeout - n;
1317                 } else
1318                         left = (uint64_t) -1;
1319
1320                 r = bus_poll(bus, true, left);
1321                 if (r < 0)
1322                         return r;
1323
1324                 r = dispatch_wqueue(bus);
1325                 if (r < 0)
1326                         return r;
1327         }
1328 }
1329
1330 int sd_bus_get_fd(sd_bus *bus) {
1331         if (!bus)
1332                 return -EINVAL;
1333
1334         if (bus->fd < 0)
1335                 return -ENOTCONN;
1336
1337         return bus->fd;
1338 }
1339
1340 int sd_bus_get_events(sd_bus *bus) {
1341         int flags = 0;
1342
1343         if (!bus)
1344                 return -EINVAL;
1345         if (bus->state == BUS_UNSET)
1346                 return -ENOTCONN;
1347         if (bus->fd < 0)
1348                 return -ENOTCONN;
1349
1350         if (bus->state == BUS_OPENING)
1351                 flags |= POLLOUT;
1352         else if (bus->state == BUS_AUTHENTICATING) {
1353
1354                 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1355                         flags |= POLLOUT;
1356
1357                 flags |= POLLIN;
1358
1359         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1360                 if (bus->rqueue_size <= 0)
1361                         flags |= POLLIN;
1362                 if (bus->wqueue_size > 0)
1363                         flags |= POLLOUT;
1364         }
1365
1366         return flags;
1367 }
1368
1369 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1370         struct reply_callback *c;
1371
1372         if (!bus)
1373                 return -EINVAL;
1374         if (!timeout_usec)
1375                 return -EINVAL;
1376         if (bus->state == BUS_UNSET)
1377                 return -ENOTCONN;
1378         if (bus->fd < 0)
1379                 return -ENOTCONN;
1380
1381         if (bus->state == BUS_AUTHENTICATING) {
1382                 *timeout_usec = bus->auth_timeout;
1383                 return 1;
1384         }
1385
1386         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1387                 return 0;
1388
1389         c = prioq_peek(bus->reply_callbacks_prioq);
1390         if (!c)
1391                 return 0;
1392
1393         *timeout_usec = c->timeout;
1394         return 1;
1395 }
1396
1397 static int process_timeout(sd_bus *bus) {
1398         struct reply_callback *c;
1399         usec_t n;
1400         int r;
1401
1402         assert(bus);
1403
1404         c = prioq_peek(bus->reply_callbacks_prioq);
1405         if (!c)
1406                 return 0;
1407
1408         n = now(CLOCK_MONOTONIC);
1409         if (c->timeout > n)
1410                 return 0;
1411
1412         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1413         hashmap_remove(bus->reply_callbacks, &c->serial);
1414
1415         r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1416         free(c);
1417
1418         return r < 0 ? r : 1;
1419 }
1420
1421 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1422         struct reply_callback *c;
1423         int r;
1424
1425         assert(bus);
1426         assert(m);
1427
1428         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1429             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1430                 return 0;
1431
1432         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1433         if (!c)
1434                 return 0;
1435
1436         if (c->timeout != 0)
1437                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1438
1439         r = c->callback(bus, 0, m, c->userdata);
1440         free(c);
1441
1442         return r;
1443 }
1444
1445 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1446         struct filter_callback *l;
1447         int r;
1448
1449         LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1450                 r = l->callback(bus, 0, m, l->userdata);
1451                 if (r != 0)
1452                         return r;
1453         }
1454
1455         return 0;
1456 }
1457
1458 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1459         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1460         int r;
1461
1462         assert(bus);
1463         assert(m);
1464
1465         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1466                 return 0;
1467
1468         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1469                 return 0;
1470
1471         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1472                 return 1;
1473
1474         if (streq_ptr(m->member, "Ping"))
1475                 r = sd_bus_message_new_method_return(bus, m, &reply);
1476         else if (streq_ptr(m->member, "GetMachineId")) {
1477                 sd_id128_t id;
1478                 char sid[33];
1479
1480                 r = sd_id128_get_machine(&id);
1481                 if (r < 0)
1482                         return r;
1483
1484                 r = sd_bus_message_new_method_return(bus, m, &reply);
1485                 if (r < 0)
1486                         return r;
1487
1488                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1489         } else {
1490                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1491
1492                 sd_bus_error_set(&error,
1493                                  "org.freedesktop.DBus.Error.UnknownMethod",
1494                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1495
1496                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1497         }
1498
1499         if (r < 0)
1500                 return r;
1501
1502         r = sd_bus_send(bus, reply, NULL);
1503         if (r < 0)
1504                 return r;
1505
1506         return 1;
1507 }
1508
1509 static int process_object(sd_bus *bus, sd_bus_message *m) {
1510         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1511         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1512         struct object_callback *c;
1513         char *p;
1514         int r;
1515         bool found = false;
1516
1517         assert(bus);
1518         assert(m);
1519
1520         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1521                 return 0;
1522
1523         if (hashmap_isempty(bus->object_callbacks))
1524                 return 0;
1525
1526         c = hashmap_get(bus->object_callbacks, m->path);
1527         if (c) {
1528                 r = c->callback(bus, 0, m, c->userdata);
1529                 if (r != 0)
1530                         return r;
1531
1532                 found = true;
1533         }
1534
1535         /* Look for fallback prefixes */
1536         p = strdupa(m->path);
1537         for (;;) {
1538                 char *e;
1539
1540                 e = strrchr(p, '/');
1541                 if (e == p || !e)
1542                         break;
1543
1544                 *e = 0;
1545
1546                 c = hashmap_get(bus->object_callbacks, p);
1547                 if (c && c->is_fallback) {
1548                         r = c->callback(bus, 0, m, c->userdata);
1549                         if (r != 0)
1550                                 return r;
1551
1552                         found = true;
1553                 }
1554         }
1555
1556         if (!found)
1557                 return 0;
1558
1559         sd_bus_error_set(&error,
1560                          "org.freedesktop.DBus.Error.UnknownMethod",
1561                          "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1562
1563         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1564         if (r < 0)
1565                 return r;
1566
1567         r = sd_bus_send(bus, reply, NULL);
1568         if (r < 0)
1569                 return r;
1570
1571         return 1;
1572 }
1573
1574 static int process_message(sd_bus *bus, sd_bus_message *m) {
1575         int r;
1576
1577         assert(bus);
1578         assert(m);
1579
1580         r = process_reply(bus, m);
1581         if (r != 0)
1582                 return r;
1583
1584         r = process_filter(bus, m);
1585         if (r != 0)
1586                 return r;
1587
1588         r = process_builtin(bus, m);
1589         if (r != 0)
1590                 return r;
1591
1592         return process_object(bus, m);
1593 }
1594
1595 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1596         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1597         int r;
1598
1599         assert(bus);
1600         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1601
1602         r = process_timeout(bus);
1603         if (r != 0)
1604                 goto null_message;
1605
1606         r = dispatch_wqueue(bus);
1607         if (r != 0)
1608                 goto null_message;
1609
1610         r = dispatch_rqueue(bus, &m);
1611         if (r < 0)
1612                 return r;
1613         if (!m)
1614                 goto null_message;
1615
1616         r = process_message(bus, m);
1617         if (r != 0)
1618                 goto null_message;
1619
1620         if (ret) {
1621                 *ret = m;
1622                 m = NULL;
1623                 return 1;
1624         }
1625
1626         if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1627                 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1628                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1629
1630                 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1631
1632                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1633                 if (r < 0)
1634                         return r;
1635
1636                 r = sd_bus_send(bus, reply, NULL);
1637                 if (r < 0)
1638                         return r;
1639         }
1640
1641         return 1;
1642
1643 null_message:
1644         if (r >= 0 && ret)
1645                 *ret = NULL;
1646
1647         return r;
1648 }
1649
1650 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1651         int r;
1652
1653         /* Returns 0 when we didn't do anything. This should cause the
1654          * caller to invoke sd_bus_wait() before returning the next
1655          * time. Returns > 0 when we did something, which possibly
1656          * means *ret is filled in with an unprocessed message. */
1657
1658         if (!bus)
1659                 return -EINVAL;
1660         if (bus->fd < 0)
1661                 return -ENOTCONN;
1662
1663         switch (bus->state) {
1664
1665         case BUS_UNSET:
1666                 return -ENOTCONN;
1667
1668         case BUS_OPENING:
1669                 r = bus_socket_process_opening(bus);
1670                 if (r < 0)
1671                         return r;
1672                 if (ret)
1673                         *ret = NULL;
1674                 return r;
1675
1676         case BUS_AUTHENTICATING:
1677
1678                 r = bus_socket_process_authenticating(bus);
1679                 if (r < 0)
1680                         return r;
1681                 if (ret)
1682                         *ret = NULL;
1683                 return r;
1684
1685         case BUS_RUNNING:
1686         case BUS_HELLO:
1687
1688                 return process_running(bus, ret);
1689         }
1690
1691         assert_not_reached("Unknown state");
1692 }
1693
1694 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1695         struct pollfd p;
1696         int r, e;
1697         struct timespec ts;
1698         usec_t until, m;
1699
1700         assert(bus);
1701
1702         if (bus->fd < 0)
1703                 return -ENOTCONN;
1704
1705         e = sd_bus_get_events(bus);
1706         if (e < 0)
1707                 return e;
1708
1709         if (need_more)
1710                 e |= POLLIN;
1711
1712         r = sd_bus_get_timeout(bus, &until);
1713         if (r < 0)
1714                 return r;
1715         if (r == 0)
1716                 m = (uint64_t) -1;
1717         else {
1718                 usec_t n;
1719                 n = now(CLOCK_MONOTONIC);
1720                 m = until > n ? until - n : 0;
1721         }
1722
1723         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1724                 m = timeout_usec;
1725
1726         zero(p);
1727         p.fd = bus->fd;
1728         p.events = e;
1729
1730         r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1731         if (r < 0)
1732                 return -errno;
1733
1734         return r > 0 ? 1 : 0;
1735 }
1736
1737 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1738
1739         if (!bus)
1740                 return -EINVAL;
1741         if (bus->state == BUS_UNSET)
1742                 return -ENOTCONN;
1743         if (bus->fd < 0)
1744                 return -ENOTCONN;
1745         if (bus->rqueue_size > 0)
1746                 return 0;
1747
1748         return bus_poll(bus, false, timeout_usec);
1749 }
1750
1751 int sd_bus_flush(sd_bus *bus) {
1752         int r;
1753
1754         if (!bus)
1755                 return -EINVAL;
1756         if (bus->state == BUS_UNSET)
1757                 return -ENOTCONN;
1758         if (bus->fd < 0)
1759                 return -ENOTCONN;
1760
1761         r = bus_ensure_running(bus);
1762         if (r < 0)
1763                 return r;
1764
1765         if (bus->wqueue_size <= 0)
1766                 return 0;
1767
1768         for (;;) {
1769                 r = dispatch_wqueue(bus);
1770                 if (r < 0)
1771                         return r;
1772
1773                 if (bus->wqueue_size <= 0)
1774                         return 0;
1775
1776                 r = bus_poll(bus, false, (uint64_t) -1);
1777                 if (r < 0)
1778                         return r;
1779         }
1780 }
1781
1782 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1783         struct filter_callback *f;
1784
1785         if (!bus)
1786                 return -EINVAL;
1787         if (!callback)
1788                 return -EINVAL;
1789
1790         f = new(struct filter_callback, 1);
1791         if (!f)
1792                 return -ENOMEM;
1793         f->callback = callback;
1794         f->userdata = userdata;
1795
1796         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1797         return 0;
1798 }
1799
1800 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1801         struct filter_callback *f;
1802
1803         if (!bus)
1804                 return -EINVAL;
1805         if (!callback)
1806                 return -EINVAL;
1807
1808         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1809                 if (f->callback == callback && f->userdata == userdata) {
1810                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1811                         free(f);
1812                         return 1;
1813                 }
1814         }
1815
1816         return 0;
1817 }
1818
1819 static int bus_add_object(
1820                 sd_bus *bus,
1821                 bool fallback,
1822                 const char *path,
1823                 sd_message_handler_t callback,
1824                 void *userdata) {
1825
1826         struct object_callback *c;
1827         int r;
1828
1829         if (!bus)
1830                 return -EINVAL;
1831         if (!path)
1832                 return -EINVAL;
1833         if (!callback)
1834                 return -EINVAL;
1835
1836         r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
1837         if (r < 0)
1838                 return r;
1839
1840         c = new(struct object_callback, 1);
1841         if (!c)
1842                 return -ENOMEM;
1843
1844         c->path = strdup(path);
1845         if (!path) {
1846                 free(c);
1847                 return -ENOMEM;
1848         }
1849
1850         c->callback = callback;
1851         c->userdata = userdata;
1852         c->is_fallback = fallback;
1853
1854         r = hashmap_put(bus->object_callbacks, c->path, c);
1855         if (r < 0) {
1856                 free(c->path);
1857                 free(c);
1858                 return r;
1859         }
1860
1861         return 0;
1862 }
1863
1864 static int bus_remove_object(
1865                 sd_bus *bus,
1866                 bool fallback,
1867                 const char *path,
1868                 sd_message_handler_t callback,
1869                 void *userdata) {
1870
1871         struct object_callback *c;
1872
1873         if (!bus)
1874                 return -EINVAL;
1875         if (!path)
1876                 return -EINVAL;
1877         if (!callback)
1878                 return -EINVAL;
1879
1880         c = hashmap_get(bus->object_callbacks, path);
1881         if (!c)
1882                 return 0;
1883
1884         if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
1885                 return 0;
1886
1887         assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
1888
1889         free(c->path);
1890         free(c);
1891
1892         return 1;
1893 }
1894
1895 int sd_bus_add_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
1896         return bus_add_object(bus, false, path, callback, userdata);
1897 }
1898
1899 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
1900         return bus_remove_object(bus, false, path, callback, userdata);
1901 }
1902
1903 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
1904         return bus_add_object(bus, true, prefix, callback, userdata);
1905 }
1906
1907 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
1908         return bus_remove_object(bus, true, prefix, callback, userdata);
1909 }