chiark / gitweb /
0be5a29f367fbe4e53bd2ef6a849feb55a8a1169
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29 #include <sys/mman.h>
30 #include <pthread.h>
31
32 #include "util.h"
33 #include "macro.h"
34 #include "strv.h"
35 #include "set.h"
36 #include "missing.h"
37
38 #include "sd-bus.h"
39 #include "bus-internal.h"
40 #include "bus-message.h"
41 #include "bus-type.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
48 #include "bus-util.h"
49 #include "bus-container.h"
50
51 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
52
53 static void bus_close_fds(sd_bus *b) {
54         assert(b);
55
56         if (b->input_fd >= 0)
57                 close_nointr_nofail(b->input_fd);
58
59         if (b->output_fd >= 0 && b->output_fd != b->input_fd)
60                 close_nointr_nofail(b->output_fd);
61
62         b->input_fd = b->output_fd = -1;
63 }
64
65 static void bus_node_destroy(sd_bus *b, struct node *n) {
66         struct node_callback *c;
67         struct node_vtable *v;
68         struct node_enumerator *e;
69
70         assert(b);
71
72         if (!n)
73                 return;
74
75         while (n->child)
76                 bus_node_destroy(b, n->child);
77
78         while ((c = n->callbacks)) {
79                 LIST_REMOVE(callbacks, n->callbacks, c);
80                 free(c);
81         }
82
83         while ((v = n->vtables)) {
84                 LIST_REMOVE(vtables, n->vtables, v);
85                 free(v->interface);
86                 free(v);
87         }
88
89         while ((e = n->enumerators)) {
90                 LIST_REMOVE(enumerators, n->enumerators, e);
91                 free(e);
92         }
93
94         if (n->parent)
95                 LIST_REMOVE(siblings, n->parent->child, n);
96
97         assert_se(hashmap_remove(b->nodes, n->path) == n);
98         free(n->path);
99         free(n);
100 }
101
102 static void bus_free(sd_bus *b) {
103         struct filter_callback *f;
104         struct node *n;
105         unsigned i;
106
107         assert(b);
108
109         sd_bus_detach_event(b);
110
111         bus_close_fds(b);
112
113         if (b->kdbus_buffer)
114                 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
115
116         free(b->rbuffer);
117         free(b->unique_name);
118         free(b->auth_buffer);
119         free(b->address);
120         free(b->kernel);
121         free(b->machine);
122
123         free(b->exec_path);
124         strv_free(b->exec_argv);
125
126         close_many(b->fds, b->n_fds);
127         free(b->fds);
128
129         for (i = 0; i < b->rqueue_size; i++)
130                 sd_bus_message_unref(b->rqueue[i]);
131         free(b->rqueue);
132
133         for (i = 0; i < b->wqueue_size; i++)
134                 sd_bus_message_unref(b->wqueue[i]);
135         free(b->wqueue);
136
137         hashmap_free_free(b->reply_callbacks);
138         prioq_free(b->reply_callbacks_prioq);
139
140         while ((f = b->filter_callbacks)) {
141                 LIST_REMOVE(callbacks, b->filter_callbacks, f);
142                 free(f);
143         }
144
145         bus_match_free(&b->match_callbacks);
146
147         hashmap_free_free(b->vtable_methods);
148         hashmap_free_free(b->vtable_properties);
149
150         while ((n = hashmap_first(b->nodes)))
151                 bus_node_destroy(b, n);
152
153         hashmap_free(b->nodes);
154
155         bus_kernel_flush_memfd(b);
156
157         assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
158
159         free(b);
160 }
161
162 _public_ int sd_bus_new(sd_bus **ret) {
163         sd_bus *r;
164
165         assert_return(ret, -EINVAL);
166
167         r = new0(sd_bus, 1);
168         if (!r)
169                 return -ENOMEM;
170
171         r->n_ref = REFCNT_INIT;
172         r->input_fd = r->output_fd = -1;
173         r->message_version = 1;
174         r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
175         r->original_pid = getpid();
176
177         assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
178
179         /* We guarantee that wqueue always has space for at least one
180          * entry */
181         r->wqueue = new(sd_bus_message*, 1);
182         if (!r->wqueue) {
183                 free(r);
184                 return -ENOMEM;
185         }
186
187         *ret = r;
188         return 0;
189 }
190
191 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
192         char *a;
193
194         assert_return(bus, -EINVAL);
195         assert_return(bus->state == BUS_UNSET, -EPERM);
196         assert_return(address, -EINVAL);
197         assert_return(!bus_pid_changed(bus), -ECHILD);
198
199         a = strdup(address);
200         if (!a)
201                 return -ENOMEM;
202
203         free(bus->address);
204         bus->address = a;
205
206         return 0;
207 }
208
209 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
210         assert_return(bus, -EINVAL);
211         assert_return(bus->state == BUS_UNSET, -EPERM);
212         assert_return(input_fd >= 0, -EINVAL);
213         assert_return(output_fd >= 0, -EINVAL);
214         assert_return(!bus_pid_changed(bus), -ECHILD);
215
216         bus->input_fd = input_fd;
217         bus->output_fd = output_fd;
218         return 0;
219 }
220
221 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
222         char *p, **a;
223
224         assert_return(bus, -EINVAL);
225         assert_return(bus->state == BUS_UNSET, -EPERM);
226         assert_return(path, -EINVAL);
227         assert_return(!strv_isempty(argv), -EINVAL);
228         assert_return(!bus_pid_changed(bus), -ECHILD);
229
230         p = strdup(path);
231         if (!p)
232                 return -ENOMEM;
233
234         a = strv_copy(argv);
235         if (!a) {
236                 free(p);
237                 return -ENOMEM;
238         }
239
240         free(bus->exec_path);
241         strv_free(bus->exec_argv);
242
243         bus->exec_path = p;
244         bus->exec_argv = a;
245
246         return 0;
247 }
248
249 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
250         assert_return(bus, -EINVAL);
251         assert_return(bus->state == BUS_UNSET, -EPERM);
252         assert_return(!bus_pid_changed(bus), -ECHILD);
253
254         bus->bus_client = !!b;
255         return 0;
256 }
257
258 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
259         assert_return(bus, -EINVAL);
260         assert_return(bus->state == BUS_UNSET, -EPERM);
261         assert_return(!bus_pid_changed(bus), -ECHILD);
262
263         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
264         return 0;
265 }
266
267 _public_ int sd_bus_negotiate_attach_timestamp(sd_bus *bus, int b) {
268         assert_return(bus, -EINVAL);
269         assert_return(bus->state == BUS_UNSET, -EPERM);
270         assert_return(!bus_pid_changed(bus), -ECHILD);
271
272         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_TIMESTAMP, b);
273         return 0;
274 }
275
276 _public_ int sd_bus_negotiate_attach_creds(sd_bus *bus, int b) {
277         assert_return(bus, -EINVAL);
278         assert_return(bus->state == BUS_UNSET, -EPERM);
279         assert_return(!bus_pid_changed(bus), -ECHILD);
280
281         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CREDS, b);
282         return 0;
283 }
284
285 _public_ int sd_bus_negotiate_attach_comm(sd_bus *bus, int b) {
286         assert_return(bus, -EINVAL);
287         assert_return(bus->state == BUS_UNSET, -EPERM);
288         assert_return(!bus_pid_changed(bus), -ECHILD);
289
290         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_COMM, b);
291         return 0;
292 }
293
294 _public_ int sd_bus_negotiate_attach_exe(sd_bus *bus, int b) {
295         assert_return(bus, -EINVAL);
296         assert_return(bus->state == BUS_UNSET, -EPERM);
297         assert_return(!bus_pid_changed(bus), -ECHILD);
298
299         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_EXE, b);
300         return 0;
301 }
302
303 _public_ int sd_bus_negotiate_attach_cmdline(sd_bus *bus, int b) {
304         assert_return(bus, -EINVAL);
305         assert_return(bus->state == BUS_UNSET, -EPERM);
306         assert_return(!bus_pid_changed(bus), -ECHILD);
307
308         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CMDLINE, b);
309         return 0;
310 }
311
312 _public_ int sd_bus_negotiate_attach_cgroup(sd_bus *bus, int b) {
313         assert_return(bus, -EINVAL);
314         assert_return(bus->state == BUS_UNSET, -EPERM);
315         assert_return(!bus_pid_changed(bus), -ECHILD);
316
317         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CGROUP, b);
318         return 0;
319 }
320
321 _public_ int sd_bus_negotiate_attach_caps(sd_bus *bus, int b) {
322         assert_return(bus, -EINVAL);
323         assert_return(bus->state == BUS_UNSET, -EPERM);
324         assert_return(!bus_pid_changed(bus), -ECHILD);
325
326         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CAPS, b);
327         return 0;
328 }
329
330 _public_ int sd_bus_negotiate_attach_selinux_context(sd_bus *bus, int b) {
331         assert_return(bus, -EINVAL);
332         assert_return(bus->state == BUS_UNSET, -EPERM);
333         assert_return(!bus_pid_changed(bus), -ECHILD);
334
335         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_SECLABEL, b);
336         return 0;
337 }
338
339 _public_ int sd_bus_negotiate_attach_audit(sd_bus *bus, int b) {
340         assert_return(bus, -EINVAL);
341         assert_return(bus->state == BUS_UNSET, -EPERM);
342         assert_return(!bus_pid_changed(bus), -ECHILD);
343
344         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_AUDIT, b);
345         return 0;
346 }
347
348 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
349         assert_return(bus, -EINVAL);
350         assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
351         assert_return(bus->state == BUS_UNSET, -EPERM);
352         assert_return(!bus_pid_changed(bus), -ECHILD);
353
354         bus->is_server = !!b;
355         bus->server_id = server_id;
356         return 0;
357 }
358
359 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
360         assert_return(bus, -EINVAL);
361         assert_return(bus->state == BUS_UNSET, -EPERM);
362         assert_return(!bus_pid_changed(bus), -ECHILD);
363
364         bus->anonymous_auth = !!b;
365         return 0;
366 }
367
368 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
369         const char *s;
370         int r;
371
372         assert(bus);
373         assert(bus->state == BUS_HELLO);
374         assert(reply);
375
376         r = sd_bus_message_get_errno(reply);
377         if (r < 0)
378                 return r;
379         if (r > 0)
380                 return -r;
381
382         r = sd_bus_message_read(reply, "s", &s);
383         if (r < 0)
384                 return r;
385
386         if (!service_name_is_valid(s) || s[0] != ':')
387                 return -EBADMSG;
388
389         bus->unique_name = strdup(s);
390         if (!bus->unique_name)
391                 return -ENOMEM;
392
393         bus->state = BUS_RUNNING;
394
395         return 1;
396 }
397
398 static int bus_send_hello(sd_bus *bus) {
399         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
400         int r;
401
402         assert(bus);
403
404         if (!bus->bus_client || bus->is_kernel)
405                 return 0;
406
407         r = sd_bus_message_new_method_call(
408                         bus,
409                         "org.freedesktop.DBus",
410                         "/",
411                         "org.freedesktop.DBus",
412                         "Hello",
413                         &m);
414         if (r < 0)
415                 return r;
416
417         return sd_bus_call_async(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
418 }
419
420 int bus_start_running(sd_bus *bus) {
421         assert(bus);
422
423         if (bus->bus_client && !bus->is_kernel) {
424                 bus->state = BUS_HELLO;
425                 return 1;
426         }
427
428         bus->state = BUS_RUNNING;
429         return 1;
430 }
431
432 static int parse_address_key(const char **p, const char *key, char **value) {
433         size_t l, n = 0;
434         const char *a;
435         char *r = NULL;
436
437         assert(p);
438         assert(*p);
439         assert(value);
440
441         if (key) {
442                 l = strlen(key);
443                 if (strncmp(*p, key, l) != 0)
444                         return 0;
445
446                 if ((*p)[l] != '=')
447                         return 0;
448
449                 if (*value)
450                         return -EINVAL;
451
452                 a = *p + l + 1;
453         } else
454                 a = *p;
455
456         while (*a != ';' && *a != ',' && *a != 0) {
457                 char c, *t;
458
459                 if (*a == '%') {
460                         int x, y;
461
462                         x = unhexchar(a[1]);
463                         if (x < 0) {
464                                 free(r);
465                                 return x;
466                         }
467
468                         y = unhexchar(a[2]);
469                         if (y < 0) {
470                                 free(r);
471                                 return y;
472                         }
473
474                         c = (char) ((x << 4) | y);
475                         a += 3;
476                 } else {
477                         c = *a;
478                         a++;
479                 }
480
481                 t = realloc(r, n + 2);
482                 if (!t) {
483                         free(r);
484                         return -ENOMEM;
485                 }
486
487                 r = t;
488                 r[n++] = c;
489         }
490
491         if (!r) {
492                 r = strdup("");
493                 if (!r)
494                         return -ENOMEM;
495         } else
496                 r[n] = 0;
497
498         if (*a == ',')
499                 a++;
500
501         *p = a;
502
503         free(*value);
504         *value = r;
505
506         return 1;
507 }
508
509 static void skip_address_key(const char **p) {
510         assert(p);
511         assert(*p);
512
513         *p += strcspn(*p, ",");
514
515         if (**p == ',')
516                 (*p) ++;
517 }
518
519 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
520         _cleanup_free_ char *path = NULL, *abstract = NULL;
521         size_t l;
522         int r;
523
524         assert(b);
525         assert(p);
526         assert(*p);
527         assert(guid);
528
529         while (**p != 0 && **p != ';') {
530                 r = parse_address_key(p, "guid", guid);
531                 if (r < 0)
532                         return r;
533                 else if (r > 0)
534                         continue;
535
536                 r = parse_address_key(p, "path", &path);
537                 if (r < 0)
538                         return r;
539                 else if (r > 0)
540                         continue;
541
542                 r = parse_address_key(p, "abstract", &abstract);
543                 if (r < 0)
544                         return r;
545                 else if (r > 0)
546                         continue;
547
548                 skip_address_key(p);
549         }
550
551         if (!path && !abstract)
552                 return -EINVAL;
553
554         if (path && abstract)
555                 return -EINVAL;
556
557         if (path) {
558                 l = strlen(path);
559                 if (l > sizeof(b->sockaddr.un.sun_path))
560                         return -E2BIG;
561
562                 b->sockaddr.un.sun_family = AF_UNIX;
563                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
564                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
565         } else if (abstract) {
566                 l = strlen(abstract);
567                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
568                         return -E2BIG;
569
570                 b->sockaddr.un.sun_family = AF_UNIX;
571                 b->sockaddr.un.sun_path[0] = 0;
572                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
573                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
574         }
575
576         return 0;
577 }
578
579 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
580         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
581         int r;
582         struct addrinfo *result, hints = {
583                 .ai_socktype = SOCK_STREAM,
584                 .ai_flags = AI_ADDRCONFIG,
585         };
586
587         assert(b);
588         assert(p);
589         assert(*p);
590         assert(guid);
591
592         while (**p != 0 && **p != ';') {
593                 r = parse_address_key(p, "guid", guid);
594                 if (r < 0)
595                         return r;
596                 else if (r > 0)
597                         continue;
598
599                 r = parse_address_key(p, "host", &host);
600                 if (r < 0)
601                         return r;
602                 else if (r > 0)
603                         continue;
604
605                 r = parse_address_key(p, "port", &port);
606                 if (r < 0)
607                         return r;
608                 else if (r > 0)
609                         continue;
610
611                 r = parse_address_key(p, "family", &family);
612                 if (r < 0)
613                         return r;
614                 else if (r > 0)
615                         continue;
616
617                 skip_address_key(p);
618         }
619
620         if (!host || !port)
621                 return -EINVAL;
622
623         if (family) {
624                 if (streq(family, "ipv4"))
625                         hints.ai_family = AF_INET;
626                 else if (streq(family, "ipv6"))
627                         hints.ai_family = AF_INET6;
628                 else
629                         return -EINVAL;
630         }
631
632         r = getaddrinfo(host, port, &hints, &result);
633         if (r == EAI_SYSTEM)
634                 return -errno;
635         else if (r != 0)
636                 return -EADDRNOTAVAIL;
637
638         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
639         b->sockaddr_size = result->ai_addrlen;
640
641         freeaddrinfo(result);
642
643         return 0;
644 }
645
646 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
647         char *path = NULL;
648         unsigned n_argv = 0, j;
649         char **argv = NULL;
650         int r;
651
652         assert(b);
653         assert(p);
654         assert(*p);
655         assert(guid);
656
657         while (**p != 0 && **p != ';') {
658                 r = parse_address_key(p, "guid", guid);
659                 if (r < 0)
660                         goto fail;
661                 else if (r > 0)
662                         continue;
663
664                 r = parse_address_key(p, "path", &path);
665                 if (r < 0)
666                         goto fail;
667                 else if (r > 0)
668                         continue;
669
670                 if (startswith(*p, "argv")) {
671                         unsigned ul;
672
673                         errno = 0;
674                         ul = strtoul(*p + 4, (char**) p, 10);
675                         if (errno > 0 || **p != '=' || ul > 256) {
676                                 r = -EINVAL;
677                                 goto fail;
678                         }
679
680                         (*p) ++;
681
682                         if (ul >= n_argv) {
683                                 char **x;
684
685                                 x = realloc(argv, sizeof(char*) * (ul + 2));
686                                 if (!x) {
687                                         r = -ENOMEM;
688                                         goto fail;
689                                 }
690
691                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
692
693                                 argv = x;
694                                 n_argv = ul + 1;
695                         }
696
697                         r = parse_address_key(p, NULL, argv + ul);
698                         if (r < 0)
699                                 goto fail;
700
701                         continue;
702                 }
703
704                 skip_address_key(p);
705         }
706
707         if (!path) {
708                 r = -EINVAL;
709                 goto fail;
710         }
711
712         /* Make sure there are no holes in the array, with the
713          * exception of argv[0] */
714         for (j = 1; j < n_argv; j++)
715                 if (!argv[j]) {
716                         r = -EINVAL;
717                         goto fail;
718                 }
719
720         if (argv && argv[0] == NULL) {
721                 argv[0] = strdup(path);
722                 if (!argv[0]) {
723                         r = -ENOMEM;
724                         goto fail;
725                 }
726         }
727
728         b->exec_path = path;
729         b->exec_argv = argv;
730         return 0;
731
732 fail:
733         for (j = 0; j < n_argv; j++)
734                 free(argv[j]);
735
736         free(argv);
737         free(path);
738         return r;
739 }
740
741 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
742         _cleanup_free_ char *path = NULL;
743         int r;
744
745         assert(b);
746         assert(p);
747         assert(*p);
748         assert(guid);
749
750         while (**p != 0 && **p != ';') {
751                 r = parse_address_key(p, "guid", guid);
752                 if (r < 0)
753                         return r;
754                 else if (r > 0)
755                         continue;
756
757                 r = parse_address_key(p, "path", &path);
758                 if (r < 0)
759                         return r;
760                 else if (r > 0)
761                         continue;
762
763                 skip_address_key(p);
764         }
765
766         if (!path)
767                 return -EINVAL;
768
769         free(b->kernel);
770         b->kernel = path;
771         path = NULL;
772
773         return 0;
774 }
775
776 static int parse_container_address(sd_bus *b, const char **p, char **guid) {
777         _cleanup_free_ char *machine = NULL;
778         int r;
779
780         assert(b);
781         assert(p);
782         assert(*p);
783         assert(guid);
784
785         while (**p != 0 && **p != ';') {
786                 r = parse_address_key(p, "guid", guid);
787                 if (r < 0)
788                         return r;
789                 else if (r > 0)
790                         continue;
791
792                 r = parse_address_key(p, "machine", &machine);
793                 if (r < 0)
794                         return r;
795                 else if (r > 0)
796                         continue;
797
798                 skip_address_key(p);
799         }
800
801         if (!machine)
802                 return -EINVAL;
803
804         free(b->machine);
805         b->machine = machine;
806         machine = NULL;
807
808         b->sockaddr.un.sun_family = AF_UNIX;
809         strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
810         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
811
812         return 0;
813 }
814
815 static void bus_reset_parsed_address(sd_bus *b) {
816         assert(b);
817
818         zero(b->sockaddr);
819         b->sockaddr_size = 0;
820         strv_free(b->exec_argv);
821         free(b->exec_path);
822         b->exec_path = NULL;
823         b->exec_argv = NULL;
824         b->server_id = SD_ID128_NULL;
825         free(b->kernel);
826         b->kernel = NULL;
827         free(b->machine);
828         b->machine = NULL;
829 }
830
831 static int bus_parse_next_address(sd_bus *b) {
832         _cleanup_free_ char *guid = NULL;
833         const char *a;
834         int r;
835
836         assert(b);
837
838         if (!b->address)
839                 return 0;
840         if (b->address[b->address_index] == 0)
841                 return 0;
842
843         bus_reset_parsed_address(b);
844
845         a = b->address + b->address_index;
846
847         while (*a != 0) {
848
849                 if (*a == ';') {
850                         a++;
851                         continue;
852                 }
853
854                 if (startswith(a, "unix:")) {
855                         a += 5;
856
857                         r = parse_unix_address(b, &a, &guid);
858                         if (r < 0)
859                                 return r;
860                         break;
861
862                 } else if (startswith(a, "tcp:")) {
863
864                         a += 4;
865                         r = parse_tcp_address(b, &a, &guid);
866                         if (r < 0)
867                                 return r;
868
869                         break;
870
871                 } else if (startswith(a, "unixexec:")) {
872
873                         a += 9;
874                         r = parse_exec_address(b, &a, &guid);
875                         if (r < 0)
876                                 return r;
877
878                         break;
879
880                 } else if (startswith(a, "kernel:")) {
881
882                         a += 7;
883                         r = parse_kernel_address(b, &a, &guid);
884                         if (r < 0)
885                                 return r;
886
887                         break;
888                 } else if (startswith(a, "x-container:")) {
889
890                         a += 12;
891                         r = parse_container_address(b, &a, &guid);
892                         if (r < 0)
893                                 return r;
894
895                         break;
896                 }
897
898                 a = strchr(a, ';');
899                 if (!a)
900                         return 0;
901         }
902
903         if (guid) {
904                 r = sd_id128_from_string(guid, &b->server_id);
905                 if (r < 0)
906                         return r;
907         }
908
909         b->address_index = a - b->address;
910         return 1;
911 }
912
913 static int bus_start_address(sd_bus *b) {
914         int r;
915
916         assert(b);
917
918         for (;;) {
919                 sd_bus_close(b);
920
921                 if (b->exec_path) {
922
923                         r = bus_socket_exec(b);
924                         if (r >= 0)
925                                 return r;
926
927                         b->last_connect_error = -r;
928                 } else if (b->kernel) {
929
930                         r = bus_kernel_connect(b);
931                         if (r >= 0)
932                                 return r;
933
934                         b->last_connect_error = -r;
935
936                 } else if (b->machine) {
937
938                         r = bus_container_connect(b);
939                         if (r >= 0)
940                                 return r;
941
942                         b->last_connect_error = -r;
943
944                 } else if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
945
946                         r = bus_socket_connect(b);
947                         if (r >= 0)
948                                 return r;
949
950                         b->last_connect_error = -r;
951                 }
952
953                 r = bus_parse_next_address(b);
954                 if (r < 0)
955                         return r;
956                 if (r == 0)
957                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
958         }
959 }
960
961 int bus_next_address(sd_bus *b) {
962         assert(b);
963
964         bus_reset_parsed_address(b);
965         return bus_start_address(b);
966 }
967
968 static int bus_start_fd(sd_bus *b) {
969         struct stat st;
970         int r;
971
972         assert(b);
973         assert(b->input_fd >= 0);
974         assert(b->output_fd >= 0);
975
976         r = fd_nonblock(b->input_fd, true);
977         if (r < 0)
978                 return r;
979
980         r = fd_cloexec(b->input_fd, true);
981         if (r < 0)
982                 return r;
983
984         if (b->input_fd != b->output_fd) {
985                 r = fd_nonblock(b->output_fd, true);
986                 if (r < 0)
987                         return r;
988
989                 r = fd_cloexec(b->output_fd, true);
990                 if (r < 0)
991                         return r;
992         }
993
994         if (fstat(b->input_fd, &st) < 0)
995                 return -errno;
996
997         if (S_ISCHR(b->input_fd))
998                 return bus_kernel_take_fd(b);
999         else
1000                 return bus_socket_take_fd(b);
1001 }
1002
1003 _public_ int sd_bus_start(sd_bus *bus) {
1004         int r;
1005
1006         assert_return(bus, -EINVAL);
1007         assert_return(bus->state == BUS_UNSET, -EPERM);
1008         assert_return(!bus_pid_changed(bus), -ECHILD);
1009
1010         bus->state = BUS_OPENING;
1011
1012         if (bus->is_server && bus->bus_client)
1013                 return -EINVAL;
1014
1015         if (bus->input_fd >= 0)
1016                 r = bus_start_fd(bus);
1017         else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
1018                 r = bus_start_address(bus);
1019         else
1020                 return -EINVAL;
1021
1022         if (r < 0)
1023                 return r;
1024
1025         return bus_send_hello(bus);
1026 }
1027
1028 _public_ int sd_bus_open_system(sd_bus **ret) {
1029         const char *e;
1030         sd_bus *b;
1031         int r;
1032
1033         assert_return(ret, -EINVAL);
1034
1035         r = sd_bus_new(&b);
1036         if (r < 0)
1037                 return r;
1038
1039         e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1040         if (e) {
1041                 r = sd_bus_set_address(b, e);
1042                 if (r < 0)
1043                         goto fail;
1044         } else {
1045                 b->sockaddr.un.sun_family = AF_UNIX;
1046                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
1047                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
1048         }
1049
1050         b->bus_client = true;
1051
1052         r = sd_bus_start(b);
1053         if (r < 0)
1054                 goto fail;
1055
1056         *ret = b;
1057         return 0;
1058
1059 fail:
1060         bus_free(b);
1061         return r;
1062 }
1063
1064 _public_ int sd_bus_open_user(sd_bus **ret) {
1065         const char *e;
1066         sd_bus *b;
1067         size_t l;
1068         int r;
1069
1070         assert_return(ret, -EINVAL);
1071
1072         r = sd_bus_new(&b);
1073         if (r < 0)
1074                 return r;
1075
1076         e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1077         if (e) {
1078                 r = sd_bus_set_address(b, e);
1079                 if (r < 0)
1080                         goto fail;
1081         } else {
1082                 e = secure_getenv("XDG_RUNTIME_DIR");
1083                 if (!e) {
1084                         r = -ENOENT;
1085                         goto fail;
1086                 }
1087
1088                 l = strlen(e);
1089                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1090                         r = -E2BIG;
1091                         goto fail;
1092                 }
1093
1094                 b->sockaddr.un.sun_family = AF_UNIX;
1095                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1096                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1097         }
1098
1099         b->bus_client = true;
1100
1101         r = sd_bus_start(b);
1102         if (r < 0)
1103                 goto fail;
1104
1105         *ret = b;
1106         return 0;
1107
1108 fail:
1109         bus_free(b);
1110         return r;
1111 }
1112
1113 _public_ int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1114         _cleanup_free_ char *e = NULL;
1115         char *p = NULL;
1116         sd_bus *bus;
1117         int r;
1118
1119         assert_return(host, -EINVAL);
1120         assert_return(ret, -EINVAL);
1121
1122         e = bus_address_escape(host);
1123         if (!e)
1124                 return -ENOMEM;
1125
1126         p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1127         if (!p)
1128                 return -ENOMEM;
1129
1130         r = sd_bus_new(&bus);
1131         if (r < 0) {
1132                 free(p);
1133                 return r;
1134         }
1135
1136         bus->address = p;
1137         bus->bus_client = true;
1138
1139         r = sd_bus_start(bus);
1140         if (r < 0) {
1141                 bus_free(bus);
1142                 return r;
1143         }
1144
1145         *ret = bus;
1146         return 0;
1147 }
1148
1149 _public_ int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1150         _cleanup_free_ char *e = NULL;
1151         sd_bus *bus;
1152         char *p;
1153         int r;
1154
1155         assert_return(machine, -EINVAL);
1156         assert_return(ret, -EINVAL);
1157
1158         e = bus_address_escape(machine);
1159         if (!e)
1160                 return -ENOMEM;
1161
1162         p = strjoin("x-container:machine=", e, NULL);
1163         if (!p)
1164                 return -ENOMEM;
1165
1166         r = sd_bus_new(&bus);
1167         if (r < 0) {
1168                 free(p);
1169                 return r;
1170         }
1171
1172         bus->address = p;
1173         bus->bus_client = true;
1174
1175         r = sd_bus_start(bus);
1176         if (r < 0) {
1177                 bus_free(bus);
1178                 return r;
1179         }
1180
1181         *ret = bus;
1182         return 0;
1183 }
1184
1185 _public_ void sd_bus_close(sd_bus *bus) {
1186         if (!bus)
1187                 return;
1188         if (bus->state == BUS_CLOSED)
1189                 return;
1190         if (bus_pid_changed(bus))
1191                 return;
1192
1193         bus->state = BUS_CLOSED;
1194
1195         sd_bus_detach_event(bus);
1196
1197         if (!bus->is_kernel)
1198                 bus_close_fds(bus);
1199
1200         /* We'll leave the fd open in case this is a kernel bus, since
1201          * there might still be memblocks around that reference this
1202          * bus, and they might need to invoke the
1203          * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1204          * freed. */
1205 }
1206
1207 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1208         assert_return(bus, NULL);
1209
1210         assert_se(REFCNT_INC(bus->n_ref) >= 2);
1211
1212         return bus;
1213 }
1214
1215 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1216         assert_return(bus, NULL);
1217
1218         if (REFCNT_DEC(bus->n_ref) <= 0)
1219                 bus_free(bus);
1220
1221         return NULL;
1222 }
1223
1224 _public_ int sd_bus_is_open(sd_bus *bus) {
1225
1226         assert_return(bus, -EINVAL);
1227         assert_return(!bus_pid_changed(bus), -ECHILD);
1228
1229         return BUS_IS_OPEN(bus->state);
1230 }
1231
1232 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1233         int r;
1234
1235         assert_return(bus, -EINVAL);
1236         assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1237         assert_return(!bus_pid_changed(bus), -ECHILD);
1238
1239         if (type == SD_BUS_TYPE_UNIX_FD) {
1240                 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1241                         return 0;
1242
1243                 r = bus_ensure_running(bus);
1244                 if (r < 0)
1245                         return r;
1246
1247                 return bus->can_fds;
1248         }
1249
1250         return bus_type_is_valid(type);
1251 }
1252
1253 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1254         int r;
1255
1256         assert_return(bus, -EINVAL);
1257         assert_return(server_id, -EINVAL);
1258         assert_return(!bus_pid_changed(bus), -ECHILD);
1259
1260         r = bus_ensure_running(bus);
1261         if (r < 0)
1262                 return r;
1263
1264         *server_id = bus->server_id;
1265         return 0;
1266 }
1267
1268 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1269         assert(m);
1270
1271         if (m->header->version > b->message_version)
1272                 return -EPERM;
1273
1274         if (m->sealed) {
1275                 /* If we copy the same message to multiple
1276                  * destinations, avoid using the same serial
1277                  * numbers. */
1278                 b->serial = MAX(b->serial, BUS_MESSAGE_SERIAL(m));
1279                 return 0;
1280         }
1281
1282         return bus_message_seal(m, ++b->serial);
1283 }
1284
1285 static int dispatch_wqueue(sd_bus *bus) {
1286         int r, ret = 0;
1287
1288         assert(bus);
1289         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1290
1291         while (bus->wqueue_size > 0) {
1292
1293                 if (bus->is_kernel)
1294                         r = bus_kernel_write_message(bus, bus->wqueue[0]);
1295                 else
1296                         r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1297
1298                 if (r < 0) {
1299                         sd_bus_close(bus);
1300                         return r;
1301                 } else if (r == 0)
1302                         /* Didn't do anything this time */
1303                         return ret;
1304                 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1305                         /* Fully written. Let's drop the entry from
1306                          * the queue.
1307                          *
1308                          * This isn't particularly optimized, but
1309                          * well, this is supposed to be our worst-case
1310                          * buffer only, and the socket buffer is
1311                          * supposed to be our primary buffer, and if
1312                          * it got full, then all bets are off
1313                          * anyway. */
1314
1315                         sd_bus_message_unref(bus->wqueue[0]);
1316                         bus->wqueue_size --;
1317                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1318                         bus->windex = 0;
1319
1320                         ret = 1;
1321                 }
1322         }
1323
1324         return ret;
1325 }
1326
1327 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1328         sd_bus_message *z = NULL;
1329         int r, ret = 0;
1330
1331         assert(bus);
1332         assert(m);
1333         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1334
1335         if (bus->rqueue_size > 0) {
1336                 /* Dispatch a queued message */
1337
1338                 *m = bus->rqueue[0];
1339                 bus->rqueue_size --;
1340                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1341                 return 1;
1342         }
1343
1344         /* Try to read a new message */
1345         do {
1346                 if (bus->is_kernel)
1347                         r = bus_kernel_read_message(bus, &z);
1348                 else
1349                         r = bus_socket_read_message(bus, &z);
1350
1351                 if (r < 0) {
1352                         sd_bus_close(bus);
1353                         return r;
1354                 }
1355                 if (r == 0)
1356                         return ret;
1357
1358                 ret = 1;
1359         } while (!z);
1360
1361         *m = z;
1362         return ret;
1363 }
1364
1365 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1366         int r;
1367
1368         assert_return(bus, -EINVAL);
1369         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1370         assert_return(m, -EINVAL);
1371         assert_return(!bus_pid_changed(bus), -ECHILD);
1372
1373         if (m->n_fds > 0) {
1374                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1375                 if (r < 0)
1376                         return r;
1377                 if (r == 0)
1378                         return -ENOTSUP;
1379         }
1380
1381         /* If the serial number isn't kept, then we know that no reply
1382          * is expected */
1383         if (!serial && !m->sealed)
1384                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1385
1386         r = bus_seal_message(bus, m);
1387         if (r < 0)
1388                 return r;
1389
1390         /* If this is a reply and no reply was requested, then let's
1391          * suppress this, if we can */
1392         if (m->dont_send && !serial)
1393                 return 1;
1394
1395         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1396                 size_t idx = 0;
1397
1398                 if (bus->is_kernel)
1399                         r = bus_kernel_write_message(bus, m);
1400                 else
1401                         r = bus_socket_write_message(bus, m, &idx);
1402
1403                 if (r < 0) {
1404                         sd_bus_close(bus);
1405                         return r;
1406                 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m))  {
1407                         /* Wasn't fully written. So let's remember how
1408                          * much was written. Note that the first entry
1409                          * of the wqueue array is always allocated so
1410                          * that we always can remember how much was
1411                          * written. */
1412                         bus->wqueue[0] = sd_bus_message_ref(m);
1413                         bus->wqueue_size = 1;
1414                         bus->windex = idx;
1415                 }
1416         } else {
1417                 sd_bus_message **q;
1418
1419                 /* Just append it to the queue. */
1420
1421                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1422                         return -ENOBUFS;
1423
1424                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1425                 if (!q)
1426                         return -ENOMEM;
1427
1428                 bus->wqueue = q;
1429                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1430         }
1431
1432         if (serial)
1433                 *serial = BUS_MESSAGE_SERIAL(m);
1434
1435         return 1;
1436 }
1437
1438 static usec_t calc_elapse(uint64_t usec) {
1439         if (usec == (uint64_t) -1)
1440                 return 0;
1441
1442         if (usec == 0)
1443                 usec = BUS_DEFAULT_TIMEOUT;
1444
1445         return now(CLOCK_MONOTONIC) + usec;
1446 }
1447
1448 static int timeout_compare(const void *a, const void *b) {
1449         const struct reply_callback *x = a, *y = b;
1450
1451         if (x->timeout != 0 && y->timeout == 0)
1452                 return -1;
1453
1454         if (x->timeout == 0 && y->timeout != 0)
1455                 return 1;
1456
1457         if (x->timeout < y->timeout)
1458                 return -1;
1459
1460         if (x->timeout > y->timeout)
1461                 return 1;
1462
1463         return 0;
1464 }
1465
1466 _public_ int sd_bus_call_async(
1467                 sd_bus *bus,
1468                 sd_bus_message *m,
1469                 sd_bus_message_handler_t callback,
1470                 void *userdata,
1471                 uint64_t usec,
1472                 uint64_t *serial) {
1473
1474         struct reply_callback *c;
1475         int r;
1476
1477         assert_return(bus, -EINVAL);
1478         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1479         assert_return(m, -EINVAL);
1480         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1481         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1482         assert_return(callback, -EINVAL);
1483         assert_return(!bus_pid_changed(bus), -ECHILD);
1484
1485         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1486         if (r < 0)
1487                 return r;
1488
1489         if (usec != (uint64_t) -1) {
1490                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1491                 if (r < 0)
1492                         return r;
1493         }
1494
1495         r = bus_seal_message(bus, m);
1496         if (r < 0)
1497                 return r;
1498
1499         c = new0(struct reply_callback, 1);
1500         if (!c)
1501                 return -ENOMEM;
1502
1503         c->callback = callback;
1504         c->userdata = userdata;
1505         c->serial = BUS_MESSAGE_SERIAL(m);
1506         c->timeout = calc_elapse(usec);
1507
1508         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1509         if (r < 0) {
1510                 free(c);
1511                 return r;
1512         }
1513
1514         if (c->timeout != 0) {
1515                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1516                 if (r < 0) {
1517                         c->timeout = 0;
1518                         sd_bus_call_async_cancel(bus, c->serial);
1519                         return r;
1520                 }
1521         }
1522
1523         r = sd_bus_send(bus, m, serial);
1524         if (r < 0) {
1525                 sd_bus_call_async_cancel(bus, c->serial);
1526                 return r;
1527         }
1528
1529         return r;
1530 }
1531
1532 _public_ int sd_bus_call_async_cancel(sd_bus *bus, uint64_t serial) {
1533         struct reply_callback *c;
1534
1535         assert_return(bus, -EINVAL);
1536         assert_return(serial != 0, -EINVAL);
1537         assert_return(!bus_pid_changed(bus), -ECHILD);
1538
1539         c = hashmap_remove(bus->reply_callbacks, &serial);
1540         if (!c)
1541                 return 0;
1542
1543         if (c->timeout != 0)
1544                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1545
1546         free(c);
1547         return 1;
1548 }
1549
1550 int bus_ensure_running(sd_bus *bus) {
1551         int r;
1552
1553         assert(bus);
1554
1555         if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED)
1556                 return -ENOTCONN;
1557         if (bus->state == BUS_RUNNING)
1558                 return 1;
1559
1560         for (;;) {
1561                 r = sd_bus_process(bus, NULL);
1562                 if (r < 0)
1563                         return r;
1564                 if (bus->state == BUS_RUNNING)
1565                         return 1;
1566                 if (r > 0)
1567                         continue;
1568
1569                 r = sd_bus_wait(bus, (uint64_t) -1);
1570                 if (r < 0)
1571                         return r;
1572         }
1573 }
1574
1575 _public_ int sd_bus_call(
1576                 sd_bus *bus,
1577                 sd_bus_message *m,
1578                 uint64_t usec,
1579                 sd_bus_error *error,
1580                 sd_bus_message **reply) {
1581
1582         int r;
1583         usec_t timeout;
1584         uint64_t serial;
1585         bool room = false;
1586
1587         assert_return(bus, -EINVAL);
1588         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1589         assert_return(m, -EINVAL);
1590         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1591         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1592         assert_return(!bus_error_is_dirty(error), -EINVAL);
1593         assert_return(!bus_pid_changed(bus), -ECHILD);
1594
1595         r = bus_ensure_running(bus);
1596         if (r < 0)
1597                 return r;
1598
1599         r = sd_bus_send(bus, m, &serial);
1600         if (r < 0)
1601                 return r;
1602
1603         timeout = calc_elapse(usec);
1604
1605         for (;;) {
1606                 usec_t left;
1607                 sd_bus_message *incoming = NULL;
1608
1609                 if (!room) {
1610                         sd_bus_message **q;
1611
1612                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1613                                 return -ENOBUFS;
1614
1615                         /* Make sure there's room for queuing this
1616                          * locally, before we read the message */
1617
1618                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1619                         if (!q)
1620                                 return -ENOMEM;
1621
1622                         bus->rqueue = q;
1623                         room = true;
1624                 }
1625
1626                 if (bus->is_kernel)
1627                         r = bus_kernel_read_message(bus, &incoming);
1628                 else
1629                         r = bus_socket_read_message(bus, &incoming);
1630                 if (r < 0)
1631                         return r;
1632                 if (incoming) {
1633
1634                         if (incoming->reply_serial == serial) {
1635                                 /* Found a match! */
1636
1637                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1638
1639                                         if (reply)
1640                                                 *reply = incoming;
1641                                         else
1642                                                 sd_bus_message_unref(incoming);
1643
1644                                         return 1;
1645                                 }
1646
1647                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) {
1648                                         int k;
1649
1650                                         r = sd_bus_error_copy(error, &incoming->error);
1651                                         if (r < 0) {
1652                                                 sd_bus_message_unref(incoming);
1653                                                 return r;
1654                                         }
1655
1656                                         k = sd_bus_error_get_errno(&incoming->error);
1657                                         sd_bus_message_unref(incoming);
1658                                         return -k;
1659                                 }
1660
1661                                 sd_bus_message_unref(incoming);
1662                                 return -EIO;
1663
1664                         } else if (incoming->header->serial == serial &&
1665                                    bus->unique_name &&
1666                                    incoming->sender &&
1667                                    streq(bus->unique_name, incoming->sender)) {
1668
1669                                 /* Our own message? Somebody is trying
1670                                  * to send its own client a message,
1671                                  * let's not dead-lock, let's fail
1672                                  * immediately. */
1673
1674                                 sd_bus_message_unref(incoming);
1675                                 return -ELOOP;
1676                         }
1677
1678                         /* There's already guaranteed to be room for
1679                          * this, so need to resize things here */
1680                         bus->rqueue[bus->rqueue_size ++] = incoming;
1681                         room = false;
1682
1683                         /* Try to read more, right-away */
1684                         continue;
1685                 }
1686                 if (r != 0)
1687                         continue;
1688
1689                 if (timeout > 0) {
1690                         usec_t n;
1691
1692                         n = now(CLOCK_MONOTONIC);
1693                         if (n >= timeout)
1694                                 return -ETIMEDOUT;
1695
1696                         left = timeout - n;
1697                 } else
1698                         left = (uint64_t) -1;
1699
1700                 r = bus_poll(bus, true, left);
1701                 if (r < 0)
1702                         return r;
1703
1704                 r = dispatch_wqueue(bus);
1705                 if (r < 0)
1706                         return r;
1707         }
1708 }
1709
1710 _public_ int sd_bus_get_fd(sd_bus *bus) {
1711
1712         assert_return(bus, -EINVAL);
1713         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1714         assert_return(bus->input_fd == bus->output_fd, -EPERM);
1715         assert_return(!bus_pid_changed(bus), -ECHILD);
1716
1717         return bus->input_fd;
1718 }
1719
1720 _public_ int sd_bus_get_events(sd_bus *bus) {
1721         int flags = 0;
1722
1723         assert_return(bus, -EINVAL);
1724         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1725         assert_return(!bus_pid_changed(bus), -ECHILD);
1726
1727         if (bus->state == BUS_OPENING)
1728                 flags |= POLLOUT;
1729         else if (bus->state == BUS_AUTHENTICATING) {
1730
1731                 if (bus_socket_auth_needs_write(bus))
1732                         flags |= POLLOUT;
1733
1734                 flags |= POLLIN;
1735
1736         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1737                 if (bus->rqueue_size <= 0)
1738                         flags |= POLLIN;
1739                 if (bus->wqueue_size > 0)
1740                         flags |= POLLOUT;
1741         }
1742
1743         return flags;
1744 }
1745
1746 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1747         struct reply_callback *c;
1748
1749         assert_return(bus, -EINVAL);
1750         assert_return(timeout_usec, -EINVAL);
1751         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1752         assert_return(!bus_pid_changed(bus), -ECHILD);
1753
1754         if (bus->state == BUS_AUTHENTICATING) {
1755                 *timeout_usec = bus->auth_timeout;
1756                 return 1;
1757         }
1758
1759         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1760                 *timeout_usec = (uint64_t) -1;
1761                 return 0;
1762         }
1763
1764         if (bus->rqueue_size > 0) {
1765                 *timeout_usec = 0;
1766                 return 1;
1767         }
1768
1769         c = prioq_peek(bus->reply_callbacks_prioq);
1770         if (!c) {
1771                 *timeout_usec = (uint64_t) -1;
1772                 return 0;
1773         }
1774
1775         *timeout_usec = c->timeout;
1776         return 1;
1777 }
1778
1779 static int process_timeout(sd_bus *bus) {
1780         _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1781         struct reply_callback *c;
1782         usec_t n;
1783         int r;
1784
1785         assert(bus);
1786
1787         c = prioq_peek(bus->reply_callbacks_prioq);
1788         if (!c)
1789                 return 0;
1790
1791         n = now(CLOCK_MONOTONIC);
1792         if (c->timeout > n)
1793                 return 0;
1794
1795         r = bus_message_new_synthetic_error(
1796                         bus,
1797                         c->serial,
1798                         &SD_BUS_ERROR_MAKE(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1799                         &m);
1800         if (r < 0)
1801                 return r;
1802
1803         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1804         hashmap_remove(bus->reply_callbacks, &c->serial);
1805
1806         r = c->callback(bus, m, c->userdata);
1807         free(c);
1808
1809         return r < 0 ? r : 1;
1810 }
1811
1812 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1813         assert(bus);
1814         assert(m);
1815
1816         if (bus->state != BUS_HELLO)
1817                 return 0;
1818
1819         /* Let's make sure the first message on the bus is the HELLO
1820          * reply. But note that we don't actually parse the message
1821          * here (we leave that to the usual handling), we just verify
1822          * we don't let any earlier msg through. */
1823
1824         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1825             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1826                 return -EIO;
1827
1828         if (m->reply_serial != bus->hello_serial)
1829                 return -EIO;
1830
1831         return 0;
1832 }
1833
1834 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1835         struct reply_callback *c;
1836         int r;
1837
1838         assert(bus);
1839         assert(m);
1840
1841         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1842             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1843                 return 0;
1844
1845         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1846         if (!c)
1847                 return 0;
1848
1849         if (c->timeout != 0)
1850                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1851
1852         r = sd_bus_message_rewind(m, true);
1853         if (r < 0)
1854                 return r;
1855
1856         r = c->callback(bus, m, c->userdata);
1857         free(c);
1858
1859         return r;
1860 }
1861
1862 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1863         struct filter_callback *l;
1864         int r;
1865
1866         assert(bus);
1867         assert(m);
1868
1869         do {
1870                 bus->filter_callbacks_modified = false;
1871
1872                 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1873
1874                         if (bus->filter_callbacks_modified)
1875                                 break;
1876
1877                         /* Don't run this more than once per iteration */
1878                         if (l->last_iteration == bus->iteration_counter)
1879                                 continue;
1880
1881                         l->last_iteration = bus->iteration_counter;
1882
1883                         r = sd_bus_message_rewind(m, true);
1884                         if (r < 0)
1885                                 return r;
1886
1887                         r = l->callback(bus, m, l->userdata);
1888                         if (r != 0)
1889                                 return r;
1890
1891                 }
1892
1893         } while (bus->filter_callbacks_modified);
1894
1895         return 0;
1896 }
1897
1898 static int process_match(sd_bus *bus, sd_bus_message *m) {
1899         int r;
1900
1901         assert(bus);
1902         assert(m);
1903
1904         do {
1905                 bus->match_callbacks_modified = false;
1906
1907                 r = bus_match_run(bus, &bus->match_callbacks, m);
1908                 if (r != 0)
1909                         return r;
1910
1911         } while (bus->match_callbacks_modified);
1912
1913         return 0;
1914 }
1915
1916 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1917         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1918         int r;
1919
1920         assert(bus);
1921         assert(m);
1922
1923         if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
1924                 return 0;
1925
1926         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1927                 return 0;
1928
1929         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1930                 return 1;
1931
1932         if (streq_ptr(m->member, "Ping"))
1933                 r = sd_bus_message_new_method_return(bus, m, &reply);
1934         else if (streq_ptr(m->member, "GetMachineId")) {
1935                 sd_id128_t id;
1936                 char sid[33];
1937
1938                 r = sd_id128_get_machine(&id);
1939                 if (r < 0)
1940                         return r;
1941
1942                 r = sd_bus_message_new_method_return(bus, m, &reply);
1943                 if (r < 0)
1944                         return r;
1945
1946                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1947         } else {
1948                 r = sd_bus_message_new_method_errorf(
1949                                 bus, m, &reply,
1950                                 SD_BUS_ERROR_UNKNOWN_METHOD,
1951                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1952         }
1953
1954         if (r < 0)
1955                 return r;
1956
1957         r = sd_bus_send(bus, reply, NULL);
1958         if (r < 0)
1959                 return r;
1960
1961         return 1;
1962 }
1963
1964 static int process_message(sd_bus *bus, sd_bus_message *m) {
1965         int r;
1966
1967         assert(bus);
1968         assert(m);
1969
1970         bus->current = m;
1971         bus->iteration_counter++;
1972
1973         log_debug("Got message sender=%s object=%s interface=%s member=%s",
1974                   strna(sd_bus_message_get_sender(m)),
1975                   strna(sd_bus_message_get_path(m)),
1976                   strna(sd_bus_message_get_interface(m)),
1977                   strna(sd_bus_message_get_member(m)));
1978
1979         r = process_hello(bus, m);
1980         if (r != 0)
1981                 goto finish;
1982
1983         r = process_reply(bus, m);
1984         if (r != 0)
1985                 goto finish;
1986
1987         r = process_filter(bus, m);
1988         if (r != 0)
1989                 goto finish;
1990
1991         r = process_match(bus, m);
1992         if (r != 0)
1993                 goto finish;
1994
1995         r = process_builtin(bus, m);
1996         if (r != 0)
1997                 goto finish;
1998
1999         r = bus_process_object(bus, m);
2000
2001 finish:
2002         bus->current = NULL;
2003         return r;
2004 }
2005
2006 static int process_running(sd_bus *bus, sd_bus_message **ret) {
2007         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2008         int r;
2009
2010         assert(bus);
2011         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2012
2013         r = process_timeout(bus);
2014         if (r != 0)
2015                 goto null_message;
2016
2017         r = dispatch_wqueue(bus);
2018         if (r != 0)
2019                 goto null_message;
2020
2021         r = dispatch_rqueue(bus, &m);
2022         if (r < 0)
2023                 return r;
2024         if (!m)
2025                 goto null_message;
2026
2027         r = process_message(bus, m);
2028         if (r != 0)
2029                 goto null_message;
2030
2031         if (ret) {
2032                 r = sd_bus_message_rewind(m, true);
2033                 if (r < 0)
2034                         return r;
2035
2036                 *ret = m;
2037                 m = NULL;
2038                 return 1;
2039         }
2040
2041         if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2042
2043                 r = sd_bus_reply_method_errorf(
2044                                 bus, m,
2045                                 SD_BUS_ERROR_UNKNOWN_OBJECT,
2046                                 "Unknown object '%s'.", m->path);
2047                 if (r < 0)
2048                         return r;
2049         }
2050
2051         return 1;
2052
2053 null_message:
2054         if (r >= 0 && ret)
2055                 *ret = NULL;
2056
2057         return r;
2058 }
2059
2060 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2061         BUS_DONT_DESTROY(bus);
2062         int r;
2063
2064         /* Returns 0 when we didn't do anything. This should cause the
2065          * caller to invoke sd_bus_wait() before returning the next
2066          * time. Returns > 0 when we did something, which possibly
2067          * means *ret is filled in with an unprocessed message. */
2068
2069         assert_return(bus, -EINVAL);
2070         assert_return(!bus_pid_changed(bus), -ECHILD);
2071
2072         /* We don't allow recursively invoking sd_bus_process(). */
2073         assert_return(!bus->processing, -EBUSY);
2074
2075         switch (bus->state) {
2076
2077         case BUS_UNSET:
2078         case BUS_CLOSED:
2079                 return -ENOTCONN;
2080
2081         case BUS_OPENING:
2082                 r = bus_socket_process_opening(bus);
2083                 if (r < 0)
2084                         return r;
2085                 if (ret)
2086                         *ret = NULL;
2087                 return r;
2088
2089         case BUS_AUTHENTICATING:
2090
2091                 r = bus_socket_process_authenticating(bus);
2092                 if (r < 0)
2093                         return r;
2094                 if (ret)
2095                         *ret = NULL;
2096                 return r;
2097
2098         case BUS_RUNNING:
2099         case BUS_HELLO:
2100
2101                 bus->processing = true;
2102                 r = process_running(bus, ret);
2103                 bus->processing = false;
2104
2105                 return r;
2106         }
2107
2108         assert_not_reached("Unknown state");
2109 }
2110
2111 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2112         struct pollfd p[2] = {};
2113         int r, e, n;
2114         struct timespec ts;
2115         usec_t m = (usec_t) -1;
2116
2117         assert(bus);
2118         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2119
2120         e = sd_bus_get_events(bus);
2121         if (e < 0)
2122                 return e;
2123
2124         if (need_more)
2125                 /* The caller really needs some more data, he doesn't
2126                  * care about what's already read, or any timeouts
2127                  * except its own.*/
2128                 e |= POLLIN;
2129         else {
2130                 usec_t until;
2131                 /* The caller wants to process if there's something to
2132                  * process, but doesn't care otherwise */
2133
2134                 r = sd_bus_get_timeout(bus, &until);
2135                 if (r < 0)
2136                         return r;
2137                 if (r > 0) {
2138                         usec_t nw;
2139                         nw = now(CLOCK_MONOTONIC);
2140                         m = until > nw ? until - nw : 0;
2141                 }
2142         }
2143
2144         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2145                 m = timeout_usec;
2146
2147         p[0].fd = bus->input_fd;
2148         if (bus->output_fd == bus->input_fd) {
2149                 p[0].events = e;
2150                 n = 1;
2151         } else {
2152                 p[0].events = e & POLLIN;
2153                 p[1].fd = bus->output_fd;
2154                 p[1].events = e & POLLOUT;
2155                 n = 2;
2156         }
2157
2158         r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2159         if (r < 0)
2160                 return -errno;
2161
2162         return r > 0 ? 1 : 0;
2163 }
2164
2165 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2166
2167         assert_return(bus, -EINVAL);
2168         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2169         assert_return(!bus_pid_changed(bus), -ECHILD);
2170
2171         if (bus->rqueue_size > 0)
2172                 return 0;
2173
2174         return bus_poll(bus, false, timeout_usec);
2175 }
2176
2177 _public_ int sd_bus_flush(sd_bus *bus) {
2178         int r;
2179
2180         assert_return(bus, -EINVAL);
2181         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2182         assert_return(!bus_pid_changed(bus), -ECHILD);
2183
2184         r = bus_ensure_running(bus);
2185         if (r < 0)
2186                 return r;
2187
2188         if (bus->wqueue_size <= 0)
2189                 return 0;
2190
2191         for (;;) {
2192                 r = dispatch_wqueue(bus);
2193                 if (r < 0)
2194                         return r;
2195
2196                 if (bus->wqueue_size <= 0)
2197                         return 0;
2198
2199                 r = bus_poll(bus, false, (uint64_t) -1);
2200                 if (r < 0)
2201                         return r;
2202         }
2203 }
2204
2205 _public_ int sd_bus_add_filter(sd_bus *bus,
2206                                sd_bus_message_handler_t callback,
2207                                void *userdata) {
2208
2209         struct filter_callback *f;
2210
2211         assert_return(bus, -EINVAL);
2212         assert_return(callback, -EINVAL);
2213         assert_return(!bus_pid_changed(bus), -ECHILD);
2214
2215         f = new0(struct filter_callback, 1);
2216         if (!f)
2217                 return -ENOMEM;
2218         f->callback = callback;
2219         f->userdata = userdata;
2220
2221         bus->filter_callbacks_modified = true;
2222         LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2223         return 0;
2224 }
2225
2226 _public_ int sd_bus_remove_filter(sd_bus *bus,
2227                                   sd_bus_message_handler_t callback,
2228                                   void *userdata) {
2229
2230         struct filter_callback *f;
2231
2232         assert_return(bus, -EINVAL);
2233         assert_return(callback, -EINVAL);
2234         assert_return(!bus_pid_changed(bus), -ECHILD);
2235
2236         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2237                 if (f->callback == callback && f->userdata == userdata) {
2238                         bus->filter_callbacks_modified = true;
2239                         LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2240                         free(f);
2241                         return 1;
2242                 }
2243         }
2244
2245         return 0;
2246 }
2247
2248 _public_ int sd_bus_add_match(sd_bus *bus,
2249                               const char *match,
2250                               sd_bus_message_handler_t callback,
2251                               void *userdata) {
2252
2253         struct bus_match_component *components = NULL;
2254         unsigned n_components = 0;
2255         uint64_t cookie = 0;
2256         int r = 0;
2257
2258         assert_return(bus, -EINVAL);
2259         assert_return(match, -EINVAL);
2260         assert_return(!bus_pid_changed(bus), -ECHILD);
2261
2262         r = bus_match_parse(match, &components, &n_components);
2263         if (r < 0)
2264                 goto finish;
2265
2266         if (bus->bus_client) {
2267                 cookie = ++bus->match_cookie;
2268
2269                 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2270                 if (r < 0)
2271                         goto finish;
2272         }
2273
2274         bus->match_callbacks_modified = true;
2275         r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2276         if (r < 0) {
2277                 if (bus->bus_client)
2278                         bus_remove_match_internal(bus, match, cookie);
2279         }
2280
2281 finish:
2282         bus_match_parse_free(components, n_components);
2283         return r;
2284 }
2285
2286 _public_ int sd_bus_remove_match(sd_bus *bus,
2287                                  const char *match,
2288                                  sd_bus_message_handler_t callback,
2289                                  void *userdata) {
2290
2291         struct bus_match_component *components = NULL;
2292         unsigned n_components = 0;
2293         int r = 0, q = 0;
2294         uint64_t cookie = 0;
2295
2296         assert_return(bus, -EINVAL);
2297         assert_return(match, -EINVAL);
2298         assert_return(!bus_pid_changed(bus), -ECHILD);
2299
2300         r = bus_match_parse(match, &components, &n_components);
2301         if (r < 0)
2302                 return r;
2303
2304         bus->match_callbacks_modified = true;
2305         r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2306
2307         if (bus->bus_client)
2308                 q = bus_remove_match_internal(bus, match, cookie);
2309
2310         bus_match_parse_free(components, n_components);
2311
2312         return r < 0 ? r : q;
2313 }
2314
2315 bool bus_pid_changed(sd_bus *bus) {
2316         assert(bus);
2317
2318         /* We don't support people creating a bus connection and
2319          * keeping it around over a fork(). Let's complain. */
2320
2321         return bus->original_pid != getpid();
2322 }
2323
2324 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2325         sd_bus *bus = userdata;
2326         int r;
2327
2328         assert(bus);
2329
2330         r = sd_bus_process(bus, NULL);
2331         if (r < 0)
2332                 return r;
2333
2334         return 1;
2335 }
2336
2337 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2338         sd_bus *bus = userdata;
2339         int r;
2340
2341         assert(bus);
2342
2343         r = sd_bus_process(bus, NULL);
2344         if (r < 0)
2345                 return r;
2346
2347         return 1;
2348 }
2349
2350 static int prepare_callback(sd_event_source *s, void *userdata) {
2351         sd_bus *bus = userdata;
2352         int r, e;
2353         usec_t until;
2354
2355         assert(s);
2356         assert(bus);
2357
2358         e = sd_bus_get_events(bus);
2359         if (e < 0)
2360                 return e;
2361
2362         if (bus->output_fd != bus->input_fd) {
2363
2364                 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2365                 if (r < 0)
2366                         return r;
2367
2368                 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2369                 if (r < 0)
2370                         return r;
2371         } else {
2372                 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2373                 if (r < 0)
2374                         return r;
2375         }
2376
2377         r = sd_bus_get_timeout(bus, &until);
2378         if (r < 0)
2379                 return r;
2380         if (r > 0) {
2381                 int j;
2382
2383                 j = sd_event_source_set_time(bus->time_event_source, until);
2384                 if (j < 0)
2385                         return j;
2386         }
2387
2388         r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2389         if (r < 0)
2390                 return r;
2391
2392         return 1;
2393 }
2394
2395 static int quit_callback(sd_event_source *event, void *userdata) {
2396         sd_bus *bus = userdata;
2397
2398         assert(event);
2399
2400         sd_bus_flush(bus);
2401
2402         return 1;
2403 }
2404
2405 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2406         int r;
2407
2408         assert_return(bus, -EINVAL);
2409         assert_return(!bus->event, -EBUSY);
2410
2411         assert(!bus->input_io_event_source);
2412         assert(!bus->output_io_event_source);
2413         assert(!bus->time_event_source);
2414
2415         if (event)
2416                 bus->event = sd_event_ref(event);
2417         else  {
2418                 r = sd_event_default(&bus->event);
2419                 if (r < 0)
2420                         return r;
2421         }
2422
2423         r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2424         if (r < 0)
2425                 goto fail;
2426
2427         r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2428         if (r < 0)
2429                 goto fail;
2430
2431         if (bus->output_fd != bus->input_fd) {
2432                 r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2433                 if (r < 0)
2434                         goto fail;
2435
2436                 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2437                 if (r < 0)
2438                         goto fail;
2439         }
2440
2441         r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2442         if (r < 0)
2443                 goto fail;
2444
2445         r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
2446         if (r < 0)
2447                 goto fail;
2448
2449         r = sd_event_source_set_priority(bus->time_event_source, priority);
2450         if (r < 0)
2451                 goto fail;
2452
2453         r = sd_event_add_quit(bus->event, quit_callback, bus, &bus->quit_event_source);
2454         if (r < 0)
2455                 goto fail;
2456
2457         return 0;
2458
2459 fail:
2460         sd_bus_detach_event(bus);
2461         return r;
2462 }
2463
2464 _public_ int sd_bus_detach_event(sd_bus *bus) {
2465         assert_return(bus, -EINVAL);
2466         assert_return(bus->event, -ENXIO);
2467
2468         if (bus->input_io_event_source)
2469                 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2470
2471         if (bus->output_io_event_source)
2472                 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2473
2474         if (bus->time_event_source)
2475                 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2476
2477         if (bus->quit_event_source)
2478                 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2479
2480         if (bus->event)
2481                 bus->event = sd_event_unref(bus->event);
2482
2483         return 0;
2484 }
2485
2486 _public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2487         assert_return(bus, NULL);
2488
2489         return bus->current;
2490 }
2491
2492 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
2493         sd_bus *b = NULL;
2494         int r;
2495
2496         assert(bus_open);
2497         assert(default_bus);
2498
2499         if (!ret)
2500                 return !!*default_bus;
2501
2502         if (*default_bus) {
2503                 *ret = sd_bus_ref(*default_bus);
2504                 return 0;
2505         }
2506
2507         r = bus_open(&b);
2508         if (r < 0)
2509                 return r;
2510
2511         b->default_bus_ptr = default_bus;
2512         b->tid = gettid();
2513         *default_bus = b;
2514
2515         *ret = b;
2516         return 1;
2517 }
2518
2519 _public_ int sd_bus_default_system(sd_bus **ret) {
2520         static __thread sd_bus *default_system_bus = NULL;
2521
2522         return bus_default(sd_bus_open_system, &default_system_bus, ret);
2523 }
2524
2525 _public_ int sd_bus_default_user(sd_bus **ret) {
2526         static __thread sd_bus *default_user_bus = NULL;
2527
2528         return bus_default(sd_bus_open_user, &default_user_bus, ret);
2529 }
2530
2531 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
2532         assert_return(b, -EINVAL);
2533         assert_return(tid, -EINVAL);
2534         assert_return(!bus_pid_changed(b), -ECHILD);
2535
2536         if (b->tid != 0) {
2537                 *tid = b->tid;
2538                 return 0;
2539         }
2540
2541         if (b->event)
2542                 return sd_event_get_tid(b->event, tid);
2543
2544         return -ENXIO;
2545 }