chiark / gitweb /
bus: introduce concept of a default bus for each thread and make use of it everywhere
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29 #include <sys/mman.h>
30 #include <pthread.h>
31
32 #include "util.h"
33 #include "macro.h"
34 #include "strv.h"
35 #include "set.h"
36 #include "missing.h"
37
38 #include "sd-bus.h"
39 #include "bus-internal.h"
40 #include "bus-message.h"
41 #include "bus-type.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
48 #include "bus-util.h"
49 #include "bus-container.h"
50
51 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
52
53 static void bus_close_fds(sd_bus *b) {
54         assert(b);
55
56         if (b->input_fd >= 0)
57                 close_nointr_nofail(b->input_fd);
58
59         if (b->output_fd >= 0 && b->output_fd != b->input_fd)
60                 close_nointr_nofail(b->output_fd);
61
62         b->input_fd = b->output_fd = -1;
63 }
64
65 static void bus_node_destroy(sd_bus *b, struct node *n) {
66         struct node_callback *c;
67         struct node_vtable *v;
68         struct node_enumerator *e;
69
70         assert(b);
71
72         if (!n)
73                 return;
74
75         while (n->child)
76                 bus_node_destroy(b, n->child);
77
78         while ((c = n->callbacks)) {
79                 LIST_REMOVE(callbacks, n->callbacks, c);
80                 free(c);
81         }
82
83         while ((v = n->vtables)) {
84                 LIST_REMOVE(vtables, n->vtables, v);
85                 free(v->interface);
86                 free(v);
87         }
88
89         while ((e = n->enumerators)) {
90                 LIST_REMOVE(enumerators, n->enumerators, e);
91                 free(e);
92         }
93
94         if (n->parent)
95                 LIST_REMOVE(siblings, n->parent->child, n);
96
97         assert_se(hashmap_remove(b->nodes, n->path) == n);
98         free(n->path);
99         free(n);
100 }
101
102 static void bus_free(sd_bus *b) {
103         struct filter_callback *f;
104         struct node *n;
105         unsigned i;
106
107         assert(b);
108
109         sd_bus_detach_event(b);
110
111         bus_close_fds(b);
112
113         if (b->kdbus_buffer)
114                 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
115
116         free(b->rbuffer);
117         free(b->unique_name);
118         free(b->auth_buffer);
119         free(b->address);
120         free(b->kernel);
121         free(b->machine);
122
123         free(b->exec_path);
124         strv_free(b->exec_argv);
125
126         close_many(b->fds, b->n_fds);
127         free(b->fds);
128
129         for (i = 0; i < b->rqueue_size; i++)
130                 sd_bus_message_unref(b->rqueue[i]);
131         free(b->rqueue);
132
133         for (i = 0; i < b->wqueue_size; i++)
134                 sd_bus_message_unref(b->wqueue[i]);
135         free(b->wqueue);
136
137         hashmap_free_free(b->reply_callbacks);
138         prioq_free(b->reply_callbacks_prioq);
139
140         while ((f = b->filter_callbacks)) {
141                 LIST_REMOVE(callbacks, b->filter_callbacks, f);
142                 free(f);
143         }
144
145         bus_match_free(&b->match_callbacks);
146
147         hashmap_free_free(b->vtable_methods);
148         hashmap_free_free(b->vtable_properties);
149
150         while ((n = hashmap_first(b->nodes)))
151                 bus_node_destroy(b, n);
152
153         hashmap_free(b->nodes);
154
155         bus_kernel_flush_memfd(b);
156
157         assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
158
159         free(b);
160 }
161
162 _public_ int sd_bus_new(sd_bus **ret) {
163         sd_bus *r;
164
165         assert_return(ret, -EINVAL);
166
167         r = new0(sd_bus, 1);
168         if (!r)
169                 return -ENOMEM;
170
171         r->n_ref = REFCNT_INIT;
172         r->input_fd = r->output_fd = -1;
173         r->message_version = 1;
174         r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
175         r->original_pid = getpid();
176
177         assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
178
179         /* We guarantee that wqueue always has space for at least one
180          * entry */
181         r->wqueue = new(sd_bus_message*, 1);
182         if (!r->wqueue) {
183                 free(r);
184                 return -ENOMEM;
185         }
186
187         *ret = r;
188         return 0;
189 }
190
191 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
192         char *a;
193
194         assert_return(bus, -EINVAL);
195         assert_return(bus->state == BUS_UNSET, -EPERM);
196         assert_return(address, -EINVAL);
197         assert_return(!bus_pid_changed(bus), -ECHILD);
198
199         a = strdup(address);
200         if (!a)
201                 return -ENOMEM;
202
203         free(bus->address);
204         bus->address = a;
205
206         return 0;
207 }
208
209 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
210         assert_return(bus, -EINVAL);
211         assert_return(bus->state == BUS_UNSET, -EPERM);
212         assert_return(input_fd >= 0, -EINVAL);
213         assert_return(output_fd >= 0, -EINVAL);
214         assert_return(!bus_pid_changed(bus), -ECHILD);
215
216         bus->input_fd = input_fd;
217         bus->output_fd = output_fd;
218         return 0;
219 }
220
221 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
222         char *p, **a;
223
224         assert_return(bus, -EINVAL);
225         assert_return(bus->state == BUS_UNSET, -EPERM);
226         assert_return(path, -EINVAL);
227         assert_return(!strv_isempty(argv), -EINVAL);
228         assert_return(!bus_pid_changed(bus), -ECHILD);
229
230         p = strdup(path);
231         if (!p)
232                 return -ENOMEM;
233
234         a = strv_copy(argv);
235         if (!a) {
236                 free(p);
237                 return -ENOMEM;
238         }
239
240         free(bus->exec_path);
241         strv_free(bus->exec_argv);
242
243         bus->exec_path = p;
244         bus->exec_argv = a;
245
246         return 0;
247 }
248
249 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
250         assert_return(bus, -EINVAL);
251         assert_return(bus->state == BUS_UNSET, -EPERM);
252         assert_return(!bus_pid_changed(bus), -ECHILD);
253
254         bus->bus_client = !!b;
255         return 0;
256 }
257
258 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
259         assert_return(bus, -EINVAL);
260         assert_return(bus->state == BUS_UNSET, -EPERM);
261         assert_return(!bus_pid_changed(bus), -ECHILD);
262
263         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
264         return 0;
265 }
266
267 _public_ int sd_bus_negotiate_attach_comm(sd_bus *bus, int b) {
268         assert_return(bus, -EINVAL);
269         assert_return(bus->state == BUS_UNSET, -EPERM);
270         assert_return(!bus_pid_changed(bus), -ECHILD);
271
272         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_COMM, b);
273         return 0;
274 }
275
276 _public_ int sd_bus_negotiate_attach_exe(sd_bus *bus, int b) {
277         assert_return(bus, -EINVAL);
278         assert_return(bus->state == BUS_UNSET, -EPERM);
279         assert_return(!bus_pid_changed(bus), -ECHILD);
280
281         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_EXE, b);
282         return 0;
283 }
284
285 _public_ int sd_bus_negotiate_attach_cmdline(sd_bus *bus, int b) {
286         assert_return(bus, -EINVAL);
287         assert_return(bus->state == BUS_UNSET, -EPERM);
288         assert_return(!bus_pid_changed(bus), -ECHILD);
289
290         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CMDLINE, b);
291         return 0;
292 }
293
294 _public_ int sd_bus_negotiate_attach_cgroup(sd_bus *bus, int b) {
295         assert_return(bus, -EINVAL);
296         assert_return(bus->state == BUS_UNSET, -EPERM);
297         assert_return(!bus_pid_changed(bus), -ECHILD);
298
299         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CGROUP, b);
300         return 0;
301 }
302
303 _public_ int sd_bus_negotiate_attach_caps(sd_bus *bus, int b) {
304         assert_return(bus, -EINVAL);
305         assert_return(bus->state == BUS_UNSET, -EPERM);
306         assert_return(!bus_pid_changed(bus), -ECHILD);
307
308         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CAPS, b);
309         return 0;
310 }
311
312 _public_ int sd_bus_negotiate_attach_selinux_context(sd_bus *bus, int b) {
313         assert_return(bus, -EINVAL);
314         assert_return(bus->state == BUS_UNSET, -EPERM);
315         assert_return(!bus_pid_changed(bus), -ECHILD);
316
317         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_SECLABEL, b);
318         return 0;
319 }
320
321 _public_ int sd_bus_negotiate_attach_audit(sd_bus *bus, int b) {
322         assert_return(bus, -EINVAL);
323         assert_return(bus->state == BUS_UNSET, -EPERM);
324         assert_return(!bus_pid_changed(bus), -ECHILD);
325
326         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_AUDIT, b);
327         return 0;
328 }
329
330 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
331         assert_return(bus, -EINVAL);
332         assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
333         assert_return(bus->state == BUS_UNSET, -EPERM);
334         assert_return(!bus_pid_changed(bus), -ECHILD);
335
336         bus->is_server = !!b;
337         bus->server_id = server_id;
338         return 0;
339 }
340
341 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
342         assert_return(bus, -EINVAL);
343         assert_return(bus->state == BUS_UNSET, -EPERM);
344         assert_return(!bus_pid_changed(bus), -ECHILD);
345
346         bus->anonymous_auth = !!b;
347         return 0;
348 }
349
350 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
351         const char *s;
352         int r;
353
354         assert(bus);
355         assert(bus->state == BUS_HELLO);
356         assert(reply);
357
358         r = sd_bus_message_get_errno(reply);
359         if (r < 0)
360                 return r;
361         if (r > 0)
362                 return -r;
363
364         r = sd_bus_message_read(reply, "s", &s);
365         if (r < 0)
366                 return r;
367
368         if (!service_name_is_valid(s) || s[0] != ':')
369                 return -EBADMSG;
370
371         bus->unique_name = strdup(s);
372         if (!bus->unique_name)
373                 return -ENOMEM;
374
375         bus->state = BUS_RUNNING;
376
377         return 1;
378 }
379
380 static int bus_send_hello(sd_bus *bus) {
381         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
382         int r;
383
384         assert(bus);
385
386         if (!bus->bus_client || bus->is_kernel)
387                 return 0;
388
389         r = sd_bus_message_new_method_call(
390                         bus,
391                         "org.freedesktop.DBus",
392                         "/",
393                         "org.freedesktop.DBus",
394                         "Hello",
395                         &m);
396         if (r < 0)
397                 return r;
398
399         return sd_bus_call_async(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
400 }
401
402 int bus_start_running(sd_bus *bus) {
403         assert(bus);
404
405         if (bus->bus_client && !bus->is_kernel) {
406                 bus->state = BUS_HELLO;
407                 return 1;
408         }
409
410         bus->state = BUS_RUNNING;
411         return 1;
412 }
413
414 static int parse_address_key(const char **p, const char *key, char **value) {
415         size_t l, n = 0;
416         const char *a;
417         char *r = NULL;
418
419         assert(p);
420         assert(*p);
421         assert(value);
422
423         if (key) {
424                 l = strlen(key);
425                 if (strncmp(*p, key, l) != 0)
426                         return 0;
427
428                 if ((*p)[l] != '=')
429                         return 0;
430
431                 if (*value)
432                         return -EINVAL;
433
434                 a = *p + l + 1;
435         } else
436                 a = *p;
437
438         while (*a != ';' && *a != ',' && *a != 0) {
439                 char c, *t;
440
441                 if (*a == '%') {
442                         int x, y;
443
444                         x = unhexchar(a[1]);
445                         if (x < 0) {
446                                 free(r);
447                                 return x;
448                         }
449
450                         y = unhexchar(a[2]);
451                         if (y < 0) {
452                                 free(r);
453                                 return y;
454                         }
455
456                         c = (char) ((x << 4) | y);
457                         a += 3;
458                 } else {
459                         c = *a;
460                         a++;
461                 }
462
463                 t = realloc(r, n + 2);
464                 if (!t) {
465                         free(r);
466                         return -ENOMEM;
467                 }
468
469                 r = t;
470                 r[n++] = c;
471         }
472
473         if (!r) {
474                 r = strdup("");
475                 if (!r)
476                         return -ENOMEM;
477         } else
478                 r[n] = 0;
479
480         if (*a == ',')
481                 a++;
482
483         *p = a;
484
485         free(*value);
486         *value = r;
487
488         return 1;
489 }
490
491 static void skip_address_key(const char **p) {
492         assert(p);
493         assert(*p);
494
495         *p += strcspn(*p, ",");
496
497         if (**p == ',')
498                 (*p) ++;
499 }
500
501 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
502         _cleanup_free_ char *path = NULL, *abstract = NULL;
503         size_t l;
504         int r;
505
506         assert(b);
507         assert(p);
508         assert(*p);
509         assert(guid);
510
511         while (**p != 0 && **p != ';') {
512                 r = parse_address_key(p, "guid", guid);
513                 if (r < 0)
514                         return r;
515                 else if (r > 0)
516                         continue;
517
518                 r = parse_address_key(p, "path", &path);
519                 if (r < 0)
520                         return r;
521                 else if (r > 0)
522                         continue;
523
524                 r = parse_address_key(p, "abstract", &abstract);
525                 if (r < 0)
526                         return r;
527                 else if (r > 0)
528                         continue;
529
530                 skip_address_key(p);
531         }
532
533         if (!path && !abstract)
534                 return -EINVAL;
535
536         if (path && abstract)
537                 return -EINVAL;
538
539         if (path) {
540                 l = strlen(path);
541                 if (l > sizeof(b->sockaddr.un.sun_path))
542                         return -E2BIG;
543
544                 b->sockaddr.un.sun_family = AF_UNIX;
545                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
546                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
547         } else if (abstract) {
548                 l = strlen(abstract);
549                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
550                         return -E2BIG;
551
552                 b->sockaddr.un.sun_family = AF_UNIX;
553                 b->sockaddr.un.sun_path[0] = 0;
554                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
555                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
556         }
557
558         return 0;
559 }
560
561 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
562         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
563         int r;
564         struct addrinfo *result, hints = {
565                 .ai_socktype = SOCK_STREAM,
566                 .ai_flags = AI_ADDRCONFIG,
567         };
568
569         assert(b);
570         assert(p);
571         assert(*p);
572         assert(guid);
573
574         while (**p != 0 && **p != ';') {
575                 r = parse_address_key(p, "guid", guid);
576                 if (r < 0)
577                         return r;
578                 else if (r > 0)
579                         continue;
580
581                 r = parse_address_key(p, "host", &host);
582                 if (r < 0)
583                         return r;
584                 else if (r > 0)
585                         continue;
586
587                 r = parse_address_key(p, "port", &port);
588                 if (r < 0)
589                         return r;
590                 else if (r > 0)
591                         continue;
592
593                 r = parse_address_key(p, "family", &family);
594                 if (r < 0)
595                         return r;
596                 else if (r > 0)
597                         continue;
598
599                 skip_address_key(p);
600         }
601
602         if (!host || !port)
603                 return -EINVAL;
604
605         if (family) {
606                 if (streq(family, "ipv4"))
607                         hints.ai_family = AF_INET;
608                 else if (streq(family, "ipv6"))
609                         hints.ai_family = AF_INET6;
610                 else
611                         return -EINVAL;
612         }
613
614         r = getaddrinfo(host, port, &hints, &result);
615         if (r == EAI_SYSTEM)
616                 return -errno;
617         else if (r != 0)
618                 return -EADDRNOTAVAIL;
619
620         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
621         b->sockaddr_size = result->ai_addrlen;
622
623         freeaddrinfo(result);
624
625         return 0;
626 }
627
628 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
629         char *path = NULL;
630         unsigned n_argv = 0, j;
631         char **argv = NULL;
632         int r;
633
634         assert(b);
635         assert(p);
636         assert(*p);
637         assert(guid);
638
639         while (**p != 0 && **p != ';') {
640                 r = parse_address_key(p, "guid", guid);
641                 if (r < 0)
642                         goto fail;
643                 else if (r > 0)
644                         continue;
645
646                 r = parse_address_key(p, "path", &path);
647                 if (r < 0)
648                         goto fail;
649                 else if (r > 0)
650                         continue;
651
652                 if (startswith(*p, "argv")) {
653                         unsigned ul;
654
655                         errno = 0;
656                         ul = strtoul(*p + 4, (char**) p, 10);
657                         if (errno > 0 || **p != '=' || ul > 256) {
658                                 r = -EINVAL;
659                                 goto fail;
660                         }
661
662                         (*p) ++;
663
664                         if (ul >= n_argv) {
665                                 char **x;
666
667                                 x = realloc(argv, sizeof(char*) * (ul + 2));
668                                 if (!x) {
669                                         r = -ENOMEM;
670                                         goto fail;
671                                 }
672
673                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
674
675                                 argv = x;
676                                 n_argv = ul + 1;
677                         }
678
679                         r = parse_address_key(p, NULL, argv + ul);
680                         if (r < 0)
681                                 goto fail;
682
683                         continue;
684                 }
685
686                 skip_address_key(p);
687         }
688
689         if (!path) {
690                 r = -EINVAL;
691                 goto fail;
692         }
693
694         /* Make sure there are no holes in the array, with the
695          * exception of argv[0] */
696         for (j = 1; j < n_argv; j++)
697                 if (!argv[j]) {
698                         r = -EINVAL;
699                         goto fail;
700                 }
701
702         if (argv && argv[0] == NULL) {
703                 argv[0] = strdup(path);
704                 if (!argv[0]) {
705                         r = -ENOMEM;
706                         goto fail;
707                 }
708         }
709
710         b->exec_path = path;
711         b->exec_argv = argv;
712         return 0;
713
714 fail:
715         for (j = 0; j < n_argv; j++)
716                 free(argv[j]);
717
718         free(argv);
719         free(path);
720         return r;
721 }
722
723 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
724         _cleanup_free_ char *path = NULL;
725         int r;
726
727         assert(b);
728         assert(p);
729         assert(*p);
730         assert(guid);
731
732         while (**p != 0 && **p != ';') {
733                 r = parse_address_key(p, "guid", guid);
734                 if (r < 0)
735                         return r;
736                 else if (r > 0)
737                         continue;
738
739                 r = parse_address_key(p, "path", &path);
740                 if (r < 0)
741                         return r;
742                 else if (r > 0)
743                         continue;
744
745                 skip_address_key(p);
746         }
747
748         if (!path)
749                 return -EINVAL;
750
751         free(b->kernel);
752         b->kernel = path;
753         path = NULL;
754
755         return 0;
756 }
757
758 static int parse_container_address(sd_bus *b, const char **p, char **guid) {
759         _cleanup_free_ char *machine = NULL;
760         int r;
761
762         assert(b);
763         assert(p);
764         assert(*p);
765         assert(guid);
766
767         while (**p != 0 && **p != ';') {
768                 r = parse_address_key(p, "guid", guid);
769                 if (r < 0)
770                         return r;
771                 else if (r > 0)
772                         continue;
773
774                 r = parse_address_key(p, "machine", &machine);
775                 if (r < 0)
776                         return r;
777                 else if (r > 0)
778                         continue;
779
780                 skip_address_key(p);
781         }
782
783         if (!machine)
784                 return -EINVAL;
785
786         free(b->machine);
787         b->machine = machine;
788         machine = NULL;
789
790         b->sockaddr.un.sun_family = AF_UNIX;
791         strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
792         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
793
794         return 0;
795 }
796
797 static void bus_reset_parsed_address(sd_bus *b) {
798         assert(b);
799
800         zero(b->sockaddr);
801         b->sockaddr_size = 0;
802         strv_free(b->exec_argv);
803         free(b->exec_path);
804         b->exec_path = NULL;
805         b->exec_argv = NULL;
806         b->server_id = SD_ID128_NULL;
807         free(b->kernel);
808         b->kernel = NULL;
809         free(b->machine);
810         b->machine = NULL;
811 }
812
813 static int bus_parse_next_address(sd_bus *b) {
814         _cleanup_free_ char *guid = NULL;
815         const char *a;
816         int r;
817
818         assert(b);
819
820         if (!b->address)
821                 return 0;
822         if (b->address[b->address_index] == 0)
823                 return 0;
824
825         bus_reset_parsed_address(b);
826
827         a = b->address + b->address_index;
828
829         while (*a != 0) {
830
831                 if (*a == ';') {
832                         a++;
833                         continue;
834                 }
835
836                 if (startswith(a, "unix:")) {
837                         a += 5;
838
839                         r = parse_unix_address(b, &a, &guid);
840                         if (r < 0)
841                                 return r;
842                         break;
843
844                 } else if (startswith(a, "tcp:")) {
845
846                         a += 4;
847                         r = parse_tcp_address(b, &a, &guid);
848                         if (r < 0)
849                                 return r;
850
851                         break;
852
853                 } else if (startswith(a, "unixexec:")) {
854
855                         a += 9;
856                         r = parse_exec_address(b, &a, &guid);
857                         if (r < 0)
858                                 return r;
859
860                         break;
861
862                 } else if (startswith(a, "kernel:")) {
863
864                         a += 7;
865                         r = parse_kernel_address(b, &a, &guid);
866                         if (r < 0)
867                                 return r;
868
869                         break;
870                 } else if (startswith(a, "x-container:")) {
871
872                         a += 12;
873                         r = parse_container_address(b, &a, &guid);
874                         if (r < 0)
875                                 return r;
876
877                         break;
878                 }
879
880                 a = strchr(a, ';');
881                 if (!a)
882                         return 0;
883         }
884
885         if (guid) {
886                 r = sd_id128_from_string(guid, &b->server_id);
887                 if (r < 0)
888                         return r;
889         }
890
891         b->address_index = a - b->address;
892         return 1;
893 }
894
895 static int bus_start_address(sd_bus *b) {
896         int r;
897
898         assert(b);
899
900         for (;;) {
901                 sd_bus_close(b);
902
903                 if (b->exec_path) {
904
905                         r = bus_socket_exec(b);
906                         if (r >= 0)
907                                 return r;
908
909                         b->last_connect_error = -r;
910                 } else if (b->kernel) {
911
912                         r = bus_kernel_connect(b);
913                         if (r >= 0)
914                                 return r;
915
916                         b->last_connect_error = -r;
917
918                 } else if (b->machine) {
919
920                         r = bus_container_connect(b);
921                         if (r >= 0)
922                                 return r;
923
924                         b->last_connect_error = -r;
925
926                 } else if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
927
928                         r = bus_socket_connect(b);
929                         if (r >= 0)
930                                 return r;
931
932                         b->last_connect_error = -r;
933                 }
934
935                 r = bus_parse_next_address(b);
936                 if (r < 0)
937                         return r;
938                 if (r == 0)
939                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
940         }
941 }
942
943 int bus_next_address(sd_bus *b) {
944         assert(b);
945
946         bus_reset_parsed_address(b);
947         return bus_start_address(b);
948 }
949
950 static int bus_start_fd(sd_bus *b) {
951         struct stat st;
952         int r;
953
954         assert(b);
955         assert(b->input_fd >= 0);
956         assert(b->output_fd >= 0);
957
958         r = fd_nonblock(b->input_fd, true);
959         if (r < 0)
960                 return r;
961
962         r = fd_cloexec(b->input_fd, true);
963         if (r < 0)
964                 return r;
965
966         if (b->input_fd != b->output_fd) {
967                 r = fd_nonblock(b->output_fd, true);
968                 if (r < 0)
969                         return r;
970
971                 r = fd_cloexec(b->output_fd, true);
972                 if (r < 0)
973                         return r;
974         }
975
976         if (fstat(b->input_fd, &st) < 0)
977                 return -errno;
978
979         if (S_ISCHR(b->input_fd))
980                 return bus_kernel_take_fd(b);
981         else
982                 return bus_socket_take_fd(b);
983 }
984
985 _public_ int sd_bus_start(sd_bus *bus) {
986         int r;
987
988         assert_return(bus, -EINVAL);
989         assert_return(bus->state == BUS_UNSET, -EPERM);
990         assert_return(!bus_pid_changed(bus), -ECHILD);
991
992         bus->state = BUS_OPENING;
993
994         if (bus->is_server && bus->bus_client)
995                 return -EINVAL;
996
997         if (bus->input_fd >= 0)
998                 r = bus_start_fd(bus);
999         else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
1000                 r = bus_start_address(bus);
1001         else
1002                 return -EINVAL;
1003
1004         if (r < 0)
1005                 return r;
1006
1007         return bus_send_hello(bus);
1008 }
1009
1010 _public_ int sd_bus_open_system(sd_bus **ret) {
1011         const char *e;
1012         sd_bus *b;
1013         int r;
1014
1015         assert_return(ret, -EINVAL);
1016
1017         r = sd_bus_new(&b);
1018         if (r < 0)
1019                 return r;
1020
1021         e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1022         if (e) {
1023                 r = sd_bus_set_address(b, e);
1024                 if (r < 0)
1025                         goto fail;
1026         } else {
1027                 b->sockaddr.un.sun_family = AF_UNIX;
1028                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
1029                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
1030         }
1031
1032         b->bus_client = true;
1033
1034         r = sd_bus_start(b);
1035         if (r < 0)
1036                 goto fail;
1037
1038         *ret = b;
1039         return 0;
1040
1041 fail:
1042         bus_free(b);
1043         return r;
1044 }
1045
1046 _public_ int sd_bus_open_user(sd_bus **ret) {
1047         const char *e;
1048         sd_bus *b;
1049         size_t l;
1050         int r;
1051
1052         assert_return(ret, -EINVAL);
1053
1054         r = sd_bus_new(&b);
1055         if (r < 0)
1056                 return r;
1057
1058         e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1059         if (e) {
1060                 r = sd_bus_set_address(b, e);
1061                 if (r < 0)
1062                         goto fail;
1063         } else {
1064                 e = secure_getenv("XDG_RUNTIME_DIR");
1065                 if (!e) {
1066                         r = -ENOENT;
1067                         goto fail;
1068                 }
1069
1070                 l = strlen(e);
1071                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1072                         r = -E2BIG;
1073                         goto fail;
1074                 }
1075
1076                 b->sockaddr.un.sun_family = AF_UNIX;
1077                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1078                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1079         }
1080
1081         b->bus_client = true;
1082
1083         r = sd_bus_start(b);
1084         if (r < 0)
1085                 goto fail;
1086
1087         *ret = b;
1088         return 0;
1089
1090 fail:
1091         bus_free(b);
1092         return r;
1093 }
1094
1095 _public_ int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1096         _cleanup_free_ char *e = NULL;
1097         char *p = NULL;
1098         sd_bus *bus;
1099         int r;
1100
1101         assert_return(host, -EINVAL);
1102         assert_return(ret, -EINVAL);
1103
1104         e = bus_address_escape(host);
1105         if (!e)
1106                 return -ENOMEM;
1107
1108         p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1109         if (!p)
1110                 return -ENOMEM;
1111
1112         r = sd_bus_new(&bus);
1113         if (r < 0) {
1114                 free(p);
1115                 return r;
1116         }
1117
1118         bus->address = p;
1119         bus->bus_client = true;
1120
1121         r = sd_bus_start(bus);
1122         if (r < 0) {
1123                 bus_free(bus);
1124                 return r;
1125         }
1126
1127         *ret = bus;
1128         return 0;
1129 }
1130
1131 _public_ int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1132         _cleanup_free_ char *e = NULL;
1133         sd_bus *bus;
1134         char *p;
1135         int r;
1136
1137         assert_return(machine, -EINVAL);
1138         assert_return(ret, -EINVAL);
1139
1140         e = bus_address_escape(machine);
1141         if (!e)
1142                 return -ENOMEM;
1143
1144         p = strjoin("x-container:machine=", e, NULL);
1145         if (!p)
1146                 return -ENOMEM;
1147
1148         r = sd_bus_new(&bus);
1149         if (r < 0) {
1150                 free(p);
1151                 return r;
1152         }
1153
1154         bus->address = p;
1155         bus->bus_client = true;
1156
1157         r = sd_bus_start(bus);
1158         if (r < 0) {
1159                 bus_free(bus);
1160                 return r;
1161         }
1162
1163         *ret = bus;
1164         return 0;
1165 }
1166
1167 _public_ void sd_bus_close(sd_bus *bus) {
1168         if (!bus)
1169                 return;
1170         if (bus->state == BUS_CLOSED)
1171                 return;
1172         if (bus_pid_changed(bus))
1173                 return;
1174
1175         bus->state = BUS_CLOSED;
1176
1177         sd_bus_detach_event(bus);
1178
1179         if (!bus->is_kernel)
1180                 bus_close_fds(bus);
1181
1182         /* We'll leave the fd open in case this is a kernel bus, since
1183          * there might still be memblocks around that reference this
1184          * bus, and they might need to invoke the
1185          * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1186          * freed. */
1187 }
1188
1189 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1190         assert_return(bus, NULL);
1191
1192         assert_se(REFCNT_INC(bus->n_ref) >= 2);
1193
1194         return bus;
1195 }
1196
1197 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1198         assert_return(bus, NULL);
1199
1200         if (REFCNT_DEC(bus->n_ref) <= 0)
1201                 bus_free(bus);
1202
1203         return NULL;
1204 }
1205
1206 _public_ int sd_bus_is_open(sd_bus *bus) {
1207
1208         assert_return(bus, -EINVAL);
1209         assert_return(!bus_pid_changed(bus), -ECHILD);
1210
1211         return BUS_IS_OPEN(bus->state);
1212 }
1213
1214 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1215         int r;
1216
1217         assert_return(bus, -EINVAL);
1218         assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1219         assert_return(!bus_pid_changed(bus), -ECHILD);
1220
1221         if (type == SD_BUS_TYPE_UNIX_FD) {
1222                 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1223                         return 0;
1224
1225                 r = bus_ensure_running(bus);
1226                 if (r < 0)
1227                         return r;
1228
1229                 return bus->can_fds;
1230         }
1231
1232         return bus_type_is_valid(type);
1233 }
1234
1235 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1236         int r;
1237
1238         assert_return(bus, -EINVAL);
1239         assert_return(server_id, -EINVAL);
1240         assert_return(!bus_pid_changed(bus), -ECHILD);
1241
1242         r = bus_ensure_running(bus);
1243         if (r < 0)
1244                 return r;
1245
1246         *server_id = bus->server_id;
1247         return 0;
1248 }
1249
1250 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1251         assert(m);
1252
1253         if (m->header->version > b->message_version)
1254                 return -EPERM;
1255
1256         if (m->sealed)
1257                 return 0;
1258
1259         return bus_message_seal(m, ++b->serial);
1260 }
1261
1262 static int dispatch_wqueue(sd_bus *bus) {
1263         int r, ret = 0;
1264
1265         assert(bus);
1266         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1267
1268         while (bus->wqueue_size > 0) {
1269
1270                 if (bus->is_kernel)
1271                         r = bus_kernel_write_message(bus, bus->wqueue[0]);
1272                 else
1273                         r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1274
1275                 if (r < 0) {
1276                         sd_bus_close(bus);
1277                         return r;
1278                 } else if (r == 0)
1279                         /* Didn't do anything this time */
1280                         return ret;
1281                 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1282                         /* Fully written. Let's drop the entry from
1283                          * the queue.
1284                          *
1285                          * This isn't particularly optimized, but
1286                          * well, this is supposed to be our worst-case
1287                          * buffer only, and the socket buffer is
1288                          * supposed to be our primary buffer, and if
1289                          * it got full, then all bets are off
1290                          * anyway. */
1291
1292                         sd_bus_message_unref(bus->wqueue[0]);
1293                         bus->wqueue_size --;
1294                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1295                         bus->windex = 0;
1296
1297                         ret = 1;
1298                 }
1299         }
1300
1301         return ret;
1302 }
1303
1304 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1305         sd_bus_message *z = NULL;
1306         int r, ret = 0;
1307
1308         assert(bus);
1309         assert(m);
1310         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1311
1312         if (bus->rqueue_size > 0) {
1313                 /* Dispatch a queued message */
1314
1315                 *m = bus->rqueue[0];
1316                 bus->rqueue_size --;
1317                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1318                 return 1;
1319         }
1320
1321         /* Try to read a new message */
1322         do {
1323                 if (bus->is_kernel)
1324                         r = bus_kernel_read_message(bus, &z);
1325                 else
1326                         r = bus_socket_read_message(bus, &z);
1327
1328                 if (r < 0) {
1329                         sd_bus_close(bus);
1330                         return r;
1331                 }
1332                 if (r == 0)
1333                         return ret;
1334
1335                 ret = 1;
1336         } while (!z);
1337
1338         *m = z;
1339         return ret;
1340 }
1341
1342 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1343         int r;
1344
1345         assert_return(bus, -EINVAL);
1346         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1347         assert_return(m, -EINVAL);
1348         assert_return(!bus_pid_changed(bus), -ECHILD);
1349
1350         if (m->n_fds > 0) {
1351                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1352                 if (r < 0)
1353                         return r;
1354                 if (r == 0)
1355                         return -ENOTSUP;
1356         }
1357
1358         /* If the serial number isn't kept, then we know that no reply
1359          * is expected */
1360         if (!serial && !m->sealed)
1361                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1362
1363         r = bus_seal_message(bus, m);
1364         if (r < 0)
1365                 return r;
1366
1367         /* If this is a reply and no reply was requested, then let's
1368          * suppress this, if we can */
1369         if (m->dont_send && !serial)
1370                 return 1;
1371
1372         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1373                 size_t idx = 0;
1374
1375                 if (bus->is_kernel)
1376                         r = bus_kernel_write_message(bus, m);
1377                 else
1378                         r = bus_socket_write_message(bus, m, &idx);
1379
1380                 if (r < 0) {
1381                         sd_bus_close(bus);
1382                         return r;
1383                 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m))  {
1384                         /* Wasn't fully written. So let's remember how
1385                          * much was written. Note that the first entry
1386                          * of the wqueue array is always allocated so
1387                          * that we always can remember how much was
1388                          * written. */
1389                         bus->wqueue[0] = sd_bus_message_ref(m);
1390                         bus->wqueue_size = 1;
1391                         bus->windex = idx;
1392                 }
1393         } else {
1394                 sd_bus_message **q;
1395
1396                 /* Just append it to the queue. */
1397
1398                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1399                         return -ENOBUFS;
1400
1401                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1402                 if (!q)
1403                         return -ENOMEM;
1404
1405                 bus->wqueue = q;
1406                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1407         }
1408
1409         if (serial)
1410                 *serial = BUS_MESSAGE_SERIAL(m);
1411
1412         return 1;
1413 }
1414
1415 static usec_t calc_elapse(uint64_t usec) {
1416         if (usec == (uint64_t) -1)
1417                 return 0;
1418
1419         if (usec == 0)
1420                 usec = BUS_DEFAULT_TIMEOUT;
1421
1422         return now(CLOCK_MONOTONIC) + usec;
1423 }
1424
1425 static int timeout_compare(const void *a, const void *b) {
1426         const struct reply_callback *x = a, *y = b;
1427
1428         if (x->timeout != 0 && y->timeout == 0)
1429                 return -1;
1430
1431         if (x->timeout == 0 && y->timeout != 0)
1432                 return 1;
1433
1434         if (x->timeout < y->timeout)
1435                 return -1;
1436
1437         if (x->timeout > y->timeout)
1438                 return 1;
1439
1440         return 0;
1441 }
1442
1443 _public_ int sd_bus_call_async(
1444                 sd_bus *bus,
1445                 sd_bus_message *m,
1446                 sd_bus_message_handler_t callback,
1447                 void *userdata,
1448                 uint64_t usec,
1449                 uint64_t *serial) {
1450
1451         struct reply_callback *c;
1452         int r;
1453
1454         assert_return(bus, -EINVAL);
1455         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1456         assert_return(m, -EINVAL);
1457         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1458         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1459         assert_return(callback, -EINVAL);
1460         assert_return(!bus_pid_changed(bus), -ECHILD);
1461
1462         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1463         if (r < 0)
1464                 return r;
1465
1466         if (usec != (uint64_t) -1) {
1467                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1468                 if (r < 0)
1469                         return r;
1470         }
1471
1472         r = bus_seal_message(bus, m);
1473         if (r < 0)
1474                 return r;
1475
1476         c = new0(struct reply_callback, 1);
1477         if (!c)
1478                 return -ENOMEM;
1479
1480         c->callback = callback;
1481         c->userdata = userdata;
1482         c->serial = BUS_MESSAGE_SERIAL(m);
1483         c->timeout = calc_elapse(usec);
1484
1485         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1486         if (r < 0) {
1487                 free(c);
1488                 return r;
1489         }
1490
1491         if (c->timeout != 0) {
1492                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1493                 if (r < 0) {
1494                         c->timeout = 0;
1495                         sd_bus_call_async_cancel(bus, c->serial);
1496                         return r;
1497                 }
1498         }
1499
1500         r = sd_bus_send(bus, m, serial);
1501         if (r < 0) {
1502                 sd_bus_call_async_cancel(bus, c->serial);
1503                 return r;
1504         }
1505
1506         return r;
1507 }
1508
1509 _public_ int sd_bus_call_async_cancel(sd_bus *bus, uint64_t serial) {
1510         struct reply_callback *c;
1511
1512         assert_return(bus, -EINVAL);
1513         assert_return(serial != 0, -EINVAL);
1514         assert_return(!bus_pid_changed(bus), -ECHILD);
1515
1516         c = hashmap_remove(bus->reply_callbacks, &serial);
1517         if (!c)
1518                 return 0;
1519
1520         if (c->timeout != 0)
1521                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1522
1523         free(c);
1524         return 1;
1525 }
1526
1527 int bus_ensure_running(sd_bus *bus) {
1528         int r;
1529
1530         assert(bus);
1531
1532         if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED)
1533                 return -ENOTCONN;
1534         if (bus->state == BUS_RUNNING)
1535                 return 1;
1536
1537         for (;;) {
1538                 r = sd_bus_process(bus, NULL);
1539                 if (r < 0)
1540                         return r;
1541                 if (bus->state == BUS_RUNNING)
1542                         return 1;
1543                 if (r > 0)
1544                         continue;
1545
1546                 r = sd_bus_wait(bus, (uint64_t) -1);
1547                 if (r < 0)
1548                         return r;
1549         }
1550 }
1551
1552 _public_ int sd_bus_call(
1553                 sd_bus *bus,
1554                 sd_bus_message *m,
1555                 uint64_t usec,
1556                 sd_bus_error *error,
1557                 sd_bus_message **reply) {
1558
1559         int r;
1560         usec_t timeout;
1561         uint64_t serial;
1562         bool room = false;
1563
1564         assert_return(bus, -EINVAL);
1565         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1566         assert_return(m, -EINVAL);
1567         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1568         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1569         assert_return(!bus_error_is_dirty(error), -EINVAL);
1570         assert_return(!bus_pid_changed(bus), -ECHILD);
1571
1572         r = bus_ensure_running(bus);
1573         if (r < 0)
1574                 return r;
1575
1576         r = sd_bus_send(bus, m, &serial);
1577         if (r < 0)
1578                 return r;
1579
1580         timeout = calc_elapse(usec);
1581
1582         for (;;) {
1583                 usec_t left;
1584                 sd_bus_message *incoming = NULL;
1585
1586                 if (!room) {
1587                         sd_bus_message **q;
1588
1589                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1590                                 return -ENOBUFS;
1591
1592                         /* Make sure there's room for queuing this
1593                          * locally, before we read the message */
1594
1595                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1596                         if (!q)
1597                                 return -ENOMEM;
1598
1599                         bus->rqueue = q;
1600                         room = true;
1601                 }
1602
1603                 if (bus->is_kernel)
1604                         r = bus_kernel_read_message(bus, &incoming);
1605                 else
1606                         r = bus_socket_read_message(bus, &incoming);
1607                 if (r < 0)
1608                         return r;
1609                 if (incoming) {
1610
1611                         if (incoming->reply_serial == serial) {
1612                                 /* Found a match! */
1613
1614                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1615
1616                                         if (reply)
1617                                                 *reply = incoming;
1618                                         else
1619                                                 sd_bus_message_unref(incoming);
1620
1621                                         return 1;
1622                                 }
1623
1624                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) {
1625                                         int k;
1626
1627                                         r = sd_bus_error_copy(error, &incoming->error);
1628                                         if (r < 0) {
1629                                                 sd_bus_message_unref(incoming);
1630                                                 return r;
1631                                         }
1632
1633                                         k = sd_bus_error_get_errno(&incoming->error);
1634                                         sd_bus_message_unref(incoming);
1635                                         return -k;
1636                                 }
1637
1638                                 sd_bus_message_unref(incoming);
1639                                 return -EIO;
1640                         }
1641
1642                         /* There's already guaranteed to be room for
1643                          * this, so need to resize things here */
1644                         bus->rqueue[bus->rqueue_size ++] = incoming;
1645                         room = false;
1646
1647                         /* Try to read more, right-away */
1648                         continue;
1649                 }
1650                 if (r != 0)
1651                         continue;
1652
1653                 if (timeout > 0) {
1654                         usec_t n;
1655
1656                         n = now(CLOCK_MONOTONIC);
1657                         if (n >= timeout)
1658                                 return -ETIMEDOUT;
1659
1660                         left = timeout - n;
1661                 } else
1662                         left = (uint64_t) -1;
1663
1664                 r = bus_poll(bus, true, left);
1665                 if (r < 0)
1666                         return r;
1667
1668                 r = dispatch_wqueue(bus);
1669                 if (r < 0)
1670                         return r;
1671         }
1672 }
1673
1674 _public_ int sd_bus_get_fd(sd_bus *bus) {
1675
1676         assert_return(bus, -EINVAL);
1677         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1678         assert_return(bus->input_fd == bus->output_fd, -EPERM);
1679         assert_return(!bus_pid_changed(bus), -ECHILD);
1680
1681         return bus->input_fd;
1682 }
1683
1684 _public_ int sd_bus_get_events(sd_bus *bus) {
1685         int flags = 0;
1686
1687         assert_return(bus, -EINVAL);
1688         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1689         assert_return(!bus_pid_changed(bus), -ECHILD);
1690
1691         if (bus->state == BUS_OPENING)
1692                 flags |= POLLOUT;
1693         else if (bus->state == BUS_AUTHENTICATING) {
1694
1695                 if (bus_socket_auth_needs_write(bus))
1696                         flags |= POLLOUT;
1697
1698                 flags |= POLLIN;
1699
1700         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1701                 if (bus->rqueue_size <= 0)
1702                         flags |= POLLIN;
1703                 if (bus->wqueue_size > 0)
1704                         flags |= POLLOUT;
1705         }
1706
1707         return flags;
1708 }
1709
1710 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1711         struct reply_callback *c;
1712
1713         assert_return(bus, -EINVAL);
1714         assert_return(timeout_usec, -EINVAL);
1715         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1716         assert_return(!bus_pid_changed(bus), -ECHILD);
1717
1718         if (bus->state == BUS_AUTHENTICATING) {
1719                 *timeout_usec = bus->auth_timeout;
1720                 return 1;
1721         }
1722
1723         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1724                 *timeout_usec = (uint64_t) -1;
1725                 return 0;
1726         }
1727
1728         if (bus->rqueue_size > 0) {
1729                 *timeout_usec = 0;
1730                 return 1;
1731         }
1732
1733         c = prioq_peek(bus->reply_callbacks_prioq);
1734         if (!c) {
1735                 *timeout_usec = (uint64_t) -1;
1736                 return 0;
1737         }
1738
1739         *timeout_usec = c->timeout;
1740         return 1;
1741 }
1742
1743 static int process_timeout(sd_bus *bus) {
1744         _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1745         struct reply_callback *c;
1746         usec_t n;
1747         int r;
1748
1749         assert(bus);
1750
1751         c = prioq_peek(bus->reply_callbacks_prioq);
1752         if (!c)
1753                 return 0;
1754
1755         n = now(CLOCK_MONOTONIC);
1756         if (c->timeout > n)
1757                 return 0;
1758
1759         r = bus_message_new_synthetic_error(
1760                         bus,
1761                         c->serial,
1762                         &SD_BUS_ERROR_MAKE(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1763                         &m);
1764         if (r < 0)
1765                 return r;
1766
1767         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1768         hashmap_remove(bus->reply_callbacks, &c->serial);
1769
1770         r = c->callback(bus, m, c->userdata);
1771         free(c);
1772
1773         return r < 0 ? r : 1;
1774 }
1775
1776 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1777         assert(bus);
1778         assert(m);
1779
1780         if (bus->state != BUS_HELLO)
1781                 return 0;
1782
1783         /* Let's make sure the first message on the bus is the HELLO
1784          * reply. But note that we don't actually parse the message
1785          * here (we leave that to the usual handling), we just verify
1786          * we don't let any earlier msg through. */
1787
1788         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1789             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1790                 return -EIO;
1791
1792         if (m->reply_serial != bus->hello_serial)
1793                 return -EIO;
1794
1795         return 0;
1796 }
1797
1798 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1799         struct reply_callback *c;
1800         int r;
1801
1802         assert(bus);
1803         assert(m);
1804
1805         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1806             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1807                 return 0;
1808
1809         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1810         if (!c)
1811                 return 0;
1812
1813         if (c->timeout != 0)
1814                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1815
1816         r = sd_bus_message_rewind(m, true);
1817         if (r < 0)
1818                 return r;
1819
1820         r = c->callback(bus, m, c->userdata);
1821         free(c);
1822
1823         return r;
1824 }
1825
1826 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1827         struct filter_callback *l;
1828         int r;
1829
1830         assert(bus);
1831         assert(m);
1832
1833         do {
1834                 bus->filter_callbacks_modified = false;
1835
1836                 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1837
1838                         if (bus->filter_callbacks_modified)
1839                                 break;
1840
1841                         /* Don't run this more than once per iteration */
1842                         if (l->last_iteration == bus->iteration_counter)
1843                                 continue;
1844
1845                         l->last_iteration = bus->iteration_counter;
1846
1847                         r = sd_bus_message_rewind(m, true);
1848                         if (r < 0)
1849                                 return r;
1850
1851                         r = l->callback(bus, m, l->userdata);
1852                         if (r != 0)
1853                                 return r;
1854
1855                 }
1856
1857         } while (bus->filter_callbacks_modified);
1858
1859         return 0;
1860 }
1861
1862 static int process_match(sd_bus *bus, sd_bus_message *m) {
1863         int r;
1864
1865         assert(bus);
1866         assert(m);
1867
1868         do {
1869                 bus->match_callbacks_modified = false;
1870
1871                 r = bus_match_run(bus, &bus->match_callbacks, m);
1872                 if (r != 0)
1873                         return r;
1874
1875         } while (bus->match_callbacks_modified);
1876
1877         return 0;
1878 }
1879
1880 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1881         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1882         int r;
1883
1884         assert(bus);
1885         assert(m);
1886
1887         if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
1888                 return 0;
1889
1890         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1891                 return 0;
1892
1893         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1894                 return 1;
1895
1896         if (streq_ptr(m->member, "Ping"))
1897                 r = sd_bus_message_new_method_return(bus, m, &reply);
1898         else if (streq_ptr(m->member, "GetMachineId")) {
1899                 sd_id128_t id;
1900                 char sid[33];
1901
1902                 r = sd_id128_get_machine(&id);
1903                 if (r < 0)
1904                         return r;
1905
1906                 r = sd_bus_message_new_method_return(bus, m, &reply);
1907                 if (r < 0)
1908                         return r;
1909
1910                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1911         } else {
1912                 r = sd_bus_message_new_method_errorf(
1913                                 bus, m, &reply,
1914                                 SD_BUS_ERROR_UNKNOWN_METHOD,
1915                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1916         }
1917
1918         if (r < 0)
1919                 return r;
1920
1921         r = sd_bus_send(bus, reply, NULL);
1922         if (r < 0)
1923                 return r;
1924
1925         return 1;
1926 }
1927
1928 static int process_message(sd_bus *bus, sd_bus_message *m) {
1929         int r;
1930
1931         assert(bus);
1932         assert(m);
1933
1934         bus->current = m;
1935         bus->iteration_counter++;
1936
1937         log_debug("Got message sender=%s object=%s interface=%s member=%s",
1938                   strna(sd_bus_message_get_sender(m)),
1939                   strna(sd_bus_message_get_path(m)),
1940                   strna(sd_bus_message_get_interface(m)),
1941                   strna(sd_bus_message_get_member(m)));
1942
1943         r = process_hello(bus, m);
1944         if (r != 0)
1945                 goto finish;
1946
1947         r = process_reply(bus, m);
1948         if (r != 0)
1949                 goto finish;
1950
1951         r = process_filter(bus, m);
1952         if (r != 0)
1953                 goto finish;
1954
1955         r = process_match(bus, m);
1956         if (r != 0)
1957                 goto finish;
1958
1959         r = process_builtin(bus, m);
1960         if (r != 0)
1961                 goto finish;
1962
1963         r = bus_process_object(bus, m);
1964
1965 finish:
1966         bus->current = NULL;
1967         return r;
1968 }
1969
1970 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1971         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1972         int r;
1973
1974         assert(bus);
1975         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1976
1977         r = process_timeout(bus);
1978         if (r != 0)
1979                 goto null_message;
1980
1981         r = dispatch_wqueue(bus);
1982         if (r != 0)
1983                 goto null_message;
1984
1985         r = dispatch_rqueue(bus, &m);
1986         if (r < 0)
1987                 return r;
1988         if (!m)
1989                 goto null_message;
1990
1991         r = process_message(bus, m);
1992         if (r != 0)
1993                 goto null_message;
1994
1995         if (ret) {
1996                 r = sd_bus_message_rewind(m, true);
1997                 if (r < 0)
1998                         return r;
1999
2000                 *ret = m;
2001                 m = NULL;
2002                 return 1;
2003         }
2004
2005         if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2006
2007                 r = sd_bus_reply_method_errorf(
2008                                 bus, m,
2009                                 SD_BUS_ERROR_UNKNOWN_OBJECT,
2010                                 "Unknown object '%s'.", m->path);
2011                 if (r < 0)
2012                         return r;
2013         }
2014
2015         return 1;
2016
2017 null_message:
2018         if (r >= 0 && ret)
2019                 *ret = NULL;
2020
2021         return r;
2022 }
2023
2024 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2025         BUS_DONT_DESTROY(bus);
2026         int r;
2027
2028         /* Returns 0 when we didn't do anything. This should cause the
2029          * caller to invoke sd_bus_wait() before returning the next
2030          * time. Returns > 0 when we did something, which possibly
2031          * means *ret is filled in with an unprocessed message. */
2032
2033         assert_return(bus, -EINVAL);
2034         assert_return(!bus_pid_changed(bus), -ECHILD);
2035
2036         /* We don't allow recursively invoking sd_bus_process(). */
2037         assert_return(!bus->processing, -EBUSY);
2038
2039         switch (bus->state) {
2040
2041         case BUS_UNSET:
2042         case BUS_CLOSED:
2043                 return -ENOTCONN;
2044
2045         case BUS_OPENING:
2046                 r = bus_socket_process_opening(bus);
2047                 if (r < 0)
2048                         return r;
2049                 if (ret)
2050                         *ret = NULL;
2051                 return r;
2052
2053         case BUS_AUTHENTICATING:
2054
2055                 r = bus_socket_process_authenticating(bus);
2056                 if (r < 0)
2057                         return r;
2058                 if (ret)
2059                         *ret = NULL;
2060                 return r;
2061
2062         case BUS_RUNNING:
2063         case BUS_HELLO:
2064
2065                 bus->processing = true;
2066                 r = process_running(bus, ret);
2067                 bus->processing = false;
2068
2069                 return r;
2070         }
2071
2072         assert_not_reached("Unknown state");
2073 }
2074
2075 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2076         struct pollfd p[2] = {};
2077         int r, e, n;
2078         struct timespec ts;
2079         usec_t m = (usec_t) -1;
2080
2081         assert(bus);
2082         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2083
2084         e = sd_bus_get_events(bus);
2085         if (e < 0)
2086                 return e;
2087
2088         if (need_more)
2089                 /* The caller really needs some more data, he doesn't
2090                  * care about what's already read, or any timeouts
2091                  * except its own.*/
2092                 e |= POLLIN;
2093         else {
2094                 usec_t until;
2095                 /* The caller wants to process if there's something to
2096                  * process, but doesn't care otherwise */
2097
2098                 r = sd_bus_get_timeout(bus, &until);
2099                 if (r < 0)
2100                         return r;
2101                 if (r > 0) {
2102                         usec_t nw;
2103                         nw = now(CLOCK_MONOTONIC);
2104                         m = until > nw ? until - nw : 0;
2105                 }
2106         }
2107
2108         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2109                 m = timeout_usec;
2110
2111         p[0].fd = bus->input_fd;
2112         if (bus->output_fd == bus->input_fd) {
2113                 p[0].events = e;
2114                 n = 1;
2115         } else {
2116                 p[0].events = e & POLLIN;
2117                 p[1].fd = bus->output_fd;
2118                 p[1].events = e & POLLOUT;
2119                 n = 2;
2120         }
2121
2122         r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2123         if (r < 0)
2124                 return -errno;
2125
2126         return r > 0 ? 1 : 0;
2127 }
2128
2129 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2130
2131         assert_return(bus, -EINVAL);
2132         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2133         assert_return(!bus_pid_changed(bus), -ECHILD);
2134
2135         if (bus->rqueue_size > 0)
2136                 return 0;
2137
2138         return bus_poll(bus, false, timeout_usec);
2139 }
2140
2141 _public_ int sd_bus_flush(sd_bus *bus) {
2142         int r;
2143
2144         assert_return(bus, -EINVAL);
2145         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2146         assert_return(!bus_pid_changed(bus), -ECHILD);
2147
2148         r = bus_ensure_running(bus);
2149         if (r < 0)
2150                 return r;
2151
2152         if (bus->wqueue_size <= 0)
2153                 return 0;
2154
2155         for (;;) {
2156                 r = dispatch_wqueue(bus);
2157                 if (r < 0)
2158                         return r;
2159
2160                 if (bus->wqueue_size <= 0)
2161                         return 0;
2162
2163                 r = bus_poll(bus, false, (uint64_t) -1);
2164                 if (r < 0)
2165                         return r;
2166         }
2167 }
2168
2169 _public_ int sd_bus_add_filter(sd_bus *bus,
2170                                sd_bus_message_handler_t callback,
2171                                void *userdata) {
2172
2173         struct filter_callback *f;
2174
2175         assert_return(bus, -EINVAL);
2176         assert_return(callback, -EINVAL);
2177         assert_return(!bus_pid_changed(bus), -ECHILD);
2178
2179         f = new0(struct filter_callback, 1);
2180         if (!f)
2181                 return -ENOMEM;
2182         f->callback = callback;
2183         f->userdata = userdata;
2184
2185         bus->filter_callbacks_modified = true;
2186         LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2187         return 0;
2188 }
2189
2190 _public_ int sd_bus_remove_filter(sd_bus *bus,
2191                                   sd_bus_message_handler_t callback,
2192                                   void *userdata) {
2193
2194         struct filter_callback *f;
2195
2196         assert_return(bus, -EINVAL);
2197         assert_return(callback, -EINVAL);
2198         assert_return(!bus_pid_changed(bus), -ECHILD);
2199
2200         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2201                 if (f->callback == callback && f->userdata == userdata) {
2202                         bus->filter_callbacks_modified = true;
2203                         LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2204                         free(f);
2205                         return 1;
2206                 }
2207         }
2208
2209         return 0;
2210 }
2211
2212 _public_ int sd_bus_add_match(sd_bus *bus,
2213                               const char *match,
2214                               sd_bus_message_handler_t callback,
2215                               void *userdata) {
2216
2217         struct bus_match_component *components = NULL;
2218         unsigned n_components = 0;
2219         uint64_t cookie = 0;
2220         int r = 0;
2221
2222         assert_return(bus, -EINVAL);
2223         assert_return(match, -EINVAL);
2224         assert_return(!bus_pid_changed(bus), -ECHILD);
2225
2226         r = bus_match_parse(match, &components, &n_components);
2227         if (r < 0)
2228                 goto finish;
2229
2230         if (bus->bus_client) {
2231                 cookie = ++bus->match_cookie;
2232
2233                 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2234                 if (r < 0)
2235                         goto finish;
2236         }
2237
2238         bus->match_callbacks_modified = true;
2239         r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2240         if (r < 0) {
2241                 if (bus->bus_client)
2242                         bus_remove_match_internal(bus, match, cookie);
2243         }
2244
2245 finish:
2246         bus_match_parse_free(components, n_components);
2247         return r;
2248 }
2249
2250 _public_ int sd_bus_remove_match(sd_bus *bus,
2251                                  const char *match,
2252                                  sd_bus_message_handler_t callback,
2253                                  void *userdata) {
2254
2255         struct bus_match_component *components = NULL;
2256         unsigned n_components = 0;
2257         int r = 0, q = 0;
2258         uint64_t cookie = 0;
2259
2260         assert_return(bus, -EINVAL);
2261         assert_return(match, -EINVAL);
2262         assert_return(!bus_pid_changed(bus), -ECHILD);
2263
2264         r = bus_match_parse(match, &components, &n_components);
2265         if (r < 0)
2266                 return r;
2267
2268         bus->match_callbacks_modified = true;
2269         r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2270
2271         if (bus->bus_client)
2272                 q = bus_remove_match_internal(bus, match, cookie);
2273
2274         bus_match_parse_free(components, n_components);
2275
2276         return r < 0 ? r : q;
2277 }
2278
2279 bool bus_pid_changed(sd_bus *bus) {
2280         assert(bus);
2281
2282         /* We don't support people creating a bus connection and
2283          * keeping it around over a fork(). Let's complain. */
2284
2285         return bus->original_pid != getpid();
2286 }
2287
2288 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2289         void *bus = userdata;
2290         int r;
2291
2292         assert(bus);
2293
2294         r = sd_bus_process(bus, NULL);
2295         if (r < 0)
2296                 return r;
2297
2298         return 1;
2299 }
2300
2301 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2302         void *bus = userdata;
2303         int r;
2304
2305         assert(bus);
2306
2307         r = sd_bus_process(bus, NULL);
2308         if (r < 0)
2309                 return r;
2310
2311         return 1;
2312 }
2313
2314 static int prepare_callback(sd_event_source *s, void *userdata) {
2315         sd_bus *bus = userdata;
2316         int r, e;
2317         usec_t until;
2318
2319         assert(s);
2320         assert(bus);
2321
2322         e = sd_bus_get_events(bus);
2323         if (e < 0)
2324                 return e;
2325
2326         if (bus->output_fd != bus->input_fd) {
2327
2328                 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2329                 if (r < 0)
2330                         return r;
2331
2332                 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2333                 if (r < 0)
2334                         return r;
2335         } else {
2336                 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2337                 if (r < 0)
2338                         return r;
2339         }
2340
2341         r = sd_bus_get_timeout(bus, &until);
2342         if (r < 0)
2343                 return r;
2344         if (r > 0) {
2345                 int j;
2346
2347                 j = sd_event_source_set_time(bus->time_event_source, until);
2348                 if (j < 0)
2349                         return j;
2350         }
2351
2352         r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2353         if (r < 0)
2354                 return r;
2355
2356         return 1;
2357 }
2358
2359 static int quit_callback(sd_event_source *event, void *userdata) {
2360         sd_bus *bus = userdata;
2361
2362         assert(event);
2363
2364         sd_bus_flush(bus);
2365
2366         return 1;
2367 }
2368
2369 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2370         int r;
2371
2372         assert_return(bus, -EINVAL);
2373         assert_return(!bus->event, -EBUSY);
2374
2375         assert(!bus->input_io_event_source);
2376         assert(!bus->output_io_event_source);
2377         assert(!bus->time_event_source);
2378
2379         if (event)
2380                 bus->event = sd_event_ref(event);
2381         else  {
2382                 r = sd_event_default(&bus->event);
2383                 if (r < 0)
2384                         return r;
2385         }
2386
2387         r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2388         if (r < 0)
2389                 goto fail;
2390
2391         r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2392         if (r < 0)
2393                 goto fail;
2394
2395         if (bus->output_fd != bus->input_fd) {
2396                 r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2397                 if (r < 0)
2398                         goto fail;
2399
2400                 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2401                 if (r < 0)
2402                         goto fail;
2403         }
2404
2405         r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2406         if (r < 0)
2407                 goto fail;
2408
2409         r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
2410         if (r < 0)
2411                 goto fail;
2412
2413         r = sd_event_source_set_priority(bus->time_event_source, priority);
2414         if (r < 0)
2415                 goto fail;
2416
2417         r = sd_event_add_quit(bus->event, quit_callback, bus, &bus->quit_event_source);
2418         if (r < 0)
2419                 goto fail;
2420
2421         return 0;
2422
2423 fail:
2424         sd_bus_detach_event(bus);
2425         return r;
2426 }
2427
2428 _public_ int sd_bus_detach_event(sd_bus *bus) {
2429         assert_return(bus, -EINVAL);
2430         assert_return(bus->event, -ENXIO);
2431
2432         if (bus->input_io_event_source)
2433                 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2434
2435         if (bus->output_io_event_source)
2436                 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2437
2438         if (bus->time_event_source)
2439                 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2440
2441         if (bus->quit_event_source)
2442                 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2443
2444         if (bus->event)
2445                 bus->event = sd_event_unref(bus->event);
2446
2447         return 0;
2448 }
2449
2450 _public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2451         assert_return(bus, NULL);
2452
2453         return bus->current;
2454 }
2455
2456 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
2457         sd_bus *b = NULL;
2458         int r;
2459
2460         assert(bus_open);
2461         assert(default_bus);
2462
2463         if (!ret)
2464                 return !!*default_bus;
2465
2466         if (*default_bus) {
2467                 *ret = sd_bus_ref(*default_bus);
2468                 return 0;
2469         }
2470
2471         r = bus_open(&b);
2472         if (r < 0)
2473                 return r;
2474
2475         b->default_bus_ptr = default_bus;
2476         b->tid = gettid();
2477         *default_bus = b;
2478
2479         *ret = b;
2480         return 1;
2481 }
2482
2483 _public_ int sd_bus_default_system(sd_bus **ret) {
2484         static __thread sd_bus *default_system_bus = NULL;
2485
2486         return bus_default(sd_bus_open_system, &default_system_bus, ret);
2487 }
2488
2489 _public_ int sd_bus_default_user(sd_bus **ret) {
2490         static __thread sd_bus *default_user_bus = NULL;
2491
2492         return bus_default(sd_bus_open_user, &default_user_bus, ret);
2493 }
2494
2495 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
2496         assert_return(b, -EINVAL);
2497         assert_return(tid, -EINVAL);
2498         assert_return(!bus_pid_changed(b), -ECHILD);
2499
2500         if (b->tid != 0) {
2501                 *tid = b->tid;
2502                 return 0;
2503         }
2504
2505         if (b->event)
2506                 return sd_event_get_tid(b->event, tid);
2507
2508         return -ENXIO;
2509 }