chiark / gitweb /
0964649e39d675c1df423591aca7913b4e0d66ed
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32 #include "strv.h"
33 #include "set.h"
34
35 #include "sd-bus.h"
36 #include "bus-internal.h"
37 #include "bus-message.h"
38 #include "bus-type.h"
39 #include "bus-socket.h"
40 #include "bus-control.h"
41
42 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
43
44 static void bus_free(sd_bus *b) {
45         struct filter_callback *f;
46         struct object_callback *c;
47         unsigned i;
48
49         assert(b);
50
51         sd_bus_close(b);
52
53         free(b->rbuffer);
54         free(b->unique_name);
55         free(b->auth_buffer);
56         free(b->address);
57
58         free(b->exec_path);
59         strv_free(b->exec_argv);
60
61         close_many(b->fds, b->n_fds);
62         free(b->fds);
63
64         for (i = 0; i < b->rqueue_size; i++)
65                 sd_bus_message_unref(b->rqueue[i]);
66         free(b->rqueue);
67
68         for (i = 0; i < b->wqueue_size; i++)
69                 sd_bus_message_unref(b->wqueue[i]);
70         free(b->wqueue);
71
72         hashmap_free_free(b->reply_callbacks);
73         prioq_free(b->reply_callbacks_prioq);
74
75         while ((f = b->filter_callbacks)) {
76                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
77                 free(f);
78         }
79
80         while ((c = hashmap_steal_first(b->object_callbacks))) {
81                 free(c->path);
82                 free(c);
83         }
84
85         hashmap_free(b->object_callbacks);
86
87         bus_match_free(&b->match_callbacks);
88
89         free(b);
90 }
91
92 int sd_bus_new(sd_bus **ret) {
93         sd_bus *r;
94
95         if (!ret)
96                 return -EINVAL;
97
98         r = new0(sd_bus, 1);
99         if (!r)
100                 return -ENOMEM;
101
102         r->n_ref = 1;
103         r->input_fd = r->output_fd = -1;
104         r->message_version = 1;
105         r->negotiate_fds = true;
106
107         /* We guarantee that wqueue always has space for at least one
108          * entry */
109         r->wqueue = new(sd_bus_message*, 1);
110         if (!r->wqueue) {
111                 free(r);
112                 return -ENOMEM;
113         }
114
115         *ret = r;
116         return 0;
117 }
118
119 int sd_bus_set_address(sd_bus *bus, const char *address) {
120         char *a;
121
122         if (!bus)
123                 return -EINVAL;
124         if (bus->state != BUS_UNSET)
125                 return -EPERM;
126         if (!address)
127                 return -EINVAL;
128
129         a = strdup(address);
130         if (!a)
131                 return -ENOMEM;
132
133         free(bus->address);
134         bus->address = a;
135
136         return 0;
137 }
138
139 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
140         if (!bus)
141                 return -EINVAL;
142         if (bus->state != BUS_UNSET)
143                 return -EPERM;
144         if (input_fd < 0)
145                 return -EINVAL;
146         if (output_fd < 0)
147                 return -EINVAL;
148
149         bus->input_fd = input_fd;
150         bus->output_fd = output_fd;
151         return 0;
152 }
153
154 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
155         char *p, **a;
156
157         if (!bus)
158                 return -EINVAL;
159         if (bus->state != BUS_UNSET)
160                 return -EPERM;
161         if (!path)
162                 return -EINVAL;
163         if (strv_isempty(argv))
164                 return -EINVAL;
165
166         p = strdup(path);
167         if (!p)
168                 return -ENOMEM;
169
170         a = strv_copy(argv);
171         if (!a) {
172                 free(p);
173                 return -ENOMEM;
174         }
175
176         free(bus->exec_path);
177         strv_free(bus->exec_argv);
178
179         bus->exec_path = p;
180         bus->exec_argv = a;
181
182         return 0;
183 }
184
185 int sd_bus_set_bus_client(sd_bus *bus, int b) {
186         if (!bus)
187                 return -EINVAL;
188         if (bus->state != BUS_UNSET)
189                 return -EPERM;
190
191         bus->bus_client = !!b;
192         return 0;
193 }
194
195 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
196         if (!bus)
197                 return -EINVAL;
198         if (bus->state != BUS_UNSET)
199                 return -EPERM;
200
201         bus->negotiate_fds = !!b;
202         return 0;
203 }
204
205 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
206         if (!bus)
207                 return -EINVAL;
208         if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
209                 return -EINVAL;
210         if (bus->state != BUS_UNSET)
211                 return -EPERM;
212
213         bus->is_server = !!b;
214         bus->server_id = server_id;
215         return 0;
216 }
217
218 int sd_bus_set_anonymous(sd_bus *bus, int b) {
219         if (!bus)
220                 return -EINVAL;
221         if (bus->state != BUS_UNSET)
222                 return -EPERM;
223
224         bus->anonymous_auth = !!b;
225         return 0;
226 }
227
228 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
229         const char *s;
230         int r;
231
232         assert(bus);
233         assert(bus->state == BUS_HELLO);
234
235         if (error != 0)
236                 return -error;
237
238         assert(reply);
239
240         r = sd_bus_message_read(reply, "s", &s);
241         if (r < 0)
242                 return r;
243
244         if (!service_name_is_valid(s) || s[0] != ':')
245                 return -EBADMSG;
246
247         bus->unique_name = strdup(s);
248         if (!bus->unique_name)
249                 return -ENOMEM;
250
251         bus->state = BUS_RUNNING;
252
253         return 1;
254 }
255
256 static int bus_send_hello(sd_bus *bus) {
257         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
258         int r;
259
260         assert(bus);
261
262         if (!bus->bus_client)
263                 return 0;
264
265         r = sd_bus_message_new_method_call(
266                         bus,
267                         "org.freedesktop.DBus",
268                         "/",
269                         "org.freedesktop.DBus",
270                         "Hello",
271                         &m);
272         if (r < 0)
273                 return r;
274
275         return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
276 }
277
278 int bus_start_running(sd_bus *bus) {
279         assert(bus);
280
281         if (bus->bus_client) {
282                 bus->state = BUS_HELLO;
283                 return 1;
284         }
285
286         bus->state = BUS_RUNNING;
287         return 1;
288 }
289
290 static int parse_address_key(const char **p, const char *key, char **value) {
291         size_t l, n = 0;
292         const char *a;
293         char *r = NULL;
294
295         assert(p);
296         assert(*p);
297         assert(value);
298
299         if (key) {
300                 l = strlen(key);
301                 if (strncmp(*p, key, l) != 0)
302                         return 0;
303
304                 if ((*p)[l] != '=')
305                         return 0;
306
307                 if (*value)
308                         return -EINVAL;
309
310                 a = *p + l + 1;
311         } else
312                 a = *p;
313
314         while (*a != ';' && *a != ',' && *a != 0) {
315                 char c, *t;
316
317                 if (*a == '%') {
318                         int x, y;
319
320                         x = unhexchar(a[1]);
321                         if (x < 0) {
322                                 free(r);
323                                 return x;
324                         }
325
326                         y = unhexchar(a[2]);
327                         if (y < 0) {
328                                 free(r);
329                                 return y;
330                         }
331
332                         c = (char) ((x << 4) | y);
333                         a += 3;
334                 } else {
335                         c = *a;
336                         a++;
337                 }
338
339                 t = realloc(r, n + 2);
340                 if (!t) {
341                         free(r);
342                         return -ENOMEM;
343                 }
344
345                 r = t;
346                 r[n++] = c;
347         }
348
349         if (!r) {
350                 r = strdup("");
351                 if (!r)
352                         return -ENOMEM;
353         } else
354                 r[n] = 0;
355
356         if (*a == ',')
357                 a++;
358
359         *p = a;
360
361         free(*value);
362         *value = r;
363
364         return 1;
365 }
366
367 static void skip_address_key(const char **p) {
368         assert(p);
369         assert(*p);
370
371         *p += strcspn(*p, ",");
372
373         if (**p == ',')
374                 (*p) ++;
375 }
376
377 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
378         _cleanup_free_ char *path = NULL, *abstract = NULL;
379         size_t l;
380         int r;
381
382         assert(b);
383         assert(p);
384         assert(*p);
385         assert(guid);
386
387         while (**p != 0 && **p != ';') {
388                 r = parse_address_key(p, "guid", guid);
389                 if (r < 0)
390                         return r;
391                 else if (r > 0)
392                         continue;
393
394                 r = parse_address_key(p, "path", &path);
395                 if (r < 0)
396                         return r;
397                 else if (r > 0)
398                         continue;
399
400                 r = parse_address_key(p, "abstract", &abstract);
401                 if (r < 0)
402                         return r;
403                 else if (r > 0)
404                         continue;
405
406                 skip_address_key(p);
407         }
408
409         if (!path && !abstract)
410                 return -EINVAL;
411
412         if (path && abstract)
413                 return -EINVAL;
414
415         if (path) {
416                 l = strlen(path);
417                 if (l > sizeof(b->sockaddr.un.sun_path))
418                         return -E2BIG;
419
420                 b->sockaddr.un.sun_family = AF_UNIX;
421                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
422                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
423         } else if (abstract) {
424                 l = strlen(abstract);
425                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
426                         return -E2BIG;
427
428                 b->sockaddr.un.sun_family = AF_UNIX;
429                 b->sockaddr.un.sun_path[0] = 0;
430                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
431                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
432         }
433
434         return 0;
435 }
436
437 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
438         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
439         struct addrinfo hints, *result;
440         int r;
441
442         assert(b);
443         assert(p);
444         assert(*p);
445         assert(guid);
446
447         while (**p != 0 && **p != ';') {
448                 r = parse_address_key(p, "guid", guid);
449                 if (r < 0)
450                         return r;
451                 else if (r > 0)
452                         continue;
453
454                 r = parse_address_key(p, "host", &host);
455                 if (r < 0)
456                         return r;
457                 else if (r > 0)
458                         continue;
459
460                 r = parse_address_key(p, "port", &port);
461                 if (r < 0)
462                         return r;
463                 else if (r > 0)
464                         continue;
465
466                 r = parse_address_key(p, "family", &family);
467                 if (r < 0)
468                         return r;
469                 else if (r > 0)
470                         continue;
471
472                 skip_address_key(p);
473         }
474
475         if (!host || !port)
476                 return -EINVAL;
477
478         zero(hints);
479         hints.ai_socktype = SOCK_STREAM;
480         hints.ai_flags = AI_ADDRCONFIG;
481
482         if (family) {
483                 if (streq(family, "ipv4"))
484                         hints.ai_family = AF_INET;
485                 else if (streq(family, "ipv6"))
486                         hints.ai_family = AF_INET6;
487                 else
488                         return -EINVAL;
489         }
490
491         r = getaddrinfo(host, port, &hints, &result);
492         if (r == EAI_SYSTEM)
493                 return -errno;
494         else if (r != 0)
495                 return -EADDRNOTAVAIL;
496
497         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
498         b->sockaddr_size = result->ai_addrlen;
499
500         freeaddrinfo(result);
501
502         return 0;
503 }
504
505 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
506         char *path = NULL;
507         unsigned n_argv = 0, j;
508         char **argv = NULL;
509         int r;
510
511         assert(b);
512         assert(p);
513         assert(*p);
514         assert(guid);
515
516         while (**p != 0 && **p != ';') {
517                 r = parse_address_key(p, "guid", guid);
518                 if (r < 0)
519                         goto fail;
520                 else if (r > 0)
521                         continue;
522
523                 r = parse_address_key(p, "path", &path);
524                 if (r < 0)
525                         goto fail;
526                 else if (r > 0)
527                         continue;
528
529                 if (startswith(*p, "argv")) {
530                         unsigned ul;
531
532                         errno = 0;
533                         ul = strtoul(*p + 4, (char**) p, 10);
534                         if (errno > 0 || **p != '=' || ul > 256) {
535                                 r = -EINVAL;
536                                 goto fail;
537                         }
538
539                         (*p) ++;
540
541                         if (ul >= n_argv) {
542                                 char **x;
543
544                                 x = realloc(argv, sizeof(char*) * (ul + 2));
545                                 if (!x) {
546                                         r = -ENOMEM;
547                                         goto fail;
548                                 }
549
550                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
551
552                                 argv = x;
553                                 n_argv = ul + 1;
554                         }
555
556                         r = parse_address_key(p, NULL, argv + ul);
557                         if (r < 0)
558                                 goto fail;
559
560                         continue;
561                 }
562
563                 skip_address_key(p);
564         }
565
566         if (!path) {
567                 r = -EINVAL;
568                 goto fail;
569         }
570
571         /* Make sure there are no holes in the array, with the
572          * exception of argv[0] */
573         for (j = 1; j < n_argv; j++)
574                 if (!argv[j]) {
575                         r = -EINVAL;
576                         goto fail;
577                 }
578
579         if (argv && argv[0] == NULL) {
580                 argv[0] = strdup(path);
581                 if (!argv[0]) {
582                         r = -ENOMEM;
583                         goto fail;
584                 }
585         }
586
587         b->exec_path = path;
588         b->exec_argv = argv;
589         return 0;
590
591 fail:
592         for (j = 0; j < n_argv; j++)
593                 free(argv[j]);
594
595         free(argv);
596         free(path);
597         return r;
598 }
599
600 static void bus_reset_parsed_address(sd_bus *b) {
601         assert(b);
602
603         zero(b->sockaddr);
604         b->sockaddr_size = 0;
605         strv_free(b->exec_argv);
606         free(b->exec_path);
607         b->exec_path = NULL;
608         b->exec_argv = NULL;
609         b->server_id = SD_ID128_NULL;
610 }
611
612 static int bus_parse_next_address(sd_bus *b) {
613         _cleanup_free_ char *guid = NULL;
614         const char *a;
615         int r;
616
617         assert(b);
618
619         if (!b->address)
620                 return 0;
621         if (b->address[b->address_index] == 0)
622                 return 0;
623
624         bus_reset_parsed_address(b);
625
626         a = b->address + b->address_index;
627
628         while (*a != 0) {
629
630                 if (*a == ';') {
631                         a++;
632                         continue;
633                 }
634
635                 if (startswith(a, "unix:")) {
636                         a += 5;
637
638                         r = parse_unix_address(b, &a, &guid);
639                         if (r < 0)
640                                 return r;
641                         break;
642
643                 } else if (startswith(a, "tcp:")) {
644
645                         a += 4;
646                         r = parse_tcp_address(b, &a, &guid);
647                         if (r < 0)
648                                 return r;
649
650                         break;
651
652                 } else if (startswith(a, "unixexec:")) {
653
654                         a += 9;
655                         r = parse_exec_address(b, &a, &guid);
656                         if (r < 0)
657                                 return r;
658
659                         break;
660
661                 }
662
663                 a = strchr(a, ';');
664                 if (!a)
665                         return 0;
666         }
667
668         if (guid) {
669                 r = sd_id128_from_string(guid, &b->server_id);
670                 if (r < 0)
671                         return r;
672         }
673
674         b->address_index = a - b->address;
675         return 1;
676 }
677
678 static int bus_start_address(sd_bus *b) {
679         int r;
680
681         assert(b);
682
683         for (;;) {
684                 sd_bus_close(b);
685
686                 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
687
688                         r = bus_socket_connect(b);
689                         if (r >= 0)
690                                 return r;
691
692                         b->last_connect_error = -r;
693
694                 } else if (b->exec_path) {
695
696                         r = bus_socket_exec(b);
697                         if (r >= 0)
698                                 return r;
699
700                         b->last_connect_error = -r;
701                 }
702
703                 r = bus_parse_next_address(b);
704                 if (r < 0)
705                         return r;
706                 if (r == 0)
707                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
708         }
709 }
710
711 int bus_next_address(sd_bus *b) {
712         assert(b);
713
714         bus_reset_parsed_address(b);
715         return bus_start_address(b);
716 }
717
718 static int bus_start_fd(sd_bus *b) {
719         int r;
720
721         assert(b);
722         assert(b->input_fd >= 0);
723         assert(b->output_fd >= 0);
724
725         r = fd_nonblock(b->input_fd, true);
726         if (r < 0)
727                 return r;
728
729         r = fd_cloexec(b->input_fd, true);
730         if (r < 0)
731                 return r;
732
733         if (b->input_fd != b->output_fd) {
734                 r = fd_nonblock(b->output_fd, true);
735                 if (r < 0)
736                         return r;
737
738                 r = fd_cloexec(b->output_fd, true);
739                 if (r < 0)
740                         return r;
741         }
742
743         return bus_socket_take_fd(b);
744 }
745
746 int sd_bus_start(sd_bus *bus) {
747         int r;
748
749         if (!bus)
750                 return -EINVAL;
751         if (bus->state != BUS_UNSET)
752                 return -EPERM;
753
754         bus->state = BUS_OPENING;
755
756         if (bus->is_server && bus->bus_client)
757                 return -EINVAL;
758
759         if (bus->input_fd >= 0)
760                 r = bus_start_fd(bus);
761         else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path)
762                 r = bus_start_address(bus);
763         else
764                 return -EINVAL;
765
766         if (r < 0)
767                 return r;
768
769         return bus_send_hello(bus);
770 }
771
772 int sd_bus_open_system(sd_bus **ret) {
773         const char *e;
774         sd_bus *b;
775         int r;
776
777         if (!ret)
778                 return -EINVAL;
779
780         r = sd_bus_new(&b);
781         if (r < 0)
782                 return r;
783
784         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
785         if (e) {
786                 r = sd_bus_set_address(b, e);
787                 if (r < 0)
788                         goto fail;
789         } else {
790                 b->sockaddr.un.sun_family = AF_UNIX;
791                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
792                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
793         }
794
795         b->bus_client = true;
796
797         r = sd_bus_start(b);
798         if (r < 0)
799                 goto fail;
800
801         *ret = b;
802         return 0;
803
804 fail:
805         bus_free(b);
806         return r;
807 }
808
809 int sd_bus_open_user(sd_bus **ret) {
810         const char *e;
811         sd_bus *b;
812         size_t l;
813         int r;
814
815         if (!ret)
816                 return -EINVAL;
817
818         r = sd_bus_new(&b);
819         if (r < 0)
820                 return r;
821
822         e = getenv("DBUS_SESSION_BUS_ADDRESS");
823         if (e) {
824                 r = sd_bus_set_address(b, e);
825                 if (r < 0)
826                         goto fail;
827         } else {
828                 e = getenv("XDG_RUNTIME_DIR");
829                 if (!e) {
830                         r = -ENOENT;
831                         goto fail;
832                 }
833
834                 l = strlen(e);
835                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
836                         r = -E2BIG;
837                         goto fail;
838                 }
839
840                 b->sockaddr.un.sun_family = AF_UNIX;
841                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
842                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
843         }
844
845         b->bus_client = true;
846
847         r = sd_bus_start(b);
848         if (r < 0)
849                 goto fail;
850
851         *ret = b;
852         return 0;
853
854 fail:
855         bus_free(b);
856         return r;
857 }
858
859 void sd_bus_close(sd_bus *bus) {
860         if (!bus)
861                 return;
862
863         if (bus->input_fd >= 0)
864                 close_nointr_nofail(bus->input_fd);
865         if (bus->output_fd >= 0 && bus->output_fd != bus->input_fd)
866                 close_nointr_nofail(bus->output_fd);
867
868         bus->input_fd = bus->output_fd = -1;
869 }
870
871 sd_bus *sd_bus_ref(sd_bus *bus) {
872         if (!bus)
873                 return NULL;
874
875         assert(bus->n_ref > 0);
876
877         bus->n_ref++;
878         return bus;
879 }
880
881 sd_bus *sd_bus_unref(sd_bus *bus) {
882         if (!bus)
883                 return NULL;
884
885         assert(bus->n_ref > 0);
886         bus->n_ref--;
887
888         if (bus->n_ref <= 0)
889                 bus_free(bus);
890
891         return NULL;
892 }
893
894 int sd_bus_is_open(sd_bus *bus) {
895         if (!bus)
896                 return -EINVAL;
897
898         return bus->state != BUS_UNSET && bus->input_fd >= 0;
899 }
900
901 int sd_bus_can_send(sd_bus *bus, char type) {
902         int r;
903
904         if (!bus)
905                 return -EINVAL;
906         if (bus->output_fd < 0)
907                 return -ENOTCONN;
908
909         if (type == SD_BUS_TYPE_UNIX_FD) {
910                 if (!bus->negotiate_fds)
911                         return 0;
912
913                 r = bus_ensure_running(bus);
914                 if (r < 0)
915                         return r;
916
917                 return bus->can_fds;
918         }
919
920         return bus_type_is_valid(type);
921 }
922
923 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
924         int r;
925
926         if (!bus)
927                 return -EINVAL;
928         if (!server_id)
929                 return -EINVAL;
930
931         r = bus_ensure_running(bus);
932         if (r < 0)
933                 return r;
934
935         *server_id = bus->server_id;
936         return 0;
937 }
938
939 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
940         assert(m);
941
942         if (m->header->version > b->message_version)
943                 return -EPERM;
944
945         if (m->sealed)
946                 return 0;
947
948         return bus_message_seal(m, ++b->serial);
949 }
950
951 static int dispatch_wqueue(sd_bus *bus) {
952         int r, ret = 0;
953
954         assert(bus);
955         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
956
957         if (bus->output_fd < 0)
958                 return -ENOTCONN;
959
960         while (bus->wqueue_size > 0) {
961
962                 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
963                 if (r < 0) {
964                         sd_bus_close(bus);
965                         return r;
966                 } else if (r == 0)
967                         /* Didn't do anything this time */
968                         return ret;
969                 else if (bus->windex >= bus->wqueue[0]->size) {
970                         /* Fully written. Let's drop the entry from
971                          * the queue.
972                          *
973                          * This isn't particularly optimized, but
974                          * well, this is supposed to be our worst-case
975                          * buffer only, and the socket buffer is
976                          * supposed to be our primary buffer, and if
977                          * it got full, then all bets are off
978                          * anyway. */
979
980                         sd_bus_message_unref(bus->wqueue[0]);
981                         bus->wqueue_size --;
982                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
983                         bus->windex = 0;
984
985                         ret = 1;
986                 }
987         }
988
989         return ret;
990 }
991
992 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
993         sd_bus_message *z = NULL;
994         int r, ret = 0;
995
996         assert(bus);
997         assert(m);
998         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
999
1000         if (bus->input_fd < 0)
1001                 return -ENOTCONN;
1002
1003         if (bus->rqueue_size > 0) {
1004                 /* Dispatch a queued message */
1005
1006                 *m = bus->rqueue[0];
1007                 bus->rqueue_size --;
1008                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1009                 return 1;
1010         }
1011
1012         /* Try to read a new message */
1013         do {
1014                 r = bus_socket_read_message(bus, &z);
1015                 if (r < 0) {
1016                         sd_bus_close(bus);
1017                         return r;
1018                 }
1019                 if (r == 0)
1020                         return ret;
1021
1022                 r = 1;
1023         } while (!z);
1024
1025         *m = z;
1026         return 1;
1027 }
1028
1029 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1030         int r;
1031
1032         if (!bus)
1033                 return -EINVAL;
1034         if (bus->state == BUS_UNSET)
1035                 return -ENOTCONN;
1036         if (bus->output_fd < 0)
1037                 return -ENOTCONN;
1038         if (!m)
1039                 return -EINVAL;
1040
1041         if (m->n_fds > 0) {
1042                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1043                 if (r < 0)
1044                         return r;
1045                 if (r == 0)
1046                         return -ENOTSUP;
1047         }
1048
1049         /* If the serial number isn't kept, then we know that no reply
1050          * is expected */
1051         if (!serial && !m->sealed)
1052                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1053
1054         r = bus_seal_message(bus, m);
1055         if (r < 0)
1056                 return r;
1057
1058         /* If this is a reply and no reply was requested, then let's
1059          * suppress this, if we can */
1060         if (m->dont_send && !serial)
1061                 return 0;
1062
1063         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1064                 size_t idx = 0;
1065
1066                 r = bus_socket_write_message(bus, m, &idx);
1067                 if (r < 0) {
1068                         sd_bus_close(bus);
1069                         return r;
1070                 } else if (idx < m->size)  {
1071                         /* Wasn't fully written. So let's remember how
1072                          * much was written. Note that the first entry
1073                          * of the wqueue array is always allocated so
1074                          * that we always can remember how much was
1075                          * written. */
1076                         bus->wqueue[0] = sd_bus_message_ref(m);
1077                         bus->wqueue_size = 1;
1078                         bus->windex = idx;
1079                 }
1080         } else {
1081                 sd_bus_message **q;
1082
1083                 /* Just append it to the queue. */
1084
1085                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1086                         return -ENOBUFS;
1087
1088                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1089                 if (!q)
1090                         return -ENOMEM;
1091
1092                 bus->wqueue = q;
1093                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1094         }
1095
1096         if (serial)
1097                 *serial = BUS_MESSAGE_SERIAL(m);
1098
1099         return 0;
1100 }
1101
1102 static usec_t calc_elapse(uint64_t usec) {
1103         if (usec == (uint64_t) -1)
1104                 return 0;
1105
1106         if (usec == 0)
1107                 usec = BUS_DEFAULT_TIMEOUT;
1108
1109         return now(CLOCK_MONOTONIC) + usec;
1110 }
1111
1112 static int timeout_compare(const void *a, const void *b) {
1113         const struct reply_callback *x = a, *y = b;
1114
1115         if (x->timeout != 0 && y->timeout == 0)
1116                 return -1;
1117
1118         if (x->timeout == 0 && y->timeout != 0)
1119                 return 1;
1120
1121         if (x->timeout < y->timeout)
1122                 return -1;
1123
1124         if (x->timeout > y->timeout)
1125                 return 1;
1126
1127         return 0;
1128 }
1129
1130 int sd_bus_send_with_reply(
1131                 sd_bus *bus,
1132                 sd_bus_message *m,
1133                 sd_bus_message_handler_t callback,
1134                 void *userdata,
1135                 uint64_t usec,
1136                 uint64_t *serial) {
1137
1138         struct reply_callback *c;
1139         int r;
1140
1141         if (!bus)
1142                 return -EINVAL;
1143         if (bus->state == BUS_UNSET)
1144                 return -ENOTCONN;
1145         if (bus->output_fd < 0)
1146                 return -ENOTCONN;
1147         if (!m)
1148                 return -EINVAL;
1149         if (!callback)
1150                 return -EINVAL;
1151         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1152                 return -EINVAL;
1153         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1154                 return -EINVAL;
1155
1156         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1157         if (r < 0)
1158                 return r;
1159
1160         if (usec != (uint64_t) -1) {
1161                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1162                 if (r < 0)
1163                         return r;
1164         }
1165
1166         r = bus_seal_message(bus, m);
1167         if (r < 0)
1168                 return r;
1169
1170         c = new(struct reply_callback, 1);
1171         if (!c)
1172                 return -ENOMEM;
1173
1174         c->callback = callback;
1175         c->userdata = userdata;
1176         c->serial = BUS_MESSAGE_SERIAL(m);
1177         c->timeout = calc_elapse(usec);
1178
1179         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1180         if (r < 0) {
1181                 free(c);
1182                 return r;
1183         }
1184
1185         if (c->timeout != 0) {
1186                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1187                 if (r < 0) {
1188                         c->timeout = 0;
1189                         sd_bus_send_with_reply_cancel(bus, c->serial);
1190                         return r;
1191                 }
1192         }
1193
1194         r = sd_bus_send(bus, m, serial);
1195         if (r < 0) {
1196                 sd_bus_send_with_reply_cancel(bus, c->serial);
1197                 return r;
1198         }
1199
1200         return r;
1201 }
1202
1203 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1204         struct reply_callback *c;
1205
1206         if (!bus)
1207                 return -EINVAL;
1208         if (serial == 0)
1209                 return -EINVAL;
1210
1211         c = hashmap_remove(bus->reply_callbacks, &serial);
1212         if (!c)
1213                 return 0;
1214
1215         if (c->timeout != 0)
1216                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1217
1218         free(c);
1219         return 1;
1220 }
1221
1222 int bus_ensure_running(sd_bus *bus) {
1223         int r;
1224
1225         assert(bus);
1226
1227         if (bus->input_fd < 0)
1228                 return -ENOTCONN;
1229         if (bus->state == BUS_UNSET)
1230                 return -ENOTCONN;
1231
1232         if (bus->state == BUS_RUNNING)
1233                 return 1;
1234
1235         for (;;) {
1236                 r = sd_bus_process(bus, NULL);
1237                 if (r < 0)
1238                         return r;
1239                 if (bus->state == BUS_RUNNING)
1240                         return 1;
1241                 if (r > 0)
1242                         continue;
1243
1244                 r = sd_bus_wait(bus, (uint64_t) -1);
1245                 if (r < 0)
1246                         return r;
1247         }
1248 }
1249
1250 int sd_bus_send_with_reply_and_block(
1251                 sd_bus *bus,
1252                 sd_bus_message *m,
1253                 uint64_t usec,
1254                 sd_bus_error *error,
1255                 sd_bus_message **reply) {
1256
1257         int r;
1258         usec_t timeout;
1259         uint64_t serial;
1260         bool room = false;
1261
1262         if (!bus)
1263                 return -EINVAL;
1264         if (bus->output_fd < 0)
1265                 return -ENOTCONN;
1266         if (bus->state == BUS_UNSET)
1267                 return -ENOTCONN;
1268         if (!m)
1269                 return -EINVAL;
1270         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1271                 return -EINVAL;
1272         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1273                 return -EINVAL;
1274         if (bus_error_is_dirty(error))
1275                 return -EINVAL;
1276
1277         r = bus_ensure_running(bus);
1278         if (r < 0)
1279                 return r;
1280
1281         r = sd_bus_send(bus, m, &serial);
1282         if (r < 0)
1283                 return r;
1284
1285         timeout = calc_elapse(usec);
1286
1287         for (;;) {
1288                 usec_t left;
1289                 sd_bus_message *incoming = NULL;
1290
1291                 if (!room) {
1292                         sd_bus_message **q;
1293
1294                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1295                                 return -ENOBUFS;
1296
1297                         /* Make sure there's room for queuing this
1298                          * locally, before we read the message */
1299
1300                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1301                         if (!q)
1302                                 return -ENOMEM;
1303
1304                         bus->rqueue = q;
1305                         room = true;
1306                 }
1307
1308                 r = bus_socket_read_message(bus, &incoming);
1309                 if (r < 0)
1310                         return r;
1311                 if (incoming) {
1312
1313                         if (incoming->reply_serial == serial) {
1314                                 /* Found a match! */
1315
1316                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1317                                         *reply = incoming;
1318                                         return 0;
1319                                 }
1320
1321                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1322                                         int k;
1323
1324                                         r = sd_bus_error_copy(error, &incoming->error);
1325                                         if (r < 0) {
1326                                                 sd_bus_message_unref(incoming);
1327                                                 return r;
1328                                         }
1329
1330                                         k = bus_error_to_errno(&incoming->error);
1331                                         sd_bus_message_unref(incoming);
1332                                         return k;
1333                                 }
1334
1335                                 sd_bus_message_unref(incoming);
1336                                 return -EIO;
1337                         }
1338
1339                         /* There's already guaranteed to be room for
1340                          * this, so need to resize things here */
1341                         bus->rqueue[bus->rqueue_size ++] = incoming;
1342                         room = false;
1343
1344                         /* Try to read more, right-away */
1345                         continue;
1346                 }
1347                 if (r != 0)
1348                         continue;
1349
1350                 if (timeout > 0) {
1351                         usec_t n;
1352
1353                         n = now(CLOCK_MONOTONIC);
1354                         if (n >= timeout)
1355                                 return -ETIMEDOUT;
1356
1357                         left = timeout - n;
1358                 } else
1359                         left = (uint64_t) -1;
1360
1361                 r = bus_poll(bus, true, left);
1362                 if (r < 0)
1363                         return r;
1364
1365                 r = dispatch_wqueue(bus);
1366                 if (r < 0)
1367                         return r;
1368         }
1369 }
1370
1371 int sd_bus_get_fd(sd_bus *bus) {
1372         if (!bus)
1373                 return -EINVAL;
1374         if (bus->input_fd < 0)
1375                 return -ENOTCONN;
1376         if (bus->input_fd != bus->output_fd)
1377                 return -EPERM;
1378
1379         return bus->input_fd;
1380 }
1381
1382 int sd_bus_get_events(sd_bus *bus) {
1383         int flags = 0;
1384
1385         if (!bus)
1386                 return -EINVAL;
1387         if (bus->state == BUS_UNSET)
1388                 return -ENOTCONN;
1389         if (bus->input_fd < 0)
1390                 return -ENOTCONN;
1391
1392         if (bus->state == BUS_OPENING)
1393                 flags |= POLLOUT;
1394         else if (bus->state == BUS_AUTHENTICATING) {
1395
1396                 if (bus_socket_auth_needs_write(bus))
1397                         flags |= POLLOUT;
1398
1399                 flags |= POLLIN;
1400
1401         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1402                 if (bus->rqueue_size <= 0)
1403                         flags |= POLLIN;
1404                 if (bus->wqueue_size > 0)
1405                         flags |= POLLOUT;
1406         }
1407
1408         return flags;
1409 }
1410
1411 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1412         struct reply_callback *c;
1413
1414         if (!bus)
1415                 return -EINVAL;
1416         if (!timeout_usec)
1417                 return -EINVAL;
1418         if (bus->state == BUS_UNSET)
1419                 return -ENOTCONN;
1420         if (bus->input_fd < 0)
1421                 return -ENOTCONN;
1422
1423         if (bus->state == BUS_AUTHENTICATING) {
1424                 *timeout_usec = bus->auth_timeout;
1425                 return 1;
1426         }
1427
1428         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1429                 *timeout_usec = (uint64_t) -1;
1430                 return 0;
1431         }
1432
1433         c = prioq_peek(bus->reply_callbacks_prioq);
1434         if (!c) {
1435                 *timeout_usec = (uint64_t) -1;
1436                 return 0;
1437         }
1438
1439         *timeout_usec = c->timeout;
1440         return 1;
1441 }
1442
1443 static int process_timeout(sd_bus *bus) {
1444         struct reply_callback *c;
1445         usec_t n;
1446         int r;
1447
1448         assert(bus);
1449
1450         c = prioq_peek(bus->reply_callbacks_prioq);
1451         if (!c)
1452                 return 0;
1453
1454         n = now(CLOCK_MONOTONIC);
1455         if (c->timeout > n)
1456                 return 0;
1457
1458         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1459         hashmap_remove(bus->reply_callbacks, &c->serial);
1460
1461         r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1462         free(c);
1463
1464         return r < 0 ? r : 1;
1465 }
1466
1467 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1468         assert(bus);
1469         assert(m);
1470
1471         if (bus->state != BUS_HELLO)
1472                 return 0;
1473
1474         /* Let's make sure the first message on the bus is the HELLO
1475          * reply. But note that we don't actually parse the message
1476          * here (we leave that to the usual handling), we just verify
1477          * we don't let any earlier msg through. */
1478
1479         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1480             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1481                 return -EIO;
1482
1483         if (m->reply_serial != bus->hello_serial)
1484                 return -EIO;
1485
1486         return 0;
1487 }
1488
1489 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1490         struct reply_callback *c;
1491         int r;
1492
1493         assert(bus);
1494         assert(m);
1495
1496         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1497             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1498                 return 0;
1499
1500         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1501         if (!c)
1502                 return 0;
1503
1504         if (c->timeout != 0)
1505                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1506
1507         r = c->callback(bus, 0, m, c->userdata);
1508         free(c);
1509
1510         return r;
1511 }
1512
1513 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1514         struct filter_callback *l;
1515         int r;
1516
1517         assert(bus);
1518         assert(m);
1519
1520         LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1521                 r = l->callback(bus, 0, m, l->userdata);
1522                 if (r != 0)
1523                         return r;
1524         }
1525
1526         return 0;
1527 }
1528
1529 static int process_match(sd_bus *bus, sd_bus_message *m) {
1530         assert(bus);
1531         assert(m);
1532
1533         return bus_match_run(bus, &bus->match_callbacks, 0, m);
1534 }
1535
1536 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1537         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1538         int r;
1539
1540         assert(bus);
1541         assert(m);
1542
1543         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1544                 return 0;
1545
1546         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1547                 return 0;
1548
1549         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1550                 return 1;
1551
1552         if (streq_ptr(m->member, "Ping"))
1553                 r = sd_bus_message_new_method_return(bus, m, &reply);
1554         else if (streq_ptr(m->member, "GetMachineId")) {
1555                 sd_id128_t id;
1556                 char sid[33];
1557
1558                 r = sd_id128_get_machine(&id);
1559                 if (r < 0)
1560                         return r;
1561
1562                 r = sd_bus_message_new_method_return(bus, m, &reply);
1563                 if (r < 0)
1564                         return r;
1565
1566                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1567         } else {
1568                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1569
1570                 sd_bus_error_set(&error,
1571                                  "org.freedesktop.DBus.Error.UnknownMethod",
1572                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1573
1574                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1575         }
1576
1577         if (r < 0)
1578                 return r;
1579
1580         r = sd_bus_send(bus, reply, NULL);
1581         if (r < 0)
1582                 return r;
1583
1584         return 1;
1585 }
1586
1587 static int process_object(sd_bus *bus, sd_bus_message *m) {
1588         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1589         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1590         struct object_callback *c;
1591         char *p;
1592         int r;
1593         bool found = false;
1594
1595         assert(bus);
1596         assert(m);
1597
1598         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1599                 return 0;
1600
1601         if (hashmap_isempty(bus->object_callbacks))
1602                 return 0;
1603
1604         c = hashmap_get(bus->object_callbacks, m->path);
1605         if (c) {
1606                 r = c->callback(bus, 0, m, c->userdata);
1607                 if (r != 0)
1608                         return r;
1609
1610                 found = true;
1611         }
1612
1613         /* Look for fallback prefixes */
1614         p = strdupa(m->path);
1615         for (;;) {
1616                 char *e;
1617
1618                 e = strrchr(p, '/');
1619                 if (e == p || !e)
1620                         break;
1621
1622                 *e = 0;
1623
1624                 c = hashmap_get(bus->object_callbacks, p);
1625                 if (c && c->is_fallback) {
1626                         r = c->callback(bus, 0, m, c->userdata);
1627                         if (r != 0)
1628                                 return r;
1629
1630                         found = true;
1631                 }
1632         }
1633
1634         /* We found some handlers but none wanted to take this, then
1635          * return this -- with one exception, we can handle
1636          * introspection minimally ourselves */
1637         if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1638                 return 0;
1639
1640         sd_bus_error_set(&error,
1641                          "org.freedesktop.DBus.Error.UnknownMethod",
1642                          "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1643
1644         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1645         if (r < 0)
1646                 return r;
1647
1648         r = sd_bus_send(bus, reply, NULL);
1649         if (r < 0)
1650                 return r;
1651
1652         return 1;
1653 }
1654
1655 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1656         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1657         _cleanup_free_ char *introspection = NULL;
1658         _cleanup_set_free_free_ Set *s = NULL;
1659         _cleanup_fclose_ FILE *f = NULL;
1660         struct object_callback *c;
1661         Iterator i;
1662         size_t size = 0;
1663         char *node;
1664         int r;
1665
1666         assert(bus);
1667         assert(m);
1668
1669         if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1670                 return 0;
1671
1672         if (!m->path)
1673                 return 0;
1674
1675         s = set_new(string_hash_func, string_compare_func);
1676         if (!s)
1677                 return -ENOMEM;
1678
1679         HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1680                 const char *e;
1681                 char *a, *p;
1682
1683                 if (streq(c->path, "/"))
1684                         continue;
1685
1686                 if (streq(m->path, "/"))
1687                         e = c->path;
1688                 else {
1689                         e = startswith(c->path, m->path);
1690                         if (!e || *e != '/')
1691                                 continue;
1692                 }
1693
1694                 a = strdup(e+1);
1695                 if (!a)
1696                         return -ENOMEM;
1697
1698                 p = strchr(a, '/');
1699                 if (p)
1700                         *p = 0;
1701
1702                 r = set_put(s, a);
1703                 if (r < 0) {
1704                         free(a);
1705
1706                         if (r != -EEXIST)
1707                                 return r;
1708                 }
1709         }
1710
1711         f = open_memstream(&introspection, &size);
1712         if (!f)
1713                 return -ENOMEM;
1714
1715         fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1716         fputs("<node>\n", f);
1717         fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1718         fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1719
1720         while ((node = set_steal_first(s))) {
1721                 fprintf(f, " <node name=\"%s\"/>\n", node);
1722                 free(node);
1723         }
1724
1725         fputs("</node>\n", f);
1726
1727         fflush(f);
1728
1729         if (ferror(f))
1730                 return -ENOMEM;
1731
1732         r = sd_bus_message_new_method_return(bus, m, &reply);
1733         if (r < 0)
1734                 return r;
1735
1736         r = sd_bus_message_append(reply, "s", introspection);
1737         if (r < 0)
1738                 return r;
1739
1740         r = sd_bus_send(bus, reply, NULL);
1741         if (r < 0)
1742                 return r;
1743
1744         return 1;
1745 }
1746
1747 static int process_message(sd_bus *bus, sd_bus_message *m) {
1748         int r;
1749
1750         assert(bus);
1751         assert(m);
1752
1753         r = process_hello(bus, m);
1754         if (r != 0)
1755                 return r;
1756
1757         r = process_reply(bus, m);
1758         if (r != 0)
1759                 return r;
1760
1761         r = process_filter(bus, m);
1762         if (r != 0)
1763                 return r;
1764
1765         r = process_match(bus, m);
1766         if (r != 0)
1767                 return r;
1768
1769         r = process_builtin(bus, m);
1770         if (r != 0)
1771                 return r;
1772
1773         r = process_object(bus, m);
1774         if (r != 0)
1775                 return r;
1776
1777         return process_introspect(bus, m);
1778 }
1779
1780 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1781         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1782         int r;
1783
1784         assert(bus);
1785         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1786
1787         r = process_timeout(bus);
1788         if (r != 0)
1789                 goto null_message;
1790
1791         r = dispatch_wqueue(bus);
1792         if (r != 0)
1793                 goto null_message;
1794
1795         r = dispatch_rqueue(bus, &m);
1796         if (r < 0)
1797                 return r;
1798         if (!m)
1799                 goto null_message;
1800
1801         r = process_message(bus, m);
1802         if (r != 0)
1803                 goto null_message;
1804
1805         if (ret) {
1806                 *ret = m;
1807                 m = NULL;
1808                 return 1;
1809         }
1810
1811         if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1812                 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1813                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1814
1815                 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1816
1817                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1818                 if (r < 0)
1819                         return r;
1820
1821                 r = sd_bus_send(bus, reply, NULL);
1822                 if (r < 0)
1823                         return r;
1824         }
1825
1826         return 1;
1827
1828 null_message:
1829         if (r >= 0 && ret)
1830                 *ret = NULL;
1831
1832         return r;
1833 }
1834
1835 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1836         int r;
1837
1838         /* Returns 0 when we didn't do anything. This should cause the
1839          * caller to invoke sd_bus_wait() before returning the next
1840          * time. Returns > 0 when we did something, which possibly
1841          * means *ret is filled in with an unprocessed message. */
1842
1843         if (!bus)
1844                 return -EINVAL;
1845         if (bus->input_fd < 0)
1846                 return -ENOTCONN;
1847
1848         switch (bus->state) {
1849
1850         case BUS_UNSET:
1851                 return -ENOTCONN;
1852
1853         case BUS_OPENING:
1854                 r = bus_socket_process_opening(bus);
1855                 if (r < 0)
1856                         return r;
1857                 if (ret)
1858                         *ret = NULL;
1859                 return r;
1860
1861         case BUS_AUTHENTICATING:
1862
1863                 r = bus_socket_process_authenticating(bus);
1864                 if (r < 0)
1865                         return r;
1866                 if (ret)
1867                         *ret = NULL;
1868                 return r;
1869
1870         case BUS_RUNNING:
1871         case BUS_HELLO:
1872
1873                 return process_running(bus, ret);
1874         }
1875
1876         assert_not_reached("Unknown state");
1877 }
1878
1879 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1880         struct pollfd p[2];
1881         int r, e, n;
1882         struct timespec ts;
1883         usec_t until, m;
1884
1885         assert(bus);
1886
1887         if (bus->input_fd < 0)
1888                 return -ENOTCONN;
1889
1890         e = sd_bus_get_events(bus);
1891         if (e < 0)
1892                 return e;
1893
1894         if (need_more)
1895                 e |= POLLIN;
1896
1897         r = sd_bus_get_timeout(bus, &until);
1898         if (r < 0)
1899                 return r;
1900         if (r == 0)
1901                 m = (uint64_t) -1;
1902         else {
1903                 usec_t nw;
1904                 nw = now(CLOCK_MONOTONIC);
1905                 m = until > nw ? until - nw : 0;
1906         }
1907
1908         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1909                 m = timeout_usec;
1910
1911         zero(p);
1912         p[0].fd = bus->input_fd;
1913
1914         if (bus->output_fd == bus->input_fd) {
1915                 p[0].events = e;
1916                 n = 1;
1917         } else {
1918                 p[0].events = e & POLLIN;
1919                 p[1].fd = bus->output_fd;
1920                 p[1].events = e & POLLOUT;
1921                 n = 2;
1922         }
1923
1924         r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1925         if (r < 0)
1926                 return -errno;
1927
1928         return r > 0 ? 1 : 0;
1929 }
1930
1931 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1932
1933         if (!bus)
1934                 return -EINVAL;
1935         if (bus->state == BUS_UNSET)
1936                 return -ENOTCONN;
1937         if (bus->input_fd < 0)
1938                 return -ENOTCONN;
1939         if (bus->rqueue_size > 0)
1940                 return 0;
1941
1942         return bus_poll(bus, false, timeout_usec);
1943 }
1944
1945 int sd_bus_flush(sd_bus *bus) {
1946         int r;
1947
1948         if (!bus)
1949                 return -EINVAL;
1950         if (bus->state == BUS_UNSET)
1951                 return -ENOTCONN;
1952         if (bus->output_fd < 0)
1953                 return -ENOTCONN;
1954
1955         r = bus_ensure_running(bus);
1956         if (r < 0)
1957                 return r;
1958
1959         if (bus->wqueue_size <= 0)
1960                 return 0;
1961
1962         for (;;) {
1963                 r = dispatch_wqueue(bus);
1964                 if (r < 0)
1965                         return r;
1966
1967                 if (bus->wqueue_size <= 0)
1968                         return 0;
1969
1970                 r = bus_poll(bus, false, (uint64_t) -1);
1971                 if (r < 0)
1972                         return r;
1973         }
1974 }
1975
1976 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
1977         struct filter_callback *f;
1978
1979         if (!bus)
1980                 return -EINVAL;
1981         if (!callback)
1982                 return -EINVAL;
1983
1984         f = new(struct filter_callback, 1);
1985         if (!f)
1986                 return -ENOMEM;
1987         f->callback = callback;
1988         f->userdata = userdata;
1989
1990         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1991         return 0;
1992 }
1993
1994 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
1995         struct filter_callback *f;
1996
1997         if (!bus)
1998                 return -EINVAL;
1999         if (!callback)
2000                 return -EINVAL;
2001
2002         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2003                 if (f->callback == callback && f->userdata == userdata) {
2004                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2005                         free(f);
2006                         return 1;
2007                 }
2008         }
2009
2010         return 0;
2011 }
2012
2013 static int bus_add_object(
2014                 sd_bus *bus,
2015                 bool fallback,
2016                 const char *path,
2017                 sd_bus_message_handler_t callback,
2018                 void *userdata) {
2019
2020         struct object_callback *c;
2021         int r;
2022
2023         if (!bus)
2024                 return -EINVAL;
2025         if (!path)
2026                 return -EINVAL;
2027         if (!callback)
2028                 return -EINVAL;
2029
2030         r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2031         if (r < 0)
2032                 return r;
2033
2034         c = new(struct object_callback, 1);
2035         if (!c)
2036                 return -ENOMEM;
2037
2038         c->path = strdup(path);
2039         if (!c->path) {
2040                 free(c);
2041                 return -ENOMEM;
2042         }
2043
2044         c->callback = callback;
2045         c->userdata = userdata;
2046         c->is_fallback = fallback;
2047
2048         r = hashmap_put(bus->object_callbacks, c->path, c);
2049         if (r < 0) {
2050                 free(c->path);
2051                 free(c);
2052                 return r;
2053         }
2054
2055         return 0;
2056 }
2057
2058 static int bus_remove_object(
2059                 sd_bus *bus,
2060                 bool fallback,
2061                 const char *path,
2062                 sd_bus_message_handler_t callback,
2063                 void *userdata) {
2064
2065         struct object_callback *c;
2066
2067         if (!bus)
2068                 return -EINVAL;
2069         if (!path)
2070                 return -EINVAL;
2071         if (!callback)
2072                 return -EINVAL;
2073
2074         c = hashmap_get(bus->object_callbacks, path);
2075         if (!c)
2076                 return 0;
2077
2078         if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2079                 return 0;
2080
2081         assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2082
2083         free(c->path);
2084         free(c);
2085
2086         return 1;
2087 }
2088
2089 int sd_bus_add_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2090         return bus_add_object(bus, false, path, callback, userdata);
2091 }
2092
2093 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_bus_message_handler_t callback, void *userdata) {
2094         return bus_remove_object(bus, false, path, callback, userdata);
2095 }
2096
2097 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2098         return bus_add_object(bus, true, prefix, callback, userdata);
2099 }
2100
2101 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_bus_message_handler_t callback, void *userdata) {
2102         return bus_remove_object(bus, true, prefix, callback, userdata);
2103 }
2104
2105 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2106         int r = 0;
2107
2108         if (!bus)
2109                 return -EINVAL;
2110         if (!match)
2111                 return -EINVAL;
2112
2113         if (bus->bus_client) {
2114                 r = bus_add_match_internal(bus, match);
2115                 if (r < 0)
2116                         return r;
2117         }
2118
2119         if (callback) {
2120                 r = bus_match_add(&bus->match_callbacks, match, callback, userdata, NULL);
2121                 if (r < 0) {
2122
2123                         if (bus->bus_client)
2124                                 bus_remove_match_internal(bus, match);
2125                 }
2126         }
2127
2128         return r;
2129 }
2130
2131 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2132         int r = 0, q = 0;
2133
2134         if (!bus)
2135                 return -EINVAL;
2136         if (!match)
2137                 return -EINVAL;
2138
2139         if (bus->bus_client)
2140                 r = bus_remove_match_internal(bus, match);
2141
2142         if (callback)
2143                 q = bus_match_remove(&bus->match_callbacks, match, callback, userdata);
2144
2145         if (r < 0)
2146                 return r;
2147         return q;
2148 }