chiark / gitweb /
bus: various improvements for test-bus-chat
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32 #include "strv.h"
33 #include "set.h"
34
35 #include "sd-bus.h"
36 #include "bus-internal.h"
37 #include "bus-message.h"
38 #include "bus-type.h"
39 #include "bus-socket.h"
40 #include "bus-control.h"
41
42 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
43
44 static void bus_free(sd_bus *b) {
45         struct filter_callback *f;
46         struct object_callback *c;
47         unsigned i;
48
49         assert(b);
50
51         sd_bus_close(b);
52
53         free(b->rbuffer);
54         free(b->unique_name);
55         free(b->auth_buffer);
56         free(b->address);
57
58         free(b->exec_path);
59         strv_free(b->exec_argv);
60
61         close_many(b->fds, b->n_fds);
62         free(b->fds);
63
64         for (i = 0; i < b->rqueue_size; i++)
65                 sd_bus_message_unref(b->rqueue[i]);
66         free(b->rqueue);
67
68         for (i = 0; i < b->wqueue_size; i++)
69                 sd_bus_message_unref(b->wqueue[i]);
70         free(b->wqueue);
71
72         hashmap_free_free(b->reply_callbacks);
73         prioq_free(b->reply_callbacks_prioq);
74
75         while ((f = b->filter_callbacks)) {
76                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
77                 free(f);
78         }
79
80         while ((c = hashmap_steal_first(b->object_callbacks))) {
81                 free(c->path);
82                 free(c);
83         }
84
85         hashmap_free(b->object_callbacks);
86
87         bus_match_free(&b->match_callbacks);
88
89         free(b);
90 }
91
92 int sd_bus_new(sd_bus **ret) {
93         sd_bus *r;
94
95         if (!ret)
96                 return -EINVAL;
97
98         r = new0(sd_bus, 1);
99         if (!r)
100                 return -ENOMEM;
101
102         r->n_ref = 1;
103         r->input_fd = r->output_fd = -1;
104         r->message_version = 1;
105         r->negotiate_fds = true;
106
107         /* We guarantee that wqueue always has space for at least one
108          * entry */
109         r->wqueue = new(sd_bus_message*, 1);
110         if (!r->wqueue) {
111                 free(r);
112                 return -ENOMEM;
113         }
114
115         *ret = r;
116         return 0;
117 }
118
119 int sd_bus_set_address(sd_bus *bus, const char *address) {
120         char *a;
121
122         if (!bus)
123                 return -EINVAL;
124         if (bus->state != BUS_UNSET)
125                 return -EPERM;
126         if (!address)
127                 return -EINVAL;
128
129         a = strdup(address);
130         if (!a)
131                 return -ENOMEM;
132
133         free(bus->address);
134         bus->address = a;
135
136         return 0;
137 }
138
139 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
140         if (!bus)
141                 return -EINVAL;
142         if (bus->state != BUS_UNSET)
143                 return -EPERM;
144         if (input_fd < 0)
145                 return -EINVAL;
146         if (output_fd < 0)
147                 return -EINVAL;
148
149         bus->input_fd = input_fd;
150         bus->output_fd = output_fd;
151         return 0;
152 }
153
154 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
155         char *p, **a;
156
157         if (!bus)
158                 return -EINVAL;
159         if (bus->state != BUS_UNSET)
160                 return -EPERM;
161         if (!path)
162                 return -EINVAL;
163         if (strv_isempty(argv))
164                 return -EINVAL;
165
166         p = strdup(path);
167         if (!p)
168                 return -ENOMEM;
169
170         a = strv_copy(argv);
171         if (!a) {
172                 free(p);
173                 return -ENOMEM;
174         }
175
176         free(bus->exec_path);
177         strv_free(bus->exec_argv);
178
179         bus->exec_path = p;
180         bus->exec_argv = a;
181
182         return 0;
183 }
184
185 int sd_bus_set_bus_client(sd_bus *bus, int b) {
186         if (!bus)
187                 return -EINVAL;
188         if (bus->state != BUS_UNSET)
189                 return -EPERM;
190
191         bus->bus_client = !!b;
192         return 0;
193 }
194
195 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
196         if (!bus)
197                 return -EINVAL;
198         if (bus->state != BUS_UNSET)
199                 return -EPERM;
200
201         bus->negotiate_fds = !!b;
202         return 0;
203 }
204
205 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
206         if (!bus)
207                 return -EINVAL;
208         if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
209                 return -EINVAL;
210         if (bus->state != BUS_UNSET)
211                 return -EPERM;
212
213         bus->is_server = !!b;
214         bus->server_id = server_id;
215         return 0;
216 }
217
218 int sd_bus_set_anonymous(sd_bus *bus, int b) {
219         if (!bus)
220                 return -EINVAL;
221         if (bus->state != BUS_UNSET)
222                 return -EPERM;
223
224         bus->anonymous_auth = !!b;
225         return 0;
226 }
227
228 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
229         const char *s;
230         int r;
231
232         assert(bus);
233         assert(bus->state == BUS_HELLO);
234
235         if (error != 0)
236                 return -error;
237
238         assert(reply);
239
240         r = sd_bus_message_read(reply, "s", &s);
241         if (r < 0)
242                 return r;
243
244         if (!service_name_is_valid(s) || s[0] != ':')
245                 return -EBADMSG;
246
247         bus->unique_name = strdup(s);
248         if (!bus->unique_name)
249                 return -ENOMEM;
250
251         bus->state = BUS_RUNNING;
252
253         return 1;
254 }
255
256 static int bus_send_hello(sd_bus *bus) {
257         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
258         int r;
259
260         assert(bus);
261
262         if (!bus->bus_client)
263                 return 0;
264
265         r = sd_bus_message_new_method_call(
266                         bus,
267                         "org.freedesktop.DBus",
268                         "/",
269                         "org.freedesktop.DBus",
270                         "Hello",
271                         &m);
272         if (r < 0)
273                 return r;
274
275         return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
276 }
277
278 int bus_start_running(sd_bus *bus) {
279         assert(bus);
280
281         if (bus->bus_client) {
282                 bus->state = BUS_HELLO;
283                 return 1;
284         }
285
286         bus->state = BUS_RUNNING;
287         return 1;
288 }
289
290 static int parse_address_key(const char **p, const char *key, char **value) {
291         size_t l, n = 0;
292         const char *a;
293         char *r = NULL;
294
295         assert(p);
296         assert(*p);
297         assert(value);
298
299         if (key) {
300                 l = strlen(key);
301                 if (strncmp(*p, key, l) != 0)
302                         return 0;
303
304                 if ((*p)[l] != '=')
305                         return 0;
306
307                 if (*value)
308                         return -EINVAL;
309
310                 a = *p + l + 1;
311         } else
312                 a = *p;
313
314         while (*a != ';' && *a != ',' && *a != 0) {
315                 char c, *t;
316
317                 if (*a == '%') {
318                         int x, y;
319
320                         x = unhexchar(a[1]);
321                         if (x < 0) {
322                                 free(r);
323                                 return x;
324                         }
325
326                         y = unhexchar(a[2]);
327                         if (y < 0) {
328                                 free(r);
329                                 return y;
330                         }
331
332                         c = (char) ((x << 4) | y);
333                         a += 3;
334                 } else {
335                         c = *a;
336                         a++;
337                 }
338
339                 t = realloc(r, n + 2);
340                 if (!t) {
341                         free(r);
342                         return -ENOMEM;
343                 }
344
345                 r = t;
346                 r[n++] = c;
347         }
348
349         if (!r) {
350                 r = strdup("");
351                 if (!r)
352                         return -ENOMEM;
353         } else
354                 r[n] = 0;
355
356         if (*a == ',')
357                 a++;
358
359         *p = a;
360
361         free(*value);
362         *value = r;
363
364         return 1;
365 }
366
367 static void skip_address_key(const char **p) {
368         assert(p);
369         assert(*p);
370
371         *p += strcspn(*p, ",");
372
373         if (**p == ',')
374                 (*p) ++;
375 }
376
377 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
378         _cleanup_free_ char *path = NULL, *abstract = NULL;
379         size_t l;
380         int r;
381
382         assert(b);
383         assert(p);
384         assert(*p);
385         assert(guid);
386
387         while (**p != 0 && **p != ';') {
388                 r = parse_address_key(p, "guid", guid);
389                 if (r < 0)
390                         return r;
391                 else if (r > 0)
392                         continue;
393
394                 r = parse_address_key(p, "path", &path);
395                 if (r < 0)
396                         return r;
397                 else if (r > 0)
398                         continue;
399
400                 r = parse_address_key(p, "abstract", &abstract);
401                 if (r < 0)
402                         return r;
403                 else if (r > 0)
404                         continue;
405
406                 skip_address_key(p);
407         }
408
409         if (!path && !abstract)
410                 return -EINVAL;
411
412         if (path && abstract)
413                 return -EINVAL;
414
415         if (path) {
416                 l = strlen(path);
417                 if (l > sizeof(b->sockaddr.un.sun_path))
418                         return -E2BIG;
419
420                 b->sockaddr.un.sun_family = AF_UNIX;
421                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
422                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
423         } else if (abstract) {
424                 l = strlen(abstract);
425                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
426                         return -E2BIG;
427
428                 b->sockaddr.un.sun_family = AF_UNIX;
429                 b->sockaddr.un.sun_path[0] = 0;
430                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
431                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
432         }
433
434         return 0;
435 }
436
437 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
438         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
439         struct addrinfo hints, *result;
440         int r;
441
442         assert(b);
443         assert(p);
444         assert(*p);
445         assert(guid);
446
447         while (**p != 0 && **p != ';') {
448                 r = parse_address_key(p, "guid", guid);
449                 if (r < 0)
450                         return r;
451                 else if (r > 0)
452                         continue;
453
454                 r = parse_address_key(p, "host", &host);
455                 if (r < 0)
456                         return r;
457                 else if (r > 0)
458                         continue;
459
460                 r = parse_address_key(p, "port", &port);
461                 if (r < 0)
462                         return r;
463                 else if (r > 0)
464                         continue;
465
466                 r = parse_address_key(p, "family", &family);
467                 if (r < 0)
468                         return r;
469                 else if (r > 0)
470                         continue;
471
472                 skip_address_key(p);
473         }
474
475         if (!host || !port)
476                 return -EINVAL;
477
478         zero(hints);
479         hints.ai_socktype = SOCK_STREAM;
480         hints.ai_flags = AI_ADDRCONFIG;
481
482         if (family) {
483                 if (streq(family, "ipv4"))
484                         hints.ai_family = AF_INET;
485                 else if (streq(family, "ipv6"))
486                         hints.ai_family = AF_INET6;
487                 else
488                         return -EINVAL;
489         }
490
491         r = getaddrinfo(host, port, &hints, &result);
492         if (r == EAI_SYSTEM)
493                 return -errno;
494         else if (r != 0)
495                 return -EADDRNOTAVAIL;
496
497         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
498         b->sockaddr_size = result->ai_addrlen;
499
500         freeaddrinfo(result);
501
502         return 0;
503 }
504
505 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
506         char *path = NULL;
507         unsigned n_argv = 0, j;
508         char **argv = NULL;
509         int r;
510
511         assert(b);
512         assert(p);
513         assert(*p);
514         assert(guid);
515
516         while (**p != 0 && **p != ';') {
517                 r = parse_address_key(p, "guid", guid);
518                 if (r < 0)
519                         goto fail;
520                 else if (r > 0)
521                         continue;
522
523                 r = parse_address_key(p, "path", &path);
524                 if (r < 0)
525                         goto fail;
526                 else if (r > 0)
527                         continue;
528
529                 if (startswith(*p, "argv")) {
530                         unsigned ul;
531
532                         errno = 0;
533                         ul = strtoul(*p + 4, (char**) p, 10);
534                         if (errno > 0 || **p != '=' || ul > 256) {
535                                 r = -EINVAL;
536                                 goto fail;
537                         }
538
539                         (*p) ++;
540
541                         if (ul >= n_argv) {
542                                 char **x;
543
544                                 x = realloc(argv, sizeof(char*) * (ul + 2));
545                                 if (!x) {
546                                         r = -ENOMEM;
547                                         goto fail;
548                                 }
549
550                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
551
552                                 argv = x;
553                                 n_argv = ul + 1;
554                         }
555
556                         r = parse_address_key(p, NULL, argv + ul);
557                         if (r < 0)
558                                 goto fail;
559
560                         continue;
561                 }
562
563                 skip_address_key(p);
564         }
565
566         if (!path) {
567                 r = -EINVAL;
568                 goto fail;
569         }
570
571         /* Make sure there are no holes in the array, with the
572          * exception of argv[0] */
573         for (j = 1; j < n_argv; j++)
574                 if (!argv[j]) {
575                         r = -EINVAL;
576                         goto fail;
577                 }
578
579         if (argv && argv[0] == NULL) {
580                 argv[0] = strdup(path);
581                 if (!argv[0]) {
582                         r = -ENOMEM;
583                         goto fail;
584                 }
585         }
586
587         b->exec_path = path;
588         b->exec_argv = argv;
589         return 0;
590
591 fail:
592         for (j = 0; j < n_argv; j++)
593                 free(argv[j]);
594
595         free(argv);
596         free(path);
597         return r;
598 }
599
600 static void bus_reset_parsed_address(sd_bus *b) {
601         assert(b);
602
603         zero(b->sockaddr);
604         b->sockaddr_size = 0;
605         strv_free(b->exec_argv);
606         free(b->exec_path);
607         b->exec_path = NULL;
608         b->exec_argv = NULL;
609         b->server_id = SD_ID128_NULL;
610 }
611
612 static int bus_parse_next_address(sd_bus *b) {
613         _cleanup_free_ char *guid = NULL;
614         const char *a;
615         int r;
616
617         assert(b);
618
619         if (!b->address)
620                 return 0;
621         if (b->address[b->address_index] == 0)
622                 return 0;
623
624         bus_reset_parsed_address(b);
625
626         a = b->address + b->address_index;
627
628         while (*a != 0) {
629
630                 if (*a == ';') {
631                         a++;
632                         continue;
633                 }
634
635                 if (startswith(a, "unix:")) {
636                         a += 5;
637
638                         r = parse_unix_address(b, &a, &guid);
639                         if (r < 0)
640                                 return r;
641                         break;
642
643                 } else if (startswith(a, "tcp:")) {
644
645                         a += 4;
646                         r = parse_tcp_address(b, &a, &guid);
647                         if (r < 0)
648                                 return r;
649
650                         break;
651
652                 } else if (startswith(a, "unixexec:")) {
653
654                         a += 9;
655                         r = parse_exec_address(b, &a, &guid);
656                         if (r < 0)
657                                 return r;
658
659                         break;
660
661                 }
662
663                 a = strchr(a, ';');
664                 if (!a)
665                         return 0;
666         }
667
668         if (guid) {
669                 r = sd_id128_from_string(guid, &b->server_id);
670                 if (r < 0)
671                         return r;
672         }
673
674         b->address_index = a - b->address;
675         return 1;
676 }
677
678 static int bus_start_address(sd_bus *b) {
679         int r;
680
681         assert(b);
682
683         for (;;) {
684                 sd_bus_close(b);
685
686                 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
687
688                         r = bus_socket_connect(b);
689                         if (r >= 0)
690                                 return r;
691
692                         b->last_connect_error = -r;
693
694                 } else if (b->exec_path) {
695
696                         r = bus_socket_exec(b);
697                         if (r >= 0)
698                                 return r;
699
700                         b->last_connect_error = -r;
701                 }
702
703                 r = bus_parse_next_address(b);
704                 if (r < 0)
705                         return r;
706                 if (r == 0)
707                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
708         }
709 }
710
711 int bus_next_address(sd_bus *b) {
712         assert(b);
713
714         bus_reset_parsed_address(b);
715         return bus_start_address(b);
716 }
717
718 static int bus_start_fd(sd_bus *b) {
719         int r;
720
721         assert(b);
722         assert(b->input_fd >= 0);
723         assert(b->output_fd >= 0);
724
725         r = fd_nonblock(b->input_fd, true);
726         if (r < 0)
727                 return r;
728
729         r = fd_cloexec(b->input_fd, true);
730         if (r < 0)
731                 return r;
732
733         if (b->input_fd != b->output_fd) {
734                 r = fd_nonblock(b->output_fd, true);
735                 if (r < 0)
736                         return r;
737
738                 r = fd_cloexec(b->output_fd, true);
739                 if (r < 0)
740                         return r;
741         }
742
743         return bus_socket_take_fd(b);
744 }
745
746 int sd_bus_start(sd_bus *bus) {
747         int r;
748
749         if (!bus)
750                 return -EINVAL;
751         if (bus->state != BUS_UNSET)
752                 return -EPERM;
753
754         bus->state = BUS_OPENING;
755
756         if (bus->is_server && bus->bus_client)
757                 return -EINVAL;
758
759         if (bus->input_fd >= 0)
760                 r = bus_start_fd(bus);
761         else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path)
762                 r = bus_start_address(bus);
763         else
764                 return -EINVAL;
765
766         if (r < 0)
767                 return r;
768
769         return bus_send_hello(bus);
770 }
771
772 int sd_bus_open_system(sd_bus **ret) {
773         const char *e;
774         sd_bus *b;
775         int r;
776
777         if (!ret)
778                 return -EINVAL;
779
780         r = sd_bus_new(&b);
781         if (r < 0)
782                 return r;
783
784         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
785         if (e) {
786                 r = sd_bus_set_address(b, e);
787                 if (r < 0)
788                         goto fail;
789         } else {
790                 b->sockaddr.un.sun_family = AF_UNIX;
791                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
792                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
793         }
794
795         b->bus_client = true;
796
797         r = sd_bus_start(b);
798         if (r < 0)
799                 goto fail;
800
801         *ret = b;
802         return 0;
803
804 fail:
805         bus_free(b);
806         return r;
807 }
808
809 int sd_bus_open_user(sd_bus **ret) {
810         const char *e;
811         sd_bus *b;
812         size_t l;
813         int r;
814
815         if (!ret)
816                 return -EINVAL;
817
818         r = sd_bus_new(&b);
819         if (r < 0)
820                 return r;
821
822         e = getenv("DBUS_SESSION_BUS_ADDRESS");
823         if (e) {
824                 r = sd_bus_set_address(b, e);
825                 if (r < 0)
826                         goto fail;
827         } else {
828                 e = getenv("XDG_RUNTIME_DIR");
829                 if (!e) {
830                         r = -ENOENT;
831                         goto fail;
832                 }
833
834                 l = strlen(e);
835                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
836                         r = -E2BIG;
837                         goto fail;
838                 }
839
840                 b->sockaddr.un.sun_family = AF_UNIX;
841                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
842                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
843         }
844
845         b->bus_client = true;
846
847         r = sd_bus_start(b);
848         if (r < 0)
849                 goto fail;
850
851         *ret = b;
852         return 0;
853
854 fail:
855         bus_free(b);
856         return r;
857 }
858
859 void sd_bus_close(sd_bus *bus) {
860         if (!bus)
861                 return;
862
863         if (bus->input_fd >= 0)
864                 close_nointr_nofail(bus->input_fd);
865         if (bus->output_fd >= 0 && bus->output_fd != bus->input_fd)
866                 close_nointr_nofail(bus->output_fd);
867
868         bus->input_fd = bus->output_fd = -1;
869 }
870
871 sd_bus *sd_bus_ref(sd_bus *bus) {
872         if (!bus)
873                 return NULL;
874
875         assert(bus->n_ref > 0);
876
877         bus->n_ref++;
878         return bus;
879 }
880
881 sd_bus *sd_bus_unref(sd_bus *bus) {
882         if (!bus)
883                 return NULL;
884
885         assert(bus->n_ref > 0);
886         bus->n_ref--;
887
888         if (bus->n_ref <= 0)
889                 bus_free(bus);
890
891         return NULL;
892 }
893
894 int sd_bus_is_open(sd_bus *bus) {
895         if (!bus)
896                 return -EINVAL;
897
898         return bus->state != BUS_UNSET && bus->input_fd >= 0;
899 }
900
901 int sd_bus_can_send(sd_bus *bus, char type) {
902         int r;
903
904         if (!bus)
905                 return -EINVAL;
906         if (bus->output_fd < 0)
907                 return -ENOTCONN;
908
909         if (type == SD_BUS_TYPE_UNIX_FD) {
910                 if (!bus->negotiate_fds)
911                         return 0;
912
913                 r = bus_ensure_running(bus);
914                 if (r < 0)
915                         return r;
916
917                 return bus->can_fds;
918         }
919
920         return bus_type_is_valid(type);
921 }
922
923 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
924         int r;
925
926         if (!bus)
927                 return -EINVAL;
928         if (!server_id)
929                 return -EINVAL;
930
931         r = bus_ensure_running(bus);
932         if (r < 0)
933                 return r;
934
935         *server_id = bus->server_id;
936         return 0;
937 }
938
939 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
940         assert(m);
941
942         if (m->header->version > b->message_version)
943                 return -EPERM;
944
945         if (m->sealed)
946                 return 0;
947
948         return bus_message_seal(m, ++b->serial);
949 }
950
951 static int dispatch_wqueue(sd_bus *bus) {
952         int r, ret = 0;
953
954         assert(bus);
955         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
956
957         if (bus->output_fd < 0)
958                 return -ENOTCONN;
959
960         while (bus->wqueue_size > 0) {
961
962                 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
963                 if (r < 0) {
964                         sd_bus_close(bus);
965                         return r;
966                 } else if (r == 0)
967                         /* Didn't do anything this time */
968                         return ret;
969                 else if (bus->windex >= bus->wqueue[0]->size) {
970                         /* Fully written. Let's drop the entry from
971                          * the queue.
972                          *
973                          * This isn't particularly optimized, but
974                          * well, this is supposed to be our worst-case
975                          * buffer only, and the socket buffer is
976                          * supposed to be our primary buffer, and if
977                          * it got full, then all bets are off
978                          * anyway. */
979
980                         sd_bus_message_unref(bus->wqueue[0]);
981                         bus->wqueue_size --;
982                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
983                         bus->windex = 0;
984
985                         ret = 1;
986                 }
987         }
988
989         return ret;
990 }
991
992 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
993         sd_bus_message *z = NULL;
994         int r, ret = 0;
995
996         assert(bus);
997         assert(m);
998         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
999
1000         if (bus->input_fd < 0)
1001                 return -ENOTCONN;
1002
1003         if (bus->rqueue_size > 0) {
1004                 /* Dispatch a queued message */
1005
1006                 *m = bus->rqueue[0];
1007                 bus->rqueue_size --;
1008                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1009                 return 1;
1010         }
1011
1012         /* Try to read a new message */
1013         do {
1014                 r = bus_socket_read_message(bus, &z);
1015                 if (r < 0) {
1016                         sd_bus_close(bus);
1017                         return r;
1018                 }
1019                 if (r == 0)
1020                         return ret;
1021
1022                 r = 1;
1023         } while (!z);
1024
1025         *m = z;
1026         return 1;
1027 }
1028
1029 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1030         int r;
1031
1032         if (!bus)
1033                 return -EINVAL;
1034         if (bus->state == BUS_UNSET)
1035                 return -ENOTCONN;
1036         if (bus->output_fd < 0)
1037                 return -ENOTCONN;
1038         if (!m)
1039                 return -EINVAL;
1040
1041         if (m->n_fds > 0) {
1042                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1043                 if (r < 0)
1044                         return r;
1045                 if (r == 0)
1046                         return -ENOTSUP;
1047         }
1048
1049         /* If the serial number isn't kept, then we know that no reply
1050          * is expected */
1051         if (!serial && !m->sealed)
1052                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1053
1054         r = bus_seal_message(bus, m);
1055         if (r < 0)
1056                 return r;
1057
1058         /* If this is a reply and no reply was requested, then let's
1059          * suppress this, if we can */
1060         if (m->dont_send && !serial)
1061                 return 0;
1062
1063         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1064                 size_t idx = 0;
1065
1066                 r = bus_socket_write_message(bus, m, &idx);
1067                 if (r < 0) {
1068                         sd_bus_close(bus);
1069                         return r;
1070                 } else if (idx < m->size)  {
1071                         /* Wasn't fully written. So let's remember how
1072                          * much was written. Note that the first entry
1073                          * of the wqueue array is always allocated so
1074                          * that we always can remember how much was
1075                          * written. */
1076                         bus->wqueue[0] = sd_bus_message_ref(m);
1077                         bus->wqueue_size = 1;
1078                         bus->windex = idx;
1079                 }
1080         } else {
1081                 sd_bus_message **q;
1082
1083                 /* Just append it to the queue. */
1084
1085                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1086                         return -ENOBUFS;
1087
1088                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1089                 if (!q)
1090                         return -ENOMEM;
1091
1092                 bus->wqueue = q;
1093                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1094         }
1095
1096         if (serial)
1097                 *serial = BUS_MESSAGE_SERIAL(m);
1098
1099         return 0;
1100 }
1101
1102 static usec_t calc_elapse(uint64_t usec) {
1103         if (usec == (uint64_t) -1)
1104                 return 0;
1105
1106         if (usec == 0)
1107                 usec = BUS_DEFAULT_TIMEOUT;
1108
1109         return now(CLOCK_MONOTONIC) + usec;
1110 }
1111
1112 static int timeout_compare(const void *a, const void *b) {
1113         const struct reply_callback *x = a, *y = b;
1114
1115         if (x->timeout != 0 && y->timeout == 0)
1116                 return -1;
1117
1118         if (x->timeout == 0 && y->timeout != 0)
1119                 return 1;
1120
1121         if (x->timeout < y->timeout)
1122                 return -1;
1123
1124         if (x->timeout > y->timeout)
1125                 return 1;
1126
1127         return 0;
1128 }
1129
1130 int sd_bus_send_with_reply(
1131                 sd_bus *bus,
1132                 sd_bus_message *m,
1133                 sd_bus_message_handler_t callback,
1134                 void *userdata,
1135                 uint64_t usec,
1136                 uint64_t *serial) {
1137
1138         struct reply_callback *c;
1139         int r;
1140
1141         if (!bus)
1142                 return -EINVAL;
1143         if (bus->state == BUS_UNSET)
1144                 return -ENOTCONN;
1145         if (bus->output_fd < 0)
1146                 return -ENOTCONN;
1147         if (!m)
1148                 return -EINVAL;
1149         if (!callback)
1150                 return -EINVAL;
1151         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1152                 return -EINVAL;
1153         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1154                 return -EINVAL;
1155
1156         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1157         if (r < 0)
1158                 return r;
1159
1160         if (usec != (uint64_t) -1) {
1161                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1162                 if (r < 0)
1163                         return r;
1164         }
1165
1166         r = bus_seal_message(bus, m);
1167         if (r < 0)
1168                 return r;
1169
1170         c = new0(struct reply_callback, 1);
1171         if (!c)
1172                 return -ENOMEM;
1173
1174         c->callback = callback;
1175         c->userdata = userdata;
1176         c->serial = BUS_MESSAGE_SERIAL(m);
1177         c->timeout = calc_elapse(usec);
1178
1179         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1180         if (r < 0) {
1181                 free(c);
1182                 return r;
1183         }
1184
1185         if (c->timeout != 0) {
1186                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1187                 if (r < 0) {
1188                         c->timeout = 0;
1189                         sd_bus_send_with_reply_cancel(bus, c->serial);
1190                         return r;
1191                 }
1192         }
1193
1194         r = sd_bus_send(bus, m, serial);
1195         if (r < 0) {
1196                 sd_bus_send_with_reply_cancel(bus, c->serial);
1197                 return r;
1198         }
1199
1200         return r;
1201 }
1202
1203 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1204         struct reply_callback *c;
1205
1206         if (!bus)
1207                 return -EINVAL;
1208         if (serial == 0)
1209                 return -EINVAL;
1210
1211         c = hashmap_remove(bus->reply_callbacks, &serial);
1212         if (!c)
1213                 return 0;
1214
1215         if (c->timeout != 0)
1216                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1217
1218         free(c);
1219         return 1;
1220 }
1221
1222 int bus_ensure_running(sd_bus *bus) {
1223         int r;
1224
1225         assert(bus);
1226
1227         if (bus->input_fd < 0)
1228                 return -ENOTCONN;
1229         if (bus->state == BUS_UNSET)
1230                 return -ENOTCONN;
1231
1232         if (bus->state == BUS_RUNNING)
1233                 return 1;
1234
1235         for (;;) {
1236                 r = sd_bus_process(bus, NULL);
1237                 if (r < 0)
1238                         return r;
1239                 if (bus->state == BUS_RUNNING)
1240                         return 1;
1241                 if (r > 0)
1242                         continue;
1243
1244                 r = sd_bus_wait(bus, (uint64_t) -1);
1245                 if (r < 0)
1246                         return r;
1247         }
1248 }
1249
1250 int sd_bus_send_with_reply_and_block(
1251                 sd_bus *bus,
1252                 sd_bus_message *m,
1253                 uint64_t usec,
1254                 sd_bus_error *error,
1255                 sd_bus_message **reply) {
1256
1257         int r;
1258         usec_t timeout;
1259         uint64_t serial;
1260         bool room = false;
1261
1262         if (!bus)
1263                 return -EINVAL;
1264         if (bus->output_fd < 0)
1265                 return -ENOTCONN;
1266         if (bus->state == BUS_UNSET)
1267                 return -ENOTCONN;
1268         if (!m)
1269                 return -EINVAL;
1270         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1271                 return -EINVAL;
1272         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1273                 return -EINVAL;
1274         if (bus_error_is_dirty(error))
1275                 return -EINVAL;
1276
1277         r = bus_ensure_running(bus);
1278         if (r < 0)
1279                 return r;
1280
1281         r = sd_bus_send(bus, m, &serial);
1282         if (r < 0)
1283                 return r;
1284
1285         timeout = calc_elapse(usec);
1286
1287         for (;;) {
1288                 usec_t left;
1289                 sd_bus_message *incoming = NULL;
1290
1291                 if (!room) {
1292                         sd_bus_message **q;
1293
1294                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1295                                 return -ENOBUFS;
1296
1297                         /* Make sure there's room for queuing this
1298                          * locally, before we read the message */
1299
1300                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1301                         if (!q)
1302                                 return -ENOMEM;
1303
1304                         bus->rqueue = q;
1305                         room = true;
1306                 }
1307
1308                 r = bus_socket_read_message(bus, &incoming);
1309                 if (r < 0)
1310                         return r;
1311                 if (incoming) {
1312
1313                         if (incoming->reply_serial == serial) {
1314                                 /* Found a match! */
1315
1316                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1317
1318                                         if (reply)
1319                                                 *reply = incoming;
1320                                         else
1321                                                 sd_bus_message_unref(incoming);
1322
1323                                         return 0;
1324                                 }
1325
1326                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1327                                         int k;
1328
1329                                         r = sd_bus_error_copy(error, &incoming->error);
1330                                         if (r < 0) {
1331                                                 sd_bus_message_unref(incoming);
1332                                                 return r;
1333                                         }
1334
1335                                         k = bus_error_to_errno(&incoming->error);
1336                                         sd_bus_message_unref(incoming);
1337                                         return k;
1338                                 }
1339
1340                                 sd_bus_message_unref(incoming);
1341                                 return -EIO;
1342                         }
1343
1344                         /* There's already guaranteed to be room for
1345                          * this, so need to resize things here */
1346                         bus->rqueue[bus->rqueue_size ++] = incoming;
1347                         room = false;
1348
1349                         /* Try to read more, right-away */
1350                         continue;
1351                 }
1352                 if (r != 0)
1353                         continue;
1354
1355                 if (timeout > 0) {
1356                         usec_t n;
1357
1358                         n = now(CLOCK_MONOTONIC);
1359                         if (n >= timeout)
1360                                 return -ETIMEDOUT;
1361
1362                         left = timeout - n;
1363                 } else
1364                         left = (uint64_t) -1;
1365
1366                 r = bus_poll(bus, true, left);
1367                 if (r < 0)
1368                         return r;
1369
1370                 r = dispatch_wqueue(bus);
1371                 if (r < 0)
1372                         return r;
1373         }
1374 }
1375
1376 int sd_bus_get_fd(sd_bus *bus) {
1377         if (!bus)
1378                 return -EINVAL;
1379         if (bus->input_fd < 0)
1380                 return -ENOTCONN;
1381         if (bus->input_fd != bus->output_fd)
1382                 return -EPERM;
1383
1384         return bus->input_fd;
1385 }
1386
1387 int sd_bus_get_events(sd_bus *bus) {
1388         int flags = 0;
1389
1390         if (!bus)
1391                 return -EINVAL;
1392         if (bus->state == BUS_UNSET)
1393                 return -ENOTCONN;
1394         if (bus->input_fd < 0)
1395                 return -ENOTCONN;
1396
1397         if (bus->state == BUS_OPENING)
1398                 flags |= POLLOUT;
1399         else if (bus->state == BUS_AUTHENTICATING) {
1400
1401                 if (bus_socket_auth_needs_write(bus))
1402                         flags |= POLLOUT;
1403
1404                 flags |= POLLIN;
1405
1406         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1407                 if (bus->rqueue_size <= 0)
1408                         flags |= POLLIN;
1409                 if (bus->wqueue_size > 0)
1410                         flags |= POLLOUT;
1411         }
1412
1413         return flags;
1414 }
1415
1416 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1417         struct reply_callback *c;
1418
1419         if (!bus)
1420                 return -EINVAL;
1421         if (!timeout_usec)
1422                 return -EINVAL;
1423         if (bus->state == BUS_UNSET)
1424                 return -ENOTCONN;
1425         if (bus->input_fd < 0)
1426                 return -ENOTCONN;
1427
1428         if (bus->state == BUS_AUTHENTICATING) {
1429                 *timeout_usec = bus->auth_timeout;
1430                 return 1;
1431         }
1432
1433         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1434                 *timeout_usec = (uint64_t) -1;
1435                 return 0;
1436         }
1437
1438         c = prioq_peek(bus->reply_callbacks_prioq);
1439         if (!c) {
1440                 *timeout_usec = (uint64_t) -1;
1441                 return 0;
1442         }
1443
1444         *timeout_usec = c->timeout;
1445         return 1;
1446 }
1447
1448 static int process_timeout(sd_bus *bus) {
1449         struct reply_callback *c;
1450         usec_t n;
1451         int r;
1452
1453         assert(bus);
1454
1455         c = prioq_peek(bus->reply_callbacks_prioq);
1456         if (!c)
1457                 return 0;
1458
1459         n = now(CLOCK_MONOTONIC);
1460         if (c->timeout > n)
1461                 return 0;
1462
1463         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1464         hashmap_remove(bus->reply_callbacks, &c->serial);
1465
1466         r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1467         free(c);
1468
1469         return r < 0 ? r : 1;
1470 }
1471
1472 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1473         assert(bus);
1474         assert(m);
1475
1476         if (bus->state != BUS_HELLO)
1477                 return 0;
1478
1479         /* Let's make sure the first message on the bus is the HELLO
1480          * reply. But note that we don't actually parse the message
1481          * here (we leave that to the usual handling), we just verify
1482          * we don't let any earlier msg through. */
1483
1484         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1485             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1486                 return -EIO;
1487
1488         if (m->reply_serial != bus->hello_serial)
1489                 return -EIO;
1490
1491         return 0;
1492 }
1493
1494 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1495         struct reply_callback *c;
1496         int r;
1497
1498         assert(bus);
1499         assert(m);
1500
1501         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1502             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1503                 return 0;
1504
1505         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1506         if (!c)
1507                 return 0;
1508
1509         if (c->timeout != 0)
1510                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1511
1512         r = c->callback(bus, 0, m, c->userdata);
1513         free(c);
1514
1515         return r;
1516 }
1517
1518 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1519         struct filter_callback *l;
1520         int r;
1521
1522         assert(bus);
1523         assert(m);
1524
1525         do {
1526                 bus->filter_callbacks_modified = false;
1527
1528                 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1529
1530                         if (bus->filter_callbacks_modified)
1531                                 break;
1532
1533                         /* Don't run this more than once per iteration */
1534                         if (l->last_iteration == bus->iteration_counter)
1535                                 continue;
1536
1537                         l->last_iteration = bus->iteration_counter;
1538
1539                         r = l->callback(bus, 0, m, l->userdata);
1540                         if (r != 0)
1541                                 return r;
1542
1543                 }
1544
1545         } while (bus->filter_callbacks_modified);
1546
1547         return 0;
1548 }
1549
1550 static int process_match(sd_bus *bus, sd_bus_message *m) {
1551         int r;
1552
1553         assert(bus);
1554         assert(m);
1555
1556         do {
1557                 bus->match_callbacks_modified = false;
1558
1559                 r = bus_match_run(bus, &bus->match_callbacks, 0, m);
1560                 if (r != 0)
1561                         return r;
1562
1563         } while (bus->match_callbacks_modified);
1564
1565         return 0;
1566 }
1567
1568 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1569         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1570         int r;
1571
1572         assert(bus);
1573         assert(m);
1574
1575         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1576                 return 0;
1577
1578         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1579                 return 0;
1580
1581         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1582                 return 1;
1583
1584         if (streq_ptr(m->member, "Ping"))
1585                 r = sd_bus_message_new_method_return(bus, m, &reply);
1586         else if (streq_ptr(m->member, "GetMachineId")) {
1587                 sd_id128_t id;
1588                 char sid[33];
1589
1590                 r = sd_id128_get_machine(&id);
1591                 if (r < 0)
1592                         return r;
1593
1594                 r = sd_bus_message_new_method_return(bus, m, &reply);
1595                 if (r < 0)
1596                         return r;
1597
1598                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1599         } else {
1600                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1601
1602                 sd_bus_error_set(&error,
1603                                  "org.freedesktop.DBus.Error.UnknownMethod",
1604                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1605
1606                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1607         }
1608
1609         if (r < 0)
1610                 return r;
1611
1612         r = sd_bus_send(bus, reply, NULL);
1613         if (r < 0)
1614                 return r;
1615
1616         return 1;
1617 }
1618
1619 static int process_object(sd_bus *bus, sd_bus_message *m) {
1620         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1621         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1622         struct object_callback *c;
1623         int r;
1624         bool found = false;
1625         size_t pl;
1626
1627         assert(bus);
1628         assert(m);
1629
1630         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1631                 return 0;
1632
1633         if (hashmap_isempty(bus->object_callbacks))
1634                 return 0;
1635
1636         pl = strlen(m->path);
1637
1638         do {
1639                 char p[pl+1];
1640
1641                 bus->object_callbacks_modified = false;
1642
1643                 c = hashmap_get(bus->object_callbacks, m->path);
1644                 if (c && c->last_iteration != bus->iteration_counter) {
1645
1646                         c->last_iteration = bus->iteration_counter;
1647
1648                         r = c->callback(bus, 0, m, c->userdata);
1649                         if (r != 0)
1650                                 return r;
1651
1652                         found = true;
1653                 }
1654
1655                 /* Look for fallback prefixes */
1656                 strcpy(p, m->path);
1657                 for (;;) {
1658                         char *e;
1659
1660                         if (bus->object_callbacks_modified)
1661                                 break;
1662
1663                         e = strrchr(p, '/');
1664                         if (e == p || !e)
1665                                 break;
1666
1667                         *e = 0;
1668
1669                         c = hashmap_get(bus->object_callbacks, p);
1670                         if (c && c->last_iteration != bus->iteration_counter && c->is_fallback) {
1671
1672                                 c->last_iteration = bus->iteration_counter;
1673
1674                                 r = c->callback(bus, 0, m, c->userdata);
1675                                 if (r != 0)
1676                                         return r;
1677
1678                                 found = true;
1679                         }
1680                 }
1681
1682         } while (bus->object_callbacks_modified);
1683
1684         /* We found some handlers but none wanted to take this, then
1685          * return this -- with one exception, we can handle
1686          * introspection minimally ourselves */
1687         if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1688                 return 0;
1689
1690         sd_bus_error_set(&error,
1691                          "org.freedesktop.DBus.Error.UnknownMethod",
1692                          "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1693
1694         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1695         if (r < 0)
1696                 return r;
1697
1698         r = sd_bus_send(bus, reply, NULL);
1699         if (r < 0)
1700                 return r;
1701
1702         return 1;
1703 }
1704
1705 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1706         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1707         _cleanup_free_ char *introspection = NULL;
1708         _cleanup_set_free_free_ Set *s = NULL;
1709         _cleanup_fclose_ FILE *f = NULL;
1710         struct object_callback *c;
1711         Iterator i;
1712         size_t size = 0;
1713         char *node;
1714         int r;
1715
1716         assert(bus);
1717         assert(m);
1718
1719         if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1720                 return 0;
1721
1722         if (!m->path)
1723                 return 0;
1724
1725         s = set_new(string_hash_func, string_compare_func);
1726         if (!s)
1727                 return -ENOMEM;
1728
1729         HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1730                 const char *e;
1731                 char *a, *p;
1732
1733                 if (streq(c->path, "/"))
1734                         continue;
1735
1736                 if (streq(m->path, "/"))
1737                         e = c->path;
1738                 else {
1739                         e = startswith(c->path, m->path);
1740                         if (!e || *e != '/')
1741                                 continue;
1742                 }
1743
1744                 a = strdup(e+1);
1745                 if (!a)
1746                         return -ENOMEM;
1747
1748                 p = strchr(a, '/');
1749                 if (p)
1750                         *p = 0;
1751
1752                 r = set_put(s, a);
1753                 if (r < 0) {
1754                         free(a);
1755
1756                         if (r != -EEXIST)
1757                                 return r;
1758                 }
1759         }
1760
1761         f = open_memstream(&introspection, &size);
1762         if (!f)
1763                 return -ENOMEM;
1764
1765         fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1766         fputs("<node>\n", f);
1767         fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1768         fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1769
1770         while ((node = set_steal_first(s))) {
1771                 fprintf(f, " <node name=\"%s\"/>\n", node);
1772                 free(node);
1773         }
1774
1775         fputs("</node>\n", f);
1776
1777         fflush(f);
1778
1779         if (ferror(f))
1780                 return -ENOMEM;
1781
1782         r = sd_bus_message_new_method_return(bus, m, &reply);
1783         if (r < 0)
1784                 return r;
1785
1786         r = sd_bus_message_append(reply, "s", introspection);
1787         if (r < 0)
1788                 return r;
1789
1790         r = sd_bus_send(bus, reply, NULL);
1791         if (r < 0)
1792                 return r;
1793
1794         return 1;
1795 }
1796
1797 static int process_message(sd_bus *bus, sd_bus_message *m) {
1798         int r;
1799
1800         assert(bus);
1801         assert(m);
1802
1803         bus->iteration_counter++;
1804
1805         r = process_hello(bus, m);
1806         if (r != 0)
1807                 return r;
1808
1809         r = process_reply(bus, m);
1810         if (r != 0)
1811                 return r;
1812
1813         r = process_filter(bus, m);
1814         if (r != 0)
1815                 return r;
1816
1817         r = process_match(bus, m);
1818         if (r != 0)
1819                 return r;
1820
1821         r = process_builtin(bus, m);
1822         if (r != 0)
1823                 return r;
1824
1825         r = process_object(bus, m);
1826         if (r != 0)
1827                 return r;
1828
1829         return process_introspect(bus, m);
1830 }
1831
1832 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1833         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1834         int r;
1835
1836         assert(bus);
1837         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1838
1839         r = process_timeout(bus);
1840         if (r != 0)
1841                 goto null_message;
1842
1843         r = dispatch_wqueue(bus);
1844         if (r != 0)
1845                 goto null_message;
1846
1847         r = dispatch_rqueue(bus, &m);
1848         if (r < 0)
1849                 return r;
1850         if (!m)
1851                 goto null_message;
1852
1853         r = process_message(bus, m);
1854         if (r != 0)
1855                 goto null_message;
1856
1857         if (ret) {
1858                 *ret = m;
1859                 m = NULL;
1860                 return 1;
1861         }
1862
1863         if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1864                 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1865                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_NULL;
1866
1867                 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1868
1869                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1870                 if (r < 0)
1871                         return r;
1872
1873                 r = sd_bus_send(bus, reply, NULL);
1874                 if (r < 0)
1875                         return r;
1876         }
1877
1878         return 1;
1879
1880 null_message:
1881         if (r >= 0 && ret)
1882                 *ret = NULL;
1883
1884         return r;
1885 }
1886
1887 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1888         int r;
1889
1890         /* Returns 0 when we didn't do anything. This should cause the
1891          * caller to invoke sd_bus_wait() before returning the next
1892          * time. Returns > 0 when we did something, which possibly
1893          * means *ret is filled in with an unprocessed message. */
1894
1895         if (!bus)
1896                 return -EINVAL;
1897         if (bus->input_fd < 0)
1898                 return -ENOTCONN;
1899
1900         /* We don't allow recursively invoking sd_bus_process(). */
1901         if (bus->processing)
1902                 return -EBUSY;
1903
1904         switch (bus->state) {
1905
1906         case BUS_UNSET:
1907                 return -ENOTCONN;
1908
1909         case BUS_OPENING:
1910                 r = bus_socket_process_opening(bus);
1911                 if (r < 0)
1912                         return r;
1913                 if (ret)
1914                         *ret = NULL;
1915                 return r;
1916
1917         case BUS_AUTHENTICATING:
1918
1919                 r = bus_socket_process_authenticating(bus);
1920                 if (r < 0)
1921                         return r;
1922                 if (ret)
1923                         *ret = NULL;
1924                 return r;
1925
1926         case BUS_RUNNING:
1927         case BUS_HELLO:
1928
1929                 bus->processing = true;
1930                 r = process_running(bus, ret);
1931                 bus->processing = false;
1932
1933                 return r;
1934         }
1935
1936         assert_not_reached("Unknown state");
1937 }
1938
1939 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1940         struct pollfd p[2];
1941         int r, e, n;
1942         struct timespec ts;
1943         usec_t until, m;
1944
1945         assert(bus);
1946
1947         if (bus->input_fd < 0)
1948                 return -ENOTCONN;
1949
1950         e = sd_bus_get_events(bus);
1951         if (e < 0)
1952                 return e;
1953
1954         if (need_more)
1955                 e |= POLLIN;
1956
1957         r = sd_bus_get_timeout(bus, &until);
1958         if (r < 0)
1959                 return r;
1960         if (r == 0)
1961                 m = (uint64_t) -1;
1962         else {
1963                 usec_t nw;
1964                 nw = now(CLOCK_MONOTONIC);
1965                 m = until > nw ? until - nw : 0;
1966         }
1967
1968         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1969                 m = timeout_usec;
1970
1971         zero(p);
1972         p[0].fd = bus->input_fd;
1973
1974         if (bus->output_fd == bus->input_fd) {
1975                 p[0].events = e;
1976                 n = 1;
1977         } else {
1978                 p[0].events = e & POLLIN;
1979                 p[1].fd = bus->output_fd;
1980                 p[1].events = e & POLLOUT;
1981                 n = 2;
1982         }
1983
1984         r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1985         if (r < 0)
1986                 return -errno;
1987
1988         return r > 0 ? 1 : 0;
1989 }
1990
1991 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1992
1993         if (!bus)
1994                 return -EINVAL;
1995         if (bus->state == BUS_UNSET)
1996                 return -ENOTCONN;
1997         if (bus->input_fd < 0)
1998                 return -ENOTCONN;
1999         if (bus->rqueue_size > 0)
2000                 return 0;
2001
2002         return bus_poll(bus, false, timeout_usec);
2003 }
2004
2005 int sd_bus_flush(sd_bus *bus) {
2006         int r;
2007
2008         if (!bus)
2009                 return -EINVAL;
2010         if (bus->state == BUS_UNSET)
2011                 return -ENOTCONN;
2012         if (bus->output_fd < 0)
2013                 return -ENOTCONN;
2014
2015         r = bus_ensure_running(bus);
2016         if (r < 0)
2017                 return r;
2018
2019         if (bus->wqueue_size <= 0)
2020                 return 0;
2021
2022         for (;;) {
2023                 r = dispatch_wqueue(bus);
2024                 if (r < 0)
2025                         return r;
2026
2027                 if (bus->wqueue_size <= 0)
2028                         return 0;
2029
2030                 r = bus_poll(bus, false, (uint64_t) -1);
2031                 if (r < 0)
2032                         return r;
2033         }
2034 }
2035
2036 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2037         struct filter_callback *f;
2038
2039         if (!bus)
2040                 return -EINVAL;
2041         if (!callback)
2042                 return -EINVAL;
2043
2044         f = new0(struct filter_callback, 1);
2045         if (!f)
2046                 return -ENOMEM;
2047         f->callback = callback;
2048         f->userdata = userdata;
2049
2050         bus->filter_callbacks_modified = true;
2051         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2052         return 0;
2053 }
2054
2055 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2056         struct filter_callback *f;
2057
2058         if (!bus)
2059                 return -EINVAL;
2060         if (!callback)
2061                 return -EINVAL;
2062
2063         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2064                 if (f->callback == callback && f->userdata == userdata) {
2065                         bus->filter_callbacks_modified = true;
2066                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2067                         free(f);
2068                         return 1;
2069                 }
2070         }
2071
2072         return 0;
2073 }
2074
2075 static int bus_add_object(
2076                 sd_bus *bus,
2077                 bool fallback,
2078                 const char *path,
2079                 sd_bus_message_handler_t callback,
2080                 void *userdata) {
2081
2082         struct object_callback *c;
2083         int r;
2084
2085         if (!bus)
2086                 return -EINVAL;
2087         if (!path)
2088                 return -EINVAL;
2089         if (!callback)
2090                 return -EINVAL;
2091
2092         r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2093         if (r < 0)
2094                 return r;
2095
2096         c = new0(struct object_callback, 1);
2097         if (!c)
2098                 return -ENOMEM;
2099
2100         c->path = strdup(path);
2101         if (!c->path) {
2102                 free(c);
2103                 return -ENOMEM;
2104         }
2105
2106         c->callback = callback;
2107         c->userdata = userdata;
2108         c->is_fallback = fallback;
2109
2110         bus->object_callbacks_modified = true;
2111         r = hashmap_put(bus->object_callbacks, c->path, c);
2112         if (r < 0) {
2113                 free(c->path);
2114                 free(c);
2115                 return r;
2116         }
2117
2118         return 0;
2119 }
2120
2121 static int bus_remove_object(
2122                 sd_bus *bus,
2123                 bool fallback,
2124                 const char *path,
2125                 sd_bus_message_handler_t callback,
2126                 void *userdata) {
2127
2128         struct object_callback *c;
2129
2130         if (!bus)
2131                 return -EINVAL;
2132         if (!path)
2133                 return -EINVAL;
2134         if (!callback)
2135                 return -EINVAL;
2136
2137         c = hashmap_get(bus->object_callbacks, path);
2138         if (!c)
2139                 return 0;
2140
2141         if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2142                 return 0;
2143
2144         bus->object_callbacks_modified = true;
2145         assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2146
2147         free(c->path);
2148         free(c);
2149
2150         return 1;
2151 }
2152
2153 int sd_bus_add_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2154         return bus_add_object(bus, false, path, callback, userdata);
2155 }
2156
2157 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2158         return bus_remove_object(bus, false, path, callback, userdata);
2159 }
2160
2161 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2162         return bus_add_object(bus, true, prefix, callback, userdata);
2163 }
2164
2165 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2166         return bus_remove_object(bus, true, prefix, callback, userdata);
2167 }
2168
2169 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2170         int r = 0;
2171
2172         if (!bus)
2173                 return -EINVAL;
2174         if (!match)
2175                 return -EINVAL;
2176
2177         if (bus->bus_client) {
2178                 r = bus_add_match_internal(bus, match);
2179                 if (r < 0)
2180                         return r;
2181         }
2182
2183         if (callback) {
2184                 bus->match_callbacks_modified = true;
2185                 r = bus_match_add(&bus->match_callbacks, match, callback, userdata, NULL);
2186                 if (r < 0) {
2187
2188                         if (bus->bus_client)
2189                                 bus_remove_match_internal(bus, match);
2190                 }
2191         }
2192
2193         return r;
2194 }
2195
2196 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2197         int r = 0, q = 0;
2198
2199         if (!bus)
2200                 return -EINVAL;
2201         if (!match)
2202                 return -EINVAL;
2203
2204         if (bus->bus_client)
2205                 r = bus_remove_match_internal(bus, match);
2206
2207         if (callback) {
2208                 bus->match_callbacks_modified = true;
2209                 q = bus_match_remove(&bus->match_callbacks, match, callback, userdata);
2210         }
2211
2212         if (r < 0)
2213                 return r;
2214         return q;
2215 }
2216
2217 int sd_bus_emit_signal(
2218                 sd_bus *bus,
2219                 const char *path,
2220                 const char *interface,
2221                 const char *member,
2222                 const char *types, ...) {
2223
2224         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2225         va_list ap;
2226         int r;
2227
2228         if (!bus)
2229                 return -EINVAL;
2230
2231         r = sd_bus_message_new_signal(bus, path, interface, member, &m);
2232         if (r < 0)
2233                 return r;
2234
2235         va_start(ap, types);
2236         r = bus_message_append_ap(m, types, ap);
2237         va_end(ap);
2238         if (r < 0)
2239                 return r;
2240
2241         return sd_bus_send(bus, m, NULL);
2242 }
2243
2244 int sd_bus_call_method(
2245                 sd_bus *bus,
2246                 const char *destination,
2247                 const char *path,
2248                 const char *interface,
2249                 const char *member,
2250                 sd_bus_error *error,
2251                 sd_bus_message **reply,
2252                 const char *types, ...) {
2253
2254         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2255         va_list ap;
2256         int r;
2257
2258         if (!bus)
2259                 return -EINVAL;
2260
2261         r = sd_bus_message_new_method_call(bus, destination, path, interface, member, &m);
2262         if (r < 0)
2263                 return r;
2264
2265         va_start(ap, types);
2266         r = bus_message_append_ap(m, types, ap);
2267         va_end(ap);
2268         if (r < 0)
2269                 return r;
2270
2271         return sd_bus_send_with_reply_and_block(bus, m, 0, error, reply);
2272 }
2273
2274 int sd_bus_reply_method_return(
2275                 sd_bus *bus,
2276                 sd_bus_message *call,
2277                 const char *types, ...) {
2278
2279         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2280         va_list ap;
2281         int r;
2282
2283         if (!bus)
2284                 return -EINVAL;
2285         if (!call)
2286                 return -EINVAL;
2287         if (!call->sealed)
2288                 return -EPERM;
2289         if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2290                 return -EINVAL;
2291
2292         if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2293                 return 0;
2294
2295         r = sd_bus_message_new_method_return(bus, call, &m);
2296         if (r < 0)
2297                 return r;
2298
2299         va_start(ap, types);
2300         r = bus_message_append_ap(m, types, ap);
2301         va_end(ap);
2302         if (r < 0)
2303                 return r;
2304
2305         return sd_bus_send(bus, m, NULL);
2306 }
2307
2308 int sd_bus_reply_method_error(
2309                 sd_bus *bus,
2310                 sd_bus_message *call,
2311                 const sd_bus_error *e) {
2312
2313         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2314         int r;
2315
2316         if (!bus)
2317                 return -EINVAL;
2318         if (!call)
2319                 return -EINVAL;
2320         if (!call->sealed)
2321                 return -EPERM;
2322         if (call->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
2323                 return -EINVAL;
2324         if (!sd_bus_error_is_set(e))
2325                 return -EINVAL;
2326
2327         if (call->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2328                 return 0;
2329
2330         r = sd_bus_message_new_method_error(bus, call, e, &m);
2331         if (r < 0)
2332                 return r;
2333
2334         return sd_bus_send(bus, m, NULL);
2335 }