chiark / gitweb /
bus: TIMESTAMP is optional kdbus metadata now, NAMES are always added
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29 #include <sys/mman.h>
30 #include <pthread.h>
31
32 #include "util.h"
33 #include "macro.h"
34 #include "strv.h"
35 #include "set.h"
36 #include "missing.h"
37
38 #include "sd-bus.h"
39 #include "bus-internal.h"
40 #include "bus-message.h"
41 #include "bus-type.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
48 #include "bus-util.h"
49 #include "bus-container.h"
50
51 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
52
53 static void bus_close_fds(sd_bus *b) {
54         assert(b);
55
56         if (b->input_fd >= 0)
57                 close_nointr_nofail(b->input_fd);
58
59         if (b->output_fd >= 0 && b->output_fd != b->input_fd)
60                 close_nointr_nofail(b->output_fd);
61
62         b->input_fd = b->output_fd = -1;
63 }
64
65 static void bus_node_destroy(sd_bus *b, struct node *n) {
66         struct node_callback *c;
67         struct node_vtable *v;
68         struct node_enumerator *e;
69
70         assert(b);
71
72         if (!n)
73                 return;
74
75         while (n->child)
76                 bus_node_destroy(b, n->child);
77
78         while ((c = n->callbacks)) {
79                 LIST_REMOVE(callbacks, n->callbacks, c);
80                 free(c);
81         }
82
83         while ((v = n->vtables)) {
84                 LIST_REMOVE(vtables, n->vtables, v);
85                 free(v->interface);
86                 free(v);
87         }
88
89         while ((e = n->enumerators)) {
90                 LIST_REMOVE(enumerators, n->enumerators, e);
91                 free(e);
92         }
93
94         if (n->parent)
95                 LIST_REMOVE(siblings, n->parent->child, n);
96
97         assert_se(hashmap_remove(b->nodes, n->path) == n);
98         free(n->path);
99         free(n);
100 }
101
102 static void bus_free(sd_bus *b) {
103         struct filter_callback *f;
104         struct node *n;
105         unsigned i;
106
107         assert(b);
108
109         sd_bus_detach_event(b);
110
111         bus_close_fds(b);
112
113         if (b->kdbus_buffer)
114                 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
115
116         free(b->rbuffer);
117         free(b->unique_name);
118         free(b->auth_buffer);
119         free(b->address);
120         free(b->kernel);
121         free(b->machine);
122
123         free(b->exec_path);
124         strv_free(b->exec_argv);
125
126         close_many(b->fds, b->n_fds);
127         free(b->fds);
128
129         for (i = 0; i < b->rqueue_size; i++)
130                 sd_bus_message_unref(b->rqueue[i]);
131         free(b->rqueue);
132
133         for (i = 0; i < b->wqueue_size; i++)
134                 sd_bus_message_unref(b->wqueue[i]);
135         free(b->wqueue);
136
137         hashmap_free_free(b->reply_callbacks);
138         prioq_free(b->reply_callbacks_prioq);
139
140         while ((f = b->filter_callbacks)) {
141                 LIST_REMOVE(callbacks, b->filter_callbacks, f);
142                 free(f);
143         }
144
145         bus_match_free(&b->match_callbacks);
146
147         hashmap_free_free(b->vtable_methods);
148         hashmap_free_free(b->vtable_properties);
149
150         while ((n = hashmap_first(b->nodes)))
151                 bus_node_destroy(b, n);
152
153         hashmap_free(b->nodes);
154
155         bus_kernel_flush_memfd(b);
156
157         assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
158
159         free(b);
160 }
161
162 _public_ int sd_bus_new(sd_bus **ret) {
163         sd_bus *r;
164
165         assert_return(ret, -EINVAL);
166
167         r = new0(sd_bus, 1);
168         if (!r)
169                 return -ENOMEM;
170
171         r->n_ref = REFCNT_INIT;
172         r->input_fd = r->output_fd = -1;
173         r->message_version = 1;
174         r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
175         r->original_pid = getpid();
176
177         assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
178
179         /* We guarantee that wqueue always has space for at least one
180          * entry */
181         r->wqueue = new(sd_bus_message*, 1);
182         if (!r->wqueue) {
183                 free(r);
184                 return -ENOMEM;
185         }
186
187         *ret = r;
188         return 0;
189 }
190
191 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
192         char *a;
193
194         assert_return(bus, -EINVAL);
195         assert_return(bus->state == BUS_UNSET, -EPERM);
196         assert_return(address, -EINVAL);
197         assert_return(!bus_pid_changed(bus), -ECHILD);
198
199         a = strdup(address);
200         if (!a)
201                 return -ENOMEM;
202
203         free(bus->address);
204         bus->address = a;
205
206         return 0;
207 }
208
209 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
210         assert_return(bus, -EINVAL);
211         assert_return(bus->state == BUS_UNSET, -EPERM);
212         assert_return(input_fd >= 0, -EINVAL);
213         assert_return(output_fd >= 0, -EINVAL);
214         assert_return(!bus_pid_changed(bus), -ECHILD);
215
216         bus->input_fd = input_fd;
217         bus->output_fd = output_fd;
218         return 0;
219 }
220
221 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
222         char *p, **a;
223
224         assert_return(bus, -EINVAL);
225         assert_return(bus->state == BUS_UNSET, -EPERM);
226         assert_return(path, -EINVAL);
227         assert_return(!strv_isempty(argv), -EINVAL);
228         assert_return(!bus_pid_changed(bus), -ECHILD);
229
230         p = strdup(path);
231         if (!p)
232                 return -ENOMEM;
233
234         a = strv_copy(argv);
235         if (!a) {
236                 free(p);
237                 return -ENOMEM;
238         }
239
240         free(bus->exec_path);
241         strv_free(bus->exec_argv);
242
243         bus->exec_path = p;
244         bus->exec_argv = a;
245
246         return 0;
247 }
248
249 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
250         assert_return(bus, -EINVAL);
251         assert_return(bus->state == BUS_UNSET, -EPERM);
252         assert_return(!bus_pid_changed(bus), -ECHILD);
253
254         bus->bus_client = !!b;
255         return 0;
256 }
257
258 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
259         assert_return(bus, -EINVAL);
260         assert_return(bus->state == BUS_UNSET, -EPERM);
261         assert_return(!bus_pid_changed(bus), -ECHILD);
262
263         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
264         return 0;
265 }
266
267 _public_ int sd_bus_negotiate_attach_timestamp(sd_bus *bus, int b) {
268         assert_return(bus, -EINVAL);
269         assert_return(bus->state == BUS_UNSET, -EPERM);
270         assert_return(!bus_pid_changed(bus), -ECHILD);
271
272         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_TIMESTAMP, b);
273         return 0;
274 }
275
276 _public_ int sd_bus_negotiate_attach_creds(sd_bus *bus, int b) {
277         assert_return(bus, -EINVAL);
278         assert_return(bus->state == BUS_UNSET, -EPERM);
279         assert_return(!bus_pid_changed(bus), -ECHILD);
280
281         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CREDS, b);
282         return 0;
283 }
284
285 _public_ int sd_bus_negotiate_attach_comm(sd_bus *bus, int b) {
286         assert_return(bus, -EINVAL);
287         assert_return(bus->state == BUS_UNSET, -EPERM);
288         assert_return(!bus_pid_changed(bus), -ECHILD);
289
290         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_COMM, b);
291         return 0;
292 }
293
294 _public_ int sd_bus_negotiate_attach_exe(sd_bus *bus, int b) {
295         assert_return(bus, -EINVAL);
296         assert_return(bus->state == BUS_UNSET, -EPERM);
297         assert_return(!bus_pid_changed(bus), -ECHILD);
298
299         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_EXE, b);
300         return 0;
301 }
302
303 _public_ int sd_bus_negotiate_attach_cmdline(sd_bus *bus, int b) {
304         assert_return(bus, -EINVAL);
305         assert_return(bus->state == BUS_UNSET, -EPERM);
306         assert_return(!bus_pid_changed(bus), -ECHILD);
307
308         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CMDLINE, b);
309         return 0;
310 }
311
312 _public_ int sd_bus_negotiate_attach_cgroup(sd_bus *bus, int b) {
313         assert_return(bus, -EINVAL);
314         assert_return(bus->state == BUS_UNSET, -EPERM);
315         assert_return(!bus_pid_changed(bus), -ECHILD);
316
317         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CGROUP, b);
318         return 0;
319 }
320
321 _public_ int sd_bus_negotiate_attach_caps(sd_bus *bus, int b) {
322         assert_return(bus, -EINVAL);
323         assert_return(bus->state == BUS_UNSET, -EPERM);
324         assert_return(!bus_pid_changed(bus), -ECHILD);
325
326         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CAPS, b);
327         return 0;
328 }
329
330 _public_ int sd_bus_negotiate_attach_selinux_context(sd_bus *bus, int b) {
331         assert_return(bus, -EINVAL);
332         assert_return(bus->state == BUS_UNSET, -EPERM);
333         assert_return(!bus_pid_changed(bus), -ECHILD);
334
335         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_SECLABEL, b);
336         return 0;
337 }
338
339 _public_ int sd_bus_negotiate_attach_audit(sd_bus *bus, int b) {
340         assert_return(bus, -EINVAL);
341         assert_return(bus->state == BUS_UNSET, -EPERM);
342         assert_return(!bus_pid_changed(bus), -ECHILD);
343
344         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_AUDIT, b);
345         return 0;
346 }
347
348 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
349         assert_return(bus, -EINVAL);
350         assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
351         assert_return(bus->state == BUS_UNSET, -EPERM);
352         assert_return(!bus_pid_changed(bus), -ECHILD);
353
354         bus->is_server = !!b;
355         bus->server_id = server_id;
356         return 0;
357 }
358
359 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
360         assert_return(bus, -EINVAL);
361         assert_return(bus->state == BUS_UNSET, -EPERM);
362         assert_return(!bus_pid_changed(bus), -ECHILD);
363
364         bus->anonymous_auth = !!b;
365         return 0;
366 }
367
368 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
369         const char *s;
370         int r;
371
372         assert(bus);
373         assert(bus->state == BUS_HELLO);
374         assert(reply);
375
376         r = sd_bus_message_get_errno(reply);
377         if (r < 0)
378                 return r;
379         if (r > 0)
380                 return -r;
381
382         r = sd_bus_message_read(reply, "s", &s);
383         if (r < 0)
384                 return r;
385
386         if (!service_name_is_valid(s) || s[0] != ':')
387                 return -EBADMSG;
388
389         bus->unique_name = strdup(s);
390         if (!bus->unique_name)
391                 return -ENOMEM;
392
393         bus->state = BUS_RUNNING;
394
395         return 1;
396 }
397
398 static int bus_send_hello(sd_bus *bus) {
399         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
400         int r;
401
402         assert(bus);
403
404         if (!bus->bus_client || bus->is_kernel)
405                 return 0;
406
407         r = sd_bus_message_new_method_call(
408                         bus,
409                         "org.freedesktop.DBus",
410                         "/",
411                         "org.freedesktop.DBus",
412                         "Hello",
413                         &m);
414         if (r < 0)
415                 return r;
416
417         return sd_bus_call_async(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
418 }
419
420 int bus_start_running(sd_bus *bus) {
421         assert(bus);
422
423         if (bus->bus_client && !bus->is_kernel) {
424                 bus->state = BUS_HELLO;
425                 return 1;
426         }
427
428         bus->state = BUS_RUNNING;
429         return 1;
430 }
431
432 static int parse_address_key(const char **p, const char *key, char **value) {
433         size_t l, n = 0;
434         const char *a;
435         char *r = NULL;
436
437         assert(p);
438         assert(*p);
439         assert(value);
440
441         if (key) {
442                 l = strlen(key);
443                 if (strncmp(*p, key, l) != 0)
444                         return 0;
445
446                 if ((*p)[l] != '=')
447                         return 0;
448
449                 if (*value)
450                         return -EINVAL;
451
452                 a = *p + l + 1;
453         } else
454                 a = *p;
455
456         while (*a != ';' && *a != ',' && *a != 0) {
457                 char c, *t;
458
459                 if (*a == '%') {
460                         int x, y;
461
462                         x = unhexchar(a[1]);
463                         if (x < 0) {
464                                 free(r);
465                                 return x;
466                         }
467
468                         y = unhexchar(a[2]);
469                         if (y < 0) {
470                                 free(r);
471                                 return y;
472                         }
473
474                         c = (char) ((x << 4) | y);
475                         a += 3;
476                 } else {
477                         c = *a;
478                         a++;
479                 }
480
481                 t = realloc(r, n + 2);
482                 if (!t) {
483                         free(r);
484                         return -ENOMEM;
485                 }
486
487                 r = t;
488                 r[n++] = c;
489         }
490
491         if (!r) {
492                 r = strdup("");
493                 if (!r)
494                         return -ENOMEM;
495         } else
496                 r[n] = 0;
497
498         if (*a == ',')
499                 a++;
500
501         *p = a;
502
503         free(*value);
504         *value = r;
505
506         return 1;
507 }
508
509 static void skip_address_key(const char **p) {
510         assert(p);
511         assert(*p);
512
513         *p += strcspn(*p, ",");
514
515         if (**p == ',')
516                 (*p) ++;
517 }
518
519 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
520         _cleanup_free_ char *path = NULL, *abstract = NULL;
521         size_t l;
522         int r;
523
524         assert(b);
525         assert(p);
526         assert(*p);
527         assert(guid);
528
529         while (**p != 0 && **p != ';') {
530                 r = parse_address_key(p, "guid", guid);
531                 if (r < 0)
532                         return r;
533                 else if (r > 0)
534                         continue;
535
536                 r = parse_address_key(p, "path", &path);
537                 if (r < 0)
538                         return r;
539                 else if (r > 0)
540                         continue;
541
542                 r = parse_address_key(p, "abstract", &abstract);
543                 if (r < 0)
544                         return r;
545                 else if (r > 0)
546                         continue;
547
548                 skip_address_key(p);
549         }
550
551         if (!path && !abstract)
552                 return -EINVAL;
553
554         if (path && abstract)
555                 return -EINVAL;
556
557         if (path) {
558                 l = strlen(path);
559                 if (l > sizeof(b->sockaddr.un.sun_path))
560                         return -E2BIG;
561
562                 b->sockaddr.un.sun_family = AF_UNIX;
563                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
564                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
565         } else if (abstract) {
566                 l = strlen(abstract);
567                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
568                         return -E2BIG;
569
570                 b->sockaddr.un.sun_family = AF_UNIX;
571                 b->sockaddr.un.sun_path[0] = 0;
572                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
573                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
574         }
575
576         return 0;
577 }
578
579 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
580         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
581         int r;
582         struct addrinfo *result, hints = {
583                 .ai_socktype = SOCK_STREAM,
584                 .ai_flags = AI_ADDRCONFIG,
585         };
586
587         assert(b);
588         assert(p);
589         assert(*p);
590         assert(guid);
591
592         while (**p != 0 && **p != ';') {
593                 r = parse_address_key(p, "guid", guid);
594                 if (r < 0)
595                         return r;
596                 else if (r > 0)
597                         continue;
598
599                 r = parse_address_key(p, "host", &host);
600                 if (r < 0)
601                         return r;
602                 else if (r > 0)
603                         continue;
604
605                 r = parse_address_key(p, "port", &port);
606                 if (r < 0)
607                         return r;
608                 else if (r > 0)
609                         continue;
610
611                 r = parse_address_key(p, "family", &family);
612                 if (r < 0)
613                         return r;
614                 else if (r > 0)
615                         continue;
616
617                 skip_address_key(p);
618         }
619
620         if (!host || !port)
621                 return -EINVAL;
622
623         if (family) {
624                 if (streq(family, "ipv4"))
625                         hints.ai_family = AF_INET;
626                 else if (streq(family, "ipv6"))
627                         hints.ai_family = AF_INET6;
628                 else
629                         return -EINVAL;
630         }
631
632         r = getaddrinfo(host, port, &hints, &result);
633         if (r == EAI_SYSTEM)
634                 return -errno;
635         else if (r != 0)
636                 return -EADDRNOTAVAIL;
637
638         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
639         b->sockaddr_size = result->ai_addrlen;
640
641         freeaddrinfo(result);
642
643         return 0;
644 }
645
646 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
647         char *path = NULL;
648         unsigned n_argv = 0, j;
649         char **argv = NULL;
650         int r;
651
652         assert(b);
653         assert(p);
654         assert(*p);
655         assert(guid);
656
657         while (**p != 0 && **p != ';') {
658                 r = parse_address_key(p, "guid", guid);
659                 if (r < 0)
660                         goto fail;
661                 else if (r > 0)
662                         continue;
663
664                 r = parse_address_key(p, "path", &path);
665                 if (r < 0)
666                         goto fail;
667                 else if (r > 0)
668                         continue;
669
670                 if (startswith(*p, "argv")) {
671                         unsigned ul;
672
673                         errno = 0;
674                         ul = strtoul(*p + 4, (char**) p, 10);
675                         if (errno > 0 || **p != '=' || ul > 256) {
676                                 r = -EINVAL;
677                                 goto fail;
678                         }
679
680                         (*p) ++;
681
682                         if (ul >= n_argv) {
683                                 char **x;
684
685                                 x = realloc(argv, sizeof(char*) * (ul + 2));
686                                 if (!x) {
687                                         r = -ENOMEM;
688                                         goto fail;
689                                 }
690
691                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
692
693                                 argv = x;
694                                 n_argv = ul + 1;
695                         }
696
697                         r = parse_address_key(p, NULL, argv + ul);
698                         if (r < 0)
699                                 goto fail;
700
701                         continue;
702                 }
703
704                 skip_address_key(p);
705         }
706
707         if (!path) {
708                 r = -EINVAL;
709                 goto fail;
710         }
711
712         /* Make sure there are no holes in the array, with the
713          * exception of argv[0] */
714         for (j = 1; j < n_argv; j++)
715                 if (!argv[j]) {
716                         r = -EINVAL;
717                         goto fail;
718                 }
719
720         if (argv && argv[0] == NULL) {
721                 argv[0] = strdup(path);
722                 if (!argv[0]) {
723                         r = -ENOMEM;
724                         goto fail;
725                 }
726         }
727
728         b->exec_path = path;
729         b->exec_argv = argv;
730         return 0;
731
732 fail:
733         for (j = 0; j < n_argv; j++)
734                 free(argv[j]);
735
736         free(argv);
737         free(path);
738         return r;
739 }
740
741 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
742         _cleanup_free_ char *path = NULL;
743         int r;
744
745         assert(b);
746         assert(p);
747         assert(*p);
748         assert(guid);
749
750         while (**p != 0 && **p != ';') {
751                 r = parse_address_key(p, "guid", guid);
752                 if (r < 0)
753                         return r;
754                 else if (r > 0)
755                         continue;
756
757                 r = parse_address_key(p, "path", &path);
758                 if (r < 0)
759                         return r;
760                 else if (r > 0)
761                         continue;
762
763                 skip_address_key(p);
764         }
765
766         if (!path)
767                 return -EINVAL;
768
769         free(b->kernel);
770         b->kernel = path;
771         path = NULL;
772
773         return 0;
774 }
775
776 static int parse_container_address(sd_bus *b, const char **p, char **guid) {
777         _cleanup_free_ char *machine = NULL;
778         int r;
779
780         assert(b);
781         assert(p);
782         assert(*p);
783         assert(guid);
784
785         while (**p != 0 && **p != ';') {
786                 r = parse_address_key(p, "guid", guid);
787                 if (r < 0)
788                         return r;
789                 else if (r > 0)
790                         continue;
791
792                 r = parse_address_key(p, "machine", &machine);
793                 if (r < 0)
794                         return r;
795                 else if (r > 0)
796                         continue;
797
798                 skip_address_key(p);
799         }
800
801         if (!machine)
802                 return -EINVAL;
803
804         free(b->machine);
805         b->machine = machine;
806         machine = NULL;
807
808         b->sockaddr.un.sun_family = AF_UNIX;
809         strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
810         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
811
812         return 0;
813 }
814
815 static void bus_reset_parsed_address(sd_bus *b) {
816         assert(b);
817
818         zero(b->sockaddr);
819         b->sockaddr_size = 0;
820         strv_free(b->exec_argv);
821         free(b->exec_path);
822         b->exec_path = NULL;
823         b->exec_argv = NULL;
824         b->server_id = SD_ID128_NULL;
825         free(b->kernel);
826         b->kernel = NULL;
827         free(b->machine);
828         b->machine = NULL;
829 }
830
831 static int bus_parse_next_address(sd_bus *b) {
832         _cleanup_free_ char *guid = NULL;
833         const char *a;
834         int r;
835
836         assert(b);
837
838         if (!b->address)
839                 return 0;
840         if (b->address[b->address_index] == 0)
841                 return 0;
842
843         bus_reset_parsed_address(b);
844
845         a = b->address + b->address_index;
846
847         while (*a != 0) {
848
849                 if (*a == ';') {
850                         a++;
851                         continue;
852                 }
853
854                 if (startswith(a, "unix:")) {
855                         a += 5;
856
857                         r = parse_unix_address(b, &a, &guid);
858                         if (r < 0)
859                                 return r;
860                         break;
861
862                 } else if (startswith(a, "tcp:")) {
863
864                         a += 4;
865                         r = parse_tcp_address(b, &a, &guid);
866                         if (r < 0)
867                                 return r;
868
869                         break;
870
871                 } else if (startswith(a, "unixexec:")) {
872
873                         a += 9;
874                         r = parse_exec_address(b, &a, &guid);
875                         if (r < 0)
876                                 return r;
877
878                         break;
879
880                 } else if (startswith(a, "kernel:")) {
881
882                         a += 7;
883                         r = parse_kernel_address(b, &a, &guid);
884                         if (r < 0)
885                                 return r;
886
887                         break;
888                 } else if (startswith(a, "x-container:")) {
889
890                         a += 12;
891                         r = parse_container_address(b, &a, &guid);
892                         if (r < 0)
893                                 return r;
894
895                         break;
896                 }
897
898                 a = strchr(a, ';');
899                 if (!a)
900                         return 0;
901         }
902
903         if (guid) {
904                 r = sd_id128_from_string(guid, &b->server_id);
905                 if (r < 0)
906                         return r;
907         }
908
909         b->address_index = a - b->address;
910         return 1;
911 }
912
913 static int bus_start_address(sd_bus *b) {
914         int r;
915
916         assert(b);
917
918         for (;;) {
919                 sd_bus_close(b);
920
921                 if (b->exec_path) {
922
923                         r = bus_socket_exec(b);
924                         if (r >= 0)
925                                 return r;
926
927                         b->last_connect_error = -r;
928                 } else if (b->kernel) {
929
930                         r = bus_kernel_connect(b);
931                         if (r >= 0)
932                                 return r;
933
934                         b->last_connect_error = -r;
935
936                 } else if (b->machine) {
937
938                         r = bus_container_connect(b);
939                         if (r >= 0)
940                                 return r;
941
942                         b->last_connect_error = -r;
943
944                 } else if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
945
946                         r = bus_socket_connect(b);
947                         if (r >= 0)
948                                 return r;
949
950                         b->last_connect_error = -r;
951                 }
952
953                 r = bus_parse_next_address(b);
954                 if (r < 0)
955                         return r;
956                 if (r == 0)
957                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
958         }
959 }
960
961 int bus_next_address(sd_bus *b) {
962         assert(b);
963
964         bus_reset_parsed_address(b);
965         return bus_start_address(b);
966 }
967
968 static int bus_start_fd(sd_bus *b) {
969         struct stat st;
970         int r;
971
972         assert(b);
973         assert(b->input_fd >= 0);
974         assert(b->output_fd >= 0);
975
976         r = fd_nonblock(b->input_fd, true);
977         if (r < 0)
978                 return r;
979
980         r = fd_cloexec(b->input_fd, true);
981         if (r < 0)
982                 return r;
983
984         if (b->input_fd != b->output_fd) {
985                 r = fd_nonblock(b->output_fd, true);
986                 if (r < 0)
987                         return r;
988
989                 r = fd_cloexec(b->output_fd, true);
990                 if (r < 0)
991                         return r;
992         }
993
994         if (fstat(b->input_fd, &st) < 0)
995                 return -errno;
996
997         if (S_ISCHR(b->input_fd))
998                 return bus_kernel_take_fd(b);
999         else
1000                 return bus_socket_take_fd(b);
1001 }
1002
1003 _public_ int sd_bus_start(sd_bus *bus) {
1004         int r;
1005
1006         assert_return(bus, -EINVAL);
1007         assert_return(bus->state == BUS_UNSET, -EPERM);
1008         assert_return(!bus_pid_changed(bus), -ECHILD);
1009
1010         bus->state = BUS_OPENING;
1011
1012         if (bus->is_server && bus->bus_client)
1013                 return -EINVAL;
1014
1015         if (bus->input_fd >= 0)
1016                 r = bus_start_fd(bus);
1017         else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
1018                 r = bus_start_address(bus);
1019         else
1020                 return -EINVAL;
1021
1022         if (r < 0)
1023                 return r;
1024
1025         return bus_send_hello(bus);
1026 }
1027
1028 _public_ int sd_bus_open_system(sd_bus **ret) {
1029         const char *e;
1030         sd_bus *b;
1031         int r;
1032
1033         assert_return(ret, -EINVAL);
1034
1035         r = sd_bus_new(&b);
1036         if (r < 0)
1037                 return r;
1038
1039         e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1040         if (e) {
1041                 r = sd_bus_set_address(b, e);
1042                 if (r < 0)
1043                         goto fail;
1044         } else {
1045                 b->sockaddr.un.sun_family = AF_UNIX;
1046                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
1047                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
1048         }
1049
1050         b->bus_client = true;
1051
1052         r = sd_bus_start(b);
1053         if (r < 0)
1054                 goto fail;
1055
1056         *ret = b;
1057         return 0;
1058
1059 fail:
1060         bus_free(b);
1061         return r;
1062 }
1063
1064 _public_ int sd_bus_open_user(sd_bus **ret) {
1065         const char *e;
1066         sd_bus *b;
1067         size_t l;
1068         int r;
1069
1070         assert_return(ret, -EINVAL);
1071
1072         r = sd_bus_new(&b);
1073         if (r < 0)
1074                 return r;
1075
1076         e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1077         if (e) {
1078                 r = sd_bus_set_address(b, e);
1079                 if (r < 0)
1080                         goto fail;
1081         } else {
1082                 e = secure_getenv("XDG_RUNTIME_DIR");
1083                 if (!e) {
1084                         r = -ENOENT;
1085                         goto fail;
1086                 }
1087
1088                 l = strlen(e);
1089                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1090                         r = -E2BIG;
1091                         goto fail;
1092                 }
1093
1094                 b->sockaddr.un.sun_family = AF_UNIX;
1095                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1096                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1097         }
1098
1099         b->bus_client = true;
1100
1101         r = sd_bus_start(b);
1102         if (r < 0)
1103                 goto fail;
1104
1105         *ret = b;
1106         return 0;
1107
1108 fail:
1109         bus_free(b);
1110         return r;
1111 }
1112
1113 _public_ int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1114         _cleanup_free_ char *e = NULL;
1115         char *p = NULL;
1116         sd_bus *bus;
1117         int r;
1118
1119         assert_return(host, -EINVAL);
1120         assert_return(ret, -EINVAL);
1121
1122         e = bus_address_escape(host);
1123         if (!e)
1124                 return -ENOMEM;
1125
1126         p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1127         if (!p)
1128                 return -ENOMEM;
1129
1130         r = sd_bus_new(&bus);
1131         if (r < 0) {
1132                 free(p);
1133                 return r;
1134         }
1135
1136         bus->address = p;
1137         bus->bus_client = true;
1138
1139         r = sd_bus_start(bus);
1140         if (r < 0) {
1141                 bus_free(bus);
1142                 return r;
1143         }
1144
1145         *ret = bus;
1146         return 0;
1147 }
1148
1149 _public_ int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1150         _cleanup_free_ char *e = NULL;
1151         sd_bus *bus;
1152         char *p;
1153         int r;
1154
1155         assert_return(machine, -EINVAL);
1156         assert_return(ret, -EINVAL);
1157
1158         e = bus_address_escape(machine);
1159         if (!e)
1160                 return -ENOMEM;
1161
1162         p = strjoin("x-container:machine=", e, NULL);
1163         if (!p)
1164                 return -ENOMEM;
1165
1166         r = sd_bus_new(&bus);
1167         if (r < 0) {
1168                 free(p);
1169                 return r;
1170         }
1171
1172         bus->address = p;
1173         bus->bus_client = true;
1174
1175         r = sd_bus_start(bus);
1176         if (r < 0) {
1177                 bus_free(bus);
1178                 return r;
1179         }
1180
1181         *ret = bus;
1182         return 0;
1183 }
1184
1185 _public_ void sd_bus_close(sd_bus *bus) {
1186         if (!bus)
1187                 return;
1188         if (bus->state == BUS_CLOSED)
1189                 return;
1190         if (bus_pid_changed(bus))
1191                 return;
1192
1193         bus->state = BUS_CLOSED;
1194
1195         sd_bus_detach_event(bus);
1196
1197         if (!bus->is_kernel)
1198                 bus_close_fds(bus);
1199
1200         /* We'll leave the fd open in case this is a kernel bus, since
1201          * there might still be memblocks around that reference this
1202          * bus, and they might need to invoke the
1203          * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1204          * freed. */
1205 }
1206
1207 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1208         assert_return(bus, NULL);
1209
1210         assert_se(REFCNT_INC(bus->n_ref) >= 2);
1211
1212         return bus;
1213 }
1214
1215 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1216         assert_return(bus, NULL);
1217
1218         if (REFCNT_DEC(bus->n_ref) <= 0)
1219                 bus_free(bus);
1220
1221         return NULL;
1222 }
1223
1224 _public_ int sd_bus_is_open(sd_bus *bus) {
1225
1226         assert_return(bus, -EINVAL);
1227         assert_return(!bus_pid_changed(bus), -ECHILD);
1228
1229         return BUS_IS_OPEN(bus->state);
1230 }
1231
1232 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1233         int r;
1234
1235         assert_return(bus, -EINVAL);
1236         assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1237         assert_return(!bus_pid_changed(bus), -ECHILD);
1238
1239         if (type == SD_BUS_TYPE_UNIX_FD) {
1240                 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1241                         return 0;
1242
1243                 r = bus_ensure_running(bus);
1244                 if (r < 0)
1245                         return r;
1246
1247                 return bus->can_fds;
1248         }
1249
1250         return bus_type_is_valid(type);
1251 }
1252
1253 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1254         int r;
1255
1256         assert_return(bus, -EINVAL);
1257         assert_return(server_id, -EINVAL);
1258         assert_return(!bus_pid_changed(bus), -ECHILD);
1259
1260         r = bus_ensure_running(bus);
1261         if (r < 0)
1262                 return r;
1263
1264         *server_id = bus->server_id;
1265         return 0;
1266 }
1267
1268 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1269         assert(m);
1270
1271         if (m->header->version > b->message_version)
1272                 return -EPERM;
1273
1274         if (m->sealed)
1275                 return 0;
1276
1277         return bus_message_seal(m, ++b->serial);
1278 }
1279
1280 static int dispatch_wqueue(sd_bus *bus) {
1281         int r, ret = 0;
1282
1283         assert(bus);
1284         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1285
1286         while (bus->wqueue_size > 0) {
1287
1288                 if (bus->is_kernel)
1289                         r = bus_kernel_write_message(bus, bus->wqueue[0]);
1290                 else
1291                         r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1292
1293                 if (r < 0) {
1294                         sd_bus_close(bus);
1295                         return r;
1296                 } else if (r == 0)
1297                         /* Didn't do anything this time */
1298                         return ret;
1299                 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1300                         /* Fully written. Let's drop the entry from
1301                          * the queue.
1302                          *
1303                          * This isn't particularly optimized, but
1304                          * well, this is supposed to be our worst-case
1305                          * buffer only, and the socket buffer is
1306                          * supposed to be our primary buffer, and if
1307                          * it got full, then all bets are off
1308                          * anyway. */
1309
1310                         sd_bus_message_unref(bus->wqueue[0]);
1311                         bus->wqueue_size --;
1312                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1313                         bus->windex = 0;
1314
1315                         ret = 1;
1316                 }
1317         }
1318
1319         return ret;
1320 }
1321
1322 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1323         sd_bus_message *z = NULL;
1324         int r, ret = 0;
1325
1326         assert(bus);
1327         assert(m);
1328         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1329
1330         if (bus->rqueue_size > 0) {
1331                 /* Dispatch a queued message */
1332
1333                 *m = bus->rqueue[0];
1334                 bus->rqueue_size --;
1335                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1336                 return 1;
1337         }
1338
1339         /* Try to read a new message */
1340         do {
1341                 if (bus->is_kernel)
1342                         r = bus_kernel_read_message(bus, &z);
1343                 else
1344                         r = bus_socket_read_message(bus, &z);
1345
1346                 if (r < 0) {
1347                         sd_bus_close(bus);
1348                         return r;
1349                 }
1350                 if (r == 0)
1351                         return ret;
1352
1353                 ret = 1;
1354         } while (!z);
1355
1356         *m = z;
1357         return ret;
1358 }
1359
1360 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1361         int r;
1362
1363         assert_return(bus, -EINVAL);
1364         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1365         assert_return(m, -EINVAL);
1366         assert_return(!bus_pid_changed(bus), -ECHILD);
1367
1368         if (m->n_fds > 0) {
1369                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1370                 if (r < 0)
1371                         return r;
1372                 if (r == 0)
1373                         return -ENOTSUP;
1374         }
1375
1376         /* If the serial number isn't kept, then we know that no reply
1377          * is expected */
1378         if (!serial && !m->sealed)
1379                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1380
1381         r = bus_seal_message(bus, m);
1382         if (r < 0)
1383                 return r;
1384
1385         /* If this is a reply and no reply was requested, then let's
1386          * suppress this, if we can */
1387         if (m->dont_send && !serial)
1388                 return 1;
1389
1390         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1391                 size_t idx = 0;
1392
1393                 if (bus->is_kernel)
1394                         r = bus_kernel_write_message(bus, m);
1395                 else
1396                         r = bus_socket_write_message(bus, m, &idx);
1397
1398                 if (r < 0) {
1399                         sd_bus_close(bus);
1400                         return r;
1401                 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m))  {
1402                         /* Wasn't fully written. So let's remember how
1403                          * much was written. Note that the first entry
1404                          * of the wqueue array is always allocated so
1405                          * that we always can remember how much was
1406                          * written. */
1407                         bus->wqueue[0] = sd_bus_message_ref(m);
1408                         bus->wqueue_size = 1;
1409                         bus->windex = idx;
1410                 }
1411         } else {
1412                 sd_bus_message **q;
1413
1414                 /* Just append it to the queue. */
1415
1416                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1417                         return -ENOBUFS;
1418
1419                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1420                 if (!q)
1421                         return -ENOMEM;
1422
1423                 bus->wqueue = q;
1424                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1425         }
1426
1427         if (serial)
1428                 *serial = BUS_MESSAGE_SERIAL(m);
1429
1430         return 1;
1431 }
1432
1433 static usec_t calc_elapse(uint64_t usec) {
1434         if (usec == (uint64_t) -1)
1435                 return 0;
1436
1437         if (usec == 0)
1438                 usec = BUS_DEFAULT_TIMEOUT;
1439
1440         return now(CLOCK_MONOTONIC) + usec;
1441 }
1442
1443 static int timeout_compare(const void *a, const void *b) {
1444         const struct reply_callback *x = a, *y = b;
1445
1446         if (x->timeout != 0 && y->timeout == 0)
1447                 return -1;
1448
1449         if (x->timeout == 0 && y->timeout != 0)
1450                 return 1;
1451
1452         if (x->timeout < y->timeout)
1453                 return -1;
1454
1455         if (x->timeout > y->timeout)
1456                 return 1;
1457
1458         return 0;
1459 }
1460
1461 _public_ int sd_bus_call_async(
1462                 sd_bus *bus,
1463                 sd_bus_message *m,
1464                 sd_bus_message_handler_t callback,
1465                 void *userdata,
1466                 uint64_t usec,
1467                 uint64_t *serial) {
1468
1469         struct reply_callback *c;
1470         int r;
1471
1472         assert_return(bus, -EINVAL);
1473         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1474         assert_return(m, -EINVAL);
1475         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1476         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1477         assert_return(callback, -EINVAL);
1478         assert_return(!bus_pid_changed(bus), -ECHILD);
1479
1480         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1481         if (r < 0)
1482                 return r;
1483
1484         if (usec != (uint64_t) -1) {
1485                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1486                 if (r < 0)
1487                         return r;
1488         }
1489
1490         r = bus_seal_message(bus, m);
1491         if (r < 0)
1492                 return r;
1493
1494         c = new0(struct reply_callback, 1);
1495         if (!c)
1496                 return -ENOMEM;
1497
1498         c->callback = callback;
1499         c->userdata = userdata;
1500         c->serial = BUS_MESSAGE_SERIAL(m);
1501         c->timeout = calc_elapse(usec);
1502
1503         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1504         if (r < 0) {
1505                 free(c);
1506                 return r;
1507         }
1508
1509         if (c->timeout != 0) {
1510                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1511                 if (r < 0) {
1512                         c->timeout = 0;
1513                         sd_bus_call_async_cancel(bus, c->serial);
1514                         return r;
1515                 }
1516         }
1517
1518         r = sd_bus_send(bus, m, serial);
1519         if (r < 0) {
1520                 sd_bus_call_async_cancel(bus, c->serial);
1521                 return r;
1522         }
1523
1524         return r;
1525 }
1526
1527 _public_ int sd_bus_call_async_cancel(sd_bus *bus, uint64_t serial) {
1528         struct reply_callback *c;
1529
1530         assert_return(bus, -EINVAL);
1531         assert_return(serial != 0, -EINVAL);
1532         assert_return(!bus_pid_changed(bus), -ECHILD);
1533
1534         c = hashmap_remove(bus->reply_callbacks, &serial);
1535         if (!c)
1536                 return 0;
1537
1538         if (c->timeout != 0)
1539                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1540
1541         free(c);
1542         return 1;
1543 }
1544
1545 int bus_ensure_running(sd_bus *bus) {
1546         int r;
1547
1548         assert(bus);
1549
1550         if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED)
1551                 return -ENOTCONN;
1552         if (bus->state == BUS_RUNNING)
1553                 return 1;
1554
1555         for (;;) {
1556                 r = sd_bus_process(bus, NULL);
1557                 if (r < 0)
1558                         return r;
1559                 if (bus->state == BUS_RUNNING)
1560                         return 1;
1561                 if (r > 0)
1562                         continue;
1563
1564                 r = sd_bus_wait(bus, (uint64_t) -1);
1565                 if (r < 0)
1566                         return r;
1567         }
1568 }
1569
1570 _public_ int sd_bus_call(
1571                 sd_bus *bus,
1572                 sd_bus_message *m,
1573                 uint64_t usec,
1574                 sd_bus_error *error,
1575                 sd_bus_message **reply) {
1576
1577         int r;
1578         usec_t timeout;
1579         uint64_t serial;
1580         bool room = false;
1581
1582         assert_return(bus, -EINVAL);
1583         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1584         assert_return(m, -EINVAL);
1585         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1586         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1587         assert_return(!bus_error_is_dirty(error), -EINVAL);
1588         assert_return(!bus_pid_changed(bus), -ECHILD);
1589
1590         r = bus_ensure_running(bus);
1591         if (r < 0)
1592                 return r;
1593
1594         r = sd_bus_send(bus, m, &serial);
1595         if (r < 0)
1596                 return r;
1597
1598         timeout = calc_elapse(usec);
1599
1600         for (;;) {
1601                 usec_t left;
1602                 sd_bus_message *incoming = NULL;
1603
1604                 if (!room) {
1605                         sd_bus_message **q;
1606
1607                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1608                                 return -ENOBUFS;
1609
1610                         /* Make sure there's room for queuing this
1611                          * locally, before we read the message */
1612
1613                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1614                         if (!q)
1615                                 return -ENOMEM;
1616
1617                         bus->rqueue = q;
1618                         room = true;
1619                 }
1620
1621                 if (bus->is_kernel)
1622                         r = bus_kernel_read_message(bus, &incoming);
1623                 else
1624                         r = bus_socket_read_message(bus, &incoming);
1625                 if (r < 0)
1626                         return r;
1627                 if (incoming) {
1628
1629                         if (incoming->reply_serial == serial) {
1630                                 /* Found a match! */
1631
1632                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1633
1634                                         if (reply)
1635                                                 *reply = incoming;
1636                                         else
1637                                                 sd_bus_message_unref(incoming);
1638
1639                                         return 1;
1640                                 }
1641
1642                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) {
1643                                         int k;
1644
1645                                         r = sd_bus_error_copy(error, &incoming->error);
1646                                         if (r < 0) {
1647                                                 sd_bus_message_unref(incoming);
1648                                                 return r;
1649                                         }
1650
1651                                         k = sd_bus_error_get_errno(&incoming->error);
1652                                         sd_bus_message_unref(incoming);
1653                                         return -k;
1654                                 }
1655
1656                                 sd_bus_message_unref(incoming);
1657                                 return -EIO;
1658
1659                         } else if (incoming->header->serial == serial &&
1660                                    bus->unique_name &&
1661                                    incoming->sender &&
1662                                    streq(bus->unique_name, incoming->sender)) {
1663
1664                                 /* Our own message? Somebody is trying
1665                                  * to send its own client a message,
1666                                  * let's not dead-lock, let's fail
1667                                  * immediately. */
1668
1669                                 sd_bus_message_unref(incoming);
1670                                 return -ELOOP;
1671                         }
1672
1673                         /* There's already guaranteed to be room for
1674                          * this, so need to resize things here */
1675                         bus->rqueue[bus->rqueue_size ++] = incoming;
1676                         room = false;
1677
1678                         /* Try to read more, right-away */
1679                         continue;
1680                 }
1681                 if (r != 0)
1682                         continue;
1683
1684                 if (timeout > 0) {
1685                         usec_t n;
1686
1687                         n = now(CLOCK_MONOTONIC);
1688                         if (n >= timeout)
1689                                 return -ETIMEDOUT;
1690
1691                         left = timeout - n;
1692                 } else
1693                         left = (uint64_t) -1;
1694
1695                 r = bus_poll(bus, true, left);
1696                 if (r < 0)
1697                         return r;
1698
1699                 r = dispatch_wqueue(bus);
1700                 if (r < 0)
1701                         return r;
1702         }
1703 }
1704
1705 _public_ int sd_bus_get_fd(sd_bus *bus) {
1706
1707         assert_return(bus, -EINVAL);
1708         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1709         assert_return(bus->input_fd == bus->output_fd, -EPERM);
1710         assert_return(!bus_pid_changed(bus), -ECHILD);
1711
1712         return bus->input_fd;
1713 }
1714
1715 _public_ int sd_bus_get_events(sd_bus *bus) {
1716         int flags = 0;
1717
1718         assert_return(bus, -EINVAL);
1719         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1720         assert_return(!bus_pid_changed(bus), -ECHILD);
1721
1722         if (bus->state == BUS_OPENING)
1723                 flags |= POLLOUT;
1724         else if (bus->state == BUS_AUTHENTICATING) {
1725
1726                 if (bus_socket_auth_needs_write(bus))
1727                         flags |= POLLOUT;
1728
1729                 flags |= POLLIN;
1730
1731         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1732                 if (bus->rqueue_size <= 0)
1733                         flags |= POLLIN;
1734                 if (bus->wqueue_size > 0)
1735                         flags |= POLLOUT;
1736         }
1737
1738         return flags;
1739 }
1740
1741 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1742         struct reply_callback *c;
1743
1744         assert_return(bus, -EINVAL);
1745         assert_return(timeout_usec, -EINVAL);
1746         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1747         assert_return(!bus_pid_changed(bus), -ECHILD);
1748
1749         if (bus->state == BUS_AUTHENTICATING) {
1750                 *timeout_usec = bus->auth_timeout;
1751                 return 1;
1752         }
1753
1754         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1755                 *timeout_usec = (uint64_t) -1;
1756                 return 0;
1757         }
1758
1759         if (bus->rqueue_size > 0) {
1760                 *timeout_usec = 0;
1761                 return 1;
1762         }
1763
1764         c = prioq_peek(bus->reply_callbacks_prioq);
1765         if (!c) {
1766                 *timeout_usec = (uint64_t) -1;
1767                 return 0;
1768         }
1769
1770         *timeout_usec = c->timeout;
1771         return 1;
1772 }
1773
1774 static int process_timeout(sd_bus *bus) {
1775         _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1776         struct reply_callback *c;
1777         usec_t n;
1778         int r;
1779
1780         assert(bus);
1781
1782         c = prioq_peek(bus->reply_callbacks_prioq);
1783         if (!c)
1784                 return 0;
1785
1786         n = now(CLOCK_MONOTONIC);
1787         if (c->timeout > n)
1788                 return 0;
1789
1790         r = bus_message_new_synthetic_error(
1791                         bus,
1792                         c->serial,
1793                         &SD_BUS_ERROR_MAKE(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1794                         &m);
1795         if (r < 0)
1796                 return r;
1797
1798         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1799         hashmap_remove(bus->reply_callbacks, &c->serial);
1800
1801         r = c->callback(bus, m, c->userdata);
1802         free(c);
1803
1804         return r < 0 ? r : 1;
1805 }
1806
1807 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1808         assert(bus);
1809         assert(m);
1810
1811         if (bus->state != BUS_HELLO)
1812                 return 0;
1813
1814         /* Let's make sure the first message on the bus is the HELLO
1815          * reply. But note that we don't actually parse the message
1816          * here (we leave that to the usual handling), we just verify
1817          * we don't let any earlier msg through. */
1818
1819         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1820             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1821                 return -EIO;
1822
1823         if (m->reply_serial != bus->hello_serial)
1824                 return -EIO;
1825
1826         return 0;
1827 }
1828
1829 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1830         struct reply_callback *c;
1831         int r;
1832
1833         assert(bus);
1834         assert(m);
1835
1836         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1837             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1838                 return 0;
1839
1840         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1841         if (!c)
1842                 return 0;
1843
1844         if (c->timeout != 0)
1845                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1846
1847         r = sd_bus_message_rewind(m, true);
1848         if (r < 0)
1849                 return r;
1850
1851         r = c->callback(bus, m, c->userdata);
1852         free(c);
1853
1854         return r;
1855 }
1856
1857 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1858         struct filter_callback *l;
1859         int r;
1860
1861         assert(bus);
1862         assert(m);
1863
1864         do {
1865                 bus->filter_callbacks_modified = false;
1866
1867                 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1868
1869                         if (bus->filter_callbacks_modified)
1870                                 break;
1871
1872                         /* Don't run this more than once per iteration */
1873                         if (l->last_iteration == bus->iteration_counter)
1874                                 continue;
1875
1876                         l->last_iteration = bus->iteration_counter;
1877
1878                         r = sd_bus_message_rewind(m, true);
1879                         if (r < 0)
1880                                 return r;
1881
1882                         r = l->callback(bus, m, l->userdata);
1883                         if (r != 0)
1884                                 return r;
1885
1886                 }
1887
1888         } while (bus->filter_callbacks_modified);
1889
1890         return 0;
1891 }
1892
1893 static int process_match(sd_bus *bus, sd_bus_message *m) {
1894         int r;
1895
1896         assert(bus);
1897         assert(m);
1898
1899         do {
1900                 bus->match_callbacks_modified = false;
1901
1902                 r = bus_match_run(bus, &bus->match_callbacks, m);
1903                 if (r != 0)
1904                         return r;
1905
1906         } while (bus->match_callbacks_modified);
1907
1908         return 0;
1909 }
1910
1911 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1912         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1913         int r;
1914
1915         assert(bus);
1916         assert(m);
1917
1918         if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
1919                 return 0;
1920
1921         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1922                 return 0;
1923
1924         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1925                 return 1;
1926
1927         if (streq_ptr(m->member, "Ping"))
1928                 r = sd_bus_message_new_method_return(bus, m, &reply);
1929         else if (streq_ptr(m->member, "GetMachineId")) {
1930                 sd_id128_t id;
1931                 char sid[33];
1932
1933                 r = sd_id128_get_machine(&id);
1934                 if (r < 0)
1935                         return r;
1936
1937                 r = sd_bus_message_new_method_return(bus, m, &reply);
1938                 if (r < 0)
1939                         return r;
1940
1941                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1942         } else {
1943                 r = sd_bus_message_new_method_errorf(
1944                                 bus, m, &reply,
1945                                 SD_BUS_ERROR_UNKNOWN_METHOD,
1946                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1947         }
1948
1949         if (r < 0)
1950                 return r;
1951
1952         r = sd_bus_send(bus, reply, NULL);
1953         if (r < 0)
1954                 return r;
1955
1956         return 1;
1957 }
1958
1959 static int process_message(sd_bus *bus, sd_bus_message *m) {
1960         int r;
1961
1962         assert(bus);
1963         assert(m);
1964
1965         bus->current = m;
1966         bus->iteration_counter++;
1967
1968         log_debug("Got message sender=%s object=%s interface=%s member=%s",
1969                   strna(sd_bus_message_get_sender(m)),
1970                   strna(sd_bus_message_get_path(m)),
1971                   strna(sd_bus_message_get_interface(m)),
1972                   strna(sd_bus_message_get_member(m)));
1973
1974         r = process_hello(bus, m);
1975         if (r != 0)
1976                 goto finish;
1977
1978         r = process_reply(bus, m);
1979         if (r != 0)
1980                 goto finish;
1981
1982         r = process_filter(bus, m);
1983         if (r != 0)
1984                 goto finish;
1985
1986         r = process_match(bus, m);
1987         if (r != 0)
1988                 goto finish;
1989
1990         r = process_builtin(bus, m);
1991         if (r != 0)
1992                 goto finish;
1993
1994         r = bus_process_object(bus, m);
1995
1996 finish:
1997         bus->current = NULL;
1998         return r;
1999 }
2000
2001 static int process_running(sd_bus *bus, sd_bus_message **ret) {
2002         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2003         int r;
2004
2005         assert(bus);
2006         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2007
2008         r = process_timeout(bus);
2009         if (r != 0)
2010                 goto null_message;
2011
2012         r = dispatch_wqueue(bus);
2013         if (r != 0)
2014                 goto null_message;
2015
2016         r = dispatch_rqueue(bus, &m);
2017         if (r < 0)
2018                 return r;
2019         if (!m)
2020                 goto null_message;
2021
2022         r = process_message(bus, m);
2023         if (r != 0)
2024                 goto null_message;
2025
2026         if (ret) {
2027                 r = sd_bus_message_rewind(m, true);
2028                 if (r < 0)
2029                         return r;
2030
2031                 *ret = m;
2032                 m = NULL;
2033                 return 1;
2034         }
2035
2036         if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2037
2038                 r = sd_bus_reply_method_errorf(
2039                                 bus, m,
2040                                 SD_BUS_ERROR_UNKNOWN_OBJECT,
2041                                 "Unknown object '%s'.", m->path);
2042                 if (r < 0)
2043                         return r;
2044         }
2045
2046         return 1;
2047
2048 null_message:
2049         if (r >= 0 && ret)
2050                 *ret = NULL;
2051
2052         return r;
2053 }
2054
2055 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2056         BUS_DONT_DESTROY(bus);
2057         int r;
2058
2059         /* Returns 0 when we didn't do anything. This should cause the
2060          * caller to invoke sd_bus_wait() before returning the next
2061          * time. Returns > 0 when we did something, which possibly
2062          * means *ret is filled in with an unprocessed message. */
2063
2064         assert_return(bus, -EINVAL);
2065         assert_return(!bus_pid_changed(bus), -ECHILD);
2066
2067         /* We don't allow recursively invoking sd_bus_process(). */
2068         assert_return(!bus->processing, -EBUSY);
2069
2070         switch (bus->state) {
2071
2072         case BUS_UNSET:
2073         case BUS_CLOSED:
2074                 return -ENOTCONN;
2075
2076         case BUS_OPENING:
2077                 r = bus_socket_process_opening(bus);
2078                 if (r < 0)
2079                         return r;
2080                 if (ret)
2081                         *ret = NULL;
2082                 return r;
2083
2084         case BUS_AUTHENTICATING:
2085
2086                 r = bus_socket_process_authenticating(bus);
2087                 if (r < 0)
2088                         return r;
2089                 if (ret)
2090                         *ret = NULL;
2091                 return r;
2092
2093         case BUS_RUNNING:
2094         case BUS_HELLO:
2095
2096                 bus->processing = true;
2097                 r = process_running(bus, ret);
2098                 bus->processing = false;
2099
2100                 return r;
2101         }
2102
2103         assert_not_reached("Unknown state");
2104 }
2105
2106 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2107         struct pollfd p[2] = {};
2108         int r, e, n;
2109         struct timespec ts;
2110         usec_t m = (usec_t) -1;
2111
2112         assert(bus);
2113         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2114
2115         e = sd_bus_get_events(bus);
2116         if (e < 0)
2117                 return e;
2118
2119         if (need_more)
2120                 /* The caller really needs some more data, he doesn't
2121                  * care about what's already read, or any timeouts
2122                  * except its own.*/
2123                 e |= POLLIN;
2124         else {
2125                 usec_t until;
2126                 /* The caller wants to process if there's something to
2127                  * process, but doesn't care otherwise */
2128
2129                 r = sd_bus_get_timeout(bus, &until);
2130                 if (r < 0)
2131                         return r;
2132                 if (r > 0) {
2133                         usec_t nw;
2134                         nw = now(CLOCK_MONOTONIC);
2135                         m = until > nw ? until - nw : 0;
2136                 }
2137         }
2138
2139         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2140                 m = timeout_usec;
2141
2142         p[0].fd = bus->input_fd;
2143         if (bus->output_fd == bus->input_fd) {
2144                 p[0].events = e;
2145                 n = 1;
2146         } else {
2147                 p[0].events = e & POLLIN;
2148                 p[1].fd = bus->output_fd;
2149                 p[1].events = e & POLLOUT;
2150                 n = 2;
2151         }
2152
2153         r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2154         if (r < 0)
2155                 return -errno;
2156
2157         return r > 0 ? 1 : 0;
2158 }
2159
2160 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2161
2162         assert_return(bus, -EINVAL);
2163         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2164         assert_return(!bus_pid_changed(bus), -ECHILD);
2165
2166         if (bus->rqueue_size > 0)
2167                 return 0;
2168
2169         return bus_poll(bus, false, timeout_usec);
2170 }
2171
2172 _public_ int sd_bus_flush(sd_bus *bus) {
2173         int r;
2174
2175         assert_return(bus, -EINVAL);
2176         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2177         assert_return(!bus_pid_changed(bus), -ECHILD);
2178
2179         r = bus_ensure_running(bus);
2180         if (r < 0)
2181                 return r;
2182
2183         if (bus->wqueue_size <= 0)
2184                 return 0;
2185
2186         for (;;) {
2187                 r = dispatch_wqueue(bus);
2188                 if (r < 0)
2189                         return r;
2190
2191                 if (bus->wqueue_size <= 0)
2192                         return 0;
2193
2194                 r = bus_poll(bus, false, (uint64_t) -1);
2195                 if (r < 0)
2196                         return r;
2197         }
2198 }
2199
2200 _public_ int sd_bus_add_filter(sd_bus *bus,
2201                                sd_bus_message_handler_t callback,
2202                                void *userdata) {
2203
2204         struct filter_callback *f;
2205
2206         assert_return(bus, -EINVAL);
2207         assert_return(callback, -EINVAL);
2208         assert_return(!bus_pid_changed(bus), -ECHILD);
2209
2210         f = new0(struct filter_callback, 1);
2211         if (!f)
2212                 return -ENOMEM;
2213         f->callback = callback;
2214         f->userdata = userdata;
2215
2216         bus->filter_callbacks_modified = true;
2217         LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2218         return 0;
2219 }
2220
2221 _public_ int sd_bus_remove_filter(sd_bus *bus,
2222                                   sd_bus_message_handler_t callback,
2223                                   void *userdata) {
2224
2225         struct filter_callback *f;
2226
2227         assert_return(bus, -EINVAL);
2228         assert_return(callback, -EINVAL);
2229         assert_return(!bus_pid_changed(bus), -ECHILD);
2230
2231         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2232                 if (f->callback == callback && f->userdata == userdata) {
2233                         bus->filter_callbacks_modified = true;
2234                         LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2235                         free(f);
2236                         return 1;
2237                 }
2238         }
2239
2240         return 0;
2241 }
2242
2243 _public_ int sd_bus_add_match(sd_bus *bus,
2244                               const char *match,
2245                               sd_bus_message_handler_t callback,
2246                               void *userdata) {
2247
2248         struct bus_match_component *components = NULL;
2249         unsigned n_components = 0;
2250         uint64_t cookie = 0;
2251         int r = 0;
2252
2253         assert_return(bus, -EINVAL);
2254         assert_return(match, -EINVAL);
2255         assert_return(!bus_pid_changed(bus), -ECHILD);
2256
2257         r = bus_match_parse(match, &components, &n_components);
2258         if (r < 0)
2259                 goto finish;
2260
2261         if (bus->bus_client) {
2262                 cookie = ++bus->match_cookie;
2263
2264                 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2265                 if (r < 0)
2266                         goto finish;
2267         }
2268
2269         bus->match_callbacks_modified = true;
2270         r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2271         if (r < 0) {
2272                 if (bus->bus_client)
2273                         bus_remove_match_internal(bus, match, cookie);
2274         }
2275
2276 finish:
2277         bus_match_parse_free(components, n_components);
2278         return r;
2279 }
2280
2281 _public_ int sd_bus_remove_match(sd_bus *bus,
2282                                  const char *match,
2283                                  sd_bus_message_handler_t callback,
2284                                  void *userdata) {
2285
2286         struct bus_match_component *components = NULL;
2287         unsigned n_components = 0;
2288         int r = 0, q = 0;
2289         uint64_t cookie = 0;
2290
2291         assert_return(bus, -EINVAL);
2292         assert_return(match, -EINVAL);
2293         assert_return(!bus_pid_changed(bus), -ECHILD);
2294
2295         r = bus_match_parse(match, &components, &n_components);
2296         if (r < 0)
2297                 return r;
2298
2299         bus->match_callbacks_modified = true;
2300         r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2301
2302         if (bus->bus_client)
2303                 q = bus_remove_match_internal(bus, match, cookie);
2304
2305         bus_match_parse_free(components, n_components);
2306
2307         return r < 0 ? r : q;
2308 }
2309
2310 bool bus_pid_changed(sd_bus *bus) {
2311         assert(bus);
2312
2313         /* We don't support people creating a bus connection and
2314          * keeping it around over a fork(). Let's complain. */
2315
2316         return bus->original_pid != getpid();
2317 }
2318
2319 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2320         sd_bus *bus = userdata;
2321         int r;
2322
2323         assert(bus);
2324
2325         r = sd_bus_process(bus, NULL);
2326         if (r < 0)
2327                 return r;
2328
2329         return 1;
2330 }
2331
2332 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2333         sd_bus *bus = userdata;
2334         int r;
2335
2336         assert(bus);
2337
2338         r = sd_bus_process(bus, NULL);
2339         if (r < 0)
2340                 return r;
2341
2342         return 1;
2343 }
2344
2345 static int prepare_callback(sd_event_source *s, void *userdata) {
2346         sd_bus *bus = userdata;
2347         int r, e;
2348         usec_t until;
2349
2350         assert(s);
2351         assert(bus);
2352
2353         e = sd_bus_get_events(bus);
2354         if (e < 0)
2355                 return e;
2356
2357         if (bus->output_fd != bus->input_fd) {
2358
2359                 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2360                 if (r < 0)
2361                         return r;
2362
2363                 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2364                 if (r < 0)
2365                         return r;
2366         } else {
2367                 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2368                 if (r < 0)
2369                         return r;
2370         }
2371
2372         r = sd_bus_get_timeout(bus, &until);
2373         if (r < 0)
2374                 return r;
2375         if (r > 0) {
2376                 int j;
2377
2378                 j = sd_event_source_set_time(bus->time_event_source, until);
2379                 if (j < 0)
2380                         return j;
2381         }
2382
2383         r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2384         if (r < 0)
2385                 return r;
2386
2387         return 1;
2388 }
2389
2390 static int quit_callback(sd_event_source *event, void *userdata) {
2391         sd_bus *bus = userdata;
2392
2393         assert(event);
2394
2395         sd_bus_flush(bus);
2396
2397         return 1;
2398 }
2399
2400 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2401         int r;
2402
2403         assert_return(bus, -EINVAL);
2404         assert_return(!bus->event, -EBUSY);
2405
2406         assert(!bus->input_io_event_source);
2407         assert(!bus->output_io_event_source);
2408         assert(!bus->time_event_source);
2409
2410         if (event)
2411                 bus->event = sd_event_ref(event);
2412         else  {
2413                 r = sd_event_default(&bus->event);
2414                 if (r < 0)
2415                         return r;
2416         }
2417
2418         r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2419         if (r < 0)
2420                 goto fail;
2421
2422         r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2423         if (r < 0)
2424                 goto fail;
2425
2426         if (bus->output_fd != bus->input_fd) {
2427                 r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2428                 if (r < 0)
2429                         goto fail;
2430
2431                 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2432                 if (r < 0)
2433                         goto fail;
2434         }
2435
2436         r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2437         if (r < 0)
2438                 goto fail;
2439
2440         r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
2441         if (r < 0)
2442                 goto fail;
2443
2444         r = sd_event_source_set_priority(bus->time_event_source, priority);
2445         if (r < 0)
2446                 goto fail;
2447
2448         r = sd_event_add_quit(bus->event, quit_callback, bus, &bus->quit_event_source);
2449         if (r < 0)
2450                 goto fail;
2451
2452         return 0;
2453
2454 fail:
2455         sd_bus_detach_event(bus);
2456         return r;
2457 }
2458
2459 _public_ int sd_bus_detach_event(sd_bus *bus) {
2460         assert_return(bus, -EINVAL);
2461         assert_return(bus->event, -ENXIO);
2462
2463         if (bus->input_io_event_source)
2464                 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2465
2466         if (bus->output_io_event_source)
2467                 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2468
2469         if (bus->time_event_source)
2470                 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2471
2472         if (bus->quit_event_source)
2473                 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2474
2475         if (bus->event)
2476                 bus->event = sd_event_unref(bus->event);
2477
2478         return 0;
2479 }
2480
2481 _public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2482         assert_return(bus, NULL);
2483
2484         return bus->current;
2485 }
2486
2487 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
2488         sd_bus *b = NULL;
2489         int r;
2490
2491         assert(bus_open);
2492         assert(default_bus);
2493
2494         if (!ret)
2495                 return !!*default_bus;
2496
2497         if (*default_bus) {
2498                 *ret = sd_bus_ref(*default_bus);
2499                 return 0;
2500         }
2501
2502         r = bus_open(&b);
2503         if (r < 0)
2504                 return r;
2505
2506         b->default_bus_ptr = default_bus;
2507         b->tid = gettid();
2508         *default_bus = b;
2509
2510         *ret = b;
2511         return 1;
2512 }
2513
2514 _public_ int sd_bus_default_system(sd_bus **ret) {
2515         static __thread sd_bus *default_system_bus = NULL;
2516
2517         return bus_default(sd_bus_open_system, &default_system_bus, ret);
2518 }
2519
2520 _public_ int sd_bus_default_user(sd_bus **ret) {
2521         static __thread sd_bus *default_user_bus = NULL;
2522
2523         return bus_default(sd_bus_open_user, &default_user_bus, ret);
2524 }
2525
2526 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
2527         assert_return(b, -EINVAL);
2528         assert_return(tid, -EINVAL);
2529         assert_return(!bus_pid_changed(b), -ECHILD);
2530
2531         if (b->tid != 0) {
2532                 *tid = b->tid;
2533                 return 0;
2534         }
2535
2536         if (b->event)
2537                 return sd_event_get_tid(b->event, tid);
2538
2539         return -ENXIO;
2540 }