chiark / gitweb /
bus: add sd_bus_get_current() bus call to determine message that is currently being...
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29 #include <sys/mman.h>
30 #include <pthread.h>
31
32 #include "util.h"
33 #include "macro.h"
34 #include "strv.h"
35 #include "set.h"
36 #include "missing.h"
37
38 #include "sd-bus.h"
39 #include "bus-internal.h"
40 #include "bus-message.h"
41 #include "bus-type.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
48 #include "bus-util.h"
49 #include "bus-container.h"
50
51 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
52
53 static void bus_close_fds(sd_bus *b) {
54         assert(b);
55
56         if (b->input_fd >= 0)
57                 close_nointr_nofail(b->input_fd);
58
59         if (b->output_fd >= 0 && b->output_fd != b->input_fd)
60                 close_nointr_nofail(b->output_fd);
61
62         b->input_fd = b->output_fd = -1;
63 }
64
65 static void bus_node_destroy(sd_bus *b, struct node *n) {
66         struct node_callback *c;
67         struct node_vtable *v;
68         struct node_enumerator *e;
69
70         assert(b);
71
72         if (!n)
73                 return;
74
75         while (n->child)
76                 bus_node_destroy(b, n->child);
77
78         while ((c = n->callbacks)) {
79                 LIST_REMOVE(callbacks, n->callbacks, c);
80                 free(c);
81         }
82
83         while ((v = n->vtables)) {
84                 LIST_REMOVE(vtables, n->vtables, v);
85                 free(v->interface);
86                 free(v);
87         }
88
89         while ((e = n->enumerators)) {
90                 LIST_REMOVE(enumerators, n->enumerators, e);
91                 free(e);
92         }
93
94         if (n->parent)
95                 LIST_REMOVE(siblings, n->parent->child, n);
96
97         assert_se(hashmap_remove(b->nodes, n->path) == n);
98         free(n->path);
99         free(n);
100 }
101
102 static void bus_free(sd_bus *b) {
103         struct filter_callback *f;
104         struct node *n;
105         unsigned i;
106
107         assert(b);
108
109         sd_bus_detach_event(b);
110
111         bus_close_fds(b);
112
113         if (b->kdbus_buffer)
114                 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
115
116         free(b->rbuffer);
117         free(b->unique_name);
118         free(b->auth_buffer);
119         free(b->address);
120         free(b->kernel);
121         free(b->machine);
122
123         free(b->exec_path);
124         strv_free(b->exec_argv);
125
126         close_many(b->fds, b->n_fds);
127         free(b->fds);
128
129         for (i = 0; i < b->rqueue_size; i++)
130                 sd_bus_message_unref(b->rqueue[i]);
131         free(b->rqueue);
132
133         for (i = 0; i < b->wqueue_size; i++)
134                 sd_bus_message_unref(b->wqueue[i]);
135         free(b->wqueue);
136
137         hashmap_free_free(b->reply_callbacks);
138         prioq_free(b->reply_callbacks_prioq);
139
140         while ((f = b->filter_callbacks)) {
141                 LIST_REMOVE(callbacks, b->filter_callbacks, f);
142                 free(f);
143         }
144
145         bus_match_free(&b->match_callbacks);
146
147         hashmap_free_free(b->vtable_methods);
148         hashmap_free_free(b->vtable_properties);
149
150         while ((n = hashmap_first(b->nodes)))
151                 bus_node_destroy(b, n);
152
153         hashmap_free(b->nodes);
154
155         bus_kernel_flush_memfd(b);
156
157         assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
158
159         free(b);
160 }
161
162 int sd_bus_new(sd_bus **ret) {
163         sd_bus *r;
164
165         assert_return(ret, -EINVAL);
166
167         r = new0(sd_bus, 1);
168         if (!r)
169                 return -ENOMEM;
170
171         r->n_ref = REFCNT_INIT;
172         r->input_fd = r->output_fd = -1;
173         r->message_version = 1;
174         r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
175         r->original_pid = getpid();
176
177         assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
178
179         /* We guarantee that wqueue always has space for at least one
180          * entry */
181         r->wqueue = new(sd_bus_message*, 1);
182         if (!r->wqueue) {
183                 free(r);
184                 return -ENOMEM;
185         }
186
187         *ret = r;
188         return 0;
189 }
190
191 int sd_bus_set_address(sd_bus *bus, const char *address) {
192         char *a;
193
194         assert_return(bus, -EINVAL);
195         assert_return(bus->state == BUS_UNSET, -EPERM);
196         assert_return(address, -EINVAL);
197         assert_return(!bus_pid_changed(bus), -ECHILD);
198
199         a = strdup(address);
200         if (!a)
201                 return -ENOMEM;
202
203         free(bus->address);
204         bus->address = a;
205
206         return 0;
207 }
208
209 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
210         assert_return(bus, -EINVAL);
211         assert_return(bus->state == BUS_UNSET, -EPERM);
212         assert_return(input_fd >= 0, -EINVAL);
213         assert_return(output_fd >= 0, -EINVAL);
214         assert_return(!bus_pid_changed(bus), -ECHILD);
215
216         bus->input_fd = input_fd;
217         bus->output_fd = output_fd;
218         return 0;
219 }
220
221 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
222         char *p, **a;
223
224         assert_return(bus, -EINVAL);
225         assert_return(bus->state == BUS_UNSET, -EPERM);
226         assert_return(path, -EINVAL);
227         assert_return(!strv_isempty(argv), -EINVAL);
228         assert_return(!bus_pid_changed(bus), -ECHILD);
229
230         p = strdup(path);
231         if (!p)
232                 return -ENOMEM;
233
234         a = strv_copy(argv);
235         if (!a) {
236                 free(p);
237                 return -ENOMEM;
238         }
239
240         free(bus->exec_path);
241         strv_free(bus->exec_argv);
242
243         bus->exec_path = p;
244         bus->exec_argv = a;
245
246         return 0;
247 }
248
249 int sd_bus_set_bus_client(sd_bus *bus, int b) {
250         assert_return(bus, -EINVAL);
251         assert_return(bus->state == BUS_UNSET, -EPERM);
252         assert_return(!bus_pid_changed(bus), -ECHILD);
253
254         bus->bus_client = !!b;
255         return 0;
256 }
257
258 int sd_bus_negotiate_fds(sd_bus *bus, int b) {
259         assert_return(bus, -EINVAL);
260         assert_return(bus->state == BUS_UNSET, -EPERM);
261         assert_return(!bus_pid_changed(bus), -ECHILD);
262
263         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
264         return 0;
265 }
266
267 int sd_bus_negotiate_attach_comm(sd_bus *bus, int b) {
268         assert_return(bus, -EINVAL);
269         assert_return(bus->state == BUS_UNSET, -EPERM);
270         assert_return(!bus_pid_changed(bus), -ECHILD);
271
272         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_COMM, b);
273         return 0;
274 }
275
276 int sd_bus_negotiate_attach_exe(sd_bus *bus, int b) {
277         assert_return(bus, -EINVAL);
278         assert_return(bus->state == BUS_UNSET, -EPERM);
279         assert_return(!bus_pid_changed(bus), -ECHILD);
280
281         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_EXE, b);
282         return 0;
283 }
284
285 int sd_bus_negotiate_attach_cmdline(sd_bus *bus, int b) {
286         assert_return(bus, -EINVAL);
287         assert_return(bus->state == BUS_UNSET, -EPERM);
288         assert_return(!bus_pid_changed(bus), -ECHILD);
289
290         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CMDLINE, b);
291         return 0;
292 }
293
294 int sd_bus_negotiate_attach_cgroup(sd_bus *bus, int b) {
295         assert_return(bus, -EINVAL);
296         assert_return(bus->state == BUS_UNSET, -EPERM);
297         assert_return(!bus_pid_changed(bus), -ECHILD);
298
299         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CGROUP, b);
300         return 0;
301 }
302
303 int sd_bus_negotiate_attach_caps(sd_bus *bus, int b) {
304         assert_return(bus, -EINVAL);
305         assert_return(bus->state == BUS_UNSET, -EPERM);
306         assert_return(!bus_pid_changed(bus), -ECHILD);
307
308         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CAPS, b);
309         return 0;
310 }
311
312 int sd_bus_negotiate_attach_selinux_context(sd_bus *bus, int b) {
313         assert_return(bus, -EINVAL);
314         assert_return(bus->state == BUS_UNSET, -EPERM);
315         assert_return(!bus_pid_changed(bus), -ECHILD);
316
317         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_SECLABEL, b);
318         return 0;
319 }
320
321 int sd_bus_negotiate_attach_audit(sd_bus *bus, int b) {
322         assert_return(bus, -EINVAL);
323         assert_return(bus->state == BUS_UNSET, -EPERM);
324         assert_return(!bus_pid_changed(bus), -ECHILD);
325
326         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_AUDIT, b);
327         return 0;
328 }
329
330 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
331         assert_return(bus, -EINVAL);
332         assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
333         assert_return(bus->state == BUS_UNSET, -EPERM);
334         assert_return(!bus_pid_changed(bus), -ECHILD);
335
336         bus->is_server = !!b;
337         bus->server_id = server_id;
338         return 0;
339 }
340
341 int sd_bus_set_anonymous(sd_bus *bus, int b) {
342         assert_return(bus, -EINVAL);
343         assert_return(bus->state == BUS_UNSET, -EPERM);
344         assert_return(!bus_pid_changed(bus), -ECHILD);
345
346         bus->anonymous_auth = !!b;
347         return 0;
348 }
349
350 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
351         const char *s;
352         int r;
353
354         assert(bus);
355         assert(bus->state == BUS_HELLO);
356         assert(reply);
357
358         r = sd_bus_message_get_errno(reply);
359         if (r < 0)
360                 return r;
361         if (r > 0)
362                 return -r;
363
364         r = sd_bus_message_read(reply, "s", &s);
365         if (r < 0)
366                 return r;
367
368         if (!service_name_is_valid(s) || s[0] != ':')
369                 return -EBADMSG;
370
371         bus->unique_name = strdup(s);
372         if (!bus->unique_name)
373                 return -ENOMEM;
374
375         bus->state = BUS_RUNNING;
376
377         return 1;
378 }
379
380 static int bus_send_hello(sd_bus *bus) {
381         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
382         int r;
383
384         assert(bus);
385
386         if (!bus->bus_client || bus->is_kernel)
387                 return 0;
388
389         r = sd_bus_message_new_method_call(
390                         bus,
391                         "org.freedesktop.DBus",
392                         "/",
393                         "org.freedesktop.DBus",
394                         "Hello",
395                         &m);
396         if (r < 0)
397                 return r;
398
399         return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
400 }
401
402 int bus_start_running(sd_bus *bus) {
403         assert(bus);
404
405         if (bus->bus_client && !bus->is_kernel) {
406                 bus->state = BUS_HELLO;
407                 return 1;
408         }
409
410         bus->state = BUS_RUNNING;
411         return 1;
412 }
413
414 static int parse_address_key(const char **p, const char *key, char **value) {
415         size_t l, n = 0;
416         const char *a;
417         char *r = NULL;
418
419         assert(p);
420         assert(*p);
421         assert(value);
422
423         if (key) {
424                 l = strlen(key);
425                 if (strncmp(*p, key, l) != 0)
426                         return 0;
427
428                 if ((*p)[l] != '=')
429                         return 0;
430
431                 if (*value)
432                         return -EINVAL;
433
434                 a = *p + l + 1;
435         } else
436                 a = *p;
437
438         while (*a != ';' && *a != ',' && *a != 0) {
439                 char c, *t;
440
441                 if (*a == '%') {
442                         int x, y;
443
444                         x = unhexchar(a[1]);
445                         if (x < 0) {
446                                 free(r);
447                                 return x;
448                         }
449
450                         y = unhexchar(a[2]);
451                         if (y < 0) {
452                                 free(r);
453                                 return y;
454                         }
455
456                         c = (char) ((x << 4) | y);
457                         a += 3;
458                 } else {
459                         c = *a;
460                         a++;
461                 }
462
463                 t = realloc(r, n + 2);
464                 if (!t) {
465                         free(r);
466                         return -ENOMEM;
467                 }
468
469                 r = t;
470                 r[n++] = c;
471         }
472
473         if (!r) {
474                 r = strdup("");
475                 if (!r)
476                         return -ENOMEM;
477         } else
478                 r[n] = 0;
479
480         if (*a == ',')
481                 a++;
482
483         *p = a;
484
485         free(*value);
486         *value = r;
487
488         return 1;
489 }
490
491 static void skip_address_key(const char **p) {
492         assert(p);
493         assert(*p);
494
495         *p += strcspn(*p, ",");
496
497         if (**p == ',')
498                 (*p) ++;
499 }
500
501 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
502         _cleanup_free_ char *path = NULL, *abstract = NULL;
503         size_t l;
504         int r;
505
506         assert(b);
507         assert(p);
508         assert(*p);
509         assert(guid);
510
511         while (**p != 0 && **p != ';') {
512                 r = parse_address_key(p, "guid", guid);
513                 if (r < 0)
514                         return r;
515                 else if (r > 0)
516                         continue;
517
518                 r = parse_address_key(p, "path", &path);
519                 if (r < 0)
520                         return r;
521                 else if (r > 0)
522                         continue;
523
524                 r = parse_address_key(p, "abstract", &abstract);
525                 if (r < 0)
526                         return r;
527                 else if (r > 0)
528                         continue;
529
530                 skip_address_key(p);
531         }
532
533         if (!path && !abstract)
534                 return -EINVAL;
535
536         if (path && abstract)
537                 return -EINVAL;
538
539         if (path) {
540                 l = strlen(path);
541                 if (l > sizeof(b->sockaddr.un.sun_path))
542                         return -E2BIG;
543
544                 b->sockaddr.un.sun_family = AF_UNIX;
545                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
546                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
547         } else if (abstract) {
548                 l = strlen(abstract);
549                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
550                         return -E2BIG;
551
552                 b->sockaddr.un.sun_family = AF_UNIX;
553                 b->sockaddr.un.sun_path[0] = 0;
554                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
555                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
556         }
557
558         return 0;
559 }
560
561 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
562         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
563         int r;
564         struct addrinfo *result, hints = {
565                 .ai_socktype = SOCK_STREAM,
566                 .ai_flags = AI_ADDRCONFIG,
567         };
568
569         assert(b);
570         assert(p);
571         assert(*p);
572         assert(guid);
573
574         while (**p != 0 && **p != ';') {
575                 r = parse_address_key(p, "guid", guid);
576                 if (r < 0)
577                         return r;
578                 else if (r > 0)
579                         continue;
580
581                 r = parse_address_key(p, "host", &host);
582                 if (r < 0)
583                         return r;
584                 else if (r > 0)
585                         continue;
586
587                 r = parse_address_key(p, "port", &port);
588                 if (r < 0)
589                         return r;
590                 else if (r > 0)
591                         continue;
592
593                 r = parse_address_key(p, "family", &family);
594                 if (r < 0)
595                         return r;
596                 else if (r > 0)
597                         continue;
598
599                 skip_address_key(p);
600         }
601
602         if (!host || !port)
603                 return -EINVAL;
604
605         if (family) {
606                 if (streq(family, "ipv4"))
607                         hints.ai_family = AF_INET;
608                 else if (streq(family, "ipv6"))
609                         hints.ai_family = AF_INET6;
610                 else
611                         return -EINVAL;
612         }
613
614         r = getaddrinfo(host, port, &hints, &result);
615         if (r == EAI_SYSTEM)
616                 return -errno;
617         else if (r != 0)
618                 return -EADDRNOTAVAIL;
619
620         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
621         b->sockaddr_size = result->ai_addrlen;
622
623         freeaddrinfo(result);
624
625         return 0;
626 }
627
628 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
629         char *path = NULL;
630         unsigned n_argv = 0, j;
631         char **argv = NULL;
632         int r;
633
634         assert(b);
635         assert(p);
636         assert(*p);
637         assert(guid);
638
639         while (**p != 0 && **p != ';') {
640                 r = parse_address_key(p, "guid", guid);
641                 if (r < 0)
642                         goto fail;
643                 else if (r > 0)
644                         continue;
645
646                 r = parse_address_key(p, "path", &path);
647                 if (r < 0)
648                         goto fail;
649                 else if (r > 0)
650                         continue;
651
652                 if (startswith(*p, "argv")) {
653                         unsigned ul;
654
655                         errno = 0;
656                         ul = strtoul(*p + 4, (char**) p, 10);
657                         if (errno > 0 || **p != '=' || ul > 256) {
658                                 r = -EINVAL;
659                                 goto fail;
660                         }
661
662                         (*p) ++;
663
664                         if (ul >= n_argv) {
665                                 char **x;
666
667                                 x = realloc(argv, sizeof(char*) * (ul + 2));
668                                 if (!x) {
669                                         r = -ENOMEM;
670                                         goto fail;
671                                 }
672
673                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
674
675                                 argv = x;
676                                 n_argv = ul + 1;
677                         }
678
679                         r = parse_address_key(p, NULL, argv + ul);
680                         if (r < 0)
681                                 goto fail;
682
683                         continue;
684                 }
685
686                 skip_address_key(p);
687         }
688
689         if (!path) {
690                 r = -EINVAL;
691                 goto fail;
692         }
693
694         /* Make sure there are no holes in the array, with the
695          * exception of argv[0] */
696         for (j = 1; j < n_argv; j++)
697                 if (!argv[j]) {
698                         r = -EINVAL;
699                         goto fail;
700                 }
701
702         if (argv && argv[0] == NULL) {
703                 argv[0] = strdup(path);
704                 if (!argv[0]) {
705                         r = -ENOMEM;
706                         goto fail;
707                 }
708         }
709
710         b->exec_path = path;
711         b->exec_argv = argv;
712         return 0;
713
714 fail:
715         for (j = 0; j < n_argv; j++)
716                 free(argv[j]);
717
718         free(argv);
719         free(path);
720         return r;
721 }
722
723 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
724         _cleanup_free_ char *path = NULL;
725         int r;
726
727         assert(b);
728         assert(p);
729         assert(*p);
730         assert(guid);
731
732         while (**p != 0 && **p != ';') {
733                 r = parse_address_key(p, "guid", guid);
734                 if (r < 0)
735                         return r;
736                 else if (r > 0)
737                         continue;
738
739                 r = parse_address_key(p, "path", &path);
740                 if (r < 0)
741                         return r;
742                 else if (r > 0)
743                         continue;
744
745                 skip_address_key(p);
746         }
747
748         if (!path)
749                 return -EINVAL;
750
751         free(b->kernel);
752         b->kernel = path;
753         path = NULL;
754
755         return 0;
756 }
757
758 static int parse_container_address(sd_bus *b, const char **p, char **guid) {
759         _cleanup_free_ char *machine = NULL;
760         int r;
761
762         assert(b);
763         assert(p);
764         assert(*p);
765         assert(guid);
766
767         while (**p != 0 && **p != ';') {
768                 r = parse_address_key(p, "guid", guid);
769                 if (r < 0)
770                         return r;
771                 else if (r > 0)
772                         continue;
773
774                 r = parse_address_key(p, "machine", &machine);
775                 if (r < 0)
776                         return r;
777                 else if (r > 0)
778                         continue;
779
780                 skip_address_key(p);
781         }
782
783         if (!machine)
784                 return -EINVAL;
785
786         free(b->machine);
787         b->machine = machine;
788         machine = NULL;
789
790         b->sockaddr.un.sun_family = AF_UNIX;
791         strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
792         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
793
794         return 0;
795 }
796
797 static void bus_reset_parsed_address(sd_bus *b) {
798         assert(b);
799
800         zero(b->sockaddr);
801         b->sockaddr_size = 0;
802         strv_free(b->exec_argv);
803         free(b->exec_path);
804         b->exec_path = NULL;
805         b->exec_argv = NULL;
806         b->server_id = SD_ID128_NULL;
807         free(b->kernel);
808         b->kernel = NULL;
809         free(b->machine);
810         b->machine = NULL;
811 }
812
813 static int bus_parse_next_address(sd_bus *b) {
814         _cleanup_free_ char *guid = NULL;
815         const char *a;
816         int r;
817
818         assert(b);
819
820         if (!b->address)
821                 return 0;
822         if (b->address[b->address_index] == 0)
823                 return 0;
824
825         bus_reset_parsed_address(b);
826
827         a = b->address + b->address_index;
828
829         while (*a != 0) {
830
831                 if (*a == ';') {
832                         a++;
833                         continue;
834                 }
835
836                 if (startswith(a, "unix:")) {
837                         a += 5;
838
839                         r = parse_unix_address(b, &a, &guid);
840                         if (r < 0)
841                                 return r;
842                         break;
843
844                 } else if (startswith(a, "tcp:")) {
845
846                         a += 4;
847                         r = parse_tcp_address(b, &a, &guid);
848                         if (r < 0)
849                                 return r;
850
851                         break;
852
853                 } else if (startswith(a, "unixexec:")) {
854
855                         a += 9;
856                         r = parse_exec_address(b, &a, &guid);
857                         if (r < 0)
858                                 return r;
859
860                         break;
861
862                 } else if (startswith(a, "kernel:")) {
863
864                         a += 7;
865                         r = parse_kernel_address(b, &a, &guid);
866                         if (r < 0)
867                                 return r;
868
869                         break;
870                 } else if (startswith(a, "x-container:")) {
871
872                         a += 12;
873                         r = parse_container_address(b, &a, &guid);
874                         if (r < 0)
875                                 return r;
876
877                         break;
878                 }
879
880                 a = strchr(a, ';');
881                 if (!a)
882                         return 0;
883         }
884
885         if (guid) {
886                 r = sd_id128_from_string(guid, &b->server_id);
887                 if (r < 0)
888                         return r;
889         }
890
891         b->address_index = a - b->address;
892         return 1;
893 }
894
895 static int bus_start_address(sd_bus *b) {
896         int r;
897
898         assert(b);
899
900         for (;;) {
901                 sd_bus_close(b);
902
903                 if (b->exec_path) {
904
905                         r = bus_socket_exec(b);
906                         if (r >= 0)
907                                 return r;
908
909                         b->last_connect_error = -r;
910                 } else if (b->kernel) {
911
912                         r = bus_kernel_connect(b);
913                         if (r >= 0)
914                                 return r;
915
916                         b->last_connect_error = -r;
917
918                 } else if (b->machine) {
919
920                         r = bus_container_connect(b);
921                         if (r >= 0)
922                                 return r;
923
924                         b->last_connect_error = -r;
925
926                 } else if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
927
928                         r = bus_socket_connect(b);
929                         if (r >= 0)
930                                 return r;
931
932                         b->last_connect_error = -r;
933                 }
934
935                 r = bus_parse_next_address(b);
936                 if (r < 0)
937                         return r;
938                 if (r == 0)
939                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
940         }
941 }
942
943 int bus_next_address(sd_bus *b) {
944         assert(b);
945
946         bus_reset_parsed_address(b);
947         return bus_start_address(b);
948 }
949
950 static int bus_start_fd(sd_bus *b) {
951         struct stat st;
952         int r;
953
954         assert(b);
955         assert(b->input_fd >= 0);
956         assert(b->output_fd >= 0);
957
958         r = fd_nonblock(b->input_fd, true);
959         if (r < 0)
960                 return r;
961
962         r = fd_cloexec(b->input_fd, true);
963         if (r < 0)
964                 return r;
965
966         if (b->input_fd != b->output_fd) {
967                 r = fd_nonblock(b->output_fd, true);
968                 if (r < 0)
969                         return r;
970
971                 r = fd_cloexec(b->output_fd, true);
972                 if (r < 0)
973                         return r;
974         }
975
976         if (fstat(b->input_fd, &st) < 0)
977                 return -errno;
978
979         if (S_ISCHR(b->input_fd))
980                 return bus_kernel_take_fd(b);
981         else
982                 return bus_socket_take_fd(b);
983 }
984
985 int sd_bus_start(sd_bus *bus) {
986         int r;
987
988         assert_return(bus, -EINVAL);
989         assert_return(bus->state == BUS_UNSET, -EPERM);
990         assert_return(!bus_pid_changed(bus), -ECHILD);
991
992         bus->state = BUS_OPENING;
993
994         if (bus->is_server && bus->bus_client)
995                 return -EINVAL;
996
997         if (bus->input_fd >= 0)
998                 r = bus_start_fd(bus);
999         else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
1000                 r = bus_start_address(bus);
1001         else
1002                 return -EINVAL;
1003
1004         if (r < 0)
1005                 return r;
1006
1007         return bus_send_hello(bus);
1008 }
1009
1010 int sd_bus_open_system(sd_bus **ret) {
1011         const char *e;
1012         sd_bus *b;
1013         int r;
1014
1015         assert_return(ret, -EINVAL);
1016
1017         r = sd_bus_new(&b);
1018         if (r < 0)
1019                 return r;
1020
1021         e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1022         if (e) {
1023                 r = sd_bus_set_address(b, e);
1024                 if (r < 0)
1025                         goto fail;
1026         } else {
1027                 b->sockaddr.un.sun_family = AF_UNIX;
1028                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
1029                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
1030         }
1031
1032         b->bus_client = true;
1033
1034         r = sd_bus_start(b);
1035         if (r < 0)
1036                 goto fail;
1037
1038         *ret = b;
1039         return 0;
1040
1041 fail:
1042         bus_free(b);
1043         return r;
1044 }
1045
1046 int sd_bus_open_user(sd_bus **ret) {
1047         const char *e;
1048         sd_bus *b;
1049         size_t l;
1050         int r;
1051
1052         assert_return(ret, -EINVAL);
1053
1054         r = sd_bus_new(&b);
1055         if (r < 0)
1056                 return r;
1057
1058         e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1059         if (e) {
1060                 r = sd_bus_set_address(b, e);
1061                 if (r < 0)
1062                         goto fail;
1063         } else {
1064                 e = secure_getenv("XDG_RUNTIME_DIR");
1065                 if (!e) {
1066                         r = -ENOENT;
1067                         goto fail;
1068                 }
1069
1070                 l = strlen(e);
1071                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1072                         r = -E2BIG;
1073                         goto fail;
1074                 }
1075
1076                 b->sockaddr.un.sun_family = AF_UNIX;
1077                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1078                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1079         }
1080
1081         b->bus_client = true;
1082
1083         r = sd_bus_start(b);
1084         if (r < 0)
1085                 goto fail;
1086
1087         *ret = b;
1088         return 0;
1089
1090 fail:
1091         bus_free(b);
1092         return r;
1093 }
1094
1095 int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1096         _cleanup_free_ char *e = NULL;
1097         char *p = NULL;
1098         sd_bus *bus;
1099         int r;
1100
1101         assert_return(host, -EINVAL);
1102         assert_return(ret, -EINVAL);
1103
1104         e = bus_address_escape(host);
1105         if (!e)
1106                 return -ENOMEM;
1107
1108         p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1109         if (!p)
1110                 return -ENOMEM;
1111
1112         r = sd_bus_new(&bus);
1113         if (r < 0) {
1114                 free(p);
1115                 return r;
1116         }
1117
1118         bus->address = p;
1119         bus->bus_client = true;
1120
1121         r = sd_bus_start(bus);
1122         if (r < 0) {
1123                 bus_free(bus);
1124                 return r;
1125         }
1126
1127         *ret = bus;
1128         return 0;
1129 }
1130
1131 int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1132         _cleanup_free_ char *e = NULL;
1133         sd_bus *bus;
1134         char *p;
1135         int r;
1136
1137         assert_return(machine, -EINVAL);
1138         assert_return(ret, -EINVAL);
1139
1140         e = bus_address_escape(machine);
1141         if (!e)
1142                 return -ENOMEM;
1143
1144         p = strjoin("x-container:machine=", e, NULL);
1145         if (!p)
1146                 return -ENOMEM;
1147
1148         r = sd_bus_new(&bus);
1149         if (r < 0) {
1150                 free(p);
1151                 return r;
1152         }
1153
1154         bus->address = p;
1155         bus->bus_client = true;
1156
1157         r = sd_bus_start(bus);
1158         if (r < 0) {
1159                 bus_free(bus);
1160                 return r;
1161         }
1162
1163         *ret = bus;
1164         return 0;
1165 }
1166
1167 void sd_bus_close(sd_bus *bus) {
1168         if (!bus)
1169                 return;
1170         if (bus->state == BUS_CLOSED)
1171                 return;
1172         if (bus_pid_changed(bus))
1173                 return;
1174
1175         bus->state = BUS_CLOSED;
1176
1177         sd_bus_detach_event(bus);
1178
1179         if (!bus->is_kernel)
1180                 bus_close_fds(bus);
1181
1182         /* We'll leave the fd open in case this is a kernel bus, since
1183          * there might still be memblocks around that reference this
1184          * bus, and they might need to invoke the
1185          * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1186          * freed. */
1187 }
1188
1189 sd_bus *sd_bus_ref(sd_bus *bus) {
1190         assert_return(bus, NULL);
1191
1192         assert_se(REFCNT_INC(bus->n_ref) >= 2);
1193
1194         return bus;
1195 }
1196
1197 sd_bus *sd_bus_unref(sd_bus *bus) {
1198         assert_return(bus, NULL);
1199
1200         if (REFCNT_DEC(bus->n_ref) <= 0)
1201                 bus_free(bus);
1202
1203         return NULL;
1204 }
1205
1206 int sd_bus_is_open(sd_bus *bus) {
1207
1208         assert_return(bus, -EINVAL);
1209         assert_return(!bus_pid_changed(bus), -ECHILD);
1210
1211         return BUS_IS_OPEN(bus->state);
1212 }
1213
1214 int sd_bus_can_send(sd_bus *bus, char type) {
1215         int r;
1216
1217         assert_return(bus, -EINVAL);
1218         assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1219         assert_return(!bus_pid_changed(bus), -ECHILD);
1220
1221         if (type == SD_BUS_TYPE_UNIX_FD) {
1222                 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1223                         return 0;
1224
1225                 r = bus_ensure_running(bus);
1226                 if (r < 0)
1227                         return r;
1228
1229                 return bus->can_fds;
1230         }
1231
1232         return bus_type_is_valid(type);
1233 }
1234
1235 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1236         int r;
1237
1238         assert_return(bus, -EINVAL);
1239         assert_return(server_id, -EINVAL);
1240         assert_return(!bus_pid_changed(bus), -ECHILD);
1241
1242         r = bus_ensure_running(bus);
1243         if (r < 0)
1244                 return r;
1245
1246         *server_id = bus->server_id;
1247         return 0;
1248 }
1249
1250 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1251         assert(m);
1252
1253         if (m->header->version > b->message_version)
1254                 return -EPERM;
1255
1256         if (m->sealed)
1257                 return 0;
1258
1259         return bus_message_seal(m, ++b->serial);
1260 }
1261
1262 static int dispatch_wqueue(sd_bus *bus) {
1263         int r, ret = 0;
1264
1265         assert(bus);
1266         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1267
1268         while (bus->wqueue_size > 0) {
1269
1270                 if (bus->is_kernel)
1271                         r = bus_kernel_write_message(bus, bus->wqueue[0]);
1272                 else
1273                         r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1274
1275                 if (r < 0) {
1276                         sd_bus_close(bus);
1277                         return r;
1278                 } else if (r == 0)
1279                         /* Didn't do anything this time */
1280                         return ret;
1281                 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1282                         /* Fully written. Let's drop the entry from
1283                          * the queue.
1284                          *
1285                          * This isn't particularly optimized, but
1286                          * well, this is supposed to be our worst-case
1287                          * buffer only, and the socket buffer is
1288                          * supposed to be our primary buffer, and if
1289                          * it got full, then all bets are off
1290                          * anyway. */
1291
1292                         sd_bus_message_unref(bus->wqueue[0]);
1293                         bus->wqueue_size --;
1294                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1295                         bus->windex = 0;
1296
1297                         ret = 1;
1298                 }
1299         }
1300
1301         return ret;
1302 }
1303
1304 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1305         sd_bus_message *z = NULL;
1306         int r, ret = 0;
1307
1308         assert(bus);
1309         assert(m);
1310         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1311
1312         if (bus->rqueue_size > 0) {
1313                 /* Dispatch a queued message */
1314
1315                 *m = bus->rqueue[0];
1316                 bus->rqueue_size --;
1317                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1318                 return 1;
1319         }
1320
1321         /* Try to read a new message */
1322         do {
1323                 if (bus->is_kernel)
1324                         r = bus_kernel_read_message(bus, &z);
1325                 else
1326                         r = bus_socket_read_message(bus, &z);
1327
1328                 if (r < 0) {
1329                         sd_bus_close(bus);
1330                         return r;
1331                 }
1332                 if (r == 0)
1333                         return ret;
1334
1335                 ret = 1;
1336         } while (!z);
1337
1338         *m = z;
1339         return ret;
1340 }
1341
1342 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1343         int r;
1344
1345         assert_return(bus, -EINVAL);
1346         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1347         assert_return(m, -EINVAL);
1348         assert_return(!bus_pid_changed(bus), -ECHILD);
1349
1350         if (m->n_fds > 0) {
1351                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1352                 if (r < 0)
1353                         return r;
1354                 if (r == 0)
1355                         return -ENOTSUP;
1356         }
1357
1358         /* If the serial number isn't kept, then we know that no reply
1359          * is expected */
1360         if (!serial && !m->sealed)
1361                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1362
1363         r = bus_seal_message(bus, m);
1364         if (r < 0)
1365                 return r;
1366
1367         /* If this is a reply and no reply was requested, then let's
1368          * suppress this, if we can */
1369         if (m->dont_send && !serial)
1370                 return 1;
1371
1372         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1373                 size_t idx = 0;
1374
1375                 if (bus->is_kernel)
1376                         r = bus_kernel_write_message(bus, m);
1377                 else
1378                         r = bus_socket_write_message(bus, m, &idx);
1379
1380                 if (r < 0) {
1381                         sd_bus_close(bus);
1382                         return r;
1383                 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m))  {
1384                         /* Wasn't fully written. So let's remember how
1385                          * much was written. Note that the first entry
1386                          * of the wqueue array is always allocated so
1387                          * that we always can remember how much was
1388                          * written. */
1389                         bus->wqueue[0] = sd_bus_message_ref(m);
1390                         bus->wqueue_size = 1;
1391                         bus->windex = idx;
1392                 }
1393         } else {
1394                 sd_bus_message **q;
1395
1396                 /* Just append it to the queue. */
1397
1398                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1399                         return -ENOBUFS;
1400
1401                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1402                 if (!q)
1403                         return -ENOMEM;
1404
1405                 bus->wqueue = q;
1406                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1407         }
1408
1409         if (serial)
1410                 *serial = BUS_MESSAGE_SERIAL(m);
1411
1412         return 1;
1413 }
1414
1415 static usec_t calc_elapse(uint64_t usec) {
1416         if (usec == (uint64_t) -1)
1417                 return 0;
1418
1419         if (usec == 0)
1420                 usec = BUS_DEFAULT_TIMEOUT;
1421
1422         return now(CLOCK_MONOTONIC) + usec;
1423 }
1424
1425 static int timeout_compare(const void *a, const void *b) {
1426         const struct reply_callback *x = a, *y = b;
1427
1428         if (x->timeout != 0 && y->timeout == 0)
1429                 return -1;
1430
1431         if (x->timeout == 0 && y->timeout != 0)
1432                 return 1;
1433
1434         if (x->timeout < y->timeout)
1435                 return -1;
1436
1437         if (x->timeout > y->timeout)
1438                 return 1;
1439
1440         return 0;
1441 }
1442
1443 int sd_bus_send_with_reply(
1444                 sd_bus *bus,
1445                 sd_bus_message *m,
1446                 sd_bus_message_handler_t callback,
1447                 void *userdata,
1448                 uint64_t usec,
1449                 uint64_t *serial) {
1450
1451         struct reply_callback *c;
1452         int r;
1453
1454         assert_return(bus, -EINVAL);
1455         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1456         assert_return(m, -EINVAL);
1457         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1458         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1459         assert_return(callback, -EINVAL);
1460         assert_return(!bus_pid_changed(bus), -ECHILD);
1461
1462         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1463         if (r < 0)
1464                 return r;
1465
1466         if (usec != (uint64_t) -1) {
1467                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1468                 if (r < 0)
1469                         return r;
1470         }
1471
1472         r = bus_seal_message(bus, m);
1473         if (r < 0)
1474                 return r;
1475
1476         c = new0(struct reply_callback, 1);
1477         if (!c)
1478                 return -ENOMEM;
1479
1480         c->callback = callback;
1481         c->userdata = userdata;
1482         c->serial = BUS_MESSAGE_SERIAL(m);
1483         c->timeout = calc_elapse(usec);
1484
1485         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1486         if (r < 0) {
1487                 free(c);
1488                 return r;
1489         }
1490
1491         if (c->timeout != 0) {
1492                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1493                 if (r < 0) {
1494                         c->timeout = 0;
1495                         sd_bus_send_with_reply_cancel(bus, c->serial);
1496                         return r;
1497                 }
1498         }
1499
1500         r = sd_bus_send(bus, m, serial);
1501         if (r < 0) {
1502                 sd_bus_send_with_reply_cancel(bus, c->serial);
1503                 return r;
1504         }
1505
1506         return r;
1507 }
1508
1509 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1510         struct reply_callback *c;
1511
1512         assert_return(bus, -EINVAL);
1513         assert_return(serial != 0, -EINVAL);
1514         assert_return(!bus_pid_changed(bus), -ECHILD);
1515
1516         c = hashmap_remove(bus->reply_callbacks, &serial);
1517         if (!c)
1518                 return 0;
1519
1520         if (c->timeout != 0)
1521                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1522
1523         free(c);
1524         return 1;
1525 }
1526
1527 int bus_ensure_running(sd_bus *bus) {
1528         int r;
1529
1530         assert(bus);
1531
1532         if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED)
1533                 return -ENOTCONN;
1534         if (bus->state == BUS_RUNNING)
1535                 return 1;
1536
1537         for (;;) {
1538                 r = sd_bus_process(bus, NULL);
1539                 if (r < 0)
1540                         return r;
1541                 if (bus->state == BUS_RUNNING)
1542                         return 1;
1543                 if (r > 0)
1544                         continue;
1545
1546                 r = sd_bus_wait(bus, (uint64_t) -1);
1547                 if (r < 0)
1548                         return r;
1549         }
1550 }
1551
1552 int sd_bus_send_with_reply_and_block(
1553                 sd_bus *bus,
1554                 sd_bus_message *m,
1555                 uint64_t usec,
1556                 sd_bus_error *error,
1557                 sd_bus_message **reply) {
1558
1559         int r;
1560         usec_t timeout;
1561         uint64_t serial;
1562         bool room = false;
1563
1564         assert_return(bus, -EINVAL);
1565         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1566         assert_return(m, -EINVAL);
1567         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1568         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1569         assert_return(!bus_error_is_dirty(error), -EINVAL);
1570         assert_return(!bus_pid_changed(bus), -ECHILD);
1571
1572         r = bus_ensure_running(bus);
1573         if (r < 0)
1574                 return r;
1575
1576         r = sd_bus_send(bus, m, &serial);
1577         if (r < 0)
1578                 return r;
1579
1580         timeout = calc_elapse(usec);
1581
1582         for (;;) {
1583                 usec_t left;
1584                 sd_bus_message *incoming = NULL;
1585
1586                 if (!room) {
1587                         sd_bus_message **q;
1588
1589                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1590                                 return -ENOBUFS;
1591
1592                         /* Make sure there's room for queuing this
1593                          * locally, before we read the message */
1594
1595                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1596                         if (!q)
1597                                 return -ENOMEM;
1598
1599                         bus->rqueue = q;
1600                         room = true;
1601                 }
1602
1603                 if (bus->is_kernel)
1604                         r = bus_kernel_read_message(bus, &incoming);
1605                 else
1606                         r = bus_socket_read_message(bus, &incoming);
1607                 if (r < 0)
1608                         return r;
1609                 if (incoming) {
1610
1611                         if (incoming->reply_serial == serial) {
1612                                 /* Found a match! */
1613
1614                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1615
1616                                         if (reply)
1617                                                 *reply = incoming;
1618                                         else
1619                                                 sd_bus_message_unref(incoming);
1620
1621                                         return 1;
1622                                 }
1623
1624                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) {
1625                                         int k;
1626
1627                                         r = sd_bus_error_copy(error, &incoming->error);
1628                                         if (r < 0) {
1629                                                 sd_bus_message_unref(incoming);
1630                                                 return r;
1631                                         }
1632
1633                                         k = sd_bus_error_get_errno(&incoming->error);
1634                                         sd_bus_message_unref(incoming);
1635                                         return -k;
1636                                 }
1637
1638                                 sd_bus_message_unref(incoming);
1639                                 return -EIO;
1640                         }
1641
1642                         /* There's already guaranteed to be room for
1643                          * this, so need to resize things here */
1644                         bus->rqueue[bus->rqueue_size ++] = incoming;
1645                         room = false;
1646
1647                         /* Try to read more, right-away */
1648                         continue;
1649                 }
1650                 if (r != 0)
1651                         continue;
1652
1653                 if (timeout > 0) {
1654                         usec_t n;
1655
1656                         n = now(CLOCK_MONOTONIC);
1657                         if (n >= timeout)
1658                                 return -ETIMEDOUT;
1659
1660                         left = timeout - n;
1661                 } else
1662                         left = (uint64_t) -1;
1663
1664                 r = bus_poll(bus, true, left);
1665                 if (r < 0)
1666                         return r;
1667
1668                 r = dispatch_wqueue(bus);
1669                 if (r < 0)
1670                         return r;
1671         }
1672 }
1673
1674 int sd_bus_get_fd(sd_bus *bus) {
1675
1676         assert_return(bus, -EINVAL);
1677         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1678         assert_return(bus->input_fd == bus->output_fd, -EPERM);
1679         assert_return(!bus_pid_changed(bus), -ECHILD);
1680
1681         return bus->input_fd;
1682 }
1683
1684 int sd_bus_get_events(sd_bus *bus) {
1685         int flags = 0;
1686
1687         assert_return(bus, -EINVAL);
1688         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1689         assert_return(!bus_pid_changed(bus), -ECHILD);
1690
1691         if (bus->state == BUS_OPENING)
1692                 flags |= POLLOUT;
1693         else if (bus->state == BUS_AUTHENTICATING) {
1694
1695                 if (bus_socket_auth_needs_write(bus))
1696                         flags |= POLLOUT;
1697
1698                 flags |= POLLIN;
1699
1700         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1701                 if (bus->rqueue_size <= 0)
1702                         flags |= POLLIN;
1703                 if (bus->wqueue_size > 0)
1704                         flags |= POLLOUT;
1705         }
1706
1707         return flags;
1708 }
1709
1710 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1711         struct reply_callback *c;
1712
1713         assert_return(bus, -EINVAL);
1714         assert_return(timeout_usec, -EINVAL);
1715         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1716         assert_return(!bus_pid_changed(bus), -ECHILD);
1717
1718         if (bus->state == BUS_AUTHENTICATING) {
1719                 *timeout_usec = bus->auth_timeout;
1720                 return 1;
1721         }
1722
1723         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1724                 *timeout_usec = (uint64_t) -1;
1725                 return 0;
1726         }
1727
1728         if (bus->rqueue_size > 0) {
1729                 *timeout_usec = 0;
1730                 return 1;
1731         }
1732
1733         c = prioq_peek(bus->reply_callbacks_prioq);
1734         if (!c) {
1735                 *timeout_usec = (uint64_t) -1;
1736                 return 0;
1737         }
1738
1739         *timeout_usec = c->timeout;
1740         return 1;
1741 }
1742
1743 static int process_timeout(sd_bus *bus) {
1744         _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1745         struct reply_callback *c;
1746         usec_t n;
1747         int r;
1748
1749         assert(bus);
1750
1751         c = prioq_peek(bus->reply_callbacks_prioq);
1752         if (!c)
1753                 return 0;
1754
1755         n = now(CLOCK_MONOTONIC);
1756         if (c->timeout > n)
1757                 return 0;
1758
1759         r = bus_message_new_synthetic_error(
1760                         bus,
1761                         c->serial,
1762                         &SD_BUS_ERROR_MAKE(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1763                         &m);
1764         if (r < 0)
1765                 return r;
1766
1767         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1768         hashmap_remove(bus->reply_callbacks, &c->serial);
1769
1770         r = c->callback(bus, m, c->userdata);
1771         free(c);
1772
1773         return r < 0 ? r : 1;
1774 }
1775
1776 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1777         assert(bus);
1778         assert(m);
1779
1780         if (bus->state != BUS_HELLO)
1781                 return 0;
1782
1783         /* Let's make sure the first message on the bus is the HELLO
1784          * reply. But note that we don't actually parse the message
1785          * here (we leave that to the usual handling), we just verify
1786          * we don't let any earlier msg through. */
1787
1788         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1789             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1790                 return -EIO;
1791
1792         if (m->reply_serial != bus->hello_serial)
1793                 return -EIO;
1794
1795         return 0;
1796 }
1797
1798 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1799         struct reply_callback *c;
1800         int r;
1801
1802         assert(bus);
1803         assert(m);
1804
1805         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1806             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1807                 return 0;
1808
1809         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1810         if (!c)
1811                 return 0;
1812
1813         if (c->timeout != 0)
1814                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1815
1816         r = sd_bus_message_rewind(m, true);
1817         if (r < 0)
1818                 return r;
1819
1820         r = c->callback(bus, m, c->userdata);
1821         free(c);
1822
1823         return r;
1824 }
1825
1826 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1827         struct filter_callback *l;
1828         int r;
1829
1830         assert(bus);
1831         assert(m);
1832
1833         do {
1834                 bus->filter_callbacks_modified = false;
1835
1836                 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1837
1838                         if (bus->filter_callbacks_modified)
1839                                 break;
1840
1841                         /* Don't run this more than once per iteration */
1842                         if (l->last_iteration == bus->iteration_counter)
1843                                 continue;
1844
1845                         l->last_iteration = bus->iteration_counter;
1846
1847                         r = sd_bus_message_rewind(m, true);
1848                         if (r < 0)
1849                                 return r;
1850
1851                         r = l->callback(bus, m, l->userdata);
1852                         if (r != 0)
1853                                 return r;
1854
1855                 }
1856
1857         } while (bus->filter_callbacks_modified);
1858
1859         return 0;
1860 }
1861
1862 static int process_match(sd_bus *bus, sd_bus_message *m) {
1863         int r;
1864
1865         assert(bus);
1866         assert(m);
1867
1868         do {
1869                 bus->match_callbacks_modified = false;
1870
1871                 r = bus_match_run(bus, &bus->match_callbacks, m);
1872                 if (r != 0)
1873                         return r;
1874
1875         } while (bus->match_callbacks_modified);
1876
1877         return 0;
1878 }
1879
1880 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1881         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1882         int r;
1883
1884         assert(bus);
1885         assert(m);
1886
1887         if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
1888                 return 0;
1889
1890         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1891                 return 0;
1892
1893         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1894                 return 1;
1895
1896         if (streq_ptr(m->member, "Ping"))
1897                 r = sd_bus_message_new_method_return(bus, m, &reply);
1898         else if (streq_ptr(m->member, "GetMachineId")) {
1899                 sd_id128_t id;
1900                 char sid[33];
1901
1902                 r = sd_id128_get_machine(&id);
1903                 if (r < 0)
1904                         return r;
1905
1906                 r = sd_bus_message_new_method_return(bus, m, &reply);
1907                 if (r < 0)
1908                         return r;
1909
1910                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1911         } else {
1912                 r = sd_bus_message_new_method_errorf(
1913                                 bus, m, &reply,
1914                                 SD_BUS_ERROR_UNKNOWN_METHOD,
1915                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1916         }
1917
1918         if (r < 0)
1919                 return r;
1920
1921         r = sd_bus_send(bus, reply, NULL);
1922         if (r < 0)
1923                 return r;
1924
1925         return 1;
1926 }
1927
1928 static int process_message(sd_bus *bus, sd_bus_message *m) {
1929         int r;
1930
1931         assert(bus);
1932         assert(m);
1933
1934         bus->current = m;
1935         bus->iteration_counter++;
1936
1937         log_debug("Got message sender=%s object=%s interface=%s member=%s",
1938                   strna(sd_bus_message_get_sender(m)),
1939                   strna(sd_bus_message_get_path(m)),
1940                   strna(sd_bus_message_get_interface(m)),
1941                   strna(sd_bus_message_get_member(m)));
1942
1943         r = process_hello(bus, m);
1944         if (r != 0)
1945                 goto finish;
1946
1947         r = process_reply(bus, m);
1948         if (r != 0)
1949                 goto finish;
1950
1951         r = process_filter(bus, m);
1952         if (r != 0)
1953                 goto finish;
1954
1955         r = process_match(bus, m);
1956         if (r != 0)
1957                 goto finish;
1958
1959         r = process_builtin(bus, m);
1960         if (r != 0)
1961                 goto finish;
1962
1963         r = bus_process_object(bus, m);
1964
1965 finish:
1966         bus->current = NULL;
1967         return r;
1968 }
1969
1970 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1971         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1972         int r;
1973
1974         assert(bus);
1975         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1976
1977         r = process_timeout(bus);
1978         if (r != 0)
1979                 goto null_message;
1980
1981         r = dispatch_wqueue(bus);
1982         if (r != 0)
1983                 goto null_message;
1984
1985         r = dispatch_rqueue(bus, &m);
1986         if (r < 0)
1987                 return r;
1988         if (!m)
1989                 goto null_message;
1990
1991         r = process_message(bus, m);
1992         if (r != 0)
1993                 goto null_message;
1994
1995         if (ret) {
1996                 r = sd_bus_message_rewind(m, true);
1997                 if (r < 0)
1998                         return r;
1999
2000                 *ret = m;
2001                 m = NULL;
2002                 return 1;
2003         }
2004
2005         if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2006
2007                 r = sd_bus_reply_method_errorf(
2008                                 bus, m,
2009                                 SD_BUS_ERROR_UNKNOWN_OBJECT,
2010                                 "Unknown object '%s'.", m->path);
2011                 if (r < 0)
2012                         return r;
2013         }
2014
2015         return 1;
2016
2017 null_message:
2018         if (r >= 0 && ret)
2019                 *ret = NULL;
2020
2021         return r;
2022 }
2023
2024 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2025         BUS_DONT_DESTROY(bus);
2026         int r;
2027
2028         /* Returns 0 when we didn't do anything. This should cause the
2029          * caller to invoke sd_bus_wait() before returning the next
2030          * time. Returns > 0 when we did something, which possibly
2031          * means *ret is filled in with an unprocessed message. */
2032
2033         assert_return(bus, -EINVAL);
2034         assert_return(!bus_pid_changed(bus), -ECHILD);
2035
2036         /* We don't allow recursively invoking sd_bus_process(). */
2037         assert_return(!bus->processing, -EBUSY);
2038
2039         switch (bus->state) {
2040
2041         case BUS_UNSET:
2042         case BUS_CLOSED:
2043                 return -ENOTCONN;
2044
2045         case BUS_OPENING:
2046                 r = bus_socket_process_opening(bus);
2047                 if (r < 0)
2048                         return r;
2049                 if (ret)
2050                         *ret = NULL;
2051                 return r;
2052
2053         case BUS_AUTHENTICATING:
2054
2055                 r = bus_socket_process_authenticating(bus);
2056                 if (r < 0)
2057                         return r;
2058                 if (ret)
2059                         *ret = NULL;
2060                 return r;
2061
2062         case BUS_RUNNING:
2063         case BUS_HELLO:
2064
2065                 bus->processing = true;
2066                 r = process_running(bus, ret);
2067                 bus->processing = false;
2068
2069                 return r;
2070         }
2071
2072         assert_not_reached("Unknown state");
2073 }
2074
2075 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2076         struct pollfd p[2] = {};
2077         int r, e, n;
2078         struct timespec ts;
2079         usec_t m = (usec_t) -1;
2080
2081         assert(bus);
2082         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2083
2084         e = sd_bus_get_events(bus);
2085         if (e < 0)
2086                 return e;
2087
2088         if (need_more)
2089                 /* The caller really needs some more data, he doesn't
2090                  * care about what's already read, or any timeouts
2091                  * except its own.*/
2092                 e |= POLLIN;
2093         else {
2094                 usec_t until;
2095                 /* The caller wants to process if there's something to
2096                  * process, but doesn't care otherwise */
2097
2098                 r = sd_bus_get_timeout(bus, &until);
2099                 if (r < 0)
2100                         return r;
2101                 if (r > 0) {
2102                         usec_t nw;
2103                         nw = now(CLOCK_MONOTONIC);
2104                         m = until > nw ? until - nw : 0;
2105                 }
2106         }
2107
2108         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2109                 m = timeout_usec;
2110
2111         p[0].fd = bus->input_fd;
2112         if (bus->output_fd == bus->input_fd) {
2113                 p[0].events = e;
2114                 n = 1;
2115         } else {
2116                 p[0].events = e & POLLIN;
2117                 p[1].fd = bus->output_fd;
2118                 p[1].events = e & POLLOUT;
2119                 n = 2;
2120         }
2121
2122         r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2123         if (r < 0)
2124                 return -errno;
2125
2126         return r > 0 ? 1 : 0;
2127 }
2128
2129 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2130
2131         assert_return(bus, -EINVAL);
2132         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2133         assert_return(!bus_pid_changed(bus), -ECHILD);
2134
2135         if (bus->rqueue_size > 0)
2136                 return 0;
2137
2138         return bus_poll(bus, false, timeout_usec);
2139 }
2140
2141 int sd_bus_flush(sd_bus *bus) {
2142         int r;
2143
2144         assert_return(bus, -EINVAL);
2145         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2146         assert_return(!bus_pid_changed(bus), -ECHILD);
2147
2148         r = bus_ensure_running(bus);
2149         if (r < 0)
2150                 return r;
2151
2152         if (bus->wqueue_size <= 0)
2153                 return 0;
2154
2155         for (;;) {
2156                 r = dispatch_wqueue(bus);
2157                 if (r < 0)
2158                         return r;
2159
2160                 if (bus->wqueue_size <= 0)
2161                         return 0;
2162
2163                 r = bus_poll(bus, false, (uint64_t) -1);
2164                 if (r < 0)
2165                         return r;
2166         }
2167 }
2168
2169 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2170         struct filter_callback *f;
2171
2172         assert_return(bus, -EINVAL);
2173         assert_return(callback, -EINVAL);
2174         assert_return(!bus_pid_changed(bus), -ECHILD);
2175
2176         f = new0(struct filter_callback, 1);
2177         if (!f)
2178                 return -ENOMEM;
2179         f->callback = callback;
2180         f->userdata = userdata;
2181
2182         bus->filter_callbacks_modified = true;
2183         LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2184         return 0;
2185 }
2186
2187 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2188         struct filter_callback *f;
2189
2190         assert_return(bus, -EINVAL);
2191         assert_return(callback, -EINVAL);
2192         assert_return(!bus_pid_changed(bus), -ECHILD);
2193
2194         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2195                 if (f->callback == callback && f->userdata == userdata) {
2196                         bus->filter_callbacks_modified = true;
2197                         LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2198                         free(f);
2199                         return 1;
2200                 }
2201         }
2202
2203         return 0;
2204 }
2205
2206 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2207         struct bus_match_component *components = NULL;
2208         unsigned n_components = 0;
2209         uint64_t cookie = 0;
2210         int r = 0;
2211
2212         assert_return(bus, -EINVAL);
2213         assert_return(match, -EINVAL);
2214         assert_return(!bus_pid_changed(bus), -ECHILD);
2215
2216         r = bus_match_parse(match, &components, &n_components);
2217         if (r < 0)
2218                 goto finish;
2219
2220         if (bus->bus_client) {
2221                 cookie = ++bus->match_cookie;
2222
2223                 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2224                 if (r < 0)
2225                         goto finish;
2226         }
2227
2228         bus->match_callbacks_modified = true;
2229         r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2230         if (r < 0) {
2231                 if (bus->bus_client)
2232                         bus_remove_match_internal(bus, match, cookie);
2233         }
2234
2235 finish:
2236         bus_match_parse_free(components, n_components);
2237         return r;
2238 }
2239
2240 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2241         struct bus_match_component *components = NULL;
2242         unsigned n_components = 0;
2243         int r = 0, q = 0;
2244         uint64_t cookie = 0;
2245
2246         assert_return(bus, -EINVAL);
2247         assert_return(match, -EINVAL);
2248         assert_return(!bus_pid_changed(bus), -ECHILD);
2249
2250         r = bus_match_parse(match, &components, &n_components);
2251         if (r < 0)
2252                 return r;
2253
2254         bus->match_callbacks_modified = true;
2255         r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2256
2257         if (bus->bus_client)
2258                 q = bus_remove_match_internal(bus, match, cookie);
2259
2260         bus_match_parse_free(components, n_components);
2261
2262         return r < 0 ? r : q;
2263 }
2264
2265 bool bus_pid_changed(sd_bus *bus) {
2266         assert(bus);
2267
2268         /* We don't support people creating a bus connection and
2269          * keeping it around over a fork(). Let's complain. */
2270
2271         return bus->original_pid != getpid();
2272 }
2273
2274 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2275         void *bus = userdata;
2276         int r;
2277
2278         assert(bus);
2279
2280         r = sd_bus_process(bus, NULL);
2281         if (r < 0)
2282                 return r;
2283
2284         return 1;
2285 }
2286
2287 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2288         void *bus = userdata;
2289         int r;
2290
2291         assert(bus);
2292
2293         r = sd_bus_process(bus, NULL);
2294         if (r < 0)
2295                 return r;
2296
2297         return 1;
2298 }
2299
2300 static int prepare_callback(sd_event_source *s, void *userdata) {
2301         sd_bus *bus = userdata;
2302         int r, e;
2303         usec_t until;
2304
2305         assert(s);
2306         assert(bus);
2307
2308         e = sd_bus_get_events(bus);
2309         if (e < 0)
2310                 return e;
2311
2312         if (bus->output_fd != bus->input_fd) {
2313
2314                 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2315                 if (r < 0)
2316                         return r;
2317
2318                 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2319                 if (r < 0)
2320                         return r;
2321         } else {
2322                 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2323                 if (r < 0)
2324                         return r;
2325         }
2326
2327         r = sd_bus_get_timeout(bus, &until);
2328         if (r < 0)
2329                 return r;
2330         if (r > 0) {
2331                 int j;
2332
2333                 j = sd_event_source_set_time(bus->time_event_source, until);
2334                 if (j < 0)
2335                         return j;
2336         }
2337
2338         r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2339         if (r < 0)
2340                 return r;
2341
2342         return 1;
2343 }
2344
2345 static int quit_callback(sd_event_source *event, void *userdata) {
2346         sd_bus *bus = userdata;
2347
2348         assert(event);
2349
2350         sd_bus_flush(bus);
2351
2352         return 1;
2353 }
2354
2355 int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2356         int r;
2357
2358         assert_return(bus, -EINVAL);
2359         assert_return(event, -EINVAL);
2360         assert_return(!bus->event, -EBUSY);
2361
2362         assert(!bus->input_io_event_source);
2363         assert(!bus->output_io_event_source);
2364         assert(!bus->time_event_source);
2365
2366         bus->event = sd_event_ref(event);
2367
2368         r = sd_event_add_io(event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2369         if (r < 0)
2370                 goto fail;
2371
2372         r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2373         if (r < 0)
2374                 goto fail;
2375
2376         if (bus->output_fd != bus->input_fd) {
2377                 r = sd_event_add_io(event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2378                 if (r < 0)
2379                         goto fail;
2380
2381                 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2382                 if (r < 0)
2383                         goto fail;
2384         }
2385
2386         r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2387         if (r < 0)
2388                 goto fail;
2389
2390         r = sd_event_add_monotonic(event, 0, 0, time_callback, bus, &bus->time_event_source);
2391         if (r < 0)
2392                 goto fail;
2393
2394         r = sd_event_source_set_priority(bus->time_event_source, priority);
2395         if (r < 0)
2396                 goto fail;
2397
2398         r = sd_event_add_quit(event, quit_callback, bus, &bus->quit_event_source);
2399         if (r < 0)
2400                 goto fail;
2401
2402         return 0;
2403
2404 fail:
2405         sd_bus_detach_event(bus);
2406         return r;
2407 }
2408
2409 int sd_bus_detach_event(sd_bus *bus) {
2410         assert_return(bus, -EINVAL);
2411         assert_return(bus->event, -ENXIO);
2412
2413         if (bus->input_io_event_source)
2414                 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2415
2416         if (bus->output_io_event_source)
2417                 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2418
2419         if (bus->time_event_source)
2420                 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2421
2422         if (bus->quit_event_source)
2423                 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2424
2425         if (bus->event)
2426                 bus->event = sd_event_unref(bus->event);
2427
2428         return 0;
2429 }
2430
2431 sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2432         assert_return(bus, NULL);
2433
2434         return bus->current;
2435 }