chiark / gitweb /
870d617ac1e2b962f506f9c34ef9760333c5f641
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32 #include "strv.h"
33 #include "set.h"
34
35 #include "sd-bus.h"
36 #include "bus-internal.h"
37 #include "bus-message.h"
38 #include "bus-type.h"
39 #include "bus-socket.h"
40
41 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
42
43 static void bus_free(sd_bus *b) {
44         struct filter_callback *f;
45         struct object_callback *c;
46         unsigned i;
47
48         assert(b);
49
50         if (b->fd >= 0)
51                 close_nointr_nofail(b->fd);
52
53         free(b->rbuffer);
54         free(b->unique_name);
55         free(b->auth_uid);
56         free(b->address);
57
58         free(b->exec_path);
59         strv_free(b->exec_argv);
60
61         close_many(b->fds, b->n_fds);
62         free(b->fds);
63
64         for (i = 0; i < b->rqueue_size; i++)
65                 sd_bus_message_unref(b->rqueue[i]);
66         free(b->rqueue);
67
68         for (i = 0; i < b->wqueue_size; i++)
69                 sd_bus_message_unref(b->wqueue[i]);
70         free(b->wqueue);
71
72         hashmap_free_free(b->reply_callbacks);
73         prioq_free(b->reply_callbacks_prioq);
74
75         while ((f = b->filter_callbacks)) {
76                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
77                 free(f);
78         }
79
80         while ((c = hashmap_steal_first(b->object_callbacks))) {
81                 free(c->path);
82                 free(c);
83         }
84
85         hashmap_free(b->object_callbacks);
86
87         free(b);
88 }
89
90 int sd_bus_new(sd_bus **ret) {
91         sd_bus *r;
92
93         if (!ret)
94                 return -EINVAL;
95
96         r = new0(sd_bus, 1);
97         if (!r)
98                 return -ENOMEM;
99
100         r->n_ref = 1;
101         r->fd = -1;
102         r->message_version = 1;
103         r->negotiate_fds = true;
104
105         /* We guarantee that wqueue always has space for at least one
106          * entry */
107         r->wqueue = new(sd_bus_message*, 1);
108         if (!r->wqueue) {
109                 free(r);
110                 return -ENOMEM;
111         }
112
113         *ret = r;
114         return 0;
115 }
116
117 int sd_bus_set_address(sd_bus *bus, const char *address) {
118         char *a;
119
120         if (!bus)
121                 return -EINVAL;
122         if (bus->state != BUS_UNSET)
123                 return -EPERM;
124         if (!address)
125                 return -EINVAL;
126
127         a = strdup(address);
128         if (!a)
129                 return -ENOMEM;
130
131         free(bus->address);
132         bus->address = a;
133
134         return 0;
135 }
136
137 int sd_bus_set_fd(sd_bus *bus, int fd) {
138         if (!bus)
139                 return -EINVAL;
140         if (bus->state != BUS_UNSET)
141                 return -EPERM;
142         if (fd < 0)
143                 return -EINVAL;
144
145         bus->fd = fd;
146         return 0;
147 }
148
149 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
150         char *p, **a;
151
152         if (!bus)
153                 return -EINVAL;
154         if (bus->state != BUS_UNSET)
155                 return -EPERM;
156         if (!path)
157                 return -EINVAL;
158         if (strv_isempty(argv))
159                 return -EINVAL;
160
161         p = strdup(path);
162         if (!p)
163                 return -ENOMEM;
164
165         a = strv_copy(argv);
166         if (!a) {
167                 free(p);
168                 return -ENOMEM;
169         }
170
171         free(bus->exec_path);
172         strv_free(bus->exec_argv);
173
174         bus->exec_path = p;
175         bus->exec_argv = a;
176
177         return 0;
178 }
179
180 int sd_bus_set_bus_client(sd_bus *bus, int b) {
181         if (!bus)
182                 return -EINVAL;
183         if (bus->state != BUS_UNSET)
184                 return -EPERM;
185
186         bus->bus_client = !!b;
187         return 0;
188 }
189
190 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
191         if (!bus)
192                 return -EINVAL;
193         if (bus->state != BUS_UNSET)
194                 return -EPERM;
195
196         bus->negotiate_fds = !!b;
197         return 0;
198 }
199
200 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
201         const char *s;
202         int r;
203
204         assert(bus);
205         assert(bus->state == BUS_HELLO);
206
207         if (error != 0)
208                 return -error;
209
210         assert(reply);
211
212         r = sd_bus_message_read(reply, "s", &s);
213         if (r < 0)
214                 return r;
215
216         if (!service_name_is_valid(s) || s[0] != ':')
217                 return -EBADMSG;
218
219         bus->unique_name = strdup(s);
220         if (!bus->unique_name)
221                 return -ENOMEM;
222
223         bus->state = BUS_RUNNING;
224
225         return 1;
226 }
227
228 static int bus_send_hello(sd_bus *bus) {
229         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
230         int r;
231
232         assert(bus);
233
234         if (!bus->bus_client)
235                 return 0;
236
237         r = sd_bus_message_new_method_call(
238                         bus,
239                         "org.freedesktop.DBus",
240                         "/",
241                         "org.freedesktop.DBus",
242                         "Hello",
243                         &m);
244         if (r < 0)
245                 return r;
246
247         return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
248 }
249
250 int bus_start_running(sd_bus *bus) {
251         assert(bus);
252
253         if (bus->bus_client) {
254                 bus->state = BUS_HELLO;
255                 return 1;
256         }
257
258         bus->state = BUS_RUNNING;
259         return 1;
260 }
261
262 static int parse_address_key(const char **p, const char *key, char **value) {
263         size_t l, n = 0;
264         const char *a;
265         char *r = NULL;
266
267         assert(p);
268         assert(*p);
269         assert(value);
270
271         if (key) {
272                 l = strlen(key);
273                 if (strncmp(*p, key, l) != 0)
274                         return 0;
275
276                 if ((*p)[l] != '=')
277                         return 0;
278
279                 if (*value)
280                         return -EINVAL;
281
282                 a = *p + l + 1;
283         } else
284                 a = *p;
285
286         while (*a != ';' && *a != ',' && *a != 0) {
287                 char c, *t;
288
289                 if (*a == '%') {
290                         int x, y;
291
292                         x = unhexchar(a[1]);
293                         if (x < 0) {
294                                 free(r);
295                                 return x;
296                         }
297
298                         y = unhexchar(a[2]);
299                         if (y < 0) {
300                                 free(r);
301                                 return y;
302                         }
303
304                         c = (char) ((x << 4) | y);
305                         a += 3;
306                 } else {
307                         c = *a;
308                         a++;
309                 }
310
311                 t = realloc(r, n + 2);
312                 if (!t) {
313                         free(r);
314                         return -ENOMEM;
315                 }
316
317                 r = t;
318                 r[n++] = c;
319         }
320
321         if (!r) {
322                 r = strdup("");
323                 if (!r)
324                         return -ENOMEM;
325         } else
326                 r[n] = 0;
327
328         if (*a == ',')
329                 a++;
330
331         *p = a;
332
333         free(*value);
334         *value = r;
335
336         return 1;
337 }
338
339 static void skip_address_key(const char **p) {
340         assert(p);
341         assert(*p);
342
343         *p += strcspn(*p, ",");
344
345         if (**p == ',')
346                 (*p) ++;
347 }
348
349 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
350         _cleanup_free_ char *path = NULL, *abstract = NULL;
351         size_t l;
352         int r;
353
354         assert(b);
355         assert(p);
356         assert(*p);
357         assert(guid);
358
359         while (**p != 0 && **p != ';') {
360                 r = parse_address_key(p, "guid", guid);
361                 if (r < 0)
362                         return r;
363                 else if (r > 0)
364                         continue;
365
366                 r = parse_address_key(p, "path", &path);
367                 if (r < 0)
368                         return r;
369                 else if (r > 0)
370                         continue;
371
372                 r = parse_address_key(p, "abstract", &abstract);
373                 if (r < 0)
374                         return r;
375                 else if (r > 0)
376                         continue;
377
378                 skip_address_key(p);
379         }
380
381         if (!path && !abstract)
382                 return -EINVAL;
383
384         if (path && abstract)
385                 return -EINVAL;
386
387         if (path) {
388                 l = strlen(path);
389                 if (l > sizeof(b->sockaddr.un.sun_path))
390                         return -E2BIG;
391
392                 b->sockaddr.un.sun_family = AF_UNIX;
393                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
394                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
395         } else if (abstract) {
396                 l = strlen(abstract);
397                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
398                         return -E2BIG;
399
400                 b->sockaddr.un.sun_family = AF_UNIX;
401                 b->sockaddr.un.sun_path[0] = 0;
402                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
403                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
404         }
405
406         return 0;
407 }
408
409 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
410         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
411         struct addrinfo hints, *result;
412         int r;
413
414         assert(b);
415         assert(p);
416         assert(*p);
417         assert(guid);
418
419         while (**p != 0 && **p != ';') {
420                 r = parse_address_key(p, "guid", guid);
421                 if (r < 0)
422                         return r;
423                 else if (r > 0)
424                         continue;
425
426                 r = parse_address_key(p, "host", &host);
427                 if (r < 0)
428                         return r;
429                 else if (r > 0)
430                         continue;
431
432                 r = parse_address_key(p, "port", &port);
433                 if (r < 0)
434                         return r;
435                 else if (r > 0)
436                         continue;
437
438                 r = parse_address_key(p, "family", &family);
439                 if (r < 0)
440                         return r;
441                 else if (r > 0)
442                         continue;
443
444                 skip_address_key(p);
445         }
446
447         if (!host || !port)
448                 return -EINVAL;
449
450         zero(hints);
451         hints.ai_socktype = SOCK_STREAM;
452         hints.ai_flags = AI_ADDRCONFIG;
453
454         if (family) {
455                 if (streq(family, "ipv4"))
456                         hints.ai_family = AF_INET;
457                 else if (streq(family, "ipv6"))
458                         hints.ai_family = AF_INET6;
459                 else
460                         return -EINVAL;
461         }
462
463         r = getaddrinfo(host, port, &hints, &result);
464         if (r == EAI_SYSTEM)
465                 return -errno;
466         else if (r != 0)
467                 return -EADDRNOTAVAIL;
468
469         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
470         b->sockaddr_size = result->ai_addrlen;
471
472         freeaddrinfo(result);
473
474         return 0;
475 }
476
477 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
478         char *path = NULL;
479         unsigned n_argv = 0, j;
480         char **argv = NULL;
481         int r;
482
483         assert(b);
484         assert(p);
485         assert(*p);
486         assert(guid);
487
488         while (**p != 0 && **p != ';') {
489                 r = parse_address_key(p, "guid", guid);
490                 if (r < 0)
491                         goto fail;
492                 else if (r > 0)
493                         continue;
494
495                 r = parse_address_key(p, "path", &path);
496                 if (r < 0)
497                         goto fail;
498                 else if (r > 0)
499                         continue;
500
501                 if (startswith(*p, "argv")) {
502                         unsigned ul;
503
504                         errno = 0;
505                         ul = strtoul(*p + 4, (char**) p, 10);
506                         if (errno > 0 || **p != '=' || ul > 256) {
507                                 r = -EINVAL;
508                                 goto fail;
509                         }
510
511                         (*p) ++;
512
513                         if (ul >= n_argv) {
514                                 char **x;
515
516                                 x = realloc(argv, sizeof(char*) * (ul + 2));
517                                 if (!x) {
518                                         r = -ENOMEM;
519                                         goto fail;
520                                 }
521
522                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
523
524                                 argv = x;
525                                 n_argv = ul + 1;
526                         }
527
528                         r = parse_address_key(p, NULL, argv + ul);
529                         if (r < 0)
530                                 goto fail;
531
532                         continue;
533                 }
534
535                 skip_address_key(p);
536         }
537
538         if (!path) {
539                 r = -EINVAL;
540                 goto fail;
541         }
542
543         /* Make sure there are no holes in the array, with the
544          * exception of argv[0] */
545         for (j = 1; j < n_argv; j++)
546                 if (!argv[j]) {
547                         r = -EINVAL;
548                         goto fail;
549                 }
550
551         if (argv && argv[0] == NULL) {
552                 argv[0] = strdup(path);
553                 if (!argv[0]) {
554                         r = -ENOMEM;
555                         goto fail;
556                 }
557         }
558
559         b->exec_path = path;
560         b->exec_argv = argv;
561         return 0;
562
563 fail:
564         for (j = 0; j < n_argv; j++)
565                 free(argv[j]);
566
567         free(argv);
568         free(path);
569         return r;
570 }
571
572 static void bus_reset_parsed_address(sd_bus *b) {
573         assert(b);
574
575         zero(b->sockaddr);
576         b->sockaddr_size = 0;
577         strv_free(b->exec_argv);
578         free(b->exec_path);
579         b->exec_path = NULL;
580         b->exec_argv = NULL;
581         b->peer = SD_ID128_NULL;
582 }
583
584 static int bus_parse_next_address(sd_bus *b) {
585         _cleanup_free_ char *guid = NULL;
586         const char *a;
587         int r;
588
589         assert(b);
590
591         if (!b->address)
592                 return 0;
593         if (b->address[b->address_index] == 0)
594                 return 0;
595
596         bus_reset_parsed_address(b);
597
598         a = b->address + b->address_index;
599
600         while (*a != 0) {
601
602                 if (*a == ';') {
603                         a++;
604                         continue;
605                 }
606
607                 if (startswith(a, "unix:")) {
608                         a += 5;
609
610                         r = parse_unix_address(b, &a, &guid);
611                         if (r < 0)
612                                 return r;
613                         break;
614
615                 } else if (startswith(a, "tcp:")) {
616
617                         a += 4;
618                         r = parse_tcp_address(b, &a, &guid);
619                         if (r < 0)
620                                 return r;
621
622                         break;
623
624                 } else if (startswith(a, "unixexec:")) {
625
626                         a += 9;
627                         r = parse_exec_address(b, &a, &guid);
628                         if (r < 0)
629                                 return r;
630
631                         break;
632
633                 }
634
635                 a = strchr(a, ';');
636                 if (!a)
637                         return 0;
638         }
639
640         if (guid) {
641                 r = sd_id128_from_string(guid, &b->peer);
642                 if (r < 0)
643                         return r;
644         }
645
646         b->address_index = a - b->address;
647         return 1;
648 }
649
650 static int bus_start_address(sd_bus *b) {
651         int r;
652
653         assert(b);
654
655         for (;;) {
656                 if (b->fd >= 0) {
657                         close_nointr_nofail(b->fd);
658                         b->fd = -1;
659                 }
660
661                 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
662
663                         r = bus_socket_connect(b);
664                         if (r >= 0)
665                                 return r;
666
667                         b->last_connect_error = -r;
668
669                 } else if (b->exec_path) {
670
671                         r = bus_socket_exec(b);
672                         if (r >= 0)
673                                 return r;
674
675                         b->last_connect_error = -r;
676                 }
677
678                 r = bus_parse_next_address(b);
679                 if (r < 0)
680                         return r;
681                 if (r == 0)
682                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
683         }
684 }
685
686 int bus_next_address(sd_bus *b) {
687         assert(b);
688
689         bus_reset_parsed_address(b);
690         return bus_start_address(b);
691 }
692
693 static int bus_start_fd(sd_bus *b) {
694         int r;
695
696         assert(b);
697
698         r = fd_nonblock(b->fd, true);
699         if (r < 0)
700                 return r;
701
702         r = fd_cloexec(b->fd, true);
703         if (r < 0)
704                 return r;
705
706         return bus_socket_take_fd(b);
707 }
708
709 int sd_bus_start(sd_bus *bus) {
710         int r;
711
712         if (!bus)
713                 return -EINVAL;
714         if (bus->state != BUS_UNSET)
715                 return -EPERM;
716
717         bus->state = BUS_OPENING;
718
719         if (bus->fd >= 0)
720                 r = bus_start_fd(bus);
721         else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path)
722                 r = bus_start_address(bus);
723         else
724                 return -EINVAL;
725
726         if (r < 0)
727                 return r;
728
729         return bus_send_hello(bus);
730 }
731
732 int sd_bus_open_system(sd_bus **ret) {
733         const char *e;
734         sd_bus *b;
735         int r;
736
737         if (!ret)
738                 return -EINVAL;
739
740         r = sd_bus_new(&b);
741         if (r < 0)
742                 return r;
743
744         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
745         if (e) {
746                 r = sd_bus_set_address(b, e);
747                 if (r < 0)
748                         goto fail;
749         } else {
750                 b->sockaddr.un.sun_family = AF_UNIX;
751                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
752                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
753         }
754
755         b->bus_client = true;
756
757         r = sd_bus_start(b);
758         if (r < 0)
759                 goto fail;
760
761         *ret = b;
762         return 0;
763
764 fail:
765         bus_free(b);
766         return r;
767 }
768
769 int sd_bus_open_user(sd_bus **ret) {
770         const char *e;
771         sd_bus *b;
772         size_t l;
773         int r;
774
775         if (!ret)
776                 return -EINVAL;
777
778         r = sd_bus_new(&b);
779         if (r < 0)
780                 return r;
781
782         e = getenv("DBUS_SESSION_BUS_ADDRESS");
783         if (e) {
784                 r = sd_bus_set_address(b, e);
785                 if (r < 0)
786                         goto fail;
787         } else {
788                 e = getenv("XDG_RUNTIME_DIR");
789                 if (!e) {
790                         r = -ENOENT;
791                         goto fail;
792                 }
793
794                 l = strlen(e);
795                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
796                         r = -E2BIG;
797                         goto fail;
798                 }
799
800                 b->sockaddr.un.sun_family = AF_UNIX;
801                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
802                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
803         }
804
805         b->bus_client = true;
806
807         r = sd_bus_start(b);
808         if (r < 0)
809                 goto fail;
810
811         *ret = b;
812         return 0;
813
814 fail:
815         bus_free(b);
816         return r;
817 }
818
819 void sd_bus_close(sd_bus *bus) {
820         if (!bus)
821                 return;
822         if (bus->fd < 0)
823                 return;
824
825         close_nointr_nofail(bus->fd);
826         bus->fd = -1;
827 }
828
829 sd_bus *sd_bus_ref(sd_bus *bus) {
830         if (!bus)
831                 return NULL;
832
833         assert(bus->n_ref > 0);
834
835         bus->n_ref++;
836         return bus;
837 }
838
839 sd_bus *sd_bus_unref(sd_bus *bus) {
840         if (!bus)
841                 return NULL;
842
843         assert(bus->n_ref > 0);
844         bus->n_ref--;
845
846         if (bus->n_ref <= 0)
847                 bus_free(bus);
848
849         return NULL;
850 }
851
852 int sd_bus_is_open(sd_bus *bus) {
853         if (!bus)
854                 return -EINVAL;
855
856         return bus->state != BUS_UNSET && bus->fd >= 0;
857 }
858
859 int sd_bus_can_send(sd_bus *bus, char type) {
860         int r;
861
862         if (!bus)
863                 return -EINVAL;
864         if (bus->fd < 0)
865                 return -ENOTCONN;
866
867         if (type == SD_BUS_TYPE_UNIX_FD) {
868                 if (!bus->negotiate_fds)
869                         return 0;
870
871                 r = bus_ensure_running(bus);
872                 if (r < 0)
873                         return r;
874
875                 return bus->can_fds;
876         }
877
878         return bus_type_is_valid(type);
879 }
880
881 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
882         int r;
883
884         if (!bus)
885                 return -EINVAL;
886         if (!peer)
887                 return -EINVAL;
888
889         r = bus_ensure_running(bus);
890         if (r < 0)
891                 return r;
892
893         *peer = bus->peer;
894         return 0;
895 }
896
897 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
898         assert(m);
899
900         if (m->header->version > b->message_version)
901                 return -EPERM;
902
903         if (m->sealed)
904                 return 0;
905
906         return bus_message_seal(m, ++b->serial);
907 }
908
909 static int dispatch_wqueue(sd_bus *bus) {
910         int r, ret = 0;
911
912         assert(bus);
913         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
914
915         if (bus->fd < 0)
916                 return -ENOTCONN;
917
918         while (bus->wqueue_size > 0) {
919
920                 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
921                 if (r < 0) {
922                         sd_bus_close(bus);
923                         return r;
924                 } else if (r == 0)
925                         /* Didn't do anything this time */
926                         return ret;
927                 else if (bus->windex >= bus->wqueue[0]->size) {
928                         /* Fully written. Let's drop the entry from
929                          * the queue.
930                          *
931                          * This isn't particularly optimized, but
932                          * well, this is supposed to be our worst-case
933                          * buffer only, and the socket buffer is
934                          * supposed to be our primary buffer, and if
935                          * it got full, then all bets are off
936                          * anyway. */
937
938                         sd_bus_message_unref(bus->wqueue[0]);
939                         bus->wqueue_size --;
940                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
941                         bus->windex = 0;
942
943                         ret = 1;
944                 }
945         }
946
947         return ret;
948 }
949
950 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
951         sd_bus_message *z = NULL;
952         int r, ret = 0;
953
954         assert(bus);
955         assert(m);
956         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
957
958         if (bus->fd < 0)
959                 return -ENOTCONN;
960
961         if (bus->rqueue_size > 0) {
962                 /* Dispatch a queued message */
963
964                 *m = bus->rqueue[0];
965                 bus->rqueue_size --;
966                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
967                 return 1;
968         }
969
970         /* Try to read a new message */
971         do {
972                 r = bus_socket_read_message(bus, &z);
973                 if (r < 0) {
974                         sd_bus_close(bus);
975                         return r;
976                 }
977                 if (r == 0)
978                         return ret;
979
980                 r = 1;
981         } while (!z);
982
983         *m = z;
984         return 1;
985 }
986
987 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
988         int r;
989
990         if (!bus)
991                 return -EINVAL;
992         if (bus->state == BUS_UNSET)
993                 return -ENOTCONN;
994         if (bus->fd < 0)
995                 return -ENOTCONN;
996         if (!m)
997                 return -EINVAL;
998
999         if (m->n_fds > 0) {
1000                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1001                 if (r < 0)
1002                         return r;
1003                 if (r == 0)
1004                         return -ENOTSUP;
1005         }
1006
1007         /* If the serial number isn't kept, then we know that no reply
1008          * is expected */
1009         if (!serial && !m->sealed)
1010                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1011
1012         r = bus_seal_message(bus, m);
1013         if (r < 0)
1014                 return r;
1015
1016         /* If this is a reply and no reply was requested, then let's
1017          * suppress this, if we can */
1018         if (m->dont_send && !serial)
1019                 return 0;
1020
1021         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1022                 size_t idx = 0;
1023
1024                 r = bus_socket_write_message(bus, m, &idx);
1025                 if (r < 0) {
1026                         sd_bus_close(bus);
1027                         return r;
1028                 } else if (idx < m->size)  {
1029                         /* Wasn't fully written. So let's remember how
1030                          * much was written. Note that the first entry
1031                          * of the wqueue array is always allocated so
1032                          * that we always can remember how much was
1033                          * written. */
1034                         bus->wqueue[0] = sd_bus_message_ref(m);
1035                         bus->wqueue_size = 1;
1036                         bus->windex = idx;
1037                 }
1038         } else {
1039                 sd_bus_message **q;
1040
1041                 /* Just append it to the queue. */
1042
1043                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1044                         return -ENOBUFS;
1045
1046                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1047                 if (!q)
1048                         return -ENOMEM;
1049
1050                 bus->wqueue = q;
1051                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1052         }
1053
1054         if (serial)
1055                 *serial = BUS_MESSAGE_SERIAL(m);
1056
1057         return 0;
1058 }
1059
1060 static usec_t calc_elapse(uint64_t usec) {
1061         if (usec == (uint64_t) -1)
1062                 return 0;
1063
1064         if (usec == 0)
1065                 usec = BUS_DEFAULT_TIMEOUT;
1066
1067         return now(CLOCK_MONOTONIC) + usec;
1068 }
1069
1070 static int timeout_compare(const void *a, const void *b) {
1071         const struct reply_callback *x = a, *y = b;
1072
1073         if (x->timeout != 0 && y->timeout == 0)
1074                 return -1;
1075
1076         if (x->timeout == 0 && y->timeout != 0)
1077                 return 1;
1078
1079         if (x->timeout < y->timeout)
1080                 return -1;
1081
1082         if (x->timeout > y->timeout)
1083                 return 1;
1084
1085         return 0;
1086 }
1087
1088 int sd_bus_send_with_reply(
1089                 sd_bus *bus,
1090                 sd_bus_message *m,
1091                 sd_message_handler_t callback,
1092                 void *userdata,
1093                 uint64_t usec,
1094                 uint64_t *serial) {
1095
1096         struct reply_callback *c;
1097         int r;
1098
1099         if (!bus)
1100                 return -EINVAL;
1101         if (bus->state == BUS_UNSET)
1102                 return -ENOTCONN;
1103         if (bus->fd < 0)
1104                 return -ENOTCONN;
1105         if (!m)
1106                 return -EINVAL;
1107         if (!callback)
1108                 return -EINVAL;
1109         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1110                 return -EINVAL;
1111         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1112                 return -EINVAL;
1113
1114         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1115         if (r < 0)
1116                 return r;
1117
1118         if (usec != (uint64_t) -1) {
1119                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1120                 if (r < 0)
1121                         return r;
1122         }
1123
1124         r = bus_seal_message(bus, m);
1125         if (r < 0)
1126                 return r;
1127
1128         c = new(struct reply_callback, 1);
1129         if (!c)
1130                 return -ENOMEM;
1131
1132         c->callback = callback;
1133         c->userdata = userdata;
1134         c->serial = BUS_MESSAGE_SERIAL(m);
1135         c->timeout = calc_elapse(usec);
1136
1137         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1138         if (r < 0) {
1139                 free(c);
1140                 return r;
1141         }
1142
1143         if (c->timeout != 0) {
1144                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1145                 if (r < 0) {
1146                         c->timeout = 0;
1147                         sd_bus_send_with_reply_cancel(bus, c->serial);
1148                         return r;
1149                 }
1150         }
1151
1152         r = sd_bus_send(bus, m, serial);
1153         if (r < 0) {
1154                 sd_bus_send_with_reply_cancel(bus, c->serial);
1155                 return r;
1156         }
1157
1158         return r;
1159 }
1160
1161 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1162         struct reply_callback *c;
1163
1164         if (!bus)
1165                 return -EINVAL;
1166         if (serial == 0)
1167                 return -EINVAL;
1168
1169         c = hashmap_remove(bus->reply_callbacks, &serial);
1170         if (!c)
1171                 return 0;
1172
1173         if (c->timeout != 0)
1174                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1175
1176         free(c);
1177         return 1;
1178 }
1179
1180 int bus_ensure_running(sd_bus *bus) {
1181         int r;
1182
1183         assert(bus);
1184
1185         if (bus->fd < 0)
1186                 return -ENOTCONN;
1187         if (bus->state == BUS_UNSET)
1188                 return -ENOTCONN;
1189
1190         if (bus->state == BUS_RUNNING)
1191                 return 1;
1192
1193         for (;;) {
1194                 r = sd_bus_process(bus, NULL);
1195                 if (r < 0)
1196                         return r;
1197                 if (bus->state == BUS_RUNNING)
1198                         return 1;
1199                 if (r > 0)
1200                         continue;
1201
1202                 r = sd_bus_wait(bus, (uint64_t) -1);
1203                 if (r < 0)
1204                         return r;
1205         }
1206 }
1207
1208 int sd_bus_send_with_reply_and_block(
1209                 sd_bus *bus,
1210                 sd_bus_message *m,
1211                 uint64_t usec,
1212                 sd_bus_error *error,
1213                 sd_bus_message **reply) {
1214
1215         int r;
1216         usec_t timeout;
1217         uint64_t serial;
1218         bool room = false;
1219
1220         if (!bus)
1221                 return -EINVAL;
1222         if (bus->fd < 0)
1223                 return -ENOTCONN;
1224         if (bus->state == BUS_UNSET)
1225                 return -ENOTCONN;
1226         if (!m)
1227                 return -EINVAL;
1228         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1229                 return -EINVAL;
1230         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1231                 return -EINVAL;
1232         if (bus_error_is_dirty(error))
1233                 return -EINVAL;
1234
1235         r = bus_ensure_running(bus);
1236         if (r < 0)
1237                 return r;
1238
1239         r = sd_bus_send(bus, m, &serial);
1240         if (r < 0)
1241                 return r;
1242
1243         timeout = calc_elapse(usec);
1244
1245         for (;;) {
1246                 usec_t left;
1247                 sd_bus_message *incoming = NULL;
1248
1249                 if (!room) {
1250                         sd_bus_message **q;
1251
1252                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1253                                 return -ENOBUFS;
1254
1255                         /* Make sure there's room for queuing this
1256                          * locally, before we read the message */
1257
1258                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1259                         if (!q)
1260                                 return -ENOMEM;
1261
1262                         bus->rqueue = q;
1263                         room = true;
1264                 }
1265
1266                 r = bus_socket_read_message(bus, &incoming);
1267                 if (r < 0)
1268                         return r;
1269                 if (incoming) {
1270
1271                         if (incoming->reply_serial == serial) {
1272                                 /* Found a match! */
1273
1274                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1275                                         *reply = incoming;
1276                                         return 0;
1277                                 }
1278
1279                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1280                                         int k;
1281
1282                                         r = sd_bus_error_copy(error, &incoming->error);
1283                                         if (r < 0) {
1284                                                 sd_bus_message_unref(incoming);
1285                                                 return r;
1286                                         }
1287
1288                                         k = bus_error_to_errno(&incoming->error);
1289                                         sd_bus_message_unref(incoming);
1290                                         return k;
1291                                 }
1292
1293                                 sd_bus_message_unref(incoming);
1294                                 return -EIO;
1295                         }
1296
1297                         /* There's already guaranteed to be room for
1298                          * this, so need to resize things here */
1299                         bus->rqueue[bus->rqueue_size ++] = incoming;
1300                         room = false;
1301
1302                         /* Try to read more, right-away */
1303                         continue;
1304                 }
1305                 if (r != 0)
1306                         continue;
1307
1308                 if (timeout > 0) {
1309                         usec_t n;
1310
1311                         n = now(CLOCK_MONOTONIC);
1312                         if (n >= timeout)
1313                                 return -ETIMEDOUT;
1314
1315                         left = timeout - n;
1316                 } else
1317                         left = (uint64_t) -1;
1318
1319                 r = bus_poll(bus, true, left);
1320                 if (r < 0)
1321                         return r;
1322
1323                 r = dispatch_wqueue(bus);
1324                 if (r < 0)
1325                         return r;
1326         }
1327 }
1328
1329 int sd_bus_get_fd(sd_bus *bus) {
1330         if (!bus)
1331                 return -EINVAL;
1332
1333         if (bus->fd < 0)
1334                 return -ENOTCONN;
1335
1336         return bus->fd;
1337 }
1338
1339 int sd_bus_get_events(sd_bus *bus) {
1340         int flags = 0;
1341
1342         if (!bus)
1343                 return -EINVAL;
1344         if (bus->state == BUS_UNSET)
1345                 return -ENOTCONN;
1346         if (bus->fd < 0)
1347                 return -ENOTCONN;
1348
1349         if (bus->state == BUS_OPENING)
1350                 flags |= POLLOUT;
1351         else if (bus->state == BUS_AUTHENTICATING) {
1352
1353                 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1354                         flags |= POLLOUT;
1355
1356                 flags |= POLLIN;
1357
1358         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1359                 if (bus->rqueue_size <= 0)
1360                         flags |= POLLIN;
1361                 if (bus->wqueue_size > 0)
1362                         flags |= POLLOUT;
1363         }
1364
1365         return flags;
1366 }
1367
1368 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1369         struct reply_callback *c;
1370
1371         if (!bus)
1372                 return -EINVAL;
1373         if (!timeout_usec)
1374                 return -EINVAL;
1375         if (bus->state == BUS_UNSET)
1376                 return -ENOTCONN;
1377         if (bus->fd < 0)
1378                 return -ENOTCONN;
1379
1380         if (bus->state == BUS_AUTHENTICATING) {
1381                 *timeout_usec = bus->auth_timeout;
1382                 return 1;
1383         }
1384
1385         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1386                 return 0;
1387
1388         c = prioq_peek(bus->reply_callbacks_prioq);
1389         if (!c)
1390                 return 0;
1391
1392         *timeout_usec = c->timeout;
1393         return 1;
1394 }
1395
1396 static int process_timeout(sd_bus *bus) {
1397         struct reply_callback *c;
1398         usec_t n;
1399         int r;
1400
1401         assert(bus);
1402
1403         c = prioq_peek(bus->reply_callbacks_prioq);
1404         if (!c)
1405                 return 0;
1406
1407         n = now(CLOCK_MONOTONIC);
1408         if (c->timeout > n)
1409                 return 0;
1410
1411         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1412         hashmap_remove(bus->reply_callbacks, &c->serial);
1413
1414         r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1415         free(c);
1416
1417         return r < 0 ? r : 1;
1418 }
1419
1420 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1421         assert(bus);
1422         assert(m);
1423
1424         if (bus->state != BUS_HELLO)
1425                 return 0;
1426
1427         /* Let's make sure the first message on the bus is the HELLO
1428          * reply. But note that we don't actually parse the message
1429          * here (we leave that to the usual reply handling), we just
1430          * verify we don't let any earlier msg through. */
1431
1432         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1433             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1434                 return -EIO;
1435
1436         if (m->reply_serial != bus->hello_serial)
1437                 return -EIO;
1438
1439         return 0;
1440 }
1441
1442 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1443         struct reply_callback *c;
1444         int r;
1445
1446         assert(bus);
1447         assert(m);
1448
1449         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1450             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1451                 return 0;
1452
1453         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1454         if (!c)
1455                 return 0;
1456
1457         if (c->timeout != 0)
1458                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1459
1460         r = c->callback(bus, 0, m, c->userdata);
1461         free(c);
1462
1463         return r;
1464 }
1465
1466 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1467         struct filter_callback *l;
1468         int r;
1469
1470         LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1471                 r = l->callback(bus, 0, m, l->userdata);
1472                 if (r != 0)
1473                         return r;
1474         }
1475
1476         return 0;
1477 }
1478
1479 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1480         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1481         int r;
1482
1483         assert(bus);
1484         assert(m);
1485
1486         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1487                 return 0;
1488
1489         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1490                 return 0;
1491
1492         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1493                 return 1;
1494
1495         if (streq_ptr(m->member, "Ping"))
1496                 r = sd_bus_message_new_method_return(bus, m, &reply);
1497         else if (streq_ptr(m->member, "GetMachineId")) {
1498                 sd_id128_t id;
1499                 char sid[33];
1500
1501                 r = sd_id128_get_machine(&id);
1502                 if (r < 0)
1503                         return r;
1504
1505                 r = sd_bus_message_new_method_return(bus, m, &reply);
1506                 if (r < 0)
1507                         return r;
1508
1509                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1510         } else {
1511                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1512
1513                 sd_bus_error_set(&error,
1514                                  "org.freedesktop.DBus.Error.UnknownMethod",
1515                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1516
1517                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1518         }
1519
1520         if (r < 0)
1521                 return r;
1522
1523         r = sd_bus_send(bus, reply, NULL);
1524         if (r < 0)
1525                 return r;
1526
1527         return 1;
1528 }
1529
1530 static int process_object(sd_bus *bus, sd_bus_message *m) {
1531         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1532         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1533         struct object_callback *c;
1534         char *p;
1535         int r;
1536         bool found = false;
1537
1538         assert(bus);
1539         assert(m);
1540
1541         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1542                 return 0;
1543
1544         if (hashmap_isempty(bus->object_callbacks))
1545                 return 0;
1546
1547         c = hashmap_get(bus->object_callbacks, m->path);
1548         if (c) {
1549                 r = c->callback(bus, 0, m, c->userdata);
1550                 if (r != 0)
1551                         return r;
1552
1553                 found = true;
1554         }
1555
1556         /* Look for fallback prefixes */
1557         p = strdupa(m->path);
1558         for (;;) {
1559                 char *e;
1560
1561                 e = strrchr(p, '/');
1562                 if (e == p || !e)
1563                         break;
1564
1565                 *e = 0;
1566
1567                 c = hashmap_get(bus->object_callbacks, p);
1568                 if (c && c->is_fallback) {
1569                         r = c->callback(bus, 0, m, c->userdata);
1570                         if (r != 0)
1571                                 return r;
1572
1573                         found = true;
1574                 }
1575         }
1576
1577         /* We found some handlers but none wanted to take this, then
1578          * return this -- with one exception, we can handle
1579          * introspection minimally ourselves */
1580         if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1581                 return 0;
1582
1583         sd_bus_error_set(&error,
1584                          "org.freedesktop.DBus.Error.UnknownMethod",
1585                          "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1586
1587         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1588         if (r < 0)
1589                 return r;
1590
1591         r = sd_bus_send(bus, reply, NULL);
1592         if (r < 0)
1593                 return r;
1594
1595         return 1;
1596 }
1597
1598 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1599         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1600         _cleanup_free_ char *introspection = NULL;
1601         _cleanup_set_free_free_ Set *s = NULL;
1602         _cleanup_fclose_ FILE *f = NULL;
1603         struct object_callback *c;
1604         Iterator i;
1605         size_t size = 0;
1606         char *node;
1607         int r;
1608
1609         assert(bus);
1610         assert(m);
1611
1612         if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1613                 return 0;
1614
1615         if (!m->path)
1616                 return 0;
1617
1618         s = set_new(string_hash_func, string_compare_func);
1619         if (!s)
1620                 return -ENOMEM;
1621
1622         HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1623                 const char *e;
1624                 char *a, *p;
1625
1626                 if (streq(c->path, "/"))
1627                         continue;
1628
1629                 if (streq(m->path, "/"))
1630                         e = c->path;
1631                 else {
1632                         e = startswith(c->path, m->path);
1633                         if (!e || *e != '/')
1634                                 continue;
1635                 }
1636
1637                 a = strdup(e+1);
1638                 if (!a)
1639                         return -ENOMEM;
1640
1641                 p = strchr(a, '/');
1642                 if (p)
1643                         *p = 0;
1644
1645                 r = set_put(s, a);
1646                 if (r < 0) {
1647                         free(a);
1648
1649                         if (r != -EEXIST)
1650                                 return r;
1651                 }
1652         }
1653
1654         f = open_memstream(&introspection, &size);
1655         if (!f)
1656                 return -ENOMEM;
1657
1658         fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1659         fputs("<node>\n", f);
1660         fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1661         fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1662
1663         while ((node = set_steal_first(s))) {
1664                 fprintf(f, " <node name=\"%s\"/>\n", node);
1665                 free(node);
1666         }
1667
1668         fputs("</node>\n", f);
1669
1670         fflush(f);
1671
1672         if (ferror(f))
1673                 return -ENOMEM;
1674
1675         r = sd_bus_message_new_method_return(bus, m, &reply);
1676         if (r < 0)
1677                 return r;
1678
1679         r = sd_bus_message_append(reply, "s", introspection);
1680         if (r < 0)
1681                 return r;
1682
1683         r = sd_bus_send(bus, reply, NULL);
1684         if (r < 0)
1685                 return r;
1686
1687         return 1;
1688 }
1689
1690 static int process_message(sd_bus *bus, sd_bus_message *m) {
1691         int r;
1692
1693         assert(bus);
1694         assert(m);
1695
1696         r = process_hello(bus, m);
1697         if (r != 0)
1698                 return r;
1699
1700         r = process_reply(bus, m);
1701         if (r != 0)
1702                 return r;
1703
1704         r = process_filter(bus, m);
1705         if (r != 0)
1706                 return r;
1707
1708         r = process_builtin(bus, m);
1709         if (r != 0)
1710                 return r;
1711
1712         r = process_object(bus, m);
1713         if (r != 0)
1714                 return r;
1715
1716         return process_introspect(bus, m);
1717 }
1718
1719 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1720         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1721         int r;
1722
1723         assert(bus);
1724         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1725
1726         r = process_timeout(bus);
1727         if (r != 0)
1728                 goto null_message;
1729
1730         r = dispatch_wqueue(bus);
1731         if (r != 0)
1732                 goto null_message;
1733
1734         r = dispatch_rqueue(bus, &m);
1735         if (r < 0)
1736                 return r;
1737         if (!m)
1738                 goto null_message;
1739
1740         r = process_message(bus, m);
1741         if (r != 0)
1742                 goto null_message;
1743
1744         if (ret) {
1745                 *ret = m;
1746                 m = NULL;
1747                 return 1;
1748         }
1749
1750         if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1751                 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1752                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1753
1754                 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1755
1756                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1757                 if (r < 0)
1758                         return r;
1759
1760                 r = sd_bus_send(bus, reply, NULL);
1761                 if (r < 0)
1762                         return r;
1763         }
1764
1765         return 1;
1766
1767 null_message:
1768         if (r >= 0 && ret)
1769                 *ret = NULL;
1770
1771         return r;
1772 }
1773
1774 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1775         int r;
1776
1777         /* Returns 0 when we didn't do anything. This should cause the
1778          * caller to invoke sd_bus_wait() before returning the next
1779          * time. Returns > 0 when we did something, which possibly
1780          * means *ret is filled in with an unprocessed message. */
1781
1782         if (!bus)
1783                 return -EINVAL;
1784         if (bus->fd < 0)
1785                 return -ENOTCONN;
1786
1787         switch (bus->state) {
1788
1789         case BUS_UNSET:
1790                 return -ENOTCONN;
1791
1792         case BUS_OPENING:
1793                 r = bus_socket_process_opening(bus);
1794                 if (r < 0)
1795                         return r;
1796                 if (ret)
1797                         *ret = NULL;
1798                 return r;
1799
1800         case BUS_AUTHENTICATING:
1801
1802                 r = bus_socket_process_authenticating(bus);
1803                 if (r < 0)
1804                         return r;
1805                 if (ret)
1806                         *ret = NULL;
1807                 return r;
1808
1809         case BUS_RUNNING:
1810         case BUS_HELLO:
1811
1812                 return process_running(bus, ret);
1813         }
1814
1815         assert_not_reached("Unknown state");
1816 }
1817
1818 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1819         struct pollfd p;
1820         int r, e;
1821         struct timespec ts;
1822         usec_t until, m;
1823
1824         assert(bus);
1825
1826         if (bus->fd < 0)
1827                 return -ENOTCONN;
1828
1829         e = sd_bus_get_events(bus);
1830         if (e < 0)
1831                 return e;
1832
1833         if (need_more)
1834                 e |= POLLIN;
1835
1836         r = sd_bus_get_timeout(bus, &until);
1837         if (r < 0)
1838                 return r;
1839         if (r == 0)
1840                 m = (uint64_t) -1;
1841         else {
1842                 usec_t n;
1843                 n = now(CLOCK_MONOTONIC);
1844                 m = until > n ? until - n : 0;
1845         }
1846
1847         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1848                 m = timeout_usec;
1849
1850         zero(p);
1851         p.fd = bus->fd;
1852         p.events = e;
1853
1854         r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1855         if (r < 0)
1856                 return -errno;
1857
1858         return r > 0 ? 1 : 0;
1859 }
1860
1861 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1862
1863         if (!bus)
1864                 return -EINVAL;
1865         if (bus->state == BUS_UNSET)
1866                 return -ENOTCONN;
1867         if (bus->fd < 0)
1868                 return -ENOTCONN;
1869         if (bus->rqueue_size > 0)
1870                 return 0;
1871
1872         return bus_poll(bus, false, timeout_usec);
1873 }
1874
1875 int sd_bus_flush(sd_bus *bus) {
1876         int r;
1877
1878         if (!bus)
1879                 return -EINVAL;
1880         if (bus->state == BUS_UNSET)
1881                 return -ENOTCONN;
1882         if (bus->fd < 0)
1883                 return -ENOTCONN;
1884
1885         r = bus_ensure_running(bus);
1886         if (r < 0)
1887                 return r;
1888
1889         if (bus->wqueue_size <= 0)
1890                 return 0;
1891
1892         for (;;) {
1893                 r = dispatch_wqueue(bus);
1894                 if (r < 0)
1895                         return r;
1896
1897                 if (bus->wqueue_size <= 0)
1898                         return 0;
1899
1900                 r = bus_poll(bus, false, (uint64_t) -1);
1901                 if (r < 0)
1902                         return r;
1903         }
1904 }
1905
1906 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1907         struct filter_callback *f;
1908
1909         if (!bus)
1910                 return -EINVAL;
1911         if (!callback)
1912                 return -EINVAL;
1913
1914         f = new(struct filter_callback, 1);
1915         if (!f)
1916                 return -ENOMEM;
1917         f->callback = callback;
1918         f->userdata = userdata;
1919
1920         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1921         return 0;
1922 }
1923
1924 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1925         struct filter_callback *f;
1926
1927         if (!bus)
1928                 return -EINVAL;
1929         if (!callback)
1930                 return -EINVAL;
1931
1932         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1933                 if (f->callback == callback && f->userdata == userdata) {
1934                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1935                         free(f);
1936                         return 1;
1937                 }
1938         }
1939
1940         return 0;
1941 }
1942
1943 static int bus_add_object(
1944                 sd_bus *bus,
1945                 bool fallback,
1946                 const char *path,
1947                 sd_message_handler_t callback,
1948                 void *userdata) {
1949
1950         struct object_callback *c;
1951         int r;
1952
1953         if (!bus)
1954                 return -EINVAL;
1955         if (!path)
1956                 return -EINVAL;
1957         if (!callback)
1958                 return -EINVAL;
1959
1960         r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
1961         if (r < 0)
1962                 return r;
1963
1964         c = new(struct object_callback, 1);
1965         if (!c)
1966                 return -ENOMEM;
1967
1968         c->path = strdup(path);
1969         if (!c->path) {
1970                 free(c);
1971                 return -ENOMEM;
1972         }
1973
1974         c->callback = callback;
1975         c->userdata = userdata;
1976         c->is_fallback = fallback;
1977
1978         r = hashmap_put(bus->object_callbacks, c->path, c);
1979         if (r < 0) {
1980                 free(c->path);
1981                 free(c);
1982                 return r;
1983         }
1984
1985         return 0;
1986 }
1987
1988 static int bus_remove_object(
1989                 sd_bus *bus,
1990                 bool fallback,
1991                 const char *path,
1992                 sd_message_handler_t callback,
1993                 void *userdata) {
1994
1995         struct object_callback *c;
1996
1997         if (!bus)
1998                 return -EINVAL;
1999         if (!path)
2000                 return -EINVAL;
2001         if (!callback)
2002                 return -EINVAL;
2003
2004         c = hashmap_get(bus->object_callbacks, path);
2005         if (!c)
2006                 return 0;
2007
2008         if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2009                 return 0;
2010
2011         assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2012
2013         free(c->path);
2014         free(c);
2015
2016         return 1;
2017 }
2018
2019 int sd_bus_add_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2020         return bus_add_object(bus, false, path, callback, userdata);
2021 }
2022
2023 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2024         return bus_remove_object(bus, false, path, callback, userdata);
2025 }
2026
2027 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2028         return bus_add_object(bus, true, prefix, callback, userdata);
2029 }
2030
2031 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2032         return bus_remove_object(bus, true, prefix, callback, userdata);
2033 }