chiark / gitweb /
bus: automatically generate minimal introspection data to find installed objects
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32 #include "strv.h"
33 #include "set.h"
34
35 #include "sd-bus.h"
36 #include "bus-internal.h"
37 #include "bus-message.h"
38 #include "bus-type.h"
39 #include "bus-socket.h"
40
41 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
42
43 static void bus_free(sd_bus *b) {
44         struct filter_callback *f;
45         struct object_callback *c;
46         unsigned i;
47
48         assert(b);
49
50         if (b->fd >= 0)
51                 close_nointr_nofail(b->fd);
52
53         free(b->rbuffer);
54         free(b->unique_name);
55         free(b->auth_uid);
56         free(b->address);
57
58         free(b->exec_path);
59         strv_free(b->exec_argv);
60
61         close_many(b->fds, b->n_fds);
62         free(b->fds);
63
64         for (i = 0; i < b->rqueue_size; i++)
65                 sd_bus_message_unref(b->rqueue[i]);
66         free(b->rqueue);
67
68         for (i = 0; i < b->wqueue_size; i++)
69                 sd_bus_message_unref(b->wqueue[i]);
70         free(b->wqueue);
71
72         hashmap_free_free(b->reply_callbacks);
73         prioq_free(b->reply_callbacks_prioq);
74
75         while ((f = b->filter_callbacks)) {
76                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
77                 free(f);
78         }
79
80         while ((c = hashmap_steal_first(b->object_callbacks))) {
81                 free(c->path);
82                 free(c);
83         }
84
85         hashmap_free(b->object_callbacks);
86
87         free(b);
88 }
89
90 int sd_bus_new(sd_bus **ret) {
91         sd_bus *r;
92
93         if (!ret)
94                 return -EINVAL;
95
96         r = new0(sd_bus, 1);
97         if (!r)
98                 return -ENOMEM;
99
100         r->n_ref = 1;
101         r->fd = -1;
102         r->message_version = 1;
103         r->negotiate_fds = true;
104
105         /* We guarantee that wqueue always has space for at least one
106          * entry */
107         r->wqueue = new(sd_bus_message*, 1);
108         if (!r->wqueue) {
109                 free(r);
110                 return -ENOMEM;
111         }
112
113         *ret = r;
114         return 0;
115 }
116
117 int sd_bus_set_address(sd_bus *bus, const char *address) {
118         char *a;
119
120         if (!bus)
121                 return -EINVAL;
122         if (bus->state != BUS_UNSET)
123                 return -EPERM;
124         if (!address)
125                 return -EINVAL;
126
127         a = strdup(address);
128         if (!a)
129                 return -ENOMEM;
130
131         free(bus->address);
132         bus->address = a;
133
134         return 0;
135 }
136
137 int sd_bus_set_fd(sd_bus *bus, int fd) {
138         if (!bus)
139                 return -EINVAL;
140         if (bus->state != BUS_UNSET)
141                 return -EPERM;
142         if (fd < 0)
143                 return -EINVAL;
144
145         bus->fd = fd;
146         return 0;
147 }
148
149 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
150         char *p, **a;
151
152         if (!bus)
153                 return -EINVAL;
154         if (bus->state != BUS_UNSET)
155                 return -EPERM;
156         if (!path)
157                 return -EINVAL;
158         if (strv_isempty(argv))
159                 return -EINVAL;
160
161         p = strdup(path);
162         if (!p)
163                 return -ENOMEM;
164
165         a = strv_copy(argv);
166         if (!a) {
167                 free(p);
168                 return -ENOMEM;
169         }
170
171         free(bus->exec_path);
172         strv_free(bus->exec_argv);
173
174         bus->exec_path = p;
175         bus->exec_argv = a;
176
177         return 0;
178 }
179
180 int sd_bus_set_bus_client(sd_bus *bus, int b) {
181         if (!bus)
182                 return -EINVAL;
183         if (bus->state != BUS_UNSET)
184                 return -EPERM;
185
186         bus->bus_client = !!b;
187         return 0;
188 }
189
190 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
191         if (!bus)
192                 return -EINVAL;
193         if (bus->state != BUS_UNSET)
194                 return -EPERM;
195
196         bus->negotiate_fds = !!b;
197         return 0;
198 }
199
200 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
201         const char *s;
202         int r;
203
204         assert(bus);
205         assert(bus->state == BUS_HELLO);
206
207         if (error != 0)
208                 return -error;
209
210         assert(reply);
211
212         r = sd_bus_message_read(reply, "s", &s);
213         if (r < 0)
214                 return r;
215
216         if (!service_name_is_valid(s) || s[0] != ':')
217                 return -EBADMSG;
218
219         bus->unique_name = strdup(s);
220         if (!bus->unique_name)
221                 return -ENOMEM;
222
223         bus->state = BUS_RUNNING;
224
225         return 1;
226 }
227
228 static int bus_send_hello(sd_bus *bus) {
229         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
230         int r;
231
232         assert(bus);
233
234         if (!bus->bus_client)
235                 return 0;
236
237         r = sd_bus_message_new_method_call(
238                         bus,
239                         "org.freedesktop.DBus",
240                         "/",
241                         "org.freedesktop.DBus",
242                         "Hello",
243                         &m);
244         if (r < 0)
245                 return r;
246
247         r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
248         if (r < 0)
249                 return r;
250
251         return r;
252 }
253
254 int bus_start_running(sd_bus *bus) {
255         assert(bus);
256
257         if (bus->bus_client) {
258                 bus->state = BUS_HELLO;
259                 return 1;
260         }
261
262         bus->state = BUS_RUNNING;
263         return 1;
264 }
265
266 static int parse_address_key(const char **p, const char *key, char **value) {
267         size_t l, n = 0;
268         const char *a;
269         char *r = NULL;
270
271         assert(p);
272         assert(*p);
273         assert(value);
274
275         if (key) {
276                 l = strlen(key);
277                 if (strncmp(*p, key, l) != 0)
278                         return 0;
279
280                 if ((*p)[l] != '=')
281                         return 0;
282
283                 if (*value)
284                         return -EINVAL;
285
286                 a = *p + l + 1;
287         } else
288                 a = *p;
289
290         while (*a != ';' && *a != ',' && *a != 0) {
291                 char c, *t;
292
293                 if (*a == '%') {
294                         int x, y;
295
296                         x = unhexchar(a[1]);
297                         if (x < 0) {
298                                 free(r);
299                                 return x;
300                         }
301
302                         y = unhexchar(a[2]);
303                         if (y < 0) {
304                                 free(r);
305                                 return y;
306                         }
307
308                         c = (char) ((x << 4) | y);
309                         a += 3;
310                 } else {
311                         c = *a;
312                         a++;
313                 }
314
315                 t = realloc(r, n + 2);
316                 if (!t) {
317                         free(r);
318                         return -ENOMEM;
319                 }
320
321                 r = t;
322                 r[n++] = c;
323         }
324
325         if (!r) {
326                 r = strdup("");
327                 if (!r)
328                         return -ENOMEM;
329         } else
330                 r[n] = 0;
331
332         if (*a == ',')
333                 a++;
334
335         *p = a;
336
337         free(*value);
338         *value = r;
339
340         return 1;
341 }
342
343 static void skip_address_key(const char **p) {
344         assert(p);
345         assert(*p);
346
347         *p += strcspn(*p, ",");
348
349         if (**p == ',')
350                 (*p) ++;
351 }
352
353 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
354         _cleanup_free_ char *path = NULL, *abstract = NULL;
355         size_t l;
356         int r;
357
358         assert(b);
359         assert(p);
360         assert(*p);
361         assert(guid);
362
363         while (**p != 0 && **p != ';') {
364                 r = parse_address_key(p, "guid", guid);
365                 if (r < 0)
366                         return r;
367                 else if (r > 0)
368                         continue;
369
370                 r = parse_address_key(p, "path", &path);
371                 if (r < 0)
372                         return r;
373                 else if (r > 0)
374                         continue;
375
376                 r = parse_address_key(p, "abstract", &abstract);
377                 if (r < 0)
378                         return r;
379                 else if (r > 0)
380                         continue;
381
382                 skip_address_key(p);
383         }
384
385         if (!path && !abstract)
386                 return -EINVAL;
387
388         if (path && abstract)
389                 return -EINVAL;
390
391         if (path) {
392                 l = strlen(path);
393                 if (l > sizeof(b->sockaddr.un.sun_path))
394                         return -E2BIG;
395
396                 b->sockaddr.un.sun_family = AF_UNIX;
397                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
398                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
399         } else if (abstract) {
400                 l = strlen(abstract);
401                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
402                         return -E2BIG;
403
404                 b->sockaddr.un.sun_family = AF_UNIX;
405                 b->sockaddr.un.sun_path[0] = 0;
406                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
407                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
408         }
409
410         return 0;
411 }
412
413 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
414         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
415         struct addrinfo hints, *result;
416         int r;
417
418         assert(b);
419         assert(p);
420         assert(*p);
421         assert(guid);
422
423         while (**p != 0 && **p != ';') {
424                 r = parse_address_key(p, "guid", guid);
425                 if (r < 0)
426                         return r;
427                 else if (r > 0)
428                         continue;
429
430                 r = parse_address_key(p, "host", &host);
431                 if (r < 0)
432                         return r;
433                 else if (r > 0)
434                         continue;
435
436                 r = parse_address_key(p, "port", &port);
437                 if (r < 0)
438                         return r;
439                 else if (r > 0)
440                         continue;
441
442                 r = parse_address_key(p, "family", &family);
443                 if (r < 0)
444                         return r;
445                 else if (r > 0)
446                         continue;
447
448                 skip_address_key(p);
449         }
450
451         if (!host || !port)
452                 return -EINVAL;
453
454         zero(hints);
455         hints.ai_socktype = SOCK_STREAM;
456         hints.ai_flags = AI_ADDRCONFIG;
457
458         if (family) {
459                 if (streq(family, "ipv4"))
460                         hints.ai_family = AF_INET;
461                 else if (streq(family, "ipv6"))
462                         hints.ai_family = AF_INET6;
463                 else
464                         return -EINVAL;
465         }
466
467         r = getaddrinfo(host, port, &hints, &result);
468         if (r == EAI_SYSTEM)
469                 return -errno;
470         else if (r != 0)
471                 return -EADDRNOTAVAIL;
472
473         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
474         b->sockaddr_size = result->ai_addrlen;
475
476         freeaddrinfo(result);
477
478         return 0;
479 }
480
481 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
482         char *path = NULL;
483         unsigned n_argv = 0, j;
484         char **argv = NULL;
485         int r;
486
487         assert(b);
488         assert(p);
489         assert(*p);
490         assert(guid);
491
492         while (**p != 0 && **p != ';') {
493                 r = parse_address_key(p, "guid", guid);
494                 if (r < 0)
495                         goto fail;
496                 else if (r > 0)
497                         continue;
498
499                 r = parse_address_key(p, "path", &path);
500                 if (r < 0)
501                         goto fail;
502                 else if (r > 0)
503                         continue;
504
505                 if (startswith(*p, "argv")) {
506                         unsigned ul;
507
508                         errno = 0;
509                         ul = strtoul(*p + 4, (char**) p, 10);
510                         if (errno != 0 || **p != '=' || ul > 256) {
511                                 r = -EINVAL;
512                                 goto fail;
513                         }
514
515                         (*p) ++;
516
517                         if (ul >= n_argv) {
518                                 char **x;
519
520                                 x = realloc(argv, sizeof(char*) * (ul + 2));
521                                 if (!x) {
522                                         r = -ENOMEM;
523                                         goto fail;
524                                 }
525
526                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
527
528                                 argv = x;
529                                 n_argv = ul + 1;
530                         }
531
532                         r = parse_address_key(p, NULL, argv + ul);
533                         if (r < 0)
534                                 goto fail;
535
536                         continue;
537                 }
538
539                 skip_address_key(p);
540         }
541
542         if (!path)
543                 goto fail;
544
545         /* Make sure there are no holes in the array, with the
546          * exception of argv[0] */
547         for (j = 1; j < n_argv; j++)
548                 if (!argv[j]) {
549                         r = -EINVAL;
550                         goto fail;
551                 }
552
553         if (argv && argv[0] == NULL) {
554                 argv[0] = strdup(path);
555                 if (!argv[0]) {
556                         r = -ENOMEM;
557                         goto fail;
558                 }
559         }
560
561         b->exec_path = path;
562         b->exec_argv = argv;
563         return 0;
564
565 fail:
566         for (j = 0; j < n_argv; j++)
567                 free(argv[j]);
568
569         free(argv);
570         free(path);
571         return r;
572 }
573
574 static void bus_reset_parsed_address(sd_bus *b) {
575         assert(b);
576
577         zero(b->sockaddr);
578         b->sockaddr_size = 0;
579         strv_free(b->exec_argv);
580         free(b->exec_path);
581         b->exec_path = NULL;
582         b->exec_argv = NULL;
583         b->peer = SD_ID128_NULL;
584 }
585
586 static int bus_parse_next_address(sd_bus *b) {
587         _cleanup_free_ char *guid = NULL;
588         const char *a;
589         int r;
590
591         assert(b);
592
593         if (!b->address)
594                 return 0;
595         if (b->address[b->address_index] == 0)
596                 return 0;
597
598         bus_reset_parsed_address(b);
599
600         a = b->address + b->address_index;
601
602         while (*a != 0) {
603
604                 if (*a == ';') {
605                         a++;
606                         continue;
607                 }
608
609                 if (startswith(a, "unix:")) {
610                         a += 5;
611
612                         r = parse_unix_address(b, &a, &guid);
613                         if (r < 0)
614                                 return r;
615                         break;
616
617                 } else if (startswith(a, "tcp:")) {
618
619                         a += 4;
620                         r = parse_tcp_address(b, &a, &guid);
621                         if (r < 0)
622                                 return r;
623
624                         break;
625
626                 } else if (startswith(a, "unixexec:")) {
627
628                         a += 9;
629                         r = parse_exec_address(b, &a, &guid);
630                         if (r < 0)
631                                 return r;
632
633                         break;
634
635                 }
636
637                 a = strchr(a, ';');
638                 if (!a)
639                         return 0;
640         }
641
642         if (guid) {
643                 r = sd_id128_from_string(guid, &b->peer);
644                 if (r < 0)
645                         return r;
646         }
647
648         b->address_index = a - b->address;
649         return 1;
650 }
651
652 static int bus_start_address(sd_bus *b) {
653         int r;
654
655         assert(b);
656
657         for (;;) {
658                 if (b->fd >= 0) {
659                         close_nointr_nofail(b->fd);
660                         b->fd = -1;
661                 }
662
663                 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
664
665                         r = bus_socket_connect(b);
666                         if (r >= 0)
667                                 return r;
668
669                         b->last_connect_error = -r;
670
671                 } else if (b->exec_path) {
672
673                         r = bus_socket_exec(b);
674                         if (r >= 0)
675                                 return r;
676
677                         b->last_connect_error = -r;
678                 }
679
680                 r = bus_parse_next_address(b);
681                 if (r < 0)
682                         return r;
683                 if (r == 0)
684                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
685         }
686 }
687
688 int bus_next_address(sd_bus *b) {
689         assert(b);
690
691         bus_reset_parsed_address(b);
692         return bus_start_address(b);
693 }
694
695 static int bus_start_fd(sd_bus *b) {
696         int r;
697
698         assert(b);
699
700         r = fd_nonblock(b->fd, true);
701         if (r < 0)
702                 return r;
703
704         r = fd_cloexec(b->fd, true);
705         if (r < 0)
706                 return r;
707
708         return bus_socket_take_fd(b);
709 }
710
711 int sd_bus_start(sd_bus *bus) {
712         int r;
713
714         if (!bus)
715                 return -EINVAL;
716         if (bus->state != BUS_UNSET)
717                 return -EPERM;
718
719         bus->state = BUS_OPENING;
720
721         if (bus->fd >= 0)
722                 r = bus_start_fd(bus);
723         else if (bus->address)
724                 r = bus_start_address(bus);
725         else
726                 return -EINVAL;
727
728         if (r < 0)
729                 return r;
730
731         return bus_send_hello(bus);
732 }
733
734 int sd_bus_open_system(sd_bus **ret) {
735         const char *e;
736         sd_bus *b;
737         int r;
738
739         if (!ret)
740                 return -EINVAL;
741
742         r = sd_bus_new(&b);
743         if (r < 0)
744                 return r;
745
746         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
747         if (e) {
748                 r = sd_bus_set_address(b, e);
749                 if (r < 0)
750                         goto fail;
751         } else {
752                 b->sockaddr.un.sun_family = AF_UNIX;
753                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
754                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
755         }
756
757         b->bus_client = true;
758
759         r = sd_bus_start(b);
760         if (r < 0)
761                 goto fail;
762
763         *ret = b;
764         return 0;
765
766 fail:
767         bus_free(b);
768         return r;
769 }
770
771 int sd_bus_open_user(sd_bus **ret) {
772         const char *e;
773         sd_bus *b;
774         size_t l;
775         int r;
776
777         if (!ret)
778                 return -EINVAL;
779
780         r = sd_bus_new(&b);
781         if (r < 0)
782                 return r;
783
784         e = getenv("DBUS_SESSION_BUS_ADDRESS");
785         if (e) {
786                 r = sd_bus_set_address(b, e);
787                 if (r < 0)
788                         goto fail;
789         } else {
790                 e = getenv("XDG_RUNTIME_DIR");
791                 if (!e) {
792                         r = -ENOENT;
793                         goto fail;
794                 }
795
796                 l = strlen(e);
797                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
798                         r = -E2BIG;
799                         goto fail;
800                 }
801
802                 b->sockaddr.un.sun_family = AF_UNIX;
803                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
804                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
805         }
806
807         b->bus_client = true;
808
809         r = sd_bus_start(b);
810         if (r < 0)
811                 goto fail;
812
813         *ret = b;
814         return 0;
815
816 fail:
817         bus_free(b);
818         return r;
819 }
820
821 void sd_bus_close(sd_bus *bus) {
822         if (!bus)
823                 return;
824         if (bus->fd < 0)
825                 return;
826
827         close_nointr_nofail(bus->fd);
828         bus->fd = -1;
829 }
830
831 sd_bus *sd_bus_ref(sd_bus *bus) {
832         if (!bus)
833                 return NULL;
834
835         assert(bus->n_ref > 0);
836
837         bus->n_ref++;
838         return bus;
839 }
840
841 sd_bus *sd_bus_unref(sd_bus *bus) {
842         if (!bus)
843                 return NULL;
844
845         assert(bus->n_ref > 0);
846         bus->n_ref--;
847
848         if (bus->n_ref <= 0)
849                 bus_free(bus);
850
851         return NULL;
852 }
853
854 int sd_bus_is_open(sd_bus *bus) {
855         if (!bus)
856                 return -EINVAL;
857
858         return bus->state != BUS_UNSET && bus->fd >= 0;
859 }
860
861 int sd_bus_can_send(sd_bus *bus, char type) {
862         int r;
863
864         if (!bus)
865                 return -EINVAL;
866         if (bus->fd < 0)
867                 return -ENOTCONN;
868
869         if (type == SD_BUS_TYPE_UNIX_FD) {
870                 if (!bus->negotiate_fds)
871                         return 0;
872
873                 r = bus_ensure_running(bus);
874                 if (r < 0)
875                         return r;
876
877                 return bus->can_fds;
878         }
879
880         return bus_type_is_valid(type);
881 }
882
883 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
884         int r;
885
886         if (!bus)
887                 return -EINVAL;
888         if (!peer)
889                 return -EINVAL;
890
891         r = bus_ensure_running(bus);
892         if (r < 0)
893                 return r;
894
895         *peer = bus->peer;
896         return 0;
897 }
898
899 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
900         assert(m);
901
902         if (m->header->version > b->message_version)
903                 return -EPERM;
904
905         if (m->sealed)
906                 return 0;
907
908         return bus_message_seal(m, ++b->serial);
909 }
910
911 static int dispatch_wqueue(sd_bus *bus) {
912         int r, ret = 0;
913
914         assert(bus);
915         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
916
917         if (bus->fd < 0)
918                 return -ENOTCONN;
919
920         while (bus->wqueue_size > 0) {
921
922                 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
923                 if (r < 0) {
924                         sd_bus_close(bus);
925                         return r;
926                 } else if (r == 0)
927                         /* Didn't do anything this time */
928                         return ret;
929                 else if (bus->windex >= bus->wqueue[0]->size) {
930                         /* Fully written. Let's drop the entry from
931                          * the queue.
932                          *
933                          * This isn't particularly optimized, but
934                          * well, this is supposed to be our worst-case
935                          * buffer only, and the socket buffer is
936                          * supposed to be our primary buffer, and if
937                          * it got full, then all bets are off
938                          * anyway. */
939
940                         sd_bus_message_unref(bus->wqueue[0]);
941                         bus->wqueue_size --;
942                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
943                         bus->windex = 0;
944
945                         ret = 1;
946                 }
947         }
948
949         return ret;
950 }
951
952 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
953         sd_bus_message *z = NULL;
954         int r, ret = 0;
955
956         assert(bus);
957         assert(m);
958         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
959
960         if (bus->fd < 0)
961                 return -ENOTCONN;
962
963         if (bus->rqueue_size > 0) {
964                 /* Dispatch a queued message */
965
966                 *m = bus->rqueue[0];
967                 bus->rqueue_size --;
968                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
969                 return 1;
970         }
971
972         /* Try to read a new message */
973         do {
974                 r = bus_socket_read_message(bus, &z);
975                 if (r < 0) {
976                         sd_bus_close(bus);
977                         return r;
978                 }
979                 if (r == 0)
980                         return ret;
981
982                 r = 1;
983         } while (!z);
984
985         *m = z;
986         return 1;
987 }
988
989 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
990         int r;
991
992         if (!bus)
993                 return -EINVAL;
994         if (bus->state == BUS_UNSET)
995                 return -ENOTCONN;
996         if (bus->fd < 0)
997                 return -ENOTCONN;
998         if (!m)
999                 return -EINVAL;
1000
1001         if (m->n_fds > 0) {
1002                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1003                 if (r < 0)
1004                         return r;
1005                 if (r == 0)
1006                         return -ENOTSUP;
1007         }
1008
1009         /* If the serial number isn't kept, then we know that no reply
1010          * is expected */
1011         if (!serial && !m->sealed)
1012                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1013
1014         r = bus_seal_message(bus, m);
1015         if (r < 0)
1016                 return r;
1017
1018         /* If this is a reply and no reply was requested, then let's
1019          * suppress this, if we can */
1020         if (m->dont_send && !serial)
1021                 return 0;
1022
1023         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1024                 size_t idx = 0;
1025
1026                 r = bus_socket_write_message(bus, m, &idx);
1027                 if (r < 0) {
1028                         sd_bus_close(bus);
1029                         return r;
1030                 } else if (idx < m->size)  {
1031                         /* Wasn't fully written. So let's remember how
1032                          * much was written. Note that the first entry
1033                          * of the wqueue array is always allocated so
1034                          * that we always can remember how much was
1035                          * written. */
1036                         bus->wqueue[0] = sd_bus_message_ref(m);
1037                         bus->wqueue_size = 1;
1038                         bus->windex = idx;
1039                 }
1040         } else {
1041                 sd_bus_message **q;
1042
1043                 /* Just append it to the queue. */
1044
1045                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1046                         return -ENOBUFS;
1047
1048                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1049                 if (!q)
1050                         return -ENOMEM;
1051
1052                 bus->wqueue = q;
1053                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1054         }
1055
1056         if (serial)
1057                 *serial = BUS_MESSAGE_SERIAL(m);
1058
1059         return 0;
1060 }
1061
1062 static usec_t calc_elapse(uint64_t usec) {
1063         if (usec == (uint64_t) -1)
1064                 return 0;
1065
1066         if (usec == 0)
1067                 usec = BUS_DEFAULT_TIMEOUT;
1068
1069         return now(CLOCK_MONOTONIC) + usec;
1070 }
1071
1072 static int timeout_compare(const void *a, const void *b) {
1073         const struct reply_callback *x = a, *y = b;
1074
1075         if (x->timeout != 0 && y->timeout == 0)
1076                 return -1;
1077
1078         if (x->timeout == 0 && y->timeout != 0)
1079                 return 1;
1080
1081         if (x->timeout < y->timeout)
1082                 return -1;
1083
1084         if (x->timeout > y->timeout)
1085                 return 1;
1086
1087         return 0;
1088 }
1089
1090 int sd_bus_send_with_reply(
1091                 sd_bus *bus,
1092                 sd_bus_message *m,
1093                 sd_message_handler_t callback,
1094                 void *userdata,
1095                 uint64_t usec,
1096                 uint64_t *serial) {
1097
1098         struct reply_callback *c;
1099         int r;
1100
1101         if (!bus)
1102                 return -EINVAL;
1103         if (bus->state == BUS_UNSET)
1104                 return -ENOTCONN;
1105         if (bus->fd < 0)
1106                 return -ENOTCONN;
1107         if (!m)
1108                 return -EINVAL;
1109         if (!callback)
1110                 return -EINVAL;
1111         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1112                 return -EINVAL;
1113         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1114                 return -EINVAL;
1115
1116         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1117         if (r < 0)
1118                 return r;
1119
1120         if (usec != (uint64_t) -1) {
1121                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1122                 if (r < 0)
1123                         return r;
1124         }
1125
1126         r = bus_seal_message(bus, m);
1127         if (r < 0)
1128                 return r;
1129
1130         c = new(struct reply_callback, 1);
1131         if (!c)
1132                 return -ENOMEM;
1133
1134         c->callback = callback;
1135         c->userdata = userdata;
1136         c->serial = BUS_MESSAGE_SERIAL(m);
1137         c->timeout = calc_elapse(usec);
1138
1139         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1140         if (r < 0) {
1141                 free(c);
1142                 return r;
1143         }
1144
1145         if (c->timeout != 0) {
1146                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1147                 if (r < 0) {
1148                         c->timeout = 0;
1149                         sd_bus_send_with_reply_cancel(bus, c->serial);
1150                         return r;
1151                 }
1152         }
1153
1154         r = sd_bus_send(bus, m, serial);
1155         if (r < 0) {
1156                 sd_bus_send_with_reply_cancel(bus, c->serial);
1157                 return r;
1158         }
1159
1160         return r;
1161 }
1162
1163 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1164         struct reply_callback *c;
1165
1166         if (!bus)
1167                 return -EINVAL;
1168         if (serial == 0)
1169                 return -EINVAL;
1170
1171         c = hashmap_remove(bus->reply_callbacks, &serial);
1172         if (!c)
1173                 return 0;
1174
1175         if (c->timeout != 0)
1176                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1177
1178         free(c);
1179         return 1;
1180 }
1181
1182 int bus_ensure_running(sd_bus *bus) {
1183         int r;
1184
1185         assert(bus);
1186
1187         if (bus->fd < 0)
1188                 return -ENOTCONN;
1189         if (bus->state == BUS_UNSET)
1190                 return -ENOTCONN;
1191
1192         if (bus->state == BUS_RUNNING)
1193                 return 1;
1194
1195         for (;;) {
1196                 r = sd_bus_process(bus, NULL);
1197                 if (r < 0)
1198                         return r;
1199                 if (bus->state == BUS_RUNNING)
1200                         return 1;
1201                 if (r > 0)
1202                         continue;
1203
1204                 r = sd_bus_wait(bus, (uint64_t) -1);
1205                 if (r < 0)
1206                         return r;
1207         }
1208 }
1209
1210 int sd_bus_send_with_reply_and_block(
1211                 sd_bus *bus,
1212                 sd_bus_message *m,
1213                 uint64_t usec,
1214                 sd_bus_error *error,
1215                 sd_bus_message **reply) {
1216
1217         int r;
1218         usec_t timeout;
1219         uint64_t serial;
1220         bool room = false;
1221
1222         if (!bus)
1223                 return -EINVAL;
1224         if (bus->fd < 0)
1225                 return -ENOTCONN;
1226         if (bus->state == BUS_UNSET)
1227                 return -ENOTCONN;
1228         if (!m)
1229                 return -EINVAL;
1230         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1231                 return -EINVAL;
1232         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1233                 return -EINVAL;
1234         if (bus_error_is_dirty(error))
1235                 return -EINVAL;
1236
1237         r = bus_ensure_running(bus);
1238         if (r < 0)
1239                 return r;
1240
1241         r = sd_bus_send(bus, m, &serial);
1242         if (r < 0)
1243                 return r;
1244
1245         timeout = calc_elapse(usec);
1246
1247         for (;;) {
1248                 usec_t left;
1249                 sd_bus_message *incoming = NULL;
1250
1251                 if (!room) {
1252                         sd_bus_message **q;
1253
1254                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1255                                 return -ENOBUFS;
1256
1257                         /* Make sure there's room for queuing this
1258                          * locally, before we read the message */
1259
1260                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1261                         if (!q)
1262                                 return -ENOMEM;
1263
1264                         bus->rqueue = q;
1265                         room = true;
1266                 }
1267
1268                 r = bus_socket_read_message(bus, &incoming);
1269                 if (r < 0)
1270                         return r;
1271                 if (incoming) {
1272
1273                         if (incoming->reply_serial == serial) {
1274                                 /* Found a match! */
1275
1276                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1277                                         *reply = incoming;
1278                                         return 0;
1279                                 }
1280
1281                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1282                                         int k;
1283
1284                                         r = sd_bus_error_copy(error, &incoming->error);
1285                                         if (r < 0) {
1286                                                 sd_bus_message_unref(incoming);
1287                                                 return r;
1288                                         }
1289
1290                                         k = bus_error_to_errno(&incoming->error);
1291                                         sd_bus_message_unref(incoming);
1292                                         return k;
1293                                 }
1294
1295                                 sd_bus_message_unref(incoming);
1296                                 return -EIO;
1297                         }
1298
1299                         /* There's already guaranteed to be room for
1300                          * this, so need to resize things here */
1301                         bus->rqueue[bus->rqueue_size ++] = incoming;
1302                         room = false;
1303
1304                         /* Try to read more, right-away */
1305                         continue;
1306                 }
1307                 if (r != 0)
1308                         continue;
1309
1310                 if (timeout > 0) {
1311                         usec_t n;
1312
1313                         n = now(CLOCK_MONOTONIC);
1314                         if (n >= timeout)
1315                                 return -ETIMEDOUT;
1316
1317                         left = timeout - n;
1318                 } else
1319                         left = (uint64_t) -1;
1320
1321                 r = bus_poll(bus, true, left);
1322                 if (r < 0)
1323                         return r;
1324
1325                 r = dispatch_wqueue(bus);
1326                 if (r < 0)
1327                         return r;
1328         }
1329 }
1330
1331 int sd_bus_get_fd(sd_bus *bus) {
1332         if (!bus)
1333                 return -EINVAL;
1334
1335         if (bus->fd < 0)
1336                 return -ENOTCONN;
1337
1338         return bus->fd;
1339 }
1340
1341 int sd_bus_get_events(sd_bus *bus) {
1342         int flags = 0;
1343
1344         if (!bus)
1345                 return -EINVAL;
1346         if (bus->state == BUS_UNSET)
1347                 return -ENOTCONN;
1348         if (bus->fd < 0)
1349                 return -ENOTCONN;
1350
1351         if (bus->state == BUS_OPENING)
1352                 flags |= POLLOUT;
1353         else if (bus->state == BUS_AUTHENTICATING) {
1354
1355                 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1356                         flags |= POLLOUT;
1357
1358                 flags |= POLLIN;
1359
1360         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1361                 if (bus->rqueue_size <= 0)
1362                         flags |= POLLIN;
1363                 if (bus->wqueue_size > 0)
1364                         flags |= POLLOUT;
1365         }
1366
1367         return flags;
1368 }
1369
1370 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1371         struct reply_callback *c;
1372
1373         if (!bus)
1374                 return -EINVAL;
1375         if (!timeout_usec)
1376                 return -EINVAL;
1377         if (bus->state == BUS_UNSET)
1378                 return -ENOTCONN;
1379         if (bus->fd < 0)
1380                 return -ENOTCONN;
1381
1382         if (bus->state == BUS_AUTHENTICATING) {
1383                 *timeout_usec = bus->auth_timeout;
1384                 return 1;
1385         }
1386
1387         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1388                 return 0;
1389
1390         c = prioq_peek(bus->reply_callbacks_prioq);
1391         if (!c)
1392                 return 0;
1393
1394         *timeout_usec = c->timeout;
1395         return 1;
1396 }
1397
1398 static int process_timeout(sd_bus *bus) {
1399         struct reply_callback *c;
1400         usec_t n;
1401         int r;
1402
1403         assert(bus);
1404
1405         c = prioq_peek(bus->reply_callbacks_prioq);
1406         if (!c)
1407                 return 0;
1408
1409         n = now(CLOCK_MONOTONIC);
1410         if (c->timeout > n)
1411                 return 0;
1412
1413         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1414         hashmap_remove(bus->reply_callbacks, &c->serial);
1415
1416         r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1417         free(c);
1418
1419         return r < 0 ? r : 1;
1420 }
1421
1422 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1423         struct reply_callback *c;
1424         int r;
1425
1426         assert(bus);
1427         assert(m);
1428
1429         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1430             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1431                 return 0;
1432
1433         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1434         if (!c)
1435                 return 0;
1436
1437         if (c->timeout != 0)
1438                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1439
1440         r = c->callback(bus, 0, m, c->userdata);
1441         free(c);
1442
1443         return r;
1444 }
1445
1446 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1447         struct filter_callback *l;
1448         int r;
1449
1450         LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1451                 r = l->callback(bus, 0, m, l->userdata);
1452                 if (r != 0)
1453                         return r;
1454         }
1455
1456         return 0;
1457 }
1458
1459 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1460         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1461         int r;
1462
1463         assert(bus);
1464         assert(m);
1465
1466         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1467                 return 0;
1468
1469         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1470                 return 0;
1471
1472         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1473                 return 1;
1474
1475         if (streq_ptr(m->member, "Ping"))
1476                 r = sd_bus_message_new_method_return(bus, m, &reply);
1477         else if (streq_ptr(m->member, "GetMachineId")) {
1478                 sd_id128_t id;
1479                 char sid[33];
1480
1481                 r = sd_id128_get_machine(&id);
1482                 if (r < 0)
1483                         return r;
1484
1485                 r = sd_bus_message_new_method_return(bus, m, &reply);
1486                 if (r < 0)
1487                         return r;
1488
1489                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1490         } else {
1491                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1492
1493                 sd_bus_error_set(&error,
1494                                  "org.freedesktop.DBus.Error.UnknownMethod",
1495                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1496
1497                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1498         }
1499
1500         if (r < 0)
1501                 return r;
1502
1503         r = sd_bus_send(bus, reply, NULL);
1504         if (r < 0)
1505                 return r;
1506
1507         return 1;
1508 }
1509
1510 static int process_object(sd_bus *bus, sd_bus_message *m) {
1511         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1512         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1513         struct object_callback *c;
1514         char *p;
1515         int r;
1516         bool found = false;
1517
1518         assert(bus);
1519         assert(m);
1520
1521         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1522                 return 0;
1523
1524         if (hashmap_isempty(bus->object_callbacks))
1525                 return 0;
1526
1527         c = hashmap_get(bus->object_callbacks, m->path);
1528         if (c) {
1529                 r = c->callback(bus, 0, m, c->userdata);
1530                 if (r != 0)
1531                         return r;
1532
1533                 found = true;
1534         }
1535
1536         /* Look for fallback prefixes */
1537         p = strdupa(m->path);
1538         for (;;) {
1539                 char *e;
1540
1541                 e = strrchr(p, '/');
1542                 if (e == p || !e)
1543                         break;
1544
1545                 *e = 0;
1546
1547                 c = hashmap_get(bus->object_callbacks, p);
1548                 if (c && c->is_fallback) {
1549                         r = c->callback(bus, 0, m, c->userdata);
1550                         if (r != 0)
1551                                 return r;
1552
1553                         found = true;
1554                 }
1555         }
1556
1557         /* We found some handlers but none wanted to take this, then
1558          * return this -- with one exception, we can handle
1559          * introspection minimally ourselves */
1560         if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1561                 return 0;
1562
1563         sd_bus_error_set(&error,
1564                          "org.freedesktop.DBus.Error.UnknownMethod",
1565                          "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1566
1567         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1568         if (r < 0)
1569                 return r;
1570
1571         r = sd_bus_send(bus, reply, NULL);
1572         if (r < 0)
1573                 return r;
1574
1575         return 1;
1576 }
1577
1578 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1579         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1580         _cleanup_free_ char *introspection = NULL;
1581         _cleanup_set_free_free_ Set *s = NULL;
1582         _cleanup_fclose_ FILE *f = NULL;
1583         struct object_callback *c;
1584         Iterator i;
1585         size_t size = 0;
1586         char *node;
1587         int r;
1588
1589         assert(bus);
1590         assert(m);
1591
1592         if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1593                 return 0;
1594
1595         if (!m->path)
1596                 return 0;
1597
1598         s = set_new(string_hash_func, string_compare_func);
1599         if (!s)
1600                 return -ENOMEM;
1601
1602         HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1603                 const char *e;
1604                 char *a, *p;
1605
1606                 if (streq(c->path, "/"))
1607                         continue;
1608
1609                 if (streq(m->path, "/"))
1610                         e = c->path;
1611                 else {
1612                         e = startswith(c->path, m->path);
1613                         if (!e || *e != '/')
1614                                 continue;
1615                 }
1616
1617                 a = strdup(e+1);
1618                 if (!a)
1619                         return -ENOMEM;
1620
1621                 p = strchr(a, '/');
1622                 if (p)
1623                         *p = 0;
1624
1625                 r = set_put(s, a);
1626                 if (r < 0) {
1627                         free(a);
1628
1629                         if (r != -EEXIST)
1630                                 return r;
1631                 }
1632         }
1633
1634         f = open_memstream(&introspection, &size);
1635         if (!f)
1636                 return -ENOMEM;
1637
1638         fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1639         fputs("<node>\n", f);
1640         fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1641         fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1642
1643         while ((node = set_steal_first(s))) {
1644                 fprintf(f, " <node name=\"%s\"/>\n", node);
1645                 free(node);
1646         }
1647
1648         fputs("</node>\n", f);
1649
1650         fflush(f);
1651
1652         if (ferror(f))
1653                 return -ENOMEM;
1654
1655         r = sd_bus_message_new_method_return(bus, m, &reply);
1656         if (r < 0)
1657                 return r;
1658
1659         r = sd_bus_message_append(reply, "s", introspection);
1660         if (r < 0)
1661                 return r;
1662
1663         r = sd_bus_send(bus, reply, NULL);
1664         if (r < 0)
1665                 return r;
1666
1667         return 1;
1668 }
1669
1670 static int process_message(sd_bus *bus, sd_bus_message *m) {
1671         int r;
1672
1673         assert(bus);
1674         assert(m);
1675
1676         r = process_reply(bus, m);
1677         if (r != 0)
1678                 return r;
1679
1680         r = process_filter(bus, m);
1681         if (r != 0)
1682                 return r;
1683
1684         r = process_builtin(bus, m);
1685         if (r != 0)
1686                 return r;
1687
1688         r = process_object(bus, m);
1689         if (r != 0)
1690                 return r;
1691
1692         return process_introspect(bus, m);
1693 }
1694
1695 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1696         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1697         int r;
1698
1699         assert(bus);
1700         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1701
1702         r = process_timeout(bus);
1703         if (r != 0)
1704                 goto null_message;
1705
1706         r = dispatch_wqueue(bus);
1707         if (r != 0)
1708                 goto null_message;
1709
1710         r = dispatch_rqueue(bus, &m);
1711         if (r < 0)
1712                 return r;
1713         if (!m)
1714                 goto null_message;
1715
1716         r = process_message(bus, m);
1717         if (r != 0)
1718                 goto null_message;
1719
1720         if (ret) {
1721                 *ret = m;
1722                 m = NULL;
1723                 return 1;
1724         }
1725
1726         if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1727                 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1728                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1729
1730                 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1731
1732                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1733                 if (r < 0)
1734                         return r;
1735
1736                 r = sd_bus_send(bus, reply, NULL);
1737                 if (r < 0)
1738                         return r;
1739         }
1740
1741         return 1;
1742
1743 null_message:
1744         if (r >= 0 && ret)
1745                 *ret = NULL;
1746
1747         return r;
1748 }
1749
1750 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1751         int r;
1752
1753         /* Returns 0 when we didn't do anything. This should cause the
1754          * caller to invoke sd_bus_wait() before returning the next
1755          * time. Returns > 0 when we did something, which possibly
1756          * means *ret is filled in with an unprocessed message. */
1757
1758         if (!bus)
1759                 return -EINVAL;
1760         if (bus->fd < 0)
1761                 return -ENOTCONN;
1762
1763         switch (bus->state) {
1764
1765         case BUS_UNSET:
1766                 return -ENOTCONN;
1767
1768         case BUS_OPENING:
1769                 r = bus_socket_process_opening(bus);
1770                 if (r < 0)
1771                         return r;
1772                 if (ret)
1773                         *ret = NULL;
1774                 return r;
1775
1776         case BUS_AUTHENTICATING:
1777
1778                 r = bus_socket_process_authenticating(bus);
1779                 if (r < 0)
1780                         return r;
1781                 if (ret)
1782                         *ret = NULL;
1783                 return r;
1784
1785         case BUS_RUNNING:
1786         case BUS_HELLO:
1787
1788                 return process_running(bus, ret);
1789         }
1790
1791         assert_not_reached("Unknown state");
1792 }
1793
1794 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1795         struct pollfd p;
1796         int r, e;
1797         struct timespec ts;
1798         usec_t until, m;
1799
1800         assert(bus);
1801
1802         if (bus->fd < 0)
1803                 return -ENOTCONN;
1804
1805         e = sd_bus_get_events(bus);
1806         if (e < 0)
1807                 return e;
1808
1809         if (need_more)
1810                 e |= POLLIN;
1811
1812         r = sd_bus_get_timeout(bus, &until);
1813         if (r < 0)
1814                 return r;
1815         if (r == 0)
1816                 m = (uint64_t) -1;
1817         else {
1818                 usec_t n;
1819                 n = now(CLOCK_MONOTONIC);
1820                 m = until > n ? until - n : 0;
1821         }
1822
1823         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1824                 m = timeout_usec;
1825
1826         zero(p);
1827         p.fd = bus->fd;
1828         p.events = e;
1829
1830         r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1831         if (r < 0)
1832                 return -errno;
1833
1834         return r > 0 ? 1 : 0;
1835 }
1836
1837 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1838
1839         if (!bus)
1840                 return -EINVAL;
1841         if (bus->state == BUS_UNSET)
1842                 return -ENOTCONN;
1843         if (bus->fd < 0)
1844                 return -ENOTCONN;
1845         if (bus->rqueue_size > 0)
1846                 return 0;
1847
1848         return bus_poll(bus, false, timeout_usec);
1849 }
1850
1851 int sd_bus_flush(sd_bus *bus) {
1852         int r;
1853
1854         if (!bus)
1855                 return -EINVAL;
1856         if (bus->state == BUS_UNSET)
1857                 return -ENOTCONN;
1858         if (bus->fd < 0)
1859                 return -ENOTCONN;
1860
1861         r = bus_ensure_running(bus);
1862         if (r < 0)
1863                 return r;
1864
1865         if (bus->wqueue_size <= 0)
1866                 return 0;
1867
1868         for (;;) {
1869                 r = dispatch_wqueue(bus);
1870                 if (r < 0)
1871                         return r;
1872
1873                 if (bus->wqueue_size <= 0)
1874                         return 0;
1875
1876                 r = bus_poll(bus, false, (uint64_t) -1);
1877                 if (r < 0)
1878                         return r;
1879         }
1880 }
1881
1882 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1883         struct filter_callback *f;
1884
1885         if (!bus)
1886                 return -EINVAL;
1887         if (!callback)
1888                 return -EINVAL;
1889
1890         f = new(struct filter_callback, 1);
1891         if (!f)
1892                 return -ENOMEM;
1893         f->callback = callback;
1894         f->userdata = userdata;
1895
1896         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1897         return 0;
1898 }
1899
1900 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1901         struct filter_callback *f;
1902
1903         if (!bus)
1904                 return -EINVAL;
1905         if (!callback)
1906                 return -EINVAL;
1907
1908         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1909                 if (f->callback == callback && f->userdata == userdata) {
1910                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1911                         free(f);
1912                         return 1;
1913                 }
1914         }
1915
1916         return 0;
1917 }
1918
1919 static int bus_add_object(
1920                 sd_bus *bus,
1921                 bool fallback,
1922                 const char *path,
1923                 sd_message_handler_t callback,
1924                 void *userdata) {
1925
1926         struct object_callback *c;
1927         int r;
1928
1929         if (!bus)
1930                 return -EINVAL;
1931         if (!path)
1932                 return -EINVAL;
1933         if (!callback)
1934                 return -EINVAL;
1935
1936         r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
1937         if (r < 0)
1938                 return r;
1939
1940         c = new(struct object_callback, 1);
1941         if (!c)
1942                 return -ENOMEM;
1943
1944         c->path = strdup(path);
1945         if (!path) {
1946                 free(c);
1947                 return -ENOMEM;
1948         }
1949
1950         c->callback = callback;
1951         c->userdata = userdata;
1952         c->is_fallback = fallback;
1953
1954         r = hashmap_put(bus->object_callbacks, c->path, c);
1955         if (r < 0) {
1956                 free(c->path);
1957                 free(c);
1958                 return r;
1959         }
1960
1961         return 0;
1962 }
1963
1964 static int bus_remove_object(
1965                 sd_bus *bus,
1966                 bool fallback,
1967                 const char *path,
1968                 sd_message_handler_t callback,
1969                 void *userdata) {
1970
1971         struct object_callback *c;
1972
1973         if (!bus)
1974                 return -EINVAL;
1975         if (!path)
1976                 return -EINVAL;
1977         if (!callback)
1978                 return -EINVAL;
1979
1980         c = hashmap_get(bus->object_callbacks, path);
1981         if (!c)
1982                 return 0;
1983
1984         if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
1985                 return 0;
1986
1987         assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
1988
1989         free(c->path);
1990         free(c);
1991
1992         return 1;
1993 }
1994
1995 int sd_bus_add_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
1996         return bus_add_object(bus, false, path, callback, userdata);
1997 }
1998
1999 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2000         return bus_remove_object(bus, false, path, callback, userdata);
2001 }
2002
2003 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2004         return bus_add_object(bus, true, prefix, callback, userdata);
2005 }
2006
2007 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2008         return bus_remove_object(bus, true, prefix, callback, userdata);
2009 }