chiark / gitweb /
bus: typo
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29 #include <sys/mman.h>
30 #include <pthread.h>
31
32 #include "util.h"
33 #include "macro.h"
34 #include "strv.h"
35 #include "set.h"
36 #include "missing.h"
37
38 #include "sd-bus.h"
39 #include "bus-internal.h"
40 #include "bus-message.h"
41 #include "bus-type.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
48 #include "bus-util.h"
49 #include "bus-container.h"
50
51 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
52
53 static void bus_close_fds(sd_bus *b) {
54         assert(b);
55
56         if (b->input_fd >= 0)
57                 close_nointr_nofail(b->input_fd);
58
59         if (b->output_fd >= 0 && b->output_fd != b->input_fd)
60                 close_nointr_nofail(b->output_fd);
61
62         b->input_fd = b->output_fd = -1;
63 }
64
65 static void bus_node_destroy(sd_bus *b, struct node *n) {
66         struct node_callback *c;
67         struct node_vtable *v;
68         struct node_enumerator *e;
69
70         assert(b);
71
72         if (!n)
73                 return;
74
75         while (n->child)
76                 bus_node_destroy(b, n->child);
77
78         while ((c = n->callbacks)) {
79                 LIST_REMOVE(callbacks, n->callbacks, c);
80                 free(c);
81         }
82
83         while ((v = n->vtables)) {
84                 LIST_REMOVE(vtables, n->vtables, v);
85                 free(v->interface);
86                 free(v);
87         }
88
89         while ((e = n->enumerators)) {
90                 LIST_REMOVE(enumerators, n->enumerators, e);
91                 free(e);
92         }
93
94         if (n->parent)
95                 LIST_REMOVE(siblings, n->parent->child, n);
96
97         assert_se(hashmap_remove(b->nodes, n->path) == n);
98         free(n->path);
99         free(n);
100 }
101
102 static void bus_free(sd_bus *b) {
103         struct filter_callback *f;
104         struct node *n;
105         unsigned i;
106
107         assert(b);
108
109         sd_bus_detach_event(b);
110
111         bus_close_fds(b);
112
113         if (b->kdbus_buffer)
114                 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
115
116         free(b->rbuffer);
117         free(b->unique_name);
118         free(b->auth_buffer);
119         free(b->address);
120         free(b->kernel);
121         free(b->machine);
122
123         free(b->exec_path);
124         strv_free(b->exec_argv);
125
126         close_many(b->fds, b->n_fds);
127         free(b->fds);
128
129         for (i = 0; i < b->rqueue_size; i++)
130                 sd_bus_message_unref(b->rqueue[i]);
131         free(b->rqueue);
132
133         for (i = 0; i < b->wqueue_size; i++)
134                 sd_bus_message_unref(b->wqueue[i]);
135         free(b->wqueue);
136
137         hashmap_free_free(b->reply_callbacks);
138         prioq_free(b->reply_callbacks_prioq);
139
140         while ((f = b->filter_callbacks)) {
141                 LIST_REMOVE(callbacks, b->filter_callbacks, f);
142                 free(f);
143         }
144
145         bus_match_free(&b->match_callbacks);
146
147         hashmap_free_free(b->vtable_methods);
148         hashmap_free_free(b->vtable_properties);
149
150         while ((n = hashmap_first(b->nodes)))
151                 bus_node_destroy(b, n);
152
153         hashmap_free(b->nodes);
154
155         bus_kernel_flush_memfd(b);
156
157         assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
158
159         free(b);
160 }
161
162 _public_ int sd_bus_new(sd_bus **ret) {
163         sd_bus *r;
164
165         assert_return(ret, -EINVAL);
166
167         r = new0(sd_bus, 1);
168         if (!r)
169                 return -ENOMEM;
170
171         r->n_ref = REFCNT_INIT;
172         r->input_fd = r->output_fd = -1;
173         r->message_version = 1;
174         r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
175         r->original_pid = getpid();
176
177         assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
178
179         /* We guarantee that wqueue always has space for at least one
180          * entry */
181         r->wqueue = new(sd_bus_message*, 1);
182         if (!r->wqueue) {
183                 free(r);
184                 return -ENOMEM;
185         }
186
187         *ret = r;
188         return 0;
189 }
190
191 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
192         char *a;
193
194         assert_return(bus, -EINVAL);
195         assert_return(bus->state == BUS_UNSET, -EPERM);
196         assert_return(address, -EINVAL);
197         assert_return(!bus_pid_changed(bus), -ECHILD);
198
199         a = strdup(address);
200         if (!a)
201                 return -ENOMEM;
202
203         free(bus->address);
204         bus->address = a;
205
206         return 0;
207 }
208
209 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
210         assert_return(bus, -EINVAL);
211         assert_return(bus->state == BUS_UNSET, -EPERM);
212         assert_return(input_fd >= 0, -EINVAL);
213         assert_return(output_fd >= 0, -EINVAL);
214         assert_return(!bus_pid_changed(bus), -ECHILD);
215
216         bus->input_fd = input_fd;
217         bus->output_fd = output_fd;
218         return 0;
219 }
220
221 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
222         char *p, **a;
223
224         assert_return(bus, -EINVAL);
225         assert_return(bus->state == BUS_UNSET, -EPERM);
226         assert_return(path, -EINVAL);
227         assert_return(!strv_isempty(argv), -EINVAL);
228         assert_return(!bus_pid_changed(bus), -ECHILD);
229
230         p = strdup(path);
231         if (!p)
232                 return -ENOMEM;
233
234         a = strv_copy(argv);
235         if (!a) {
236                 free(p);
237                 return -ENOMEM;
238         }
239
240         free(bus->exec_path);
241         strv_free(bus->exec_argv);
242
243         bus->exec_path = p;
244         bus->exec_argv = a;
245
246         return 0;
247 }
248
249 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
250         assert_return(bus, -EINVAL);
251         assert_return(bus->state == BUS_UNSET, -EPERM);
252         assert_return(!bus_pid_changed(bus), -ECHILD);
253
254         bus->bus_client = !!b;
255         return 0;
256 }
257
258 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
259         assert_return(bus, -EINVAL);
260         assert_return(bus->state == BUS_UNSET, -EPERM);
261         assert_return(!bus_pid_changed(bus), -ECHILD);
262
263         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
264         return 0;
265 }
266
267 _public_ int sd_bus_negotiate_attach_comm(sd_bus *bus, int b) {
268         assert_return(bus, -EINVAL);
269         assert_return(bus->state == BUS_UNSET, -EPERM);
270         assert_return(!bus_pid_changed(bus), -ECHILD);
271
272         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_COMM, b);
273         return 0;
274 }
275
276 _public_ int sd_bus_negotiate_attach_exe(sd_bus *bus, int b) {
277         assert_return(bus, -EINVAL);
278         assert_return(bus->state == BUS_UNSET, -EPERM);
279         assert_return(!bus_pid_changed(bus), -ECHILD);
280
281         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_EXE, b);
282         return 0;
283 }
284
285 _public_ int sd_bus_negotiate_attach_cmdline(sd_bus *bus, int b) {
286         assert_return(bus, -EINVAL);
287         assert_return(bus->state == BUS_UNSET, -EPERM);
288         assert_return(!bus_pid_changed(bus), -ECHILD);
289
290         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CMDLINE, b);
291         return 0;
292 }
293
294 _public_ int sd_bus_negotiate_attach_cgroup(sd_bus *bus, int b) {
295         assert_return(bus, -EINVAL);
296         assert_return(bus->state == BUS_UNSET, -EPERM);
297         assert_return(!bus_pid_changed(bus), -ECHILD);
298
299         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CGROUP, b);
300         return 0;
301 }
302
303 _public_ int sd_bus_negotiate_attach_caps(sd_bus *bus, int b) {
304         assert_return(bus, -EINVAL);
305         assert_return(bus->state == BUS_UNSET, -EPERM);
306         assert_return(!bus_pid_changed(bus), -ECHILD);
307
308         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CAPS, b);
309         return 0;
310 }
311
312 _public_ int sd_bus_negotiate_attach_selinux_context(sd_bus *bus, int b) {
313         assert_return(bus, -EINVAL);
314         assert_return(bus->state == BUS_UNSET, -EPERM);
315         assert_return(!bus_pid_changed(bus), -ECHILD);
316
317         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_SECLABEL, b);
318         return 0;
319 }
320
321 _public_ int sd_bus_negotiate_attach_audit(sd_bus *bus, int b) {
322         assert_return(bus, -EINVAL);
323         assert_return(bus->state == BUS_UNSET, -EPERM);
324         assert_return(!bus_pid_changed(bus), -ECHILD);
325
326         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_AUDIT, b);
327         return 0;
328 }
329
330 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
331         assert_return(bus, -EINVAL);
332         assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
333         assert_return(bus->state == BUS_UNSET, -EPERM);
334         assert_return(!bus_pid_changed(bus), -ECHILD);
335
336         bus->is_server = !!b;
337         bus->server_id = server_id;
338         return 0;
339 }
340
341 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
342         assert_return(bus, -EINVAL);
343         assert_return(bus->state == BUS_UNSET, -EPERM);
344         assert_return(!bus_pid_changed(bus), -ECHILD);
345
346         bus->anonymous_auth = !!b;
347         return 0;
348 }
349
350 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
351         const char *s;
352         int r;
353
354         assert(bus);
355         assert(bus->state == BUS_HELLO);
356         assert(reply);
357
358         r = sd_bus_message_get_errno(reply);
359         if (r < 0)
360                 return r;
361         if (r > 0)
362                 return -r;
363
364         r = sd_bus_message_read(reply, "s", &s);
365         if (r < 0)
366                 return r;
367
368         if (!service_name_is_valid(s) || s[0] != ':')
369                 return -EBADMSG;
370
371         bus->unique_name = strdup(s);
372         if (!bus->unique_name)
373                 return -ENOMEM;
374
375         bus->state = BUS_RUNNING;
376
377         return 1;
378 }
379
380 static int bus_send_hello(sd_bus *bus) {
381         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
382         int r;
383
384         assert(bus);
385
386         if (!bus->bus_client || bus->is_kernel)
387                 return 0;
388
389         r = sd_bus_message_new_method_call(
390                         bus,
391                         "org.freedesktop.DBus",
392                         "/",
393                         "org.freedesktop.DBus",
394                         "Hello",
395                         &m);
396         if (r < 0)
397                 return r;
398
399         return sd_bus_call_async(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
400 }
401
402 int bus_start_running(sd_bus *bus) {
403         assert(bus);
404
405         if (bus->bus_client && !bus->is_kernel) {
406                 bus->state = BUS_HELLO;
407                 return 1;
408         }
409
410         bus->state = BUS_RUNNING;
411         return 1;
412 }
413
414 static int parse_address_key(const char **p, const char *key, char **value) {
415         size_t l, n = 0;
416         const char *a;
417         char *r = NULL;
418
419         assert(p);
420         assert(*p);
421         assert(value);
422
423         if (key) {
424                 l = strlen(key);
425                 if (strncmp(*p, key, l) != 0)
426                         return 0;
427
428                 if ((*p)[l] != '=')
429                         return 0;
430
431                 if (*value)
432                         return -EINVAL;
433
434                 a = *p + l + 1;
435         } else
436                 a = *p;
437
438         while (*a != ';' && *a != ',' && *a != 0) {
439                 char c, *t;
440
441                 if (*a == '%') {
442                         int x, y;
443
444                         x = unhexchar(a[1]);
445                         if (x < 0) {
446                                 free(r);
447                                 return x;
448                         }
449
450                         y = unhexchar(a[2]);
451                         if (y < 0) {
452                                 free(r);
453                                 return y;
454                         }
455
456                         c = (char) ((x << 4) | y);
457                         a += 3;
458                 } else {
459                         c = *a;
460                         a++;
461                 }
462
463                 t = realloc(r, n + 2);
464                 if (!t) {
465                         free(r);
466                         return -ENOMEM;
467                 }
468
469                 r = t;
470                 r[n++] = c;
471         }
472
473         if (!r) {
474                 r = strdup("");
475                 if (!r)
476                         return -ENOMEM;
477         } else
478                 r[n] = 0;
479
480         if (*a == ',')
481                 a++;
482
483         *p = a;
484
485         free(*value);
486         *value = r;
487
488         return 1;
489 }
490
491 static void skip_address_key(const char **p) {
492         assert(p);
493         assert(*p);
494
495         *p += strcspn(*p, ",");
496
497         if (**p == ',')
498                 (*p) ++;
499 }
500
501 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
502         _cleanup_free_ char *path = NULL, *abstract = NULL;
503         size_t l;
504         int r;
505
506         assert(b);
507         assert(p);
508         assert(*p);
509         assert(guid);
510
511         while (**p != 0 && **p != ';') {
512                 r = parse_address_key(p, "guid", guid);
513                 if (r < 0)
514                         return r;
515                 else if (r > 0)
516                         continue;
517
518                 r = parse_address_key(p, "path", &path);
519                 if (r < 0)
520                         return r;
521                 else if (r > 0)
522                         continue;
523
524                 r = parse_address_key(p, "abstract", &abstract);
525                 if (r < 0)
526                         return r;
527                 else if (r > 0)
528                         continue;
529
530                 skip_address_key(p);
531         }
532
533         if (!path && !abstract)
534                 return -EINVAL;
535
536         if (path && abstract)
537                 return -EINVAL;
538
539         if (path) {
540                 l = strlen(path);
541                 if (l > sizeof(b->sockaddr.un.sun_path))
542                         return -E2BIG;
543
544                 b->sockaddr.un.sun_family = AF_UNIX;
545                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
546                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
547         } else if (abstract) {
548                 l = strlen(abstract);
549                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
550                         return -E2BIG;
551
552                 b->sockaddr.un.sun_family = AF_UNIX;
553                 b->sockaddr.un.sun_path[0] = 0;
554                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
555                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
556         }
557
558         return 0;
559 }
560
561 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
562         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
563         int r;
564         struct addrinfo *result, hints = {
565                 .ai_socktype = SOCK_STREAM,
566                 .ai_flags = AI_ADDRCONFIG,
567         };
568
569         assert(b);
570         assert(p);
571         assert(*p);
572         assert(guid);
573
574         while (**p != 0 && **p != ';') {
575                 r = parse_address_key(p, "guid", guid);
576                 if (r < 0)
577                         return r;
578                 else if (r > 0)
579                         continue;
580
581                 r = parse_address_key(p, "host", &host);
582                 if (r < 0)
583                         return r;
584                 else if (r > 0)
585                         continue;
586
587                 r = parse_address_key(p, "port", &port);
588                 if (r < 0)
589                         return r;
590                 else if (r > 0)
591                         continue;
592
593                 r = parse_address_key(p, "family", &family);
594                 if (r < 0)
595                         return r;
596                 else if (r > 0)
597                         continue;
598
599                 skip_address_key(p);
600         }
601
602         if (!host || !port)
603                 return -EINVAL;
604
605         if (family) {
606                 if (streq(family, "ipv4"))
607                         hints.ai_family = AF_INET;
608                 else if (streq(family, "ipv6"))
609                         hints.ai_family = AF_INET6;
610                 else
611                         return -EINVAL;
612         }
613
614         r = getaddrinfo(host, port, &hints, &result);
615         if (r == EAI_SYSTEM)
616                 return -errno;
617         else if (r != 0)
618                 return -EADDRNOTAVAIL;
619
620         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
621         b->sockaddr_size = result->ai_addrlen;
622
623         freeaddrinfo(result);
624
625         return 0;
626 }
627
628 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
629         char *path = NULL;
630         unsigned n_argv = 0, j;
631         char **argv = NULL;
632         int r;
633
634         assert(b);
635         assert(p);
636         assert(*p);
637         assert(guid);
638
639         while (**p != 0 && **p != ';') {
640                 r = parse_address_key(p, "guid", guid);
641                 if (r < 0)
642                         goto fail;
643                 else if (r > 0)
644                         continue;
645
646                 r = parse_address_key(p, "path", &path);
647                 if (r < 0)
648                         goto fail;
649                 else if (r > 0)
650                         continue;
651
652                 if (startswith(*p, "argv")) {
653                         unsigned ul;
654
655                         errno = 0;
656                         ul = strtoul(*p + 4, (char**) p, 10);
657                         if (errno > 0 || **p != '=' || ul > 256) {
658                                 r = -EINVAL;
659                                 goto fail;
660                         }
661
662                         (*p) ++;
663
664                         if (ul >= n_argv) {
665                                 char **x;
666
667                                 x = realloc(argv, sizeof(char*) * (ul + 2));
668                                 if (!x) {
669                                         r = -ENOMEM;
670                                         goto fail;
671                                 }
672
673                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
674
675                                 argv = x;
676                                 n_argv = ul + 1;
677                         }
678
679                         r = parse_address_key(p, NULL, argv + ul);
680                         if (r < 0)
681                                 goto fail;
682
683                         continue;
684                 }
685
686                 skip_address_key(p);
687         }
688
689         if (!path) {
690                 r = -EINVAL;
691                 goto fail;
692         }
693
694         /* Make sure there are no holes in the array, with the
695          * exception of argv[0] */
696         for (j = 1; j < n_argv; j++)
697                 if (!argv[j]) {
698                         r = -EINVAL;
699                         goto fail;
700                 }
701
702         if (argv && argv[0] == NULL) {
703                 argv[0] = strdup(path);
704                 if (!argv[0]) {
705                         r = -ENOMEM;
706                         goto fail;
707                 }
708         }
709
710         b->exec_path = path;
711         b->exec_argv = argv;
712         return 0;
713
714 fail:
715         for (j = 0; j < n_argv; j++)
716                 free(argv[j]);
717
718         free(argv);
719         free(path);
720         return r;
721 }
722
723 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
724         _cleanup_free_ char *path = NULL;
725         int r;
726
727         assert(b);
728         assert(p);
729         assert(*p);
730         assert(guid);
731
732         while (**p != 0 && **p != ';') {
733                 r = parse_address_key(p, "guid", guid);
734                 if (r < 0)
735                         return r;
736                 else if (r > 0)
737                         continue;
738
739                 r = parse_address_key(p, "path", &path);
740                 if (r < 0)
741                         return r;
742                 else if (r > 0)
743                         continue;
744
745                 skip_address_key(p);
746         }
747
748         if (!path)
749                 return -EINVAL;
750
751         free(b->kernel);
752         b->kernel = path;
753         path = NULL;
754
755         return 0;
756 }
757
758 static int parse_container_address(sd_bus *b, const char **p, char **guid) {
759         _cleanup_free_ char *machine = NULL;
760         int r;
761
762         assert(b);
763         assert(p);
764         assert(*p);
765         assert(guid);
766
767         while (**p != 0 && **p != ';') {
768                 r = parse_address_key(p, "guid", guid);
769                 if (r < 0)
770                         return r;
771                 else if (r > 0)
772                         continue;
773
774                 r = parse_address_key(p, "machine", &machine);
775                 if (r < 0)
776                         return r;
777                 else if (r > 0)
778                         continue;
779
780                 skip_address_key(p);
781         }
782
783         if (!machine)
784                 return -EINVAL;
785
786         free(b->machine);
787         b->machine = machine;
788         machine = NULL;
789
790         b->sockaddr.un.sun_family = AF_UNIX;
791         strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
792         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
793
794         return 0;
795 }
796
797 static void bus_reset_parsed_address(sd_bus *b) {
798         assert(b);
799
800         zero(b->sockaddr);
801         b->sockaddr_size = 0;
802         strv_free(b->exec_argv);
803         free(b->exec_path);
804         b->exec_path = NULL;
805         b->exec_argv = NULL;
806         b->server_id = SD_ID128_NULL;
807         free(b->kernel);
808         b->kernel = NULL;
809         free(b->machine);
810         b->machine = NULL;
811 }
812
813 static int bus_parse_next_address(sd_bus *b) {
814         _cleanup_free_ char *guid = NULL;
815         const char *a;
816         int r;
817
818         assert(b);
819
820         if (!b->address)
821                 return 0;
822         if (b->address[b->address_index] == 0)
823                 return 0;
824
825         bus_reset_parsed_address(b);
826
827         a = b->address + b->address_index;
828
829         while (*a != 0) {
830
831                 if (*a == ';') {
832                         a++;
833                         continue;
834                 }
835
836                 if (startswith(a, "unix:")) {
837                         a += 5;
838
839                         r = parse_unix_address(b, &a, &guid);
840                         if (r < 0)
841                                 return r;
842                         break;
843
844                 } else if (startswith(a, "tcp:")) {
845
846                         a += 4;
847                         r = parse_tcp_address(b, &a, &guid);
848                         if (r < 0)
849                                 return r;
850
851                         break;
852
853                 } else if (startswith(a, "unixexec:")) {
854
855                         a += 9;
856                         r = parse_exec_address(b, &a, &guid);
857                         if (r < 0)
858                                 return r;
859
860                         break;
861
862                 } else if (startswith(a, "kernel:")) {
863
864                         a += 7;
865                         r = parse_kernel_address(b, &a, &guid);
866                         if (r < 0)
867                                 return r;
868
869                         break;
870                 } else if (startswith(a, "x-container:")) {
871
872                         a += 12;
873                         r = parse_container_address(b, &a, &guid);
874                         if (r < 0)
875                                 return r;
876
877                         break;
878                 }
879
880                 a = strchr(a, ';');
881                 if (!a)
882                         return 0;
883         }
884
885         if (guid) {
886                 r = sd_id128_from_string(guid, &b->server_id);
887                 if (r < 0)
888                         return r;
889         }
890
891         b->address_index = a - b->address;
892         return 1;
893 }
894
895 static int bus_start_address(sd_bus *b) {
896         int r;
897
898         assert(b);
899
900         for (;;) {
901                 sd_bus_close(b);
902
903                 if (b->exec_path) {
904
905                         r = bus_socket_exec(b);
906                         if (r >= 0)
907                                 return r;
908
909                         b->last_connect_error = -r;
910                 } else if (b->kernel) {
911
912                         r = bus_kernel_connect(b);
913                         if (r >= 0)
914                                 return r;
915
916                         b->last_connect_error = -r;
917
918                 } else if (b->machine) {
919
920                         r = bus_container_connect(b);
921                         if (r >= 0)
922                                 return r;
923
924                         b->last_connect_error = -r;
925
926                 } else if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
927
928                         r = bus_socket_connect(b);
929                         if (r >= 0)
930                                 return r;
931
932                         b->last_connect_error = -r;
933                 }
934
935                 r = bus_parse_next_address(b);
936                 if (r < 0)
937                         return r;
938                 if (r == 0)
939                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
940         }
941 }
942
943 int bus_next_address(sd_bus *b) {
944         assert(b);
945
946         bus_reset_parsed_address(b);
947         return bus_start_address(b);
948 }
949
950 static int bus_start_fd(sd_bus *b) {
951         struct stat st;
952         int r;
953
954         assert(b);
955         assert(b->input_fd >= 0);
956         assert(b->output_fd >= 0);
957
958         r = fd_nonblock(b->input_fd, true);
959         if (r < 0)
960                 return r;
961
962         r = fd_cloexec(b->input_fd, true);
963         if (r < 0)
964                 return r;
965
966         if (b->input_fd != b->output_fd) {
967                 r = fd_nonblock(b->output_fd, true);
968                 if (r < 0)
969                         return r;
970
971                 r = fd_cloexec(b->output_fd, true);
972                 if (r < 0)
973                         return r;
974         }
975
976         if (fstat(b->input_fd, &st) < 0)
977                 return -errno;
978
979         if (S_ISCHR(b->input_fd))
980                 return bus_kernel_take_fd(b);
981         else
982                 return bus_socket_take_fd(b);
983 }
984
985 _public_ int sd_bus_start(sd_bus *bus) {
986         int r;
987
988         assert_return(bus, -EINVAL);
989         assert_return(bus->state == BUS_UNSET, -EPERM);
990         assert_return(!bus_pid_changed(bus), -ECHILD);
991
992         bus->state = BUS_OPENING;
993
994         if (bus->is_server && bus->bus_client)
995                 return -EINVAL;
996
997         if (bus->input_fd >= 0)
998                 r = bus_start_fd(bus);
999         else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
1000                 r = bus_start_address(bus);
1001         else
1002                 return -EINVAL;
1003
1004         if (r < 0)
1005                 return r;
1006
1007         return bus_send_hello(bus);
1008 }
1009
1010 _public_ int sd_bus_open_system(sd_bus **ret) {
1011         const char *e;
1012         sd_bus *b;
1013         int r;
1014
1015         assert_return(ret, -EINVAL);
1016
1017         r = sd_bus_new(&b);
1018         if (r < 0)
1019                 return r;
1020
1021         e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1022         if (e) {
1023                 r = sd_bus_set_address(b, e);
1024                 if (r < 0)
1025                         goto fail;
1026         } else {
1027                 b->sockaddr.un.sun_family = AF_UNIX;
1028                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
1029                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
1030         }
1031
1032         b->bus_client = true;
1033
1034         r = sd_bus_start(b);
1035         if (r < 0)
1036                 goto fail;
1037
1038         *ret = b;
1039         return 0;
1040
1041 fail:
1042         bus_free(b);
1043         return r;
1044 }
1045
1046 _public_ int sd_bus_open_user(sd_bus **ret) {
1047         const char *e;
1048         sd_bus *b;
1049         size_t l;
1050         int r;
1051
1052         assert_return(ret, -EINVAL);
1053
1054         r = sd_bus_new(&b);
1055         if (r < 0)
1056                 return r;
1057
1058         e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1059         if (e) {
1060                 r = sd_bus_set_address(b, e);
1061                 if (r < 0)
1062                         goto fail;
1063         } else {
1064                 e = secure_getenv("XDG_RUNTIME_DIR");
1065                 if (!e) {
1066                         r = -ENOENT;
1067                         goto fail;
1068                 }
1069
1070                 l = strlen(e);
1071                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1072                         r = -E2BIG;
1073                         goto fail;
1074                 }
1075
1076                 b->sockaddr.un.sun_family = AF_UNIX;
1077                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1078                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1079         }
1080
1081         b->bus_client = true;
1082
1083         r = sd_bus_start(b);
1084         if (r < 0)
1085                 goto fail;
1086
1087         *ret = b;
1088         return 0;
1089
1090 fail:
1091         bus_free(b);
1092         return r;
1093 }
1094
1095 _public_ int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1096         _cleanup_free_ char *e = NULL;
1097         char *p = NULL;
1098         sd_bus *bus;
1099         int r;
1100
1101         assert_return(host, -EINVAL);
1102         assert_return(ret, -EINVAL);
1103
1104         e = bus_address_escape(host);
1105         if (!e)
1106                 return -ENOMEM;
1107
1108         p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1109         if (!p)
1110                 return -ENOMEM;
1111
1112         r = sd_bus_new(&bus);
1113         if (r < 0) {
1114                 free(p);
1115                 return r;
1116         }
1117
1118         bus->address = p;
1119         bus->bus_client = true;
1120
1121         r = sd_bus_start(bus);
1122         if (r < 0) {
1123                 bus_free(bus);
1124                 return r;
1125         }
1126
1127         *ret = bus;
1128         return 0;
1129 }
1130
1131 _public_ int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1132         _cleanup_free_ char *e = NULL;
1133         sd_bus *bus;
1134         char *p;
1135         int r;
1136
1137         assert_return(machine, -EINVAL);
1138         assert_return(ret, -EINVAL);
1139
1140         e = bus_address_escape(machine);
1141         if (!e)
1142                 return -ENOMEM;
1143
1144         p = strjoin("x-container:machine=", e, NULL);
1145         if (!p)
1146                 return -ENOMEM;
1147
1148         r = sd_bus_new(&bus);
1149         if (r < 0) {
1150                 free(p);
1151                 return r;
1152         }
1153
1154         bus->address = p;
1155         bus->bus_client = true;
1156
1157         r = sd_bus_start(bus);
1158         if (r < 0) {
1159                 bus_free(bus);
1160                 return r;
1161         }
1162
1163         *ret = bus;
1164         return 0;
1165 }
1166
1167 _public_ void sd_bus_close(sd_bus *bus) {
1168         if (!bus)
1169                 return;
1170         if (bus->state == BUS_CLOSED)
1171                 return;
1172         if (bus_pid_changed(bus))
1173                 return;
1174
1175         bus->state = BUS_CLOSED;
1176
1177         sd_bus_detach_event(bus);
1178
1179         if (!bus->is_kernel)
1180                 bus_close_fds(bus);
1181
1182         /* We'll leave the fd open in case this is a kernel bus, since
1183          * there might still be memblocks around that reference this
1184          * bus, and they might need to invoke the
1185          * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1186          * freed. */
1187 }
1188
1189 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1190         assert_return(bus, NULL);
1191
1192         assert_se(REFCNT_INC(bus->n_ref) >= 2);
1193
1194         return bus;
1195 }
1196
1197 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1198         assert_return(bus, NULL);
1199
1200         if (REFCNT_DEC(bus->n_ref) <= 0)
1201                 bus_free(bus);
1202
1203         return NULL;
1204 }
1205
1206 _public_ int sd_bus_is_open(sd_bus *bus) {
1207
1208         assert_return(bus, -EINVAL);
1209         assert_return(!bus_pid_changed(bus), -ECHILD);
1210
1211         return BUS_IS_OPEN(bus->state);
1212 }
1213
1214 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1215         int r;
1216
1217         assert_return(bus, -EINVAL);
1218         assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1219         assert_return(!bus_pid_changed(bus), -ECHILD);
1220
1221         if (type == SD_BUS_TYPE_UNIX_FD) {
1222                 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1223                         return 0;
1224
1225                 r = bus_ensure_running(bus);
1226                 if (r < 0)
1227                         return r;
1228
1229                 return bus->can_fds;
1230         }
1231
1232         return bus_type_is_valid(type);
1233 }
1234
1235 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1236         int r;
1237
1238         assert_return(bus, -EINVAL);
1239         assert_return(server_id, -EINVAL);
1240         assert_return(!bus_pid_changed(bus), -ECHILD);
1241
1242         r = bus_ensure_running(bus);
1243         if (r < 0)
1244                 return r;
1245
1246         *server_id = bus->server_id;
1247         return 0;
1248 }
1249
1250 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1251         assert(m);
1252
1253         if (m->header->version > b->message_version)
1254                 return -EPERM;
1255
1256         if (m->sealed)
1257                 return 0;
1258
1259         return bus_message_seal(m, ++b->serial);
1260 }
1261
1262 static int dispatch_wqueue(sd_bus *bus) {
1263         int r, ret = 0;
1264
1265         assert(bus);
1266         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1267
1268         while (bus->wqueue_size > 0) {
1269
1270                 if (bus->is_kernel)
1271                         r = bus_kernel_write_message(bus, bus->wqueue[0]);
1272                 else
1273                         r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1274
1275                 if (r < 0) {
1276                         sd_bus_close(bus);
1277                         return r;
1278                 } else if (r == 0)
1279                         /* Didn't do anything this time */
1280                         return ret;
1281                 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1282                         /* Fully written. Let's drop the entry from
1283                          * the queue.
1284                          *
1285                          * This isn't particularly optimized, but
1286                          * well, this is supposed to be our worst-case
1287                          * buffer only, and the socket buffer is
1288                          * supposed to be our primary buffer, and if
1289                          * it got full, then all bets are off
1290                          * anyway. */
1291
1292                         sd_bus_message_unref(bus->wqueue[0]);
1293                         bus->wqueue_size --;
1294                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1295                         bus->windex = 0;
1296
1297                         ret = 1;
1298                 }
1299         }
1300
1301         return ret;
1302 }
1303
1304 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1305         sd_bus_message *z = NULL;
1306         int r, ret = 0;
1307
1308         assert(bus);
1309         assert(m);
1310         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1311
1312         if (bus->rqueue_size > 0) {
1313                 /* Dispatch a queued message */
1314
1315                 *m = bus->rqueue[0];
1316                 bus->rqueue_size --;
1317                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1318                 return 1;
1319         }
1320
1321         /* Try to read a new message */
1322         do {
1323                 if (bus->is_kernel)
1324                         r = bus_kernel_read_message(bus, &z);
1325                 else
1326                         r = bus_socket_read_message(bus, &z);
1327
1328                 if (r < 0) {
1329                         sd_bus_close(bus);
1330                         return r;
1331                 }
1332                 if (r == 0)
1333                         return ret;
1334
1335                 ret = 1;
1336         } while (!z);
1337
1338         *m = z;
1339         return ret;
1340 }
1341
1342 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1343         int r;
1344
1345         assert_return(bus, -EINVAL);
1346         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1347         assert_return(m, -EINVAL);
1348         assert_return(!bus_pid_changed(bus), -ECHILD);
1349
1350         if (m->n_fds > 0) {
1351                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1352                 if (r < 0)
1353                         return r;
1354                 if (r == 0)
1355                         return -ENOTSUP;
1356         }
1357
1358         /* If the serial number isn't kept, then we know that no reply
1359          * is expected */
1360         if (!serial && !m->sealed)
1361                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1362
1363         r = bus_seal_message(bus, m);
1364         if (r < 0)
1365                 return r;
1366
1367         /* If this is a reply and no reply was requested, then let's
1368          * suppress this, if we can */
1369         if (m->dont_send && !serial)
1370                 return 1;
1371
1372         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1373                 size_t idx = 0;
1374
1375                 if (bus->is_kernel)
1376                         r = bus_kernel_write_message(bus, m);
1377                 else
1378                         r = bus_socket_write_message(bus, m, &idx);
1379
1380                 if (r < 0) {
1381                         sd_bus_close(bus);
1382                         return r;
1383                 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m))  {
1384                         /* Wasn't fully written. So let's remember how
1385                          * much was written. Note that the first entry
1386                          * of the wqueue array is always allocated so
1387                          * that we always can remember how much was
1388                          * written. */
1389                         bus->wqueue[0] = sd_bus_message_ref(m);
1390                         bus->wqueue_size = 1;
1391                         bus->windex = idx;
1392                 }
1393         } else {
1394                 sd_bus_message **q;
1395
1396                 /* Just append it to the queue. */
1397
1398                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1399                         return -ENOBUFS;
1400
1401                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1402                 if (!q)
1403                         return -ENOMEM;
1404
1405                 bus->wqueue = q;
1406                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1407         }
1408
1409         if (serial)
1410                 *serial = BUS_MESSAGE_SERIAL(m);
1411
1412         return 1;
1413 }
1414
1415 static usec_t calc_elapse(uint64_t usec) {
1416         if (usec == (uint64_t) -1)
1417                 return 0;
1418
1419         if (usec == 0)
1420                 usec = BUS_DEFAULT_TIMEOUT;
1421
1422         return now(CLOCK_MONOTONIC) + usec;
1423 }
1424
1425 static int timeout_compare(const void *a, const void *b) {
1426         const struct reply_callback *x = a, *y = b;
1427
1428         if (x->timeout != 0 && y->timeout == 0)
1429                 return -1;
1430
1431         if (x->timeout == 0 && y->timeout != 0)
1432                 return 1;
1433
1434         if (x->timeout < y->timeout)
1435                 return -1;
1436
1437         if (x->timeout > y->timeout)
1438                 return 1;
1439
1440         return 0;
1441 }
1442
1443 _public_ int sd_bus_call_async(
1444                 sd_bus *bus,
1445                 sd_bus_message *m,
1446                 sd_bus_message_handler_t callback,
1447                 void *userdata,
1448                 uint64_t usec,
1449                 uint64_t *serial) {
1450
1451         struct reply_callback *c;
1452         int r;
1453
1454         assert_return(bus, -EINVAL);
1455         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1456         assert_return(m, -EINVAL);
1457         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1458         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1459         assert_return(callback, -EINVAL);
1460         assert_return(!bus_pid_changed(bus), -ECHILD);
1461
1462         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1463         if (r < 0)
1464                 return r;
1465
1466         if (usec != (uint64_t) -1) {
1467                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1468                 if (r < 0)
1469                         return r;
1470         }
1471
1472         r = bus_seal_message(bus, m);
1473         if (r < 0)
1474                 return r;
1475
1476         c = new0(struct reply_callback, 1);
1477         if (!c)
1478                 return -ENOMEM;
1479
1480         c->callback = callback;
1481         c->userdata = userdata;
1482         c->serial = BUS_MESSAGE_SERIAL(m);
1483         c->timeout = calc_elapse(usec);
1484
1485         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1486         if (r < 0) {
1487                 free(c);
1488                 return r;
1489         }
1490
1491         if (c->timeout != 0) {
1492                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1493                 if (r < 0) {
1494                         c->timeout = 0;
1495                         sd_bus_call_async_cancel(bus, c->serial);
1496                         return r;
1497                 }
1498         }
1499
1500         r = sd_bus_send(bus, m, serial);
1501         if (r < 0) {
1502                 sd_bus_call_async_cancel(bus, c->serial);
1503                 return r;
1504         }
1505
1506         return r;
1507 }
1508
1509 _public_ int sd_bus_call_async_cancel(sd_bus *bus, uint64_t serial) {
1510         struct reply_callback *c;
1511
1512         assert_return(bus, -EINVAL);
1513         assert_return(serial != 0, -EINVAL);
1514         assert_return(!bus_pid_changed(bus), -ECHILD);
1515
1516         c = hashmap_remove(bus->reply_callbacks, &serial);
1517         if (!c)
1518                 return 0;
1519
1520         if (c->timeout != 0)
1521                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1522
1523         free(c);
1524         return 1;
1525 }
1526
1527 int bus_ensure_running(sd_bus *bus) {
1528         int r;
1529
1530         assert(bus);
1531
1532         if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED)
1533                 return -ENOTCONN;
1534         if (bus->state == BUS_RUNNING)
1535                 return 1;
1536
1537         for (;;) {
1538                 r = sd_bus_process(bus, NULL);
1539                 if (r < 0)
1540                         return r;
1541                 if (bus->state == BUS_RUNNING)
1542                         return 1;
1543                 if (r > 0)
1544                         continue;
1545
1546                 r = sd_bus_wait(bus, (uint64_t) -1);
1547                 if (r < 0)
1548                         return r;
1549         }
1550 }
1551
1552 _public_ int sd_bus_call(
1553                 sd_bus *bus,
1554                 sd_bus_message *m,
1555                 uint64_t usec,
1556                 sd_bus_error *error,
1557                 sd_bus_message **reply) {
1558
1559         int r;
1560         usec_t timeout;
1561         uint64_t serial;
1562         bool room = false;
1563
1564         assert_return(bus, -EINVAL);
1565         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1566         assert_return(m, -EINVAL);
1567         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1568         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1569         assert_return(!bus_error_is_dirty(error), -EINVAL);
1570         assert_return(!bus_pid_changed(bus), -ECHILD);
1571
1572         r = bus_ensure_running(bus);
1573         if (r < 0)
1574                 return r;
1575
1576         r = sd_bus_send(bus, m, &serial);
1577         if (r < 0)
1578                 return r;
1579
1580         timeout = calc_elapse(usec);
1581
1582         for (;;) {
1583                 usec_t left;
1584                 sd_bus_message *incoming = NULL;
1585
1586                 if (!room) {
1587                         sd_bus_message **q;
1588
1589                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1590                                 return -ENOBUFS;
1591
1592                         /* Make sure there's room for queuing this
1593                          * locally, before we read the message */
1594
1595                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1596                         if (!q)
1597                                 return -ENOMEM;
1598
1599                         bus->rqueue = q;
1600                         room = true;
1601                 }
1602
1603                 if (bus->is_kernel)
1604                         r = bus_kernel_read_message(bus, &incoming);
1605                 else
1606                         r = bus_socket_read_message(bus, &incoming);
1607                 if (r < 0)
1608                         return r;
1609                 if (incoming) {
1610
1611                         if (incoming->reply_serial == serial) {
1612                                 /* Found a match! */
1613
1614                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1615
1616                                         if (reply)
1617                                                 *reply = incoming;
1618                                         else
1619                                                 sd_bus_message_unref(incoming);
1620
1621                                         return 1;
1622                                 }
1623
1624                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) {
1625                                         int k;
1626
1627                                         r = sd_bus_error_copy(error, &incoming->error);
1628                                         if (r < 0) {
1629                                                 sd_bus_message_unref(incoming);
1630                                                 return r;
1631                                         }
1632
1633                                         k = sd_bus_error_get_errno(&incoming->error);
1634                                         sd_bus_message_unref(incoming);
1635                                         return -k;
1636                                 }
1637
1638                                 sd_bus_message_unref(incoming);
1639                                 return -EIO;
1640
1641                         } else if (incoming->header->serial == serial &&
1642                                    bus->unique_name &&
1643                                    incoming->sender &&
1644                                    streq(bus->unique_name, incoming->sender)) {
1645
1646                                 /* Our own message? Somebody is trying
1647                                  * to send its own client a message,
1648                                  * let's not dead-lock, let's fail
1649                                  * immediately. */
1650
1651                                 sd_bus_message_unref(incoming);
1652                                 return -ELOOP;
1653                         }
1654
1655                         /* There's already guaranteed to be room for
1656                          * this, so need to resize things here */
1657                         bus->rqueue[bus->rqueue_size ++] = incoming;
1658                         room = false;
1659
1660                         /* Try to read more, right-away */
1661                         continue;
1662                 }
1663                 if (r != 0)
1664                         continue;
1665
1666                 if (timeout > 0) {
1667                         usec_t n;
1668
1669                         n = now(CLOCK_MONOTONIC);
1670                         if (n >= timeout)
1671                                 return -ETIMEDOUT;
1672
1673                         left = timeout - n;
1674                 } else
1675                         left = (uint64_t) -1;
1676
1677                 r = bus_poll(bus, true, left);
1678                 if (r < 0)
1679                         return r;
1680
1681                 r = dispatch_wqueue(bus);
1682                 if (r < 0)
1683                         return r;
1684         }
1685 }
1686
1687 _public_ int sd_bus_get_fd(sd_bus *bus) {
1688
1689         assert_return(bus, -EINVAL);
1690         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1691         assert_return(bus->input_fd == bus->output_fd, -EPERM);
1692         assert_return(!bus_pid_changed(bus), -ECHILD);
1693
1694         return bus->input_fd;
1695 }
1696
1697 _public_ int sd_bus_get_events(sd_bus *bus) {
1698         int flags = 0;
1699
1700         assert_return(bus, -EINVAL);
1701         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1702         assert_return(!bus_pid_changed(bus), -ECHILD);
1703
1704         if (bus->state == BUS_OPENING)
1705                 flags |= POLLOUT;
1706         else if (bus->state == BUS_AUTHENTICATING) {
1707
1708                 if (bus_socket_auth_needs_write(bus))
1709                         flags |= POLLOUT;
1710
1711                 flags |= POLLIN;
1712
1713         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1714                 if (bus->rqueue_size <= 0)
1715                         flags |= POLLIN;
1716                 if (bus->wqueue_size > 0)
1717                         flags |= POLLOUT;
1718         }
1719
1720         return flags;
1721 }
1722
1723 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1724         struct reply_callback *c;
1725
1726         assert_return(bus, -EINVAL);
1727         assert_return(timeout_usec, -EINVAL);
1728         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1729         assert_return(!bus_pid_changed(bus), -ECHILD);
1730
1731         if (bus->state == BUS_AUTHENTICATING) {
1732                 *timeout_usec = bus->auth_timeout;
1733                 return 1;
1734         }
1735
1736         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1737                 *timeout_usec = (uint64_t) -1;
1738                 return 0;
1739         }
1740
1741         if (bus->rqueue_size > 0) {
1742                 *timeout_usec = 0;
1743                 return 1;
1744         }
1745
1746         c = prioq_peek(bus->reply_callbacks_prioq);
1747         if (!c) {
1748                 *timeout_usec = (uint64_t) -1;
1749                 return 0;
1750         }
1751
1752         *timeout_usec = c->timeout;
1753         return 1;
1754 }
1755
1756 static int process_timeout(sd_bus *bus) {
1757         _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1758         struct reply_callback *c;
1759         usec_t n;
1760         int r;
1761
1762         assert(bus);
1763
1764         c = prioq_peek(bus->reply_callbacks_prioq);
1765         if (!c)
1766                 return 0;
1767
1768         n = now(CLOCK_MONOTONIC);
1769         if (c->timeout > n)
1770                 return 0;
1771
1772         r = bus_message_new_synthetic_error(
1773                         bus,
1774                         c->serial,
1775                         &SD_BUS_ERROR_MAKE(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1776                         &m);
1777         if (r < 0)
1778                 return r;
1779
1780         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1781         hashmap_remove(bus->reply_callbacks, &c->serial);
1782
1783         r = c->callback(bus, m, c->userdata);
1784         free(c);
1785
1786         return r < 0 ? r : 1;
1787 }
1788
1789 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1790         assert(bus);
1791         assert(m);
1792
1793         if (bus->state != BUS_HELLO)
1794                 return 0;
1795
1796         /* Let's make sure the first message on the bus is the HELLO
1797          * reply. But note that we don't actually parse the message
1798          * here (we leave that to the usual handling), we just verify
1799          * we don't let any earlier msg through. */
1800
1801         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1802             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1803                 return -EIO;
1804
1805         if (m->reply_serial != bus->hello_serial)
1806                 return -EIO;
1807
1808         return 0;
1809 }
1810
1811 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1812         struct reply_callback *c;
1813         int r;
1814
1815         assert(bus);
1816         assert(m);
1817
1818         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1819             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1820                 return 0;
1821
1822         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1823         if (!c)
1824                 return 0;
1825
1826         if (c->timeout != 0)
1827                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1828
1829         r = sd_bus_message_rewind(m, true);
1830         if (r < 0)
1831                 return r;
1832
1833         r = c->callback(bus, m, c->userdata);
1834         free(c);
1835
1836         return r;
1837 }
1838
1839 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1840         struct filter_callback *l;
1841         int r;
1842
1843         assert(bus);
1844         assert(m);
1845
1846         do {
1847                 bus->filter_callbacks_modified = false;
1848
1849                 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1850
1851                         if (bus->filter_callbacks_modified)
1852                                 break;
1853
1854                         /* Don't run this more than once per iteration */
1855                         if (l->last_iteration == bus->iteration_counter)
1856                                 continue;
1857
1858                         l->last_iteration = bus->iteration_counter;
1859
1860                         r = sd_bus_message_rewind(m, true);
1861                         if (r < 0)
1862                                 return r;
1863
1864                         r = l->callback(bus, m, l->userdata);
1865                         if (r != 0)
1866                                 return r;
1867
1868                 }
1869
1870         } while (bus->filter_callbacks_modified);
1871
1872         return 0;
1873 }
1874
1875 static int process_match(sd_bus *bus, sd_bus_message *m) {
1876         int r;
1877
1878         assert(bus);
1879         assert(m);
1880
1881         do {
1882                 bus->match_callbacks_modified = false;
1883
1884                 r = bus_match_run(bus, &bus->match_callbacks, m);
1885                 if (r != 0)
1886                         return r;
1887
1888         } while (bus->match_callbacks_modified);
1889
1890         return 0;
1891 }
1892
1893 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1894         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1895         int r;
1896
1897         assert(bus);
1898         assert(m);
1899
1900         if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
1901                 return 0;
1902
1903         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1904                 return 0;
1905
1906         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1907                 return 1;
1908
1909         if (streq_ptr(m->member, "Ping"))
1910                 r = sd_bus_message_new_method_return(bus, m, &reply);
1911         else if (streq_ptr(m->member, "GetMachineId")) {
1912                 sd_id128_t id;
1913                 char sid[33];
1914
1915                 r = sd_id128_get_machine(&id);
1916                 if (r < 0)
1917                         return r;
1918
1919                 r = sd_bus_message_new_method_return(bus, m, &reply);
1920                 if (r < 0)
1921                         return r;
1922
1923                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1924         } else {
1925                 r = sd_bus_message_new_method_errorf(
1926                                 bus, m, &reply,
1927                                 SD_BUS_ERROR_UNKNOWN_METHOD,
1928                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1929         }
1930
1931         if (r < 0)
1932                 return r;
1933
1934         r = sd_bus_send(bus, reply, NULL);
1935         if (r < 0)
1936                 return r;
1937
1938         return 1;
1939 }
1940
1941 static int process_message(sd_bus *bus, sd_bus_message *m) {
1942         int r;
1943
1944         assert(bus);
1945         assert(m);
1946
1947         bus->current = m;
1948         bus->iteration_counter++;
1949
1950         log_debug("Got message sender=%s object=%s interface=%s member=%s",
1951                   strna(sd_bus_message_get_sender(m)),
1952                   strna(sd_bus_message_get_path(m)),
1953                   strna(sd_bus_message_get_interface(m)),
1954                   strna(sd_bus_message_get_member(m)));
1955
1956         r = process_hello(bus, m);
1957         if (r != 0)
1958                 goto finish;
1959
1960         r = process_reply(bus, m);
1961         if (r != 0)
1962                 goto finish;
1963
1964         r = process_filter(bus, m);
1965         if (r != 0)
1966                 goto finish;
1967
1968         r = process_match(bus, m);
1969         if (r != 0)
1970                 goto finish;
1971
1972         r = process_builtin(bus, m);
1973         if (r != 0)
1974                 goto finish;
1975
1976         r = bus_process_object(bus, m);
1977
1978 finish:
1979         bus->current = NULL;
1980         return r;
1981 }
1982
1983 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1984         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1985         int r;
1986
1987         assert(bus);
1988         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1989
1990         r = process_timeout(bus);
1991         if (r != 0)
1992                 goto null_message;
1993
1994         r = dispatch_wqueue(bus);
1995         if (r != 0)
1996                 goto null_message;
1997
1998         r = dispatch_rqueue(bus, &m);
1999         if (r < 0)
2000                 return r;
2001         if (!m)
2002                 goto null_message;
2003
2004         r = process_message(bus, m);
2005         if (r != 0)
2006                 goto null_message;
2007
2008         if (ret) {
2009                 r = sd_bus_message_rewind(m, true);
2010                 if (r < 0)
2011                         return r;
2012
2013                 *ret = m;
2014                 m = NULL;
2015                 return 1;
2016         }
2017
2018         if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2019
2020                 r = sd_bus_reply_method_errorf(
2021                                 bus, m,
2022                                 SD_BUS_ERROR_UNKNOWN_OBJECT,
2023                                 "Unknown object '%s'.", m->path);
2024                 if (r < 0)
2025                         return r;
2026         }
2027
2028         return 1;
2029
2030 null_message:
2031         if (r >= 0 && ret)
2032                 *ret = NULL;
2033
2034         return r;
2035 }
2036
2037 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2038         BUS_DONT_DESTROY(bus);
2039         int r;
2040
2041         /* Returns 0 when we didn't do anything. This should cause the
2042          * caller to invoke sd_bus_wait() before returning the next
2043          * time. Returns > 0 when we did something, which possibly
2044          * means *ret is filled in with an unprocessed message. */
2045
2046         assert_return(bus, -EINVAL);
2047         assert_return(!bus_pid_changed(bus), -ECHILD);
2048
2049         /* We don't allow recursively invoking sd_bus_process(). */
2050         assert_return(!bus->processing, -EBUSY);
2051
2052         switch (bus->state) {
2053
2054         case BUS_UNSET:
2055         case BUS_CLOSED:
2056                 return -ENOTCONN;
2057
2058         case BUS_OPENING:
2059                 r = bus_socket_process_opening(bus);
2060                 if (r < 0)
2061                         return r;
2062                 if (ret)
2063                         *ret = NULL;
2064                 return r;
2065
2066         case BUS_AUTHENTICATING:
2067
2068                 r = bus_socket_process_authenticating(bus);
2069                 if (r < 0)
2070                         return r;
2071                 if (ret)
2072                         *ret = NULL;
2073                 return r;
2074
2075         case BUS_RUNNING:
2076         case BUS_HELLO:
2077
2078                 bus->processing = true;
2079                 r = process_running(bus, ret);
2080                 bus->processing = false;
2081
2082                 return r;
2083         }
2084
2085         assert_not_reached("Unknown state");
2086 }
2087
2088 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2089         struct pollfd p[2] = {};
2090         int r, e, n;
2091         struct timespec ts;
2092         usec_t m = (usec_t) -1;
2093
2094         assert(bus);
2095         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2096
2097         e = sd_bus_get_events(bus);
2098         if (e < 0)
2099                 return e;
2100
2101         if (need_more)
2102                 /* The caller really needs some more data, he doesn't
2103                  * care about what's already read, or any timeouts
2104                  * except its own.*/
2105                 e |= POLLIN;
2106         else {
2107                 usec_t until;
2108                 /* The caller wants to process if there's something to
2109                  * process, but doesn't care otherwise */
2110
2111                 r = sd_bus_get_timeout(bus, &until);
2112                 if (r < 0)
2113                         return r;
2114                 if (r > 0) {
2115                         usec_t nw;
2116                         nw = now(CLOCK_MONOTONIC);
2117                         m = until > nw ? until - nw : 0;
2118                 }
2119         }
2120
2121         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2122                 m = timeout_usec;
2123
2124         p[0].fd = bus->input_fd;
2125         if (bus->output_fd == bus->input_fd) {
2126                 p[0].events = e;
2127                 n = 1;
2128         } else {
2129                 p[0].events = e & POLLIN;
2130                 p[1].fd = bus->output_fd;
2131                 p[1].events = e & POLLOUT;
2132                 n = 2;
2133         }
2134
2135         r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2136         if (r < 0)
2137                 return -errno;
2138
2139         return r > 0 ? 1 : 0;
2140 }
2141
2142 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2143
2144         assert_return(bus, -EINVAL);
2145         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2146         assert_return(!bus_pid_changed(bus), -ECHILD);
2147
2148         if (bus->rqueue_size > 0)
2149                 return 0;
2150
2151         return bus_poll(bus, false, timeout_usec);
2152 }
2153
2154 _public_ int sd_bus_flush(sd_bus *bus) {
2155         int r;
2156
2157         assert_return(bus, -EINVAL);
2158         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2159         assert_return(!bus_pid_changed(bus), -ECHILD);
2160
2161         r = bus_ensure_running(bus);
2162         if (r < 0)
2163                 return r;
2164
2165         if (bus->wqueue_size <= 0)
2166                 return 0;
2167
2168         for (;;) {
2169                 r = dispatch_wqueue(bus);
2170                 if (r < 0)
2171                         return r;
2172
2173                 if (bus->wqueue_size <= 0)
2174                         return 0;
2175
2176                 r = bus_poll(bus, false, (uint64_t) -1);
2177                 if (r < 0)
2178                         return r;
2179         }
2180 }
2181
2182 _public_ int sd_bus_add_filter(sd_bus *bus,
2183                                sd_bus_message_handler_t callback,
2184                                void *userdata) {
2185
2186         struct filter_callback *f;
2187
2188         assert_return(bus, -EINVAL);
2189         assert_return(callback, -EINVAL);
2190         assert_return(!bus_pid_changed(bus), -ECHILD);
2191
2192         f = new0(struct filter_callback, 1);
2193         if (!f)
2194                 return -ENOMEM;
2195         f->callback = callback;
2196         f->userdata = userdata;
2197
2198         bus->filter_callbacks_modified = true;
2199         LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2200         return 0;
2201 }
2202
2203 _public_ int sd_bus_remove_filter(sd_bus *bus,
2204                                   sd_bus_message_handler_t callback,
2205                                   void *userdata) {
2206
2207         struct filter_callback *f;
2208
2209         assert_return(bus, -EINVAL);
2210         assert_return(callback, -EINVAL);
2211         assert_return(!bus_pid_changed(bus), -ECHILD);
2212
2213         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2214                 if (f->callback == callback && f->userdata == userdata) {
2215                         bus->filter_callbacks_modified = true;
2216                         LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2217                         free(f);
2218                         return 1;
2219                 }
2220         }
2221
2222         return 0;
2223 }
2224
2225 _public_ int sd_bus_add_match(sd_bus *bus,
2226                               const char *match,
2227                               sd_bus_message_handler_t callback,
2228                               void *userdata) {
2229
2230         struct bus_match_component *components = NULL;
2231         unsigned n_components = 0;
2232         uint64_t cookie = 0;
2233         int r = 0;
2234
2235         assert_return(bus, -EINVAL);
2236         assert_return(match, -EINVAL);
2237         assert_return(!bus_pid_changed(bus), -ECHILD);
2238
2239         r = bus_match_parse(match, &components, &n_components);
2240         if (r < 0)
2241                 goto finish;
2242
2243         if (bus->bus_client) {
2244                 cookie = ++bus->match_cookie;
2245
2246                 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2247                 if (r < 0)
2248                         goto finish;
2249         }
2250
2251         bus->match_callbacks_modified = true;
2252         r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2253         if (r < 0) {
2254                 if (bus->bus_client)
2255                         bus_remove_match_internal(bus, match, cookie);
2256         }
2257
2258 finish:
2259         bus_match_parse_free(components, n_components);
2260         return r;
2261 }
2262
2263 _public_ int sd_bus_remove_match(sd_bus *bus,
2264                                  const char *match,
2265                                  sd_bus_message_handler_t callback,
2266                                  void *userdata) {
2267
2268         struct bus_match_component *components = NULL;
2269         unsigned n_components = 0;
2270         int r = 0, q = 0;
2271         uint64_t cookie = 0;
2272
2273         assert_return(bus, -EINVAL);
2274         assert_return(match, -EINVAL);
2275         assert_return(!bus_pid_changed(bus), -ECHILD);
2276
2277         r = bus_match_parse(match, &components, &n_components);
2278         if (r < 0)
2279                 return r;
2280
2281         bus->match_callbacks_modified = true;
2282         r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2283
2284         if (bus->bus_client)
2285                 q = bus_remove_match_internal(bus, match, cookie);
2286
2287         bus_match_parse_free(components, n_components);
2288
2289         return r < 0 ? r : q;
2290 }
2291
2292 bool bus_pid_changed(sd_bus *bus) {
2293         assert(bus);
2294
2295         /* We don't support people creating a bus connection and
2296          * keeping it around over a fork(). Let's complain. */
2297
2298         return bus->original_pid != getpid();
2299 }
2300
2301 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2302         sd_bus *bus = userdata;
2303         int r;
2304
2305         assert(bus);
2306
2307         r = sd_bus_process(bus, NULL);
2308         if (r < 0)
2309                 return r;
2310
2311         return 1;
2312 }
2313
2314 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2315         sd_bus *bus = userdata;
2316         int r;
2317
2318         assert(bus);
2319
2320         r = sd_bus_process(bus, NULL);
2321         if (r < 0)
2322                 return r;
2323
2324         return 1;
2325 }
2326
2327 static int prepare_callback(sd_event_source *s, void *userdata) {
2328         sd_bus *bus = userdata;
2329         int r, e;
2330         usec_t until;
2331
2332         assert(s);
2333         assert(bus);
2334
2335         e = sd_bus_get_events(bus);
2336         if (e < 0)
2337                 return e;
2338
2339         if (bus->output_fd != bus->input_fd) {
2340
2341                 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2342                 if (r < 0)
2343                         return r;
2344
2345                 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2346                 if (r < 0)
2347                         return r;
2348         } else {
2349                 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2350                 if (r < 0)
2351                         return r;
2352         }
2353
2354         r = sd_bus_get_timeout(bus, &until);
2355         if (r < 0)
2356                 return r;
2357         if (r > 0) {
2358                 int j;
2359
2360                 j = sd_event_source_set_time(bus->time_event_source, until);
2361                 if (j < 0)
2362                         return j;
2363         }
2364
2365         r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2366         if (r < 0)
2367                 return r;
2368
2369         return 1;
2370 }
2371
2372 static int quit_callback(sd_event_source *event, void *userdata) {
2373         sd_bus *bus = userdata;
2374
2375         assert(event);
2376
2377         sd_bus_flush(bus);
2378
2379         return 1;
2380 }
2381
2382 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2383         int r;
2384
2385         assert_return(bus, -EINVAL);
2386         assert_return(!bus->event, -EBUSY);
2387
2388         assert(!bus->input_io_event_source);
2389         assert(!bus->output_io_event_source);
2390         assert(!bus->time_event_source);
2391
2392         if (event)
2393                 bus->event = sd_event_ref(event);
2394         else  {
2395                 r = sd_event_default(&bus->event);
2396                 if (r < 0)
2397                         return r;
2398         }
2399
2400         r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2401         if (r < 0)
2402                 goto fail;
2403
2404         r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2405         if (r < 0)
2406                 goto fail;
2407
2408         if (bus->output_fd != bus->input_fd) {
2409                 r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2410                 if (r < 0)
2411                         goto fail;
2412
2413                 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2414                 if (r < 0)
2415                         goto fail;
2416         }
2417
2418         r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2419         if (r < 0)
2420                 goto fail;
2421
2422         r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
2423         if (r < 0)
2424                 goto fail;
2425
2426         r = sd_event_source_set_priority(bus->time_event_source, priority);
2427         if (r < 0)
2428                 goto fail;
2429
2430         r = sd_event_add_quit(bus->event, quit_callback, bus, &bus->quit_event_source);
2431         if (r < 0)
2432                 goto fail;
2433
2434         return 0;
2435
2436 fail:
2437         sd_bus_detach_event(bus);
2438         return r;
2439 }
2440
2441 _public_ int sd_bus_detach_event(sd_bus *bus) {
2442         assert_return(bus, -EINVAL);
2443         assert_return(bus->event, -ENXIO);
2444
2445         if (bus->input_io_event_source)
2446                 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2447
2448         if (bus->output_io_event_source)
2449                 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2450
2451         if (bus->time_event_source)
2452                 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2453
2454         if (bus->quit_event_source)
2455                 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2456
2457         if (bus->event)
2458                 bus->event = sd_event_unref(bus->event);
2459
2460         return 0;
2461 }
2462
2463 _public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2464         assert_return(bus, NULL);
2465
2466         return bus->current;
2467 }
2468
2469 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
2470         sd_bus *b = NULL;
2471         int r;
2472
2473         assert(bus_open);
2474         assert(default_bus);
2475
2476         if (!ret)
2477                 return !!*default_bus;
2478
2479         if (*default_bus) {
2480                 *ret = sd_bus_ref(*default_bus);
2481                 return 0;
2482         }
2483
2484         r = bus_open(&b);
2485         if (r < 0)
2486                 return r;
2487
2488         b->default_bus_ptr = default_bus;
2489         b->tid = gettid();
2490         *default_bus = b;
2491
2492         *ret = b;
2493         return 1;
2494 }
2495
2496 _public_ int sd_bus_default_system(sd_bus **ret) {
2497         static __thread sd_bus *default_system_bus = NULL;
2498
2499         return bus_default(sd_bus_open_system, &default_system_bus, ret);
2500 }
2501
2502 _public_ int sd_bus_default_user(sd_bus **ret) {
2503         static __thread sd_bus *default_user_bus = NULL;
2504
2505         return bus_default(sd_bus_open_user, &default_user_bus, ret);
2506 }
2507
2508 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
2509         assert_return(b, -EINVAL);
2510         assert_return(tid, -EINVAL);
2511         assert_return(!bus_pid_changed(b), -ECHILD);
2512
2513         if (b->tid != 0) {
2514                 *tid = b->tid;
2515                 return 0;
2516         }
2517
2518         if (b->event)
2519                 return sd_event_get_tid(b->event, tid);
2520
2521         return -ENXIO;
2522 }