chiark / gitweb /
bus: add convenience functions for constructing and sending method calls/signals...
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32 #include "strv.h"
33 #include "set.h"
34
35 #include "sd-bus.h"
36 #include "bus-internal.h"
37 #include "bus-message.h"
38 #include "bus-type.h"
39 #include "bus-socket.h"
40 #include "bus-control.h"
41
42 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
43
44 static void bus_free(sd_bus *b) {
45         struct filter_callback *f;
46         struct object_callback *c;
47         unsigned i;
48
49         assert(b);
50
51         sd_bus_close(b);
52
53         free(b->rbuffer);
54         free(b->unique_name);
55         free(b->auth_buffer);
56         free(b->address);
57
58         free(b->exec_path);
59         strv_free(b->exec_argv);
60
61         close_many(b->fds, b->n_fds);
62         free(b->fds);
63
64         for (i = 0; i < b->rqueue_size; i++)
65                 sd_bus_message_unref(b->rqueue[i]);
66         free(b->rqueue);
67
68         for (i = 0; i < b->wqueue_size; i++)
69                 sd_bus_message_unref(b->wqueue[i]);
70         free(b->wqueue);
71
72         hashmap_free_free(b->reply_callbacks);
73         prioq_free(b->reply_callbacks_prioq);
74
75         while ((f = b->filter_callbacks)) {
76                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
77                 free(f);
78         }
79
80         while ((c = hashmap_steal_first(b->object_callbacks))) {
81                 free(c->path);
82                 free(c);
83         }
84
85         hashmap_free(b->object_callbacks);
86
87         bus_match_free(&b->match_callbacks);
88
89         free(b);
90 }
91
92 int sd_bus_new(sd_bus **ret) {
93         sd_bus *r;
94
95         if (!ret)
96                 return -EINVAL;
97
98         r = new0(sd_bus, 1);
99         if (!r)
100                 return -ENOMEM;
101
102         r->n_ref = 1;
103         r->input_fd = r->output_fd = -1;
104         r->message_version = 1;
105         r->negotiate_fds = true;
106
107         /* We guarantee that wqueue always has space for at least one
108          * entry */
109         r->wqueue = new(sd_bus_message*, 1);
110         if (!r->wqueue) {
111                 free(r);
112                 return -ENOMEM;
113         }
114
115         *ret = r;
116         return 0;
117 }
118
119 int sd_bus_set_address(sd_bus *bus, const char *address) {
120         char *a;
121
122         if (!bus)
123                 return -EINVAL;
124         if (bus->state != BUS_UNSET)
125                 return -EPERM;
126         if (!address)
127                 return -EINVAL;
128
129         a = strdup(address);
130         if (!a)
131                 return -ENOMEM;
132
133         free(bus->address);
134         bus->address = a;
135
136         return 0;
137 }
138
139 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
140         if (!bus)
141                 return -EINVAL;
142         if (bus->state != BUS_UNSET)
143                 return -EPERM;
144         if (input_fd < 0)
145                 return -EINVAL;
146         if (output_fd < 0)
147                 return -EINVAL;
148
149         bus->input_fd = input_fd;
150         bus->output_fd = output_fd;
151         return 0;
152 }
153
154 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
155         char *p, **a;
156
157         if (!bus)
158                 return -EINVAL;
159         if (bus->state != BUS_UNSET)
160                 return -EPERM;
161         if (!path)
162                 return -EINVAL;
163         if (strv_isempty(argv))
164                 return -EINVAL;
165
166         p = strdup(path);
167         if (!p)
168                 return -ENOMEM;
169
170         a = strv_copy(argv);
171         if (!a) {
172                 free(p);
173                 return -ENOMEM;
174         }
175
176         free(bus->exec_path);
177         strv_free(bus->exec_argv);
178
179         bus->exec_path = p;
180         bus->exec_argv = a;
181
182         return 0;
183 }
184
185 int sd_bus_set_bus_client(sd_bus *bus, int b) {
186         if (!bus)
187                 return -EINVAL;
188         if (bus->state != BUS_UNSET)
189                 return -EPERM;
190
191         bus->bus_client = !!b;
192         return 0;
193 }
194
195 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
196         if (!bus)
197                 return -EINVAL;
198         if (bus->state != BUS_UNSET)
199                 return -EPERM;
200
201         bus->negotiate_fds = !!b;
202         return 0;
203 }
204
205 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
206         if (!bus)
207                 return -EINVAL;
208         if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
209                 return -EINVAL;
210         if (bus->state != BUS_UNSET)
211                 return -EPERM;
212
213         bus->is_server = !!b;
214         bus->server_id = server_id;
215         return 0;
216 }
217
218 int sd_bus_set_anonymous(sd_bus *bus, int b) {
219         if (!bus)
220                 return -EINVAL;
221         if (bus->state != BUS_UNSET)
222                 return -EPERM;
223
224         bus->anonymous_auth = !!b;
225         return 0;
226 }
227
228 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
229         const char *s;
230         int r;
231
232         assert(bus);
233         assert(bus->state == BUS_HELLO);
234
235         if (error != 0)
236                 return -error;
237
238         assert(reply);
239
240         r = sd_bus_message_read(reply, "s", &s);
241         if (r < 0)
242                 return r;
243
244         if (!service_name_is_valid(s) || s[0] != ':')
245                 return -EBADMSG;
246
247         bus->unique_name = strdup(s);
248         if (!bus->unique_name)
249                 return -ENOMEM;
250
251         bus->state = BUS_RUNNING;
252
253         return 1;
254 }
255
256 static int bus_send_hello(sd_bus *bus) {
257         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
258         int r;
259
260         assert(bus);
261
262         if (!bus->bus_client)
263                 return 0;
264
265         r = sd_bus_message_new_method_call(
266                         bus,
267                         "org.freedesktop.DBus",
268                         "/",
269                         "org.freedesktop.DBus",
270                         "Hello",
271                         &m);
272         if (r < 0)
273                 return r;
274
275         return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
276 }
277
278 int bus_start_running(sd_bus *bus) {
279         assert(bus);
280
281         if (bus->bus_client) {
282                 bus->state = BUS_HELLO;
283                 return 1;
284         }
285
286         bus->state = BUS_RUNNING;
287         return 1;
288 }
289
290 static int parse_address_key(const char **p, const char *key, char **value) {
291         size_t l, n = 0;
292         const char *a;
293         char *r = NULL;
294
295         assert(p);
296         assert(*p);
297         assert(value);
298
299         if (key) {
300                 l = strlen(key);
301                 if (strncmp(*p, key, l) != 0)
302                         return 0;
303
304                 if ((*p)[l] != '=')
305                         return 0;
306
307                 if (*value)
308                         return -EINVAL;
309
310                 a = *p + l + 1;
311         } else
312                 a = *p;
313
314         while (*a != ';' && *a != ',' && *a != 0) {
315                 char c, *t;
316
317                 if (*a == '%') {
318                         int x, y;
319
320                         x = unhexchar(a[1]);
321                         if (x < 0) {
322                                 free(r);
323                                 return x;
324                         }
325
326                         y = unhexchar(a[2]);
327                         if (y < 0) {
328                                 free(r);
329                                 return y;
330                         }
331
332                         c = (char) ((x << 4) | y);
333                         a += 3;
334                 } else {
335                         c = *a;
336                         a++;
337                 }
338
339                 t = realloc(r, n + 2);
340                 if (!t) {
341                         free(r);
342                         return -ENOMEM;
343                 }
344
345                 r = t;
346                 r[n++] = c;
347         }
348
349         if (!r) {
350                 r = strdup("");
351                 if (!r)
352                         return -ENOMEM;
353         } else
354                 r[n] = 0;
355
356         if (*a == ',')
357                 a++;
358
359         *p = a;
360
361         free(*value);
362         *value = r;
363
364         return 1;
365 }
366
367 static void skip_address_key(const char **p) {
368         assert(p);
369         assert(*p);
370
371         *p += strcspn(*p, ",");
372
373         if (**p == ',')
374                 (*p) ++;
375 }
376
377 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
378         _cleanup_free_ char *path = NULL, *abstract = NULL;
379         size_t l;
380         int r;
381
382         assert(b);
383         assert(p);
384         assert(*p);
385         assert(guid);
386
387         while (**p != 0 && **p != ';') {
388                 r = parse_address_key(p, "guid", guid);
389                 if (r < 0)
390                         return r;
391                 else if (r > 0)
392                         continue;
393
394                 r = parse_address_key(p, "path", &path);
395                 if (r < 0)
396                         return r;
397                 else if (r > 0)
398                         continue;
399
400                 r = parse_address_key(p, "abstract", &abstract);
401                 if (r < 0)
402                         return r;
403                 else if (r > 0)
404                         continue;
405
406                 skip_address_key(p);
407         }
408
409         if (!path && !abstract)
410                 return -EINVAL;
411
412         if (path && abstract)
413                 return -EINVAL;
414
415         if (path) {
416                 l = strlen(path);
417                 if (l > sizeof(b->sockaddr.un.sun_path))
418                         return -E2BIG;
419
420                 b->sockaddr.un.sun_family = AF_UNIX;
421                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
422                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
423         } else if (abstract) {
424                 l = strlen(abstract);
425                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
426                         return -E2BIG;
427
428                 b->sockaddr.un.sun_family = AF_UNIX;
429                 b->sockaddr.un.sun_path[0] = 0;
430                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
431                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
432         }
433
434         return 0;
435 }
436
437 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
438         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
439         struct addrinfo hints, *result;
440         int r;
441
442         assert(b);
443         assert(p);
444         assert(*p);
445         assert(guid);
446
447         while (**p != 0 && **p != ';') {
448                 r = parse_address_key(p, "guid", guid);
449                 if (r < 0)
450                         return r;
451                 else if (r > 0)
452                         continue;
453
454                 r = parse_address_key(p, "host", &host);
455                 if (r < 0)
456                         return r;
457                 else if (r > 0)
458                         continue;
459
460                 r = parse_address_key(p, "port", &port);
461                 if (r < 0)
462                         return r;
463                 else if (r > 0)
464                         continue;
465
466                 r = parse_address_key(p, "family", &family);
467                 if (r < 0)
468                         return r;
469                 else if (r > 0)
470                         continue;
471
472                 skip_address_key(p);
473         }
474
475         if (!host || !port)
476                 return -EINVAL;
477
478         zero(hints);
479         hints.ai_socktype = SOCK_STREAM;
480         hints.ai_flags = AI_ADDRCONFIG;
481
482         if (family) {
483                 if (streq(family, "ipv4"))
484                         hints.ai_family = AF_INET;
485                 else if (streq(family, "ipv6"))
486                         hints.ai_family = AF_INET6;
487                 else
488                         return -EINVAL;
489         }
490
491         r = getaddrinfo(host, port, &hints, &result);
492         if (r == EAI_SYSTEM)
493                 return -errno;
494         else if (r != 0)
495                 return -EADDRNOTAVAIL;
496
497         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
498         b->sockaddr_size = result->ai_addrlen;
499
500         freeaddrinfo(result);
501
502         return 0;
503 }
504
505 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
506         char *path = NULL;
507         unsigned n_argv = 0, j;
508         char **argv = NULL;
509         int r;
510
511         assert(b);
512         assert(p);
513         assert(*p);
514         assert(guid);
515
516         while (**p != 0 && **p != ';') {
517                 r = parse_address_key(p, "guid", guid);
518                 if (r < 0)
519                         goto fail;
520                 else if (r > 0)
521                         continue;
522
523                 r = parse_address_key(p, "path", &path);
524                 if (r < 0)
525                         goto fail;
526                 else if (r > 0)
527                         continue;
528
529                 if (startswith(*p, "argv")) {
530                         unsigned ul;
531
532                         errno = 0;
533                         ul = strtoul(*p + 4, (char**) p, 10);
534                         if (errno > 0 || **p != '=' || ul > 256) {
535                                 r = -EINVAL;
536                                 goto fail;
537                         }
538
539                         (*p) ++;
540
541                         if (ul >= n_argv) {
542                                 char **x;
543
544                                 x = realloc(argv, sizeof(char*) * (ul + 2));
545                                 if (!x) {
546                                         r = -ENOMEM;
547                                         goto fail;
548                                 }
549
550                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
551
552                                 argv = x;
553                                 n_argv = ul + 1;
554                         }
555
556                         r = parse_address_key(p, NULL, argv + ul);
557                         if (r < 0)
558                                 goto fail;
559
560                         continue;
561                 }
562
563                 skip_address_key(p);
564         }
565
566         if (!path) {
567                 r = -EINVAL;
568                 goto fail;
569         }
570
571         /* Make sure there are no holes in the array, with the
572          * exception of argv[0] */
573         for (j = 1; j < n_argv; j++)
574                 if (!argv[j]) {
575                         r = -EINVAL;
576                         goto fail;
577                 }
578
579         if (argv && argv[0] == NULL) {
580                 argv[0] = strdup(path);
581                 if (!argv[0]) {
582                         r = -ENOMEM;
583                         goto fail;
584                 }
585         }
586
587         b->exec_path = path;
588         b->exec_argv = argv;
589         return 0;
590
591 fail:
592         for (j = 0; j < n_argv; j++)
593                 free(argv[j]);
594
595         free(argv);
596         free(path);
597         return r;
598 }
599
600 static void bus_reset_parsed_address(sd_bus *b) {
601         assert(b);
602
603         zero(b->sockaddr);
604         b->sockaddr_size = 0;
605         strv_free(b->exec_argv);
606         free(b->exec_path);
607         b->exec_path = NULL;
608         b->exec_argv = NULL;
609         b->server_id = SD_ID128_NULL;
610 }
611
612 static int bus_parse_next_address(sd_bus *b) {
613         _cleanup_free_ char *guid = NULL;
614         const char *a;
615         int r;
616
617         assert(b);
618
619         if (!b->address)
620                 return 0;
621         if (b->address[b->address_index] == 0)
622                 return 0;
623
624         bus_reset_parsed_address(b);
625
626         a = b->address + b->address_index;
627
628         while (*a != 0) {
629
630                 if (*a == ';') {
631                         a++;
632                         continue;
633                 }
634
635                 if (startswith(a, "unix:")) {
636                         a += 5;
637
638                         r = parse_unix_address(b, &a, &guid);
639                         if (r < 0)
640                                 return r;
641                         break;
642
643                 } else if (startswith(a, "tcp:")) {
644
645                         a += 4;
646                         r = parse_tcp_address(b, &a, &guid);
647                         if (r < 0)
648                                 return r;
649
650                         break;
651
652                 } else if (startswith(a, "unixexec:")) {
653
654                         a += 9;
655                         r = parse_exec_address(b, &a, &guid);
656                         if (r < 0)
657                                 return r;
658
659                         break;
660
661                 }
662
663                 a = strchr(a, ';');
664                 if (!a)
665                         return 0;
666         }
667
668         if (guid) {
669                 r = sd_id128_from_string(guid, &b->server_id);
670                 if (r < 0)
671                         return r;
672         }
673
674         b->address_index = a - b->address;
675         return 1;
676 }
677
678 static int bus_start_address(sd_bus *b) {
679         int r;
680
681         assert(b);
682
683         for (;;) {
684                 sd_bus_close(b);
685
686                 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
687
688                         r = bus_socket_connect(b);
689                         if (r >= 0)
690                                 return r;
691
692                         b->last_connect_error = -r;
693
694                 } else if (b->exec_path) {
695
696                         r = bus_socket_exec(b);
697                         if (r >= 0)
698                                 return r;
699
700                         b->last_connect_error = -r;
701                 }
702
703                 r = bus_parse_next_address(b);
704                 if (r < 0)
705                         return r;
706                 if (r == 0)
707                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
708         }
709 }
710
711 int bus_next_address(sd_bus *b) {
712         assert(b);
713
714         bus_reset_parsed_address(b);
715         return bus_start_address(b);
716 }
717
718 static int bus_start_fd(sd_bus *b) {
719         int r;
720
721         assert(b);
722         assert(b->input_fd >= 0);
723         assert(b->output_fd >= 0);
724
725         r = fd_nonblock(b->input_fd, true);
726         if (r < 0)
727                 return r;
728
729         r = fd_cloexec(b->input_fd, true);
730         if (r < 0)
731                 return r;
732
733         if (b->input_fd != b->output_fd) {
734                 r = fd_nonblock(b->output_fd, true);
735                 if (r < 0)
736                         return r;
737
738                 r = fd_cloexec(b->output_fd, true);
739                 if (r < 0)
740                         return r;
741         }
742
743         return bus_socket_take_fd(b);
744 }
745
746 int sd_bus_start(sd_bus *bus) {
747         int r;
748
749         if (!bus)
750                 return -EINVAL;
751         if (bus->state != BUS_UNSET)
752                 return -EPERM;
753
754         bus->state = BUS_OPENING;
755
756         if (bus->is_server && bus->bus_client)
757                 return -EINVAL;
758
759         if (bus->input_fd >= 0)
760                 r = bus_start_fd(bus);
761         else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path)
762                 r = bus_start_address(bus);
763         else
764                 return -EINVAL;
765
766         if (r < 0)
767                 return r;
768
769         return bus_send_hello(bus);
770 }
771
772 int sd_bus_open_system(sd_bus **ret) {
773         const char *e;
774         sd_bus *b;
775         int r;
776
777         if (!ret)
778                 return -EINVAL;
779
780         r = sd_bus_new(&b);
781         if (r < 0)
782                 return r;
783
784         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
785         if (e) {
786                 r = sd_bus_set_address(b, e);
787                 if (r < 0)
788                         goto fail;
789         } else {
790                 b->sockaddr.un.sun_family = AF_UNIX;
791                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
792                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
793         }
794
795         b->bus_client = true;
796
797         r = sd_bus_start(b);
798         if (r < 0)
799                 goto fail;
800
801         *ret = b;
802         return 0;
803
804 fail:
805         bus_free(b);
806         return r;
807 }
808
809 int sd_bus_open_user(sd_bus **ret) {
810         const char *e;
811         sd_bus *b;
812         size_t l;
813         int r;
814
815         if (!ret)
816                 return -EINVAL;
817
818         r = sd_bus_new(&b);
819         if (r < 0)
820                 return r;
821
822         e = getenv("DBUS_SESSION_BUS_ADDRESS");
823         if (e) {
824                 r = sd_bus_set_address(b, e);
825                 if (r < 0)
826                         goto fail;
827         } else {
828                 e = getenv("XDG_RUNTIME_DIR");
829                 if (!e) {
830                         r = -ENOENT;
831                         goto fail;
832                 }
833
834                 l = strlen(e);
835                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
836                         r = -E2BIG;
837                         goto fail;
838                 }
839
840                 b->sockaddr.un.sun_family = AF_UNIX;
841                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
842                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
843         }
844
845         b->bus_client = true;
846
847         r = sd_bus_start(b);
848         if (r < 0)
849                 goto fail;
850
851         *ret = b;
852         return 0;
853
854 fail:
855         bus_free(b);
856         return r;
857 }
858
859 void sd_bus_close(sd_bus *bus) {
860         if (!bus)
861                 return;
862
863         if (bus->input_fd >= 0)
864                 close_nointr_nofail(bus->input_fd);
865         if (bus->output_fd >= 0 && bus->output_fd != bus->input_fd)
866                 close_nointr_nofail(bus->output_fd);
867
868         bus->input_fd = bus->output_fd = -1;
869 }
870
871 sd_bus *sd_bus_ref(sd_bus *bus) {
872         if (!bus)
873                 return NULL;
874
875         assert(bus->n_ref > 0);
876
877         bus->n_ref++;
878         return bus;
879 }
880
881 sd_bus *sd_bus_unref(sd_bus *bus) {
882         if (!bus)
883                 return NULL;
884
885         assert(bus->n_ref > 0);
886         bus->n_ref--;
887
888         if (bus->n_ref <= 0)
889                 bus_free(bus);
890
891         return NULL;
892 }
893
894 int sd_bus_is_open(sd_bus *bus) {
895         if (!bus)
896                 return -EINVAL;
897
898         return bus->state != BUS_UNSET && bus->input_fd >= 0;
899 }
900
901 int sd_bus_can_send(sd_bus *bus, char type) {
902         int r;
903
904         if (!bus)
905                 return -EINVAL;
906         if (bus->output_fd < 0)
907                 return -ENOTCONN;
908
909         if (type == SD_BUS_TYPE_UNIX_FD) {
910                 if (!bus->negotiate_fds)
911                         return 0;
912
913                 r = bus_ensure_running(bus);
914                 if (r < 0)
915                         return r;
916
917                 return bus->can_fds;
918         }
919
920         return bus_type_is_valid(type);
921 }
922
923 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
924         int r;
925
926         if (!bus)
927                 return -EINVAL;
928         if (!server_id)
929                 return -EINVAL;
930
931         r = bus_ensure_running(bus);
932         if (r < 0)
933                 return r;
934
935         *server_id = bus->server_id;
936         return 0;
937 }
938
939 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
940         assert(m);
941
942         if (m->header->version > b->message_version)
943                 return -EPERM;
944
945         if (m->sealed)
946                 return 0;
947
948         return bus_message_seal(m, ++b->serial);
949 }
950
951 static int dispatch_wqueue(sd_bus *bus) {
952         int r, ret = 0;
953
954         assert(bus);
955         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
956
957         if (bus->output_fd < 0)
958                 return -ENOTCONN;
959
960         while (bus->wqueue_size > 0) {
961
962                 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
963                 if (r < 0) {
964                         sd_bus_close(bus);
965                         return r;
966                 } else if (r == 0)
967                         /* Didn't do anything this time */
968                         return ret;
969                 else if (bus->windex >= bus->wqueue[0]->size) {
970                         /* Fully written. Let's drop the entry from
971                          * the queue.
972                          *
973                          * This isn't particularly optimized, but
974                          * well, this is supposed to be our worst-case
975                          * buffer only, and the socket buffer is
976                          * supposed to be our primary buffer, and if
977                          * it got full, then all bets are off
978                          * anyway. */
979
980                         sd_bus_message_unref(bus->wqueue[0]);
981                         bus->wqueue_size --;
982                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
983                         bus->windex = 0;
984
985                         ret = 1;
986                 }
987         }
988
989         return ret;
990 }
991
992 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
993         sd_bus_message *z = NULL;
994         int r, ret = 0;
995
996         assert(bus);
997         assert(m);
998         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
999
1000         if (bus->input_fd < 0)
1001                 return -ENOTCONN;
1002
1003         if (bus->rqueue_size > 0) {
1004                 /* Dispatch a queued message */
1005
1006                 *m = bus->rqueue[0];
1007                 bus->rqueue_size --;
1008                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1009                 return 1;
1010         }
1011
1012         /* Try to read a new message */
1013         do {
1014                 r = bus_socket_read_message(bus, &z);
1015                 if (r < 0) {
1016                         sd_bus_close(bus);
1017                         return r;
1018                 }
1019                 if (r == 0)
1020                         return ret;
1021
1022                 r = 1;
1023         } while (!z);
1024
1025         *m = z;
1026         return 1;
1027 }
1028
1029 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1030         int r;
1031
1032         if (!bus)
1033                 return -EINVAL;
1034         if (bus->state == BUS_UNSET)
1035                 return -ENOTCONN;
1036         if (bus->output_fd < 0)
1037                 return -ENOTCONN;
1038         if (!m)
1039                 return -EINVAL;
1040
1041         if (m->n_fds > 0) {
1042                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1043                 if (r < 0)
1044                         return r;
1045                 if (r == 0)
1046                         return -ENOTSUP;
1047         }
1048
1049         /* If the serial number isn't kept, then we know that no reply
1050          * is expected */
1051         if (!serial && !m->sealed)
1052                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1053
1054         r = bus_seal_message(bus, m);
1055         if (r < 0)
1056                 return r;
1057
1058         /* If this is a reply and no reply was requested, then let's
1059          * suppress this, if we can */
1060         if (m->dont_send && !serial)
1061                 return 0;
1062
1063         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1064                 size_t idx = 0;
1065
1066                 r = bus_socket_write_message(bus, m, &idx);
1067                 if (r < 0) {
1068                         sd_bus_close(bus);
1069                         return r;
1070                 } else if (idx < m->size)  {
1071                         /* Wasn't fully written. So let's remember how
1072                          * much was written. Note that the first entry
1073                          * of the wqueue array is always allocated so
1074                          * that we always can remember how much was
1075                          * written. */
1076                         bus->wqueue[0] = sd_bus_message_ref(m);
1077                         bus->wqueue_size = 1;
1078                         bus->windex = idx;
1079                 }
1080         } else {
1081                 sd_bus_message **q;
1082
1083                 /* Just append it to the queue. */
1084
1085                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1086                         return -ENOBUFS;
1087
1088                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1089                 if (!q)
1090                         return -ENOMEM;
1091
1092                 bus->wqueue = q;
1093                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1094         }
1095
1096         if (serial)
1097                 *serial = BUS_MESSAGE_SERIAL(m);
1098
1099         return 0;
1100 }
1101
1102 static usec_t calc_elapse(uint64_t usec) {
1103         if (usec == (uint64_t) -1)
1104                 return 0;
1105
1106         if (usec == 0)
1107                 usec = BUS_DEFAULT_TIMEOUT;
1108
1109         return now(CLOCK_MONOTONIC) + usec;
1110 }
1111
1112 static int timeout_compare(const void *a, const void *b) {
1113         const struct reply_callback *x = a, *y = b;
1114
1115         if (x->timeout != 0 && y->timeout == 0)
1116                 return -1;
1117
1118         if (x->timeout == 0 && y->timeout != 0)
1119                 return 1;
1120
1121         if (x->timeout < y->timeout)
1122                 return -1;
1123
1124         if (x->timeout > y->timeout)
1125                 return 1;
1126
1127         return 0;
1128 }
1129
1130 int sd_bus_send_with_reply(
1131                 sd_bus *bus,
1132                 sd_bus_message *m,
1133                 sd_bus_message_handler_t callback,
1134                 void *userdata,
1135                 uint64_t usec,
1136                 uint64_t *serial) {
1137
1138         struct reply_callback *c;
1139         int r;
1140
1141         if (!bus)
1142                 return -EINVAL;
1143         if (bus->state == BUS_UNSET)
1144                 return -ENOTCONN;
1145         if (bus->output_fd < 0)
1146                 return -ENOTCONN;
1147         if (!m)
1148                 return -EINVAL;
1149         if (!callback)
1150                 return -EINVAL;
1151         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1152                 return -EINVAL;
1153         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1154                 return -EINVAL;
1155
1156         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1157         if (r < 0)
1158                 return r;
1159
1160         if (usec != (uint64_t) -1) {
1161                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1162                 if (r < 0)
1163                         return r;
1164         }
1165
1166         r = bus_seal_message(bus, m);
1167         if (r < 0)
1168                 return r;
1169
1170         c = new(struct reply_callback, 1);
1171         if (!c)
1172                 return -ENOMEM;
1173
1174         c->callback = callback;
1175         c->userdata = userdata;
1176         c->serial = BUS_MESSAGE_SERIAL(m);
1177         c->timeout = calc_elapse(usec);
1178
1179         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1180         if (r < 0) {
1181                 free(c);
1182                 return r;
1183         }
1184
1185         if (c->timeout != 0) {
1186                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1187                 if (r < 0) {
1188                         c->timeout = 0;
1189                         sd_bus_send_with_reply_cancel(bus, c->serial);
1190                         return r;
1191                 }
1192         }
1193
1194         r = sd_bus_send(bus, m, serial);
1195         if (r < 0) {
1196                 sd_bus_send_with_reply_cancel(bus, c->serial);
1197                 return r;
1198         }
1199
1200         return r;
1201 }
1202
1203 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1204         struct reply_callback *c;
1205
1206         if (!bus)
1207                 return -EINVAL;
1208         if (serial == 0)
1209                 return -EINVAL;
1210
1211         c = hashmap_remove(bus->reply_callbacks, &serial);
1212         if (!c)
1213                 return 0;
1214
1215         if (c->timeout != 0)
1216                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1217
1218         free(c);
1219         return 1;
1220 }
1221
1222 int bus_ensure_running(sd_bus *bus) {
1223         int r;
1224
1225         assert(bus);
1226
1227         if (bus->input_fd < 0)
1228                 return -ENOTCONN;
1229         if (bus->state == BUS_UNSET)
1230                 return -ENOTCONN;
1231
1232         if (bus->state == BUS_RUNNING)
1233                 return 1;
1234
1235         for (;;) {
1236                 r = sd_bus_process(bus, NULL);
1237                 if (r < 0)
1238                         return r;
1239                 if (bus->state == BUS_RUNNING)
1240                         return 1;
1241                 if (r > 0)
1242                         continue;
1243
1244                 r = sd_bus_wait(bus, (uint64_t) -1);
1245                 if (r < 0)
1246                         return r;
1247         }
1248 }
1249
1250 int sd_bus_send_with_reply_and_block(
1251                 sd_bus *bus,
1252                 sd_bus_message *m,
1253                 uint64_t usec,
1254                 sd_bus_error *error,
1255                 sd_bus_message **reply) {
1256
1257         int r;
1258         usec_t timeout;
1259         uint64_t serial;
1260         bool room = false;
1261
1262         if (!bus)
1263                 return -EINVAL;
1264         if (bus->output_fd < 0)
1265                 return -ENOTCONN;
1266         if (bus->state == BUS_UNSET)
1267                 return -ENOTCONN;
1268         if (!m)
1269                 return -EINVAL;
1270         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1271                 return -EINVAL;
1272         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1273                 return -EINVAL;
1274         if (bus_error_is_dirty(error))
1275                 return -EINVAL;
1276
1277         r = bus_ensure_running(bus);
1278         if (r < 0)
1279                 return r;
1280
1281         r = sd_bus_send(bus, m, &serial);
1282         if (r < 0)
1283                 return r;
1284
1285         timeout = calc_elapse(usec);
1286
1287         for (;;) {
1288                 usec_t left;
1289                 sd_bus_message *incoming = NULL;
1290
1291                 if (!room) {
1292                         sd_bus_message **q;
1293
1294                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1295                                 return -ENOBUFS;
1296
1297                         /* Make sure there's room for queuing this
1298                          * locally, before we read the message */
1299
1300                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1301                         if (!q)
1302                                 return -ENOMEM;
1303
1304                         bus->rqueue = q;
1305                         room = true;
1306                 }
1307
1308                 r = bus_socket_read_message(bus, &incoming);
1309                 if (r < 0)
1310                         return r;
1311                 if (incoming) {
1312
1313                         if (incoming->reply_serial == serial) {
1314                                 /* Found a match! */
1315
1316                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1317                                         *reply = incoming;
1318                                         return 0;
1319                                 }
1320
1321                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1322                                         int k;
1323
1324                                         r = sd_bus_error_copy(error, &incoming->error);
1325                                         if (r < 0) {
1326                                                 sd_bus_message_unref(incoming);
1327                                                 return r;
1328                                         }
1329
1330                                         k = bus_error_to_errno(&incoming->error);
1331                                         sd_bus_message_unref(incoming);
1332                                         return k;
1333                                 }
1334
1335                                 sd_bus_message_unref(incoming);
1336                                 return -EIO;
1337                         }
1338
1339                         /* There's already guaranteed to be room for
1340                          * this, so need to resize things here */
1341                         bus->rqueue[bus->rqueue_size ++] = incoming;
1342                         room = false;
1343
1344                         /* Try to read more, right-away */
1345                         continue;
1346                 }
1347                 if (r != 0)
1348                         continue;
1349
1350                 if (timeout > 0) {
1351                         usec_t n;
1352
1353                         n = now(CLOCK_MONOTONIC);
1354                         if (n >= timeout)
1355                                 return -ETIMEDOUT;
1356
1357                         left = timeout - n;
1358                 } else
1359                         left = (uint64_t) -1;
1360
1361                 r = bus_poll(bus, true, left);
1362                 if (r < 0)
1363                         return r;
1364
1365                 r = dispatch_wqueue(bus);
1366                 if (r < 0)
1367                         return r;
1368         }
1369 }
1370
1371 int sd_bus_get_fd(sd_bus *bus) {
1372         if (!bus)
1373                 return -EINVAL;
1374         if (bus->input_fd < 0)
1375                 return -ENOTCONN;
1376         if (bus->input_fd != bus->output_fd)
1377                 return -EPERM;
1378
1379         return bus->input_fd;
1380 }
1381
1382 int sd_bus_get_events(sd_bus *bus) {
1383         int flags = 0;
1384
1385         if (!bus)
1386                 return -EINVAL;
1387         if (bus->state == BUS_UNSET)
1388                 return -ENOTCONN;
1389         if (bus->input_fd < 0)
1390                 return -ENOTCONN;
1391
1392         if (bus->state == BUS_OPENING)
1393                 flags |= POLLOUT;
1394         else if (bus->state == BUS_AUTHENTICATING) {
1395
1396                 if (bus_socket_auth_needs_write(bus))
1397                         flags |= POLLOUT;
1398
1399                 flags |= POLLIN;
1400
1401         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1402                 if (bus->rqueue_size <= 0)
1403                         flags |= POLLIN;
1404                 if (bus->wqueue_size > 0)
1405                         flags |= POLLOUT;
1406         }
1407
1408         return flags;
1409 }
1410
1411 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1412         struct reply_callback *c;
1413
1414         if (!bus)
1415                 return -EINVAL;
1416         if (!timeout_usec)
1417                 return -EINVAL;
1418         if (bus->state == BUS_UNSET)
1419                 return -ENOTCONN;
1420         if (bus->input_fd < 0)
1421                 return -ENOTCONN;
1422
1423         if (bus->state == BUS_AUTHENTICATING) {
1424                 *timeout_usec = bus->auth_timeout;
1425                 return 1;
1426         }
1427
1428         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1429                 *timeout_usec = (uint64_t) -1;
1430                 return 0;
1431         }
1432
1433         c = prioq_peek(bus->reply_callbacks_prioq);
1434         if (!c) {
1435                 *timeout_usec = (uint64_t) -1;
1436                 return 0;
1437         }
1438
1439         *timeout_usec = c->timeout;
1440         return 1;
1441 }
1442
1443 static int process_timeout(sd_bus *bus) {
1444         struct reply_callback *c;
1445         usec_t n;
1446         int r;
1447
1448         assert(bus);
1449
1450         c = prioq_peek(bus->reply_callbacks_prioq);
1451         if (!c)
1452                 return 0;
1453
1454         n = now(CLOCK_MONOTONIC);
1455         if (c->timeout > n)
1456                 return 0;
1457
1458         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1459         hashmap_remove(bus->reply_callbacks, &c->serial);
1460
1461         r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1462         free(c);
1463
1464         return r < 0 ? r : 1;
1465 }
1466
1467 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1468         assert(bus);
1469         assert(m);
1470
1471         if (bus->state != BUS_HELLO)
1472                 return 0;
1473
1474         /* Let's make sure the first message on the bus is the HELLO
1475          * reply. But note that we don't actually parse the message
1476          * here (we leave that to the usual handling), we just verify
1477          * we don't let any earlier msg through. */
1478
1479         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1480             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1481                 return -EIO;
1482
1483         if (m->reply_serial != bus->hello_serial)
1484                 return -EIO;
1485
1486         return 0;
1487 }
1488
1489 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1490         struct reply_callback *c;
1491         int r;
1492
1493         assert(bus);
1494         assert(m);
1495
1496         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1497             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1498                 return 0;
1499
1500         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1501         if (!c)
1502                 return 0;
1503
1504         if (c->timeout != 0)
1505                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1506
1507         r = c->callback(bus, 0, m, c->userdata);
1508         free(c);
1509
1510         return r;
1511 }
1512
1513 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1514         struct filter_callback *l;
1515         int r;
1516
1517         assert(bus);
1518         assert(m);
1519
1520         do {
1521                 bus->filter_callbacks_modified = false;
1522
1523                 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1524
1525                         if (bus->filter_callbacks_modified)
1526                                 break;
1527
1528                         /* Don't run this more than once per iteration */
1529                         if (l->last_iteration == bus->iteration_counter)
1530                                 continue;
1531
1532                         l->last_iteration = bus->iteration_counter;
1533
1534                         r = l->callback(bus, 0, m, l->userdata);
1535                         if (r != 0)
1536                                 return r;
1537
1538                 }
1539
1540         } while (bus->filter_callbacks_modified);
1541
1542         return 0;
1543 }
1544
1545 static int process_match(sd_bus *bus, sd_bus_message *m) {
1546         int r;
1547
1548         assert(bus);
1549         assert(m);
1550
1551         do {
1552                 bus->match_callbacks_modified = false;
1553
1554                 r = bus_match_run(bus, &bus->match_callbacks, 0, m);
1555                 if (r != 0)
1556                         return r;
1557
1558         } while (bus->match_callbacks_modified);
1559
1560         return 0;
1561 }
1562
1563 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1564         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1565         int r;
1566
1567         assert(bus);
1568         assert(m);
1569
1570         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1571                 return 0;
1572
1573         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1574                 return 0;
1575
1576         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1577                 return 1;
1578
1579         if (streq_ptr(m->member, "Ping"))
1580                 r = sd_bus_message_new_method_return(bus, m, &reply);
1581         else if (streq_ptr(m->member, "GetMachineId")) {
1582                 sd_id128_t id;
1583                 char sid[33];
1584
1585                 r = sd_id128_get_machine(&id);
1586                 if (r < 0)
1587                         return r;
1588
1589                 r = sd_bus_message_new_method_return(bus, m, &reply);
1590                 if (r < 0)
1591                         return r;
1592
1593                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1594         } else {
1595                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1596
1597                 sd_bus_error_set(&error,
1598                                  "org.freedesktop.DBus.Error.UnknownMethod",
1599                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1600
1601                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1602         }
1603
1604         if (r < 0)
1605                 return r;
1606
1607         r = sd_bus_send(bus, reply, NULL);
1608         if (r < 0)
1609                 return r;
1610
1611         return 1;
1612 }
1613
1614 static int process_object(sd_bus *bus, sd_bus_message *m) {
1615         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1616         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1617         struct object_callback *c;
1618         int r;
1619         bool found = false;
1620         size_t pl;
1621
1622         assert(bus);
1623         assert(m);
1624
1625         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1626                 return 0;
1627
1628         if (hashmap_isempty(bus->object_callbacks))
1629                 return 0;
1630
1631         pl = strlen(m->path);
1632
1633         do {
1634                 char p[pl+1];
1635
1636                 bus->object_callbacks_modified = false;
1637
1638                 c = hashmap_get(bus->object_callbacks, m->path);
1639                 if (c && c->last_iteration != bus->iteration_counter) {
1640
1641                         c->last_iteration = bus->iteration_counter;
1642
1643                         r = c->callback(bus, 0, m, c->userdata);
1644                         if (r != 0)
1645                                 return r;
1646
1647                         found = true;
1648                 }
1649
1650                 /* Look for fallback prefixes */
1651                 strcpy(p, m->path);
1652                 for (;;) {
1653                         char *e;
1654
1655                         if (bus->object_callbacks_modified)
1656                                 break;
1657
1658                         e = strrchr(p, '/');
1659                         if (e == p || !e)
1660                                 break;
1661
1662                         *e = 0;
1663
1664                         c = hashmap_get(bus->object_callbacks, p);
1665                         if (c && c->last_iteration != bus->iteration_counter && c->is_fallback) {
1666
1667                                 c->last_iteration = bus->iteration_counter;
1668
1669                                 r = c->callback(bus, 0, m, c->userdata);
1670                                 if (r != 0)
1671                                         return r;
1672
1673                                 found = true;
1674                         }
1675                 }
1676
1677         } while (bus->object_callbacks_modified);
1678
1679         /* We found some handlers but none wanted to take this, then
1680          * return this -- with one exception, we can handle
1681          * introspection minimally ourselves */
1682         if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1683                 return 0;
1684
1685         sd_bus_error_set(&error,
1686                          "org.freedesktop.DBus.Error.UnknownMethod",
1687                          "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1688
1689         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1690         if (r < 0)
1691                 return r;
1692
1693         r = sd_bus_send(bus, reply, NULL);
1694         if (r < 0)
1695                 return r;
1696
1697         return 1;
1698 }
1699
1700 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1701         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1702         _cleanup_free_ char *introspection = NULL;
1703         _cleanup_set_free_free_ Set *s = NULL;
1704         _cleanup_fclose_ FILE *f = NULL;
1705         struct object_callback *c;
1706         Iterator i;
1707         size_t size = 0;
1708         char *node;
1709         int r;
1710
1711         assert(bus);
1712         assert(m);
1713
1714         if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1715                 return 0;
1716
1717         if (!m->path)
1718                 return 0;
1719
1720         s = set_new(string_hash_func, string_compare_func);
1721         if (!s)
1722                 return -ENOMEM;
1723
1724         HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1725                 const char *e;
1726                 char *a, *p;
1727
1728                 if (streq(c->path, "/"))
1729                         continue;
1730
1731                 if (streq(m->path, "/"))
1732                         e = c->path;
1733                 else {
1734                         e = startswith(c->path, m->path);
1735                         if (!e || *e != '/')
1736                                 continue;
1737                 }
1738
1739                 a = strdup(e+1);
1740                 if (!a)
1741                         return -ENOMEM;
1742
1743                 p = strchr(a, '/');
1744                 if (p)
1745                         *p = 0;
1746
1747                 r = set_put(s, a);
1748                 if (r < 0) {
1749                         free(a);
1750
1751                         if (r != -EEXIST)
1752                                 return r;
1753                 }
1754         }
1755
1756         f = open_memstream(&introspection, &size);
1757         if (!f)
1758                 return -ENOMEM;
1759
1760         fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1761         fputs("<node>\n", f);
1762         fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1763         fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1764
1765         while ((node = set_steal_first(s))) {
1766                 fprintf(f, " <node name=\"%s\"/>\n", node);
1767                 free(node);
1768         }
1769
1770         fputs("</node>\n", f);
1771
1772         fflush(f);
1773
1774         if (ferror(f))
1775                 return -ENOMEM;
1776
1777         r = sd_bus_message_new_method_return(bus, m, &reply);
1778         if (r < 0)
1779                 return r;
1780
1781         r = sd_bus_message_append(reply, "s", introspection);
1782         if (r < 0)
1783                 return r;
1784
1785         r = sd_bus_send(bus, reply, NULL);
1786         if (r < 0)
1787                 return r;
1788
1789         return 1;
1790 }
1791
1792 static int process_message(sd_bus *bus, sd_bus_message *m) {
1793         int r;
1794
1795         assert(bus);
1796         assert(m);
1797
1798         bus->iteration_counter++;
1799
1800         r = process_hello(bus, m);
1801         if (r != 0)
1802                 return r;
1803
1804         r = process_reply(bus, m);
1805         if (r != 0)
1806                 return r;
1807
1808         r = process_filter(bus, m);
1809         if (r != 0)
1810                 return r;
1811
1812         r = process_match(bus, m);
1813         if (r != 0)
1814                 return r;
1815
1816         r = process_builtin(bus, m);
1817         if (r != 0)
1818                 return r;
1819
1820         r = process_object(bus, m);
1821         if (r != 0)
1822                 return r;
1823
1824         return process_introspect(bus, m);
1825 }
1826
1827 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1828         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1829         int r;
1830
1831         assert(bus);
1832         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1833
1834         r = process_timeout(bus);
1835         if (r != 0)
1836                 goto null_message;
1837
1838         r = dispatch_wqueue(bus);
1839         if (r != 0)
1840                 goto null_message;
1841
1842         r = dispatch_rqueue(bus, &m);
1843         if (r < 0)
1844                 return r;
1845         if (!m)
1846                 goto null_message;
1847
1848         r = process_message(bus, m);
1849         if (r != 0)
1850                 goto null_message;
1851
1852         if (ret) {
1853                 *ret = m;
1854                 m = NULL;
1855                 return 1;
1856         }
1857
1858         if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1859                 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1860                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1861
1862                 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1863
1864                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1865                 if (r < 0)
1866                         return r;
1867
1868                 r = sd_bus_send(bus, reply, NULL);
1869                 if (r < 0)
1870                         return r;
1871         }
1872
1873         return 1;
1874
1875 null_message:
1876         if (r >= 0 && ret)
1877                 *ret = NULL;
1878
1879         return r;
1880 }
1881
1882 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1883         int r;
1884
1885         /* Returns 0 when we didn't do anything. This should cause the
1886          * caller to invoke sd_bus_wait() before returning the next
1887          * time. Returns > 0 when we did something, which possibly
1888          * means *ret is filled in with an unprocessed message. */
1889
1890         if (!bus)
1891                 return -EINVAL;
1892         if (bus->input_fd < 0)
1893                 return -ENOTCONN;
1894
1895         /* We don't allow recursively invoking sd_bus_process(). */
1896         if (bus->processing)
1897                 return -EBUSY;
1898
1899         switch (bus->state) {
1900
1901         case BUS_UNSET:
1902                 return -ENOTCONN;
1903
1904         case BUS_OPENING:
1905                 r = bus_socket_process_opening(bus);
1906                 if (r < 0)
1907                         return r;
1908                 if (ret)
1909                         *ret = NULL;
1910                 return r;
1911
1912         case BUS_AUTHENTICATING:
1913
1914                 r = bus_socket_process_authenticating(bus);
1915                 if (r < 0)
1916                         return r;
1917                 if (ret)
1918                         *ret = NULL;
1919                 return r;
1920
1921         case BUS_RUNNING:
1922         case BUS_HELLO:
1923
1924                 bus->processing = true;
1925                 r = process_running(bus, ret);
1926                 bus->processing = false;
1927
1928                 return r;
1929         }
1930
1931         assert_not_reached("Unknown state");
1932 }
1933
1934 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1935         struct pollfd p[2];
1936         int r, e, n;
1937         struct timespec ts;
1938         usec_t until, m;
1939
1940         assert(bus);
1941
1942         if (bus->input_fd < 0)
1943                 return -ENOTCONN;
1944
1945         e = sd_bus_get_events(bus);
1946         if (e < 0)
1947                 return e;
1948
1949         if (need_more)
1950                 e |= POLLIN;
1951
1952         r = sd_bus_get_timeout(bus, &until);
1953         if (r < 0)
1954                 return r;
1955         if (r == 0)
1956                 m = (uint64_t) -1;
1957         else {
1958                 usec_t nw;
1959                 nw = now(CLOCK_MONOTONIC);
1960                 m = until > nw ? until - nw : 0;
1961         }
1962
1963         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1964                 m = timeout_usec;
1965
1966         zero(p);
1967         p[0].fd = bus->input_fd;
1968
1969         if (bus->output_fd == bus->input_fd) {
1970                 p[0].events = e;
1971                 n = 1;
1972         } else {
1973                 p[0].events = e & POLLIN;
1974                 p[1].fd = bus->output_fd;
1975                 p[1].events = e & POLLOUT;
1976                 n = 2;
1977         }
1978
1979         r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1980         if (r < 0)
1981                 return -errno;
1982
1983         return r > 0 ? 1 : 0;
1984 }
1985
1986 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1987
1988         if (!bus)
1989                 return -EINVAL;
1990         if (bus->state == BUS_UNSET)
1991                 return -ENOTCONN;
1992         if (bus->input_fd < 0)
1993                 return -ENOTCONN;
1994         if (bus->rqueue_size > 0)
1995                 return 0;
1996
1997         return bus_poll(bus, false, timeout_usec);
1998 }
1999
2000 int sd_bus_flush(sd_bus *bus) {
2001         int r;
2002
2003         if (!bus)
2004                 return -EINVAL;
2005         if (bus->state == BUS_UNSET)
2006                 return -ENOTCONN;
2007         if (bus->output_fd < 0)
2008                 return -ENOTCONN;
2009
2010         r = bus_ensure_running(bus);
2011         if (r < 0)
2012                 return r;
2013
2014         if (bus->wqueue_size <= 0)
2015                 return 0;
2016
2017         for (;;) {
2018                 r = dispatch_wqueue(bus);
2019                 if (r < 0)
2020                         return r;
2021
2022                 if (bus->wqueue_size <= 0)
2023                         return 0;
2024
2025                 r = bus_poll(bus, false, (uint64_t) -1);
2026                 if (r < 0)
2027                         return r;
2028         }
2029 }
2030
2031 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2032         struct filter_callback *f;
2033
2034         if (!bus)
2035                 return -EINVAL;
2036         if (!callback)
2037                 return -EINVAL;
2038
2039         f = new(struct filter_callback, 1);
2040         if (!f)
2041                 return -ENOMEM;
2042         f->callback = callback;
2043         f->userdata = userdata;
2044
2045         bus->filter_callbacks_modified = true;
2046         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2047         return 0;
2048 }
2049
2050 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2051         struct filter_callback *f;
2052
2053         if (!bus)
2054                 return -EINVAL;
2055         if (!callback)
2056                 return -EINVAL;
2057
2058         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2059                 if (f->callback == callback && f->userdata == userdata) {
2060                         bus->filter_callbacks_modified = true;
2061                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2062                         free(f);
2063                         return 1;
2064                 }
2065         }
2066
2067         return 0;
2068 }
2069
2070 static int bus_add_object(
2071                 sd_bus *bus,
2072                 bool fallback,
2073                 const char *path,
2074                 sd_bus_message_handler_t callback,
2075                 void *userdata) {
2076
2077         struct object_callback *c;
2078         int r;
2079
2080         if (!bus)
2081                 return -EINVAL;
2082         if (!path)
2083                 return -EINVAL;
2084         if (!callback)
2085                 return -EINVAL;
2086
2087         r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2088         if (r < 0)
2089                 return r;
2090
2091         c = new(struct object_callback, 1);
2092         if (!c)
2093                 return -ENOMEM;
2094
2095         c->path = strdup(path);
2096         if (!c->path) {
2097                 free(c);
2098                 return -ENOMEM;
2099         }
2100
2101         c->callback = callback;
2102         c->userdata = userdata;
2103         c->is_fallback = fallback;
2104
2105         bus->object_callbacks_modified = true;
2106         r = hashmap_put(bus->object_callbacks, c->path, c);
2107         if (r < 0) {
2108                 free(c->path);
2109                 free(c);
2110                 return r;
2111         }
2112
2113         return 0;
2114 }
2115
2116 static int bus_remove_object(
2117                 sd_bus *bus,
2118                 bool fallback,
2119                 const char *path,
2120                 sd_bus_message_handler_t callback,
2121                 void *userdata) {
2122
2123         struct object_callback *c;
2124
2125         if (!bus)
2126                 return -EINVAL;
2127         if (!path)
2128                 return -EINVAL;
2129         if (!callback)
2130                 return -EINVAL;
2131
2132         c = hashmap_get(bus->object_callbacks, path);
2133         if (!c)
2134                 return 0;
2135
2136         if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2137                 return 0;
2138
2139         bus->object_callbacks_modified = true;
2140         assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2141
2142         free(c->path);
2143         free(c);
2144
2145         return 1;
2146 }
2147
2148 int sd_bus_add_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2149         return bus_add_object(bus, false, path, callback, userdata);
2150 }
2151
2152 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2153         return bus_remove_object(bus, false, path, callback, userdata);
2154 }
2155
2156 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2157         return bus_add_object(bus, true, prefix, callback, userdata);
2158 }
2159
2160 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2161         return bus_remove_object(bus, true, prefix, callback, userdata);
2162 }
2163
2164 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2165         int r = 0;
2166
2167         if (!bus)
2168                 return -EINVAL;
2169         if (!match)
2170                 return -EINVAL;
2171
2172         if (bus->bus_client) {
2173                 r = bus_add_match_internal(bus, match);
2174                 if (r < 0)
2175                         return r;
2176         }
2177
2178         if (callback) {
2179                 bus->match_callbacks_modified = true;
2180                 r = bus_match_add(&bus->match_callbacks, match, callback, userdata, NULL);
2181                 if (r < 0) {
2182
2183                         if (bus->bus_client)
2184                                 bus_remove_match_internal(bus, match);
2185                 }
2186         }
2187
2188         return r;
2189 }
2190
2191 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2192         int r = 0, q = 0;
2193
2194         if (!bus)
2195                 return -EINVAL;
2196         if (!match)
2197                 return -EINVAL;
2198
2199         if (bus->bus_client)
2200                 r = bus_remove_match_internal(bus, match);
2201
2202         if (callback) {
2203                 bus->match_callbacks_modified = true;
2204                 q = bus_match_remove(&bus->match_callbacks, match, callback, userdata);
2205         }
2206
2207         if (r < 0)
2208                 return r;
2209         return q;
2210 }
2211
2212 int sd_bus_emit_signal(
2213                 sd_bus *bus,
2214                 const char *path,
2215                 const char *interface,
2216                 const char *member,
2217                 const char *types, ...) {
2218
2219         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2220         va_list ap;
2221         int r;
2222
2223         if (!bus)
2224                 return -EINVAL;
2225
2226         r = sd_bus_message_new_signal(bus, path, interface, member, &m);
2227         if (r < 0)
2228                 return r;
2229
2230         va_start(ap, types);
2231         r = bus_message_append_ap(m, types, ap);
2232         va_end(ap);
2233         if (r < 0)
2234                 return r;
2235
2236         return sd_bus_send(bus, m, NULL);
2237 }
2238
2239 int sd_bus_call_method(
2240                 sd_bus *bus,
2241                 const char *destination,
2242                 const char *path,
2243                 const char *interface,
2244                 const char *member,
2245                 sd_bus_error *error,
2246                 sd_bus_message **reply,
2247                 const char *types, ...) {
2248
2249         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2250         va_list ap;
2251         int r;
2252
2253         if (!bus)
2254                 return -EINVAL;
2255
2256         r = sd_bus_message_new_method_call(bus, destination, path, interface, member, &m);
2257         if (r < 0)
2258                 return r;
2259
2260         va_start(ap, types);
2261         r = bus_message_append_ap(m, types, ap);
2262         va_end(ap);
2263         if (r < 0)
2264                 return r;
2265
2266         return sd_bus_send_with_reply_and_block(bus, m, 0, error, reply);
2267 }