chiark / gitweb /
hashmap: be a bit more conservative with pre-allocating hash tables and items
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29 #include <sys/mman.h>
30 #include <pthread.h>
31
32 #include "util.h"
33 #include "macro.h"
34 #include "strv.h"
35 #include "set.h"
36 #include "missing.h"
37
38 #include "sd-bus.h"
39 #include "bus-internal.h"
40 #include "bus-message.h"
41 #include "bus-type.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
48 #include "bus-util.h"
49 #include "bus-container.h"
50
51 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
52
53 static void bus_close_fds(sd_bus *b) {
54         assert(b);
55
56         if (b->input_fd >= 0)
57                 close_nointr_nofail(b->input_fd);
58
59         if (b->output_fd >= 0 && b->output_fd != b->input_fd)
60                 close_nointr_nofail(b->output_fd);
61
62         b->input_fd = b->output_fd = -1;
63 }
64
65 static void bus_node_destroy(sd_bus *b, struct node *n) {
66         struct node_callback *c;
67         struct node_vtable *v;
68         struct node_enumerator *e;
69
70         assert(b);
71
72         if (!n)
73                 return;
74
75         while (n->child)
76                 bus_node_destroy(b, n->child);
77
78         while ((c = n->callbacks)) {
79                 LIST_REMOVE(callbacks, n->callbacks, c);
80                 free(c);
81         }
82
83         while ((v = n->vtables)) {
84                 LIST_REMOVE(vtables, n->vtables, v);
85                 free(v->interface);
86                 free(v);
87         }
88
89         while ((e = n->enumerators)) {
90                 LIST_REMOVE(enumerators, n->enumerators, e);
91                 free(e);
92         }
93
94         if (n->parent)
95                 LIST_REMOVE(siblings, n->parent->child, n);
96
97         assert_se(hashmap_remove(b->nodes, n->path) == n);
98         free(n->path);
99         free(n);
100 }
101
102 static void bus_free(sd_bus *b) {
103         struct filter_callback *f;
104         struct node *n;
105         unsigned i;
106
107         assert(b);
108
109         sd_bus_detach_event(b);
110
111         bus_close_fds(b);
112
113         if (b->kdbus_buffer)
114                 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
115
116         free(b->rbuffer);
117         free(b->unique_name);
118         free(b->auth_buffer);
119         free(b->address);
120         free(b->kernel);
121         free(b->machine);
122
123         free(b->exec_path);
124         strv_free(b->exec_argv);
125
126         close_many(b->fds, b->n_fds);
127         free(b->fds);
128
129         for (i = 0; i < b->rqueue_size; i++)
130                 sd_bus_message_unref(b->rqueue[i]);
131         free(b->rqueue);
132
133         for (i = 0; i < b->wqueue_size; i++)
134                 sd_bus_message_unref(b->wqueue[i]);
135         free(b->wqueue);
136
137         hashmap_free_free(b->reply_callbacks);
138         prioq_free(b->reply_callbacks_prioq);
139
140         while ((f = b->filter_callbacks)) {
141                 LIST_REMOVE(callbacks, b->filter_callbacks, f);
142                 free(f);
143         }
144
145         bus_match_free(&b->match_callbacks);
146
147         hashmap_free_free(b->vtable_methods);
148         hashmap_free_free(b->vtable_properties);
149
150         while ((n = hashmap_first(b->nodes)))
151                 bus_node_destroy(b, n);
152
153         hashmap_free(b->nodes);
154
155         bus_kernel_flush_memfd(b);
156
157         assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
158
159         free(b);
160 }
161
162 _public_ int sd_bus_new(sd_bus **ret) {
163         sd_bus *r;
164
165         assert_return(ret, -EINVAL);
166
167         r = new0(sd_bus, 1);
168         if (!r)
169                 return -ENOMEM;
170
171         r->n_ref = REFCNT_INIT;
172         r->input_fd = r->output_fd = -1;
173         r->message_version = 1;
174         r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
175         r->original_pid = getpid();
176
177         assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
178
179         /* We guarantee that wqueue always has space for at least one
180          * entry */
181         r->wqueue = new(sd_bus_message*, 1);
182         if (!r->wqueue) {
183                 free(r);
184                 return -ENOMEM;
185         }
186
187         *ret = r;
188         return 0;
189 }
190
191 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
192         char *a;
193
194         assert_return(bus, -EINVAL);
195         assert_return(bus->state == BUS_UNSET, -EPERM);
196         assert_return(address, -EINVAL);
197         assert_return(!bus_pid_changed(bus), -ECHILD);
198
199         a = strdup(address);
200         if (!a)
201                 return -ENOMEM;
202
203         free(bus->address);
204         bus->address = a;
205
206         return 0;
207 }
208
209 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
210         assert_return(bus, -EINVAL);
211         assert_return(bus->state == BUS_UNSET, -EPERM);
212         assert_return(input_fd >= 0, -EINVAL);
213         assert_return(output_fd >= 0, -EINVAL);
214         assert_return(!bus_pid_changed(bus), -ECHILD);
215
216         bus->input_fd = input_fd;
217         bus->output_fd = output_fd;
218         return 0;
219 }
220
221 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
222         char *p, **a;
223
224         assert_return(bus, -EINVAL);
225         assert_return(bus->state == BUS_UNSET, -EPERM);
226         assert_return(path, -EINVAL);
227         assert_return(!strv_isempty(argv), -EINVAL);
228         assert_return(!bus_pid_changed(bus), -ECHILD);
229
230         p = strdup(path);
231         if (!p)
232                 return -ENOMEM;
233
234         a = strv_copy(argv);
235         if (!a) {
236                 free(p);
237                 return -ENOMEM;
238         }
239
240         free(bus->exec_path);
241         strv_free(bus->exec_argv);
242
243         bus->exec_path = p;
244         bus->exec_argv = a;
245
246         return 0;
247 }
248
249 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
250         assert_return(bus, -EINVAL);
251         assert_return(bus->state == BUS_UNSET, -EPERM);
252         assert_return(!bus_pid_changed(bus), -ECHILD);
253
254         bus->bus_client = !!b;
255         return 0;
256 }
257
258 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
259         assert_return(bus, -EINVAL);
260         assert_return(bus->state == BUS_UNSET, -EPERM);
261         assert_return(!bus_pid_changed(bus), -ECHILD);
262
263         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
264         return 0;
265 }
266
267 _public_ int sd_bus_negotiate_attach_timestamp(sd_bus *bus, int b) {
268         assert_return(bus, -EINVAL);
269         assert_return(bus->state == BUS_UNSET, -EPERM);
270         assert_return(!bus_pid_changed(bus), -ECHILD);
271
272         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_TIMESTAMP, b);
273         return 0;
274 }
275
276 _public_ int sd_bus_negotiate_attach_creds(sd_bus *bus, int b) {
277         assert_return(bus, -EINVAL);
278         assert_return(bus->state == BUS_UNSET, -EPERM);
279         assert_return(!bus_pid_changed(bus), -ECHILD);
280
281         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CREDS, b);
282         return 0;
283 }
284
285 _public_ int sd_bus_negotiate_attach_comm(sd_bus *bus, int b) {
286         assert_return(bus, -EINVAL);
287         assert_return(bus->state == BUS_UNSET, -EPERM);
288         assert_return(!bus_pid_changed(bus), -ECHILD);
289
290         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_COMM, b);
291         return 0;
292 }
293
294 _public_ int sd_bus_negotiate_attach_exe(sd_bus *bus, int b) {
295         assert_return(bus, -EINVAL);
296         assert_return(bus->state == BUS_UNSET, -EPERM);
297         assert_return(!bus_pid_changed(bus), -ECHILD);
298
299         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_EXE, b);
300         return 0;
301 }
302
303 _public_ int sd_bus_negotiate_attach_cmdline(sd_bus *bus, int b) {
304         assert_return(bus, -EINVAL);
305         assert_return(bus->state == BUS_UNSET, -EPERM);
306         assert_return(!bus_pid_changed(bus), -ECHILD);
307
308         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CMDLINE, b);
309         return 0;
310 }
311
312 _public_ int sd_bus_negotiate_attach_cgroup(sd_bus *bus, int b) {
313         assert_return(bus, -EINVAL);
314         assert_return(bus->state == BUS_UNSET, -EPERM);
315         assert_return(!bus_pid_changed(bus), -ECHILD);
316
317         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CGROUP, b);
318         return 0;
319 }
320
321 _public_ int sd_bus_negotiate_attach_caps(sd_bus *bus, int b) {
322         assert_return(bus, -EINVAL);
323         assert_return(bus->state == BUS_UNSET, -EPERM);
324         assert_return(!bus_pid_changed(bus), -ECHILD);
325
326         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CAPS, b);
327         return 0;
328 }
329
330 _public_ int sd_bus_negotiate_attach_selinux_context(sd_bus *bus, int b) {
331         assert_return(bus, -EINVAL);
332         assert_return(bus->state == BUS_UNSET, -EPERM);
333         assert_return(!bus_pid_changed(bus), -ECHILD);
334
335         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_SECLABEL, b);
336         return 0;
337 }
338
339 _public_ int sd_bus_negotiate_attach_audit(sd_bus *bus, int b) {
340         assert_return(bus, -EINVAL);
341         assert_return(bus->state == BUS_UNSET, -EPERM);
342         assert_return(!bus_pid_changed(bus), -ECHILD);
343
344         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_AUDIT, b);
345         return 0;
346 }
347
348 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
349         assert_return(bus, -EINVAL);
350         assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
351         assert_return(bus->state == BUS_UNSET, -EPERM);
352         assert_return(!bus_pid_changed(bus), -ECHILD);
353
354         bus->is_server = !!b;
355         bus->server_id = server_id;
356         return 0;
357 }
358
359 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
360         assert_return(bus, -EINVAL);
361         assert_return(bus->state == BUS_UNSET, -EPERM);
362         assert_return(!bus_pid_changed(bus), -ECHILD);
363
364         bus->anonymous_auth = !!b;
365         return 0;
366 }
367
368 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
369         const char *s;
370         int r;
371
372         assert(bus);
373         assert(bus->state == BUS_HELLO);
374         assert(reply);
375
376         r = sd_bus_message_get_errno(reply);
377         if (r < 0)
378                 return r;
379         if (r > 0)
380                 return -r;
381
382         r = sd_bus_message_read(reply, "s", &s);
383         if (r < 0)
384                 return r;
385
386         if (!service_name_is_valid(s) || s[0] != ':')
387                 return -EBADMSG;
388
389         bus->unique_name = strdup(s);
390         if (!bus->unique_name)
391                 return -ENOMEM;
392
393         bus->state = BUS_RUNNING;
394
395         return 1;
396 }
397
398 static int bus_send_hello(sd_bus *bus) {
399         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
400         int r;
401
402         assert(bus);
403
404         if (!bus->bus_client || bus->is_kernel)
405                 return 0;
406
407         r = sd_bus_message_new_method_call(
408                         bus,
409                         "org.freedesktop.DBus",
410                         "/",
411                         "org.freedesktop.DBus",
412                         "Hello",
413                         &m);
414         if (r < 0)
415                 return r;
416
417         return sd_bus_call_async(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
418 }
419
420 int bus_start_running(sd_bus *bus) {
421         assert(bus);
422
423         if (bus->bus_client && !bus->is_kernel) {
424                 bus->state = BUS_HELLO;
425                 return 1;
426         }
427
428         bus->state = BUS_RUNNING;
429         return 1;
430 }
431
432 static int parse_address_key(const char **p, const char *key, char **value) {
433         size_t l, n = 0;
434         const char *a;
435         char *r = NULL;
436
437         assert(p);
438         assert(*p);
439         assert(value);
440
441         if (key) {
442                 l = strlen(key);
443                 if (strncmp(*p, key, l) != 0)
444                         return 0;
445
446                 if ((*p)[l] != '=')
447                         return 0;
448
449                 if (*value)
450                         return -EINVAL;
451
452                 a = *p + l + 1;
453         } else
454                 a = *p;
455
456         while (*a != ';' && *a != ',' && *a != 0) {
457                 char c, *t;
458
459                 if (*a == '%') {
460                         int x, y;
461
462                         x = unhexchar(a[1]);
463                         if (x < 0) {
464                                 free(r);
465                                 return x;
466                         }
467
468                         y = unhexchar(a[2]);
469                         if (y < 0) {
470                                 free(r);
471                                 return y;
472                         }
473
474                         c = (char) ((x << 4) | y);
475                         a += 3;
476                 } else {
477                         c = *a;
478                         a++;
479                 }
480
481                 t = realloc(r, n + 2);
482                 if (!t) {
483                         free(r);
484                         return -ENOMEM;
485                 }
486
487                 r = t;
488                 r[n++] = c;
489         }
490
491         if (!r) {
492                 r = strdup("");
493                 if (!r)
494                         return -ENOMEM;
495         } else
496                 r[n] = 0;
497
498         if (*a == ',')
499                 a++;
500
501         *p = a;
502
503         free(*value);
504         *value = r;
505
506         return 1;
507 }
508
509 static void skip_address_key(const char **p) {
510         assert(p);
511         assert(*p);
512
513         *p += strcspn(*p, ",");
514
515         if (**p == ',')
516                 (*p) ++;
517 }
518
519 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
520         _cleanup_free_ char *path = NULL, *abstract = NULL;
521         size_t l;
522         int r;
523
524         assert(b);
525         assert(p);
526         assert(*p);
527         assert(guid);
528
529         while (**p != 0 && **p != ';') {
530                 r = parse_address_key(p, "guid", guid);
531                 if (r < 0)
532                         return r;
533                 else if (r > 0)
534                         continue;
535
536                 r = parse_address_key(p, "path", &path);
537                 if (r < 0)
538                         return r;
539                 else if (r > 0)
540                         continue;
541
542                 r = parse_address_key(p, "abstract", &abstract);
543                 if (r < 0)
544                         return r;
545                 else if (r > 0)
546                         continue;
547
548                 skip_address_key(p);
549         }
550
551         if (!path && !abstract)
552                 return -EINVAL;
553
554         if (path && abstract)
555                 return -EINVAL;
556
557         if (path) {
558                 l = strlen(path);
559                 if (l > sizeof(b->sockaddr.un.sun_path))
560                         return -E2BIG;
561
562                 b->sockaddr.un.sun_family = AF_UNIX;
563                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
564                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
565         } else if (abstract) {
566                 l = strlen(abstract);
567                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
568                         return -E2BIG;
569
570                 b->sockaddr.un.sun_family = AF_UNIX;
571                 b->sockaddr.un.sun_path[0] = 0;
572                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
573                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
574         }
575
576         return 0;
577 }
578
579 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
580         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
581         int r;
582         struct addrinfo *result, hints = {
583                 .ai_socktype = SOCK_STREAM,
584                 .ai_flags = AI_ADDRCONFIG,
585         };
586
587         assert(b);
588         assert(p);
589         assert(*p);
590         assert(guid);
591
592         while (**p != 0 && **p != ';') {
593                 r = parse_address_key(p, "guid", guid);
594                 if (r < 0)
595                         return r;
596                 else if (r > 0)
597                         continue;
598
599                 r = parse_address_key(p, "host", &host);
600                 if (r < 0)
601                         return r;
602                 else if (r > 0)
603                         continue;
604
605                 r = parse_address_key(p, "port", &port);
606                 if (r < 0)
607                         return r;
608                 else if (r > 0)
609                         continue;
610
611                 r = parse_address_key(p, "family", &family);
612                 if (r < 0)
613                         return r;
614                 else if (r > 0)
615                         continue;
616
617                 skip_address_key(p);
618         }
619
620         if (!host || !port)
621                 return -EINVAL;
622
623         if (family) {
624                 if (streq(family, "ipv4"))
625                         hints.ai_family = AF_INET;
626                 else if (streq(family, "ipv6"))
627                         hints.ai_family = AF_INET6;
628                 else
629                         return -EINVAL;
630         }
631
632         r = getaddrinfo(host, port, &hints, &result);
633         if (r == EAI_SYSTEM)
634                 return -errno;
635         else if (r != 0)
636                 return -EADDRNOTAVAIL;
637
638         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
639         b->sockaddr_size = result->ai_addrlen;
640
641         freeaddrinfo(result);
642
643         return 0;
644 }
645
646 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
647         char *path = NULL;
648         unsigned n_argv = 0, j;
649         char **argv = NULL;
650         int r;
651
652         assert(b);
653         assert(p);
654         assert(*p);
655         assert(guid);
656
657         while (**p != 0 && **p != ';') {
658                 r = parse_address_key(p, "guid", guid);
659                 if (r < 0)
660                         goto fail;
661                 else if (r > 0)
662                         continue;
663
664                 r = parse_address_key(p, "path", &path);
665                 if (r < 0)
666                         goto fail;
667                 else if (r > 0)
668                         continue;
669
670                 if (startswith(*p, "argv")) {
671                         unsigned ul;
672
673                         errno = 0;
674                         ul = strtoul(*p + 4, (char**) p, 10);
675                         if (errno > 0 || **p != '=' || ul > 256) {
676                                 r = -EINVAL;
677                                 goto fail;
678                         }
679
680                         (*p) ++;
681
682                         if (ul >= n_argv) {
683                                 char **x;
684
685                                 x = realloc(argv, sizeof(char*) * (ul + 2));
686                                 if (!x) {
687                                         r = -ENOMEM;
688                                         goto fail;
689                                 }
690
691                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
692
693                                 argv = x;
694                                 n_argv = ul + 1;
695                         }
696
697                         r = parse_address_key(p, NULL, argv + ul);
698                         if (r < 0)
699                                 goto fail;
700
701                         continue;
702                 }
703
704                 skip_address_key(p);
705         }
706
707         if (!path) {
708                 r = -EINVAL;
709                 goto fail;
710         }
711
712         /* Make sure there are no holes in the array, with the
713          * exception of argv[0] */
714         for (j = 1; j < n_argv; j++)
715                 if (!argv[j]) {
716                         r = -EINVAL;
717                         goto fail;
718                 }
719
720         if (argv && argv[0] == NULL) {
721                 argv[0] = strdup(path);
722                 if (!argv[0]) {
723                         r = -ENOMEM;
724                         goto fail;
725                 }
726         }
727
728         b->exec_path = path;
729         b->exec_argv = argv;
730         return 0;
731
732 fail:
733         for (j = 0; j < n_argv; j++)
734                 free(argv[j]);
735
736         free(argv);
737         free(path);
738         return r;
739 }
740
741 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
742         _cleanup_free_ char *path = NULL;
743         int r;
744
745         assert(b);
746         assert(p);
747         assert(*p);
748         assert(guid);
749
750         while (**p != 0 && **p != ';') {
751                 r = parse_address_key(p, "guid", guid);
752                 if (r < 0)
753                         return r;
754                 else if (r > 0)
755                         continue;
756
757                 r = parse_address_key(p, "path", &path);
758                 if (r < 0)
759                         return r;
760                 else if (r > 0)
761                         continue;
762
763                 skip_address_key(p);
764         }
765
766         if (!path)
767                 return -EINVAL;
768
769         free(b->kernel);
770         b->kernel = path;
771         path = NULL;
772
773         return 0;
774 }
775
776 static int parse_container_address(sd_bus *b, const char **p, char **guid) {
777         _cleanup_free_ char *machine = NULL;
778         int r;
779
780         assert(b);
781         assert(p);
782         assert(*p);
783         assert(guid);
784
785         while (**p != 0 && **p != ';') {
786                 r = parse_address_key(p, "guid", guid);
787                 if (r < 0)
788                         return r;
789                 else if (r > 0)
790                         continue;
791
792                 r = parse_address_key(p, "machine", &machine);
793                 if (r < 0)
794                         return r;
795                 else if (r > 0)
796                         continue;
797
798                 skip_address_key(p);
799         }
800
801         if (!machine)
802                 return -EINVAL;
803
804         free(b->machine);
805         b->machine = machine;
806         machine = NULL;
807
808         b->sockaddr.un.sun_family = AF_UNIX;
809         strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
810         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
811
812         return 0;
813 }
814
815 static void bus_reset_parsed_address(sd_bus *b) {
816         assert(b);
817
818         zero(b->sockaddr);
819         b->sockaddr_size = 0;
820         strv_free(b->exec_argv);
821         free(b->exec_path);
822         b->exec_path = NULL;
823         b->exec_argv = NULL;
824         b->server_id = SD_ID128_NULL;
825         free(b->kernel);
826         b->kernel = NULL;
827         free(b->machine);
828         b->machine = NULL;
829 }
830
831 static int bus_parse_next_address(sd_bus *b) {
832         _cleanup_free_ char *guid = NULL;
833         const char *a;
834         int r;
835
836         assert(b);
837
838         if (!b->address)
839                 return 0;
840         if (b->address[b->address_index] == 0)
841                 return 0;
842
843         bus_reset_parsed_address(b);
844
845         a = b->address + b->address_index;
846
847         while (*a != 0) {
848
849                 if (*a == ';') {
850                         a++;
851                         continue;
852                 }
853
854                 if (startswith(a, "unix:")) {
855                         a += 5;
856
857                         r = parse_unix_address(b, &a, &guid);
858                         if (r < 0)
859                                 return r;
860                         break;
861
862                 } else if (startswith(a, "tcp:")) {
863
864                         a += 4;
865                         r = parse_tcp_address(b, &a, &guid);
866                         if (r < 0)
867                                 return r;
868
869                         break;
870
871                 } else if (startswith(a, "unixexec:")) {
872
873                         a += 9;
874                         r = parse_exec_address(b, &a, &guid);
875                         if (r < 0)
876                                 return r;
877
878                         break;
879
880                 } else if (startswith(a, "kernel:")) {
881
882                         a += 7;
883                         r = parse_kernel_address(b, &a, &guid);
884                         if (r < 0)
885                                 return r;
886
887                         break;
888                 } else if (startswith(a, "x-container:")) {
889
890                         a += 12;
891                         r = parse_container_address(b, &a, &guid);
892                         if (r < 0)
893                                 return r;
894
895                         break;
896                 }
897
898                 a = strchr(a, ';');
899                 if (!a)
900                         return 0;
901         }
902
903         if (guid) {
904                 r = sd_id128_from_string(guid, &b->server_id);
905                 if (r < 0)
906                         return r;
907         }
908
909         b->address_index = a - b->address;
910         return 1;
911 }
912
913 static int bus_start_address(sd_bus *b) {
914         int r;
915
916         assert(b);
917
918         for (;;) {
919                 sd_bus_close(b);
920
921                 if (b->exec_path) {
922
923                         r = bus_socket_exec(b);
924                         if (r >= 0)
925                                 return r;
926
927                         b->last_connect_error = -r;
928                 } else if (b->kernel) {
929
930                         r = bus_kernel_connect(b);
931                         if (r >= 0)
932                                 return r;
933
934                         b->last_connect_error = -r;
935
936                 } else if (b->machine) {
937
938                         r = bus_container_connect(b);
939                         if (r >= 0)
940                                 return r;
941
942                         b->last_connect_error = -r;
943
944                 } else if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
945
946                         r = bus_socket_connect(b);
947                         if (r >= 0)
948                                 return r;
949
950                         b->last_connect_error = -r;
951                 }
952
953                 r = bus_parse_next_address(b);
954                 if (r < 0)
955                         return r;
956                 if (r == 0)
957                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
958         }
959 }
960
961 int bus_next_address(sd_bus *b) {
962         assert(b);
963
964         bus_reset_parsed_address(b);
965         return bus_start_address(b);
966 }
967
968 static int bus_start_fd(sd_bus *b) {
969         struct stat st;
970         int r;
971
972         assert(b);
973         assert(b->input_fd >= 0);
974         assert(b->output_fd >= 0);
975
976         r = fd_nonblock(b->input_fd, true);
977         if (r < 0)
978                 return r;
979
980         r = fd_cloexec(b->input_fd, true);
981         if (r < 0)
982                 return r;
983
984         if (b->input_fd != b->output_fd) {
985                 r = fd_nonblock(b->output_fd, true);
986                 if (r < 0)
987                         return r;
988
989                 r = fd_cloexec(b->output_fd, true);
990                 if (r < 0)
991                         return r;
992         }
993
994         if (fstat(b->input_fd, &st) < 0)
995                 return -errno;
996
997         if (S_ISCHR(b->input_fd))
998                 return bus_kernel_take_fd(b);
999         else
1000                 return bus_socket_take_fd(b);
1001 }
1002
1003 _public_ int sd_bus_start(sd_bus *bus) {
1004         int r;
1005
1006         assert_return(bus, -EINVAL);
1007         assert_return(bus->state == BUS_UNSET, -EPERM);
1008         assert_return(!bus_pid_changed(bus), -ECHILD);
1009
1010         bus->state = BUS_OPENING;
1011
1012         if (bus->is_server && bus->bus_client)
1013                 return -EINVAL;
1014
1015         if (bus->input_fd >= 0)
1016                 r = bus_start_fd(bus);
1017         else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
1018                 r = bus_start_address(bus);
1019         else
1020                 return -EINVAL;
1021
1022         if (r < 0)
1023                 return r;
1024
1025         return bus_send_hello(bus);
1026 }
1027
1028 _public_ int sd_bus_open_system(sd_bus **ret) {
1029         const char *e;
1030         sd_bus *b;
1031         int r;
1032
1033         assert_return(ret, -EINVAL);
1034
1035         r = sd_bus_new(&b);
1036         if (r < 0)
1037                 return r;
1038
1039         e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1040         if (e) {
1041                 r = sd_bus_set_address(b, e);
1042                 if (r < 0)
1043                         goto fail;
1044         } else {
1045                 b->sockaddr.un.sun_family = AF_UNIX;
1046                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
1047                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
1048         }
1049
1050         b->bus_client = true;
1051
1052         r = sd_bus_start(b);
1053         if (r < 0)
1054                 goto fail;
1055
1056         *ret = b;
1057         return 0;
1058
1059 fail:
1060         bus_free(b);
1061         return r;
1062 }
1063
1064 _public_ int sd_bus_open_user(sd_bus **ret) {
1065         const char *e;
1066         sd_bus *b;
1067         size_t l;
1068         int r;
1069
1070         assert_return(ret, -EINVAL);
1071
1072         r = sd_bus_new(&b);
1073         if (r < 0)
1074                 return r;
1075
1076         e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1077         if (e) {
1078                 r = sd_bus_set_address(b, e);
1079                 if (r < 0)
1080                         goto fail;
1081         } else {
1082                 e = secure_getenv("XDG_RUNTIME_DIR");
1083                 if (!e) {
1084                         r = -ENOENT;
1085                         goto fail;
1086                 }
1087
1088                 l = strlen(e);
1089                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1090                         r = -E2BIG;
1091                         goto fail;
1092                 }
1093
1094                 b->sockaddr.un.sun_family = AF_UNIX;
1095                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1096                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1097         }
1098
1099         b->bus_client = true;
1100
1101         r = sd_bus_start(b);
1102         if (r < 0)
1103                 goto fail;
1104
1105         *ret = b;
1106         return 0;
1107
1108 fail:
1109         bus_free(b);
1110         return r;
1111 }
1112
1113 _public_ int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1114         _cleanup_free_ char *e = NULL;
1115         char *p = NULL;
1116         sd_bus *bus;
1117         int r;
1118
1119         assert_return(host, -EINVAL);
1120         assert_return(ret, -EINVAL);
1121
1122         e = bus_address_escape(host);
1123         if (!e)
1124                 return -ENOMEM;
1125
1126         p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1127         if (!p)
1128                 return -ENOMEM;
1129
1130         r = sd_bus_new(&bus);
1131         if (r < 0) {
1132                 free(p);
1133                 return r;
1134         }
1135
1136         bus->address = p;
1137         bus->bus_client = true;
1138
1139         r = sd_bus_start(bus);
1140         if (r < 0) {
1141                 bus_free(bus);
1142                 return r;
1143         }
1144
1145         *ret = bus;
1146         return 0;
1147 }
1148
1149 _public_ int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1150         _cleanup_free_ char *e = NULL;
1151         sd_bus *bus;
1152         char *p;
1153         int r;
1154
1155         assert_return(machine, -EINVAL);
1156         assert_return(ret, -EINVAL);
1157
1158         e = bus_address_escape(machine);
1159         if (!e)
1160                 return -ENOMEM;
1161
1162         p = strjoin("x-container:machine=", e, NULL);
1163         if (!p)
1164                 return -ENOMEM;
1165
1166         r = sd_bus_new(&bus);
1167         if (r < 0) {
1168                 free(p);
1169                 return r;
1170         }
1171
1172         bus->address = p;
1173         bus->bus_client = true;
1174
1175         r = sd_bus_start(bus);
1176         if (r < 0) {
1177                 bus_free(bus);
1178                 return r;
1179         }
1180
1181         *ret = bus;
1182         return 0;
1183 }
1184
1185 _public_ void sd_bus_close(sd_bus *bus) {
1186         if (!bus)
1187                 return;
1188         if (bus->state == BUS_CLOSED)
1189                 return;
1190         if (bus_pid_changed(bus))
1191                 return;
1192
1193         bus->state = BUS_CLOSED;
1194
1195         sd_bus_detach_event(bus);
1196
1197         if (!bus->is_kernel)
1198                 bus_close_fds(bus);
1199
1200         /* We'll leave the fd open in case this is a kernel bus, since
1201          * there might still be memblocks around that reference this
1202          * bus, and they might need to invoke the
1203          * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1204          * freed. */
1205 }
1206
1207 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1208         assert_return(bus, NULL);
1209
1210         assert_se(REFCNT_INC(bus->n_ref) >= 2);
1211
1212         return bus;
1213 }
1214
1215 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1216         assert_return(bus, NULL);
1217
1218         if (REFCNT_DEC(bus->n_ref) <= 0)
1219                 bus_free(bus);
1220
1221         return NULL;
1222 }
1223
1224 _public_ int sd_bus_is_open(sd_bus *bus) {
1225
1226         assert_return(bus, -EINVAL);
1227         assert_return(!bus_pid_changed(bus), -ECHILD);
1228
1229         return BUS_IS_OPEN(bus->state);
1230 }
1231
1232 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1233         int r;
1234
1235         assert_return(bus, -EINVAL);
1236         assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1237         assert_return(!bus_pid_changed(bus), -ECHILD);
1238
1239         if (type == SD_BUS_TYPE_UNIX_FD) {
1240                 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1241                         return 0;
1242
1243                 r = bus_ensure_running(bus);
1244                 if (r < 0)
1245                         return r;
1246
1247                 return bus->can_fds;
1248         }
1249
1250         return bus_type_is_valid(type);
1251 }
1252
1253 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1254         int r;
1255
1256         assert_return(bus, -EINVAL);
1257         assert_return(server_id, -EINVAL);
1258         assert_return(!bus_pid_changed(bus), -ECHILD);
1259
1260         r = bus_ensure_running(bus);
1261         if (r < 0)
1262                 return r;
1263
1264         *server_id = bus->server_id;
1265         return 0;
1266 }
1267
1268 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1269         assert(m);
1270
1271         if (m->header->version > b->message_version)
1272                 return -EPERM;
1273
1274         if (m->sealed) {
1275                 /* If we copy the same message to multiple
1276                  * destinations, avoid using the same serial
1277                  * numbers. */
1278                 b->serial = MAX(b->serial, BUS_MESSAGE_SERIAL(m));
1279                 return 0;
1280         }
1281
1282         return bus_message_seal(m, ++b->serial);
1283 }
1284
1285 static int dispatch_wqueue(sd_bus *bus) {
1286         int r, ret = 0;
1287
1288         assert(bus);
1289         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1290
1291         while (bus->wqueue_size > 0) {
1292
1293                 if (bus->is_kernel)
1294                         r = bus_kernel_write_message(bus, bus->wqueue[0]);
1295                 else
1296                         r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1297
1298                 if (r < 0) {
1299                         sd_bus_close(bus);
1300                         return r;
1301                 } else if (r == 0)
1302                         /* Didn't do anything this time */
1303                         return ret;
1304                 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1305                         /* Fully written. Let's drop the entry from
1306                          * the queue.
1307                          *
1308                          * This isn't particularly optimized, but
1309                          * well, this is supposed to be our worst-case
1310                          * buffer only, and the socket buffer is
1311                          * supposed to be our primary buffer, and if
1312                          * it got full, then all bets are off
1313                          * anyway. */
1314
1315                         sd_bus_message_unref(bus->wqueue[0]);
1316                         bus->wqueue_size --;
1317                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1318                         bus->windex = 0;
1319
1320                         ret = 1;
1321                 }
1322         }
1323
1324         return ret;
1325 }
1326
1327 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1328         sd_bus_message *z = NULL;
1329         int r, ret = 0;
1330
1331         assert(bus);
1332         assert(m);
1333         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1334
1335         if (bus->rqueue_size > 0) {
1336                 /* Dispatch a queued message */
1337
1338                 *m = bus->rqueue[0];
1339                 bus->rqueue_size --;
1340                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1341                 return 1;
1342         }
1343
1344         /* Try to read a new message */
1345         do {
1346                 if (bus->is_kernel)
1347                         r = bus_kernel_read_message(bus, &z);
1348                 else
1349                         r = bus_socket_read_message(bus, &z);
1350
1351                 if (r < 0) {
1352                         sd_bus_close(bus);
1353                         return r;
1354                 }
1355                 if (r == 0)
1356                         return ret;
1357
1358                 ret = 1;
1359         } while (!z);
1360
1361         *m = z;
1362         return ret;
1363 }
1364
1365 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1366         int r;
1367
1368         assert_return(bus, -EINVAL);
1369         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1370         assert_return(m, -EINVAL);
1371         assert_return(!bus_pid_changed(bus), -ECHILD);
1372
1373         if (m->n_fds > 0) {
1374                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1375                 if (r < 0)
1376                         return r;
1377                 if (r == 0)
1378                         return -ENOTSUP;
1379         }
1380
1381         /* If the serial number isn't kept, then we know that no reply
1382          * is expected */
1383         if (!serial && !m->sealed)
1384                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1385
1386         r = bus_seal_message(bus, m);
1387         if (r < 0)
1388                 return r;
1389
1390         /* If this is a reply and no reply was requested, then let's
1391          * suppress this, if we can */
1392         if (m->dont_send && !serial)
1393                 return 1;
1394
1395         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1396                 size_t idx = 0;
1397
1398                 if (bus->is_kernel)
1399                         r = bus_kernel_write_message(bus, m);
1400                 else
1401                         r = bus_socket_write_message(bus, m, &idx);
1402
1403                 if (r < 0) {
1404                         sd_bus_close(bus);
1405                         return r;
1406                 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m))  {
1407                         /* Wasn't fully written. So let's remember how
1408                          * much was written. Note that the first entry
1409                          * of the wqueue array is always allocated so
1410                          * that we always can remember how much was
1411                          * written. */
1412                         bus->wqueue[0] = sd_bus_message_ref(m);
1413                         bus->wqueue_size = 1;
1414                         bus->windex = idx;
1415                 }
1416         } else {
1417                 sd_bus_message **q;
1418
1419                 /* Just append it to the queue. */
1420
1421                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1422                         return -ENOBUFS;
1423
1424                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1425                 if (!q)
1426                         return -ENOMEM;
1427
1428                 bus->wqueue = q;
1429                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1430         }
1431
1432         if (serial)
1433                 *serial = BUS_MESSAGE_SERIAL(m);
1434
1435         return 1;
1436 }
1437
1438 _public_ int sd_bus_send_to(sd_bus *bus, sd_bus_message *m, const char *destination, uint64_t *serial) {
1439         int r;
1440
1441         assert_return(bus, -EINVAL);
1442         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1443         assert_return(m, -EINVAL);
1444         assert_return(!bus_pid_changed(bus), -ECHILD);
1445
1446         if (!streq_ptr(m->destination, destination)) {
1447
1448                 if (!destination)
1449                         return -EEXIST;
1450
1451                 r = sd_bus_message_set_destination(m, destination);
1452                 if (r < 0)
1453                         return r;
1454         }
1455
1456         return sd_bus_send(bus, m, serial);
1457 }
1458
1459 static usec_t calc_elapse(uint64_t usec) {
1460         if (usec == (uint64_t) -1)
1461                 return 0;
1462
1463         if (usec == 0)
1464                 usec = BUS_DEFAULT_TIMEOUT;
1465
1466         return now(CLOCK_MONOTONIC) + usec;
1467 }
1468
1469 static int timeout_compare(const void *a, const void *b) {
1470         const struct reply_callback *x = a, *y = b;
1471
1472         if (x->timeout != 0 && y->timeout == 0)
1473                 return -1;
1474
1475         if (x->timeout == 0 && y->timeout != 0)
1476                 return 1;
1477
1478         if (x->timeout < y->timeout)
1479                 return -1;
1480
1481         if (x->timeout > y->timeout)
1482                 return 1;
1483
1484         return 0;
1485 }
1486
1487 _public_ int sd_bus_call_async(
1488                 sd_bus *bus,
1489                 sd_bus_message *m,
1490                 sd_bus_message_handler_t callback,
1491                 void *userdata,
1492                 uint64_t usec,
1493                 uint64_t *serial) {
1494
1495         struct reply_callback *c;
1496         int r;
1497
1498         assert_return(bus, -EINVAL);
1499         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1500         assert_return(m, -EINVAL);
1501         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1502         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1503         assert_return(callback, -EINVAL);
1504         assert_return(!bus_pid_changed(bus), -ECHILD);
1505
1506         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1507         if (r < 0)
1508                 return r;
1509
1510         if (usec != (uint64_t) -1) {
1511                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1512                 if (r < 0)
1513                         return r;
1514         }
1515
1516         r = bus_seal_message(bus, m);
1517         if (r < 0)
1518                 return r;
1519
1520         c = new0(struct reply_callback, 1);
1521         if (!c)
1522                 return -ENOMEM;
1523
1524         c->callback = callback;
1525         c->userdata = userdata;
1526         c->serial = BUS_MESSAGE_SERIAL(m);
1527         c->timeout = calc_elapse(usec);
1528
1529         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1530         if (r < 0) {
1531                 free(c);
1532                 return r;
1533         }
1534
1535         if (c->timeout != 0) {
1536                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1537                 if (r < 0) {
1538                         c->timeout = 0;
1539                         sd_bus_call_async_cancel(bus, c->serial);
1540                         return r;
1541                 }
1542         }
1543
1544         r = sd_bus_send(bus, m, serial);
1545         if (r < 0) {
1546                 sd_bus_call_async_cancel(bus, c->serial);
1547                 return r;
1548         }
1549
1550         return r;
1551 }
1552
1553 _public_ int sd_bus_call_async_cancel(sd_bus *bus, uint64_t serial) {
1554         struct reply_callback *c;
1555
1556         assert_return(bus, -EINVAL);
1557         assert_return(serial != 0, -EINVAL);
1558         assert_return(!bus_pid_changed(bus), -ECHILD);
1559
1560         c = hashmap_remove(bus->reply_callbacks, &serial);
1561         if (!c)
1562                 return 0;
1563
1564         if (c->timeout != 0)
1565                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1566
1567         free(c);
1568         return 1;
1569 }
1570
1571 int bus_ensure_running(sd_bus *bus) {
1572         int r;
1573
1574         assert(bus);
1575
1576         if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED)
1577                 return -ENOTCONN;
1578         if (bus->state == BUS_RUNNING)
1579                 return 1;
1580
1581         for (;;) {
1582                 r = sd_bus_process(bus, NULL);
1583                 if (r < 0)
1584                         return r;
1585                 if (bus->state == BUS_RUNNING)
1586                         return 1;
1587                 if (r > 0)
1588                         continue;
1589
1590                 r = sd_bus_wait(bus, (uint64_t) -1);
1591                 if (r < 0)
1592                         return r;
1593         }
1594 }
1595
1596 _public_ int sd_bus_call(
1597                 sd_bus *bus,
1598                 sd_bus_message *m,
1599                 uint64_t usec,
1600                 sd_bus_error *error,
1601                 sd_bus_message **reply) {
1602
1603         int r;
1604         usec_t timeout;
1605         uint64_t serial;
1606         bool room = false;
1607
1608         assert_return(bus, -EINVAL);
1609         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1610         assert_return(m, -EINVAL);
1611         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1612         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1613         assert_return(!bus_error_is_dirty(error), -EINVAL);
1614         assert_return(!bus_pid_changed(bus), -ECHILD);
1615
1616         r = bus_ensure_running(bus);
1617         if (r < 0)
1618                 return r;
1619
1620         r = sd_bus_send(bus, m, &serial);
1621         if (r < 0)
1622                 return r;
1623
1624         timeout = calc_elapse(usec);
1625
1626         for (;;) {
1627                 usec_t left;
1628                 sd_bus_message *incoming = NULL;
1629
1630                 if (!room) {
1631                         sd_bus_message **q;
1632
1633                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1634                                 return -ENOBUFS;
1635
1636                         /* Make sure there's room for queuing this
1637                          * locally, before we read the message */
1638
1639                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1640                         if (!q)
1641                                 return -ENOMEM;
1642
1643                         bus->rqueue = q;
1644                         room = true;
1645                 }
1646
1647                 if (bus->is_kernel)
1648                         r = bus_kernel_read_message(bus, &incoming);
1649                 else
1650                         r = bus_socket_read_message(bus, &incoming);
1651                 if (r < 0)
1652                         return r;
1653                 if (incoming) {
1654
1655                         if (incoming->reply_serial == serial) {
1656                                 /* Found a match! */
1657
1658                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1659
1660                                         if (reply)
1661                                                 *reply = incoming;
1662                                         else
1663                                                 sd_bus_message_unref(incoming);
1664
1665                                         return 1;
1666                                 }
1667
1668                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) {
1669                                         int k;
1670
1671                                         r = sd_bus_error_copy(error, &incoming->error);
1672                                         if (r < 0) {
1673                                                 sd_bus_message_unref(incoming);
1674                                                 return r;
1675                                         }
1676
1677                                         k = sd_bus_error_get_errno(&incoming->error);
1678                                         sd_bus_message_unref(incoming);
1679                                         return -k;
1680                                 }
1681
1682                                 sd_bus_message_unref(incoming);
1683                                 return -EIO;
1684
1685                         } else if (incoming->header->serial == serial &&
1686                                    bus->unique_name &&
1687                                    incoming->sender &&
1688                                    streq(bus->unique_name, incoming->sender)) {
1689
1690                                 /* Our own message? Somebody is trying
1691                                  * to send its own client a message,
1692                                  * let's not dead-lock, let's fail
1693                                  * immediately. */
1694
1695                                 sd_bus_message_unref(incoming);
1696                                 return -ELOOP;
1697                         }
1698
1699                         /* There's already guaranteed to be room for
1700                          * this, so need to resize things here */
1701                         bus->rqueue[bus->rqueue_size ++] = incoming;
1702                         room = false;
1703
1704                         /* Try to read more, right-away */
1705                         continue;
1706                 }
1707                 if (r != 0)
1708                         continue;
1709
1710                 if (timeout > 0) {
1711                         usec_t n;
1712
1713                         n = now(CLOCK_MONOTONIC);
1714                         if (n >= timeout)
1715                                 return -ETIMEDOUT;
1716
1717                         left = timeout - n;
1718                 } else
1719                         left = (uint64_t) -1;
1720
1721                 r = bus_poll(bus, true, left);
1722                 if (r < 0)
1723                         return r;
1724
1725                 r = dispatch_wqueue(bus);
1726                 if (r < 0)
1727                         return r;
1728         }
1729 }
1730
1731 _public_ int sd_bus_get_fd(sd_bus *bus) {
1732
1733         assert_return(bus, -EINVAL);
1734         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1735         assert_return(bus->input_fd == bus->output_fd, -EPERM);
1736         assert_return(!bus_pid_changed(bus), -ECHILD);
1737
1738         return bus->input_fd;
1739 }
1740
1741 _public_ int sd_bus_get_events(sd_bus *bus) {
1742         int flags = 0;
1743
1744         assert_return(bus, -EINVAL);
1745         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1746         assert_return(!bus_pid_changed(bus), -ECHILD);
1747
1748         if (bus->state == BUS_OPENING)
1749                 flags |= POLLOUT;
1750         else if (bus->state == BUS_AUTHENTICATING) {
1751
1752                 if (bus_socket_auth_needs_write(bus))
1753                         flags |= POLLOUT;
1754
1755                 flags |= POLLIN;
1756
1757         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1758                 if (bus->rqueue_size <= 0)
1759                         flags |= POLLIN;
1760                 if (bus->wqueue_size > 0)
1761                         flags |= POLLOUT;
1762         }
1763
1764         return flags;
1765 }
1766
1767 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1768         struct reply_callback *c;
1769
1770         assert_return(bus, -EINVAL);
1771         assert_return(timeout_usec, -EINVAL);
1772         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1773         assert_return(!bus_pid_changed(bus), -ECHILD);
1774
1775         if (bus->state == BUS_AUTHENTICATING) {
1776                 *timeout_usec = bus->auth_timeout;
1777                 return 1;
1778         }
1779
1780         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1781                 *timeout_usec = (uint64_t) -1;
1782                 return 0;
1783         }
1784
1785         if (bus->rqueue_size > 0) {
1786                 *timeout_usec = 0;
1787                 return 1;
1788         }
1789
1790         c = prioq_peek(bus->reply_callbacks_prioq);
1791         if (!c) {
1792                 *timeout_usec = (uint64_t) -1;
1793                 return 0;
1794         }
1795
1796         *timeout_usec = c->timeout;
1797         return 1;
1798 }
1799
1800 static int process_timeout(sd_bus *bus) {
1801         _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1802         struct reply_callback *c;
1803         usec_t n;
1804         int r;
1805
1806         assert(bus);
1807
1808         c = prioq_peek(bus->reply_callbacks_prioq);
1809         if (!c)
1810                 return 0;
1811
1812         n = now(CLOCK_MONOTONIC);
1813         if (c->timeout > n)
1814                 return 0;
1815
1816         r = bus_message_new_synthetic_error(
1817                         bus,
1818                         c->serial,
1819                         &SD_BUS_ERROR_MAKE(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1820                         &m);
1821         if (r < 0)
1822                 return r;
1823
1824         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1825         hashmap_remove(bus->reply_callbacks, &c->serial);
1826
1827         r = c->callback(bus, m, c->userdata);
1828         free(c);
1829
1830         return r < 0 ? r : 1;
1831 }
1832
1833 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1834         assert(bus);
1835         assert(m);
1836
1837         if (bus->state != BUS_HELLO)
1838                 return 0;
1839
1840         /* Let's make sure the first message on the bus is the HELLO
1841          * reply. But note that we don't actually parse the message
1842          * here (we leave that to the usual handling), we just verify
1843          * we don't let any earlier msg through. */
1844
1845         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1846             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1847                 return -EIO;
1848
1849         if (m->reply_serial != bus->hello_serial)
1850                 return -EIO;
1851
1852         return 0;
1853 }
1854
1855 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1856         struct reply_callback *c;
1857         int r;
1858
1859         assert(bus);
1860         assert(m);
1861
1862         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1863             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1864                 return 0;
1865
1866         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1867         if (!c)
1868                 return 0;
1869
1870         if (c->timeout != 0)
1871                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1872
1873         r = sd_bus_message_rewind(m, true);
1874         if (r < 0)
1875                 return r;
1876
1877         r = c->callback(bus, m, c->userdata);
1878         free(c);
1879
1880         return r;
1881 }
1882
1883 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1884         struct filter_callback *l;
1885         int r;
1886
1887         assert(bus);
1888         assert(m);
1889
1890         do {
1891                 bus->filter_callbacks_modified = false;
1892
1893                 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1894
1895                         if (bus->filter_callbacks_modified)
1896                                 break;
1897
1898                         /* Don't run this more than once per iteration */
1899                         if (l->last_iteration == bus->iteration_counter)
1900                                 continue;
1901
1902                         l->last_iteration = bus->iteration_counter;
1903
1904                         r = sd_bus_message_rewind(m, true);
1905                         if (r < 0)
1906                                 return r;
1907
1908                         r = l->callback(bus, m, l->userdata);
1909                         if (r != 0)
1910                                 return r;
1911
1912                 }
1913
1914         } while (bus->filter_callbacks_modified);
1915
1916         return 0;
1917 }
1918
1919 static int process_match(sd_bus *bus, sd_bus_message *m) {
1920         int r;
1921
1922         assert(bus);
1923         assert(m);
1924
1925         do {
1926                 bus->match_callbacks_modified = false;
1927
1928                 r = bus_match_run(bus, &bus->match_callbacks, m);
1929                 if (r != 0)
1930                         return r;
1931
1932         } while (bus->match_callbacks_modified);
1933
1934         return 0;
1935 }
1936
1937 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1938         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1939         int r;
1940
1941         assert(bus);
1942         assert(m);
1943
1944         if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
1945                 return 0;
1946
1947         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1948                 return 0;
1949
1950         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1951                 return 1;
1952
1953         if (streq_ptr(m->member, "Ping"))
1954                 r = sd_bus_message_new_method_return(bus, m, &reply);
1955         else if (streq_ptr(m->member, "GetMachineId")) {
1956                 sd_id128_t id;
1957                 char sid[33];
1958
1959                 r = sd_id128_get_machine(&id);
1960                 if (r < 0)
1961                         return r;
1962
1963                 r = sd_bus_message_new_method_return(bus, m, &reply);
1964                 if (r < 0)
1965                         return r;
1966
1967                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1968         } else {
1969                 r = sd_bus_message_new_method_errorf(
1970                                 bus, m, &reply,
1971                                 SD_BUS_ERROR_UNKNOWN_METHOD,
1972                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1973         }
1974
1975         if (r < 0)
1976                 return r;
1977
1978         r = sd_bus_send(bus, reply, NULL);
1979         if (r < 0)
1980                 return r;
1981
1982         return 1;
1983 }
1984
1985 static int process_message(sd_bus *bus, sd_bus_message *m) {
1986         int r;
1987
1988         assert(bus);
1989         assert(m);
1990
1991         bus->current = m;
1992         bus->iteration_counter++;
1993
1994         log_debug("Got message sender=%s object=%s interface=%s member=%s",
1995                   strna(sd_bus_message_get_sender(m)),
1996                   strna(sd_bus_message_get_path(m)),
1997                   strna(sd_bus_message_get_interface(m)),
1998                   strna(sd_bus_message_get_member(m)));
1999
2000         r = process_hello(bus, m);
2001         if (r != 0)
2002                 goto finish;
2003
2004         r = process_reply(bus, m);
2005         if (r != 0)
2006                 goto finish;
2007
2008         r = process_filter(bus, m);
2009         if (r != 0)
2010                 goto finish;
2011
2012         r = process_match(bus, m);
2013         if (r != 0)
2014                 goto finish;
2015
2016         r = process_builtin(bus, m);
2017         if (r != 0)
2018                 goto finish;
2019
2020         r = bus_process_object(bus, m);
2021
2022 finish:
2023         bus->current = NULL;
2024         return r;
2025 }
2026
2027 static int process_running(sd_bus *bus, sd_bus_message **ret) {
2028         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2029         int r;
2030
2031         assert(bus);
2032         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2033
2034         r = process_timeout(bus);
2035         if (r != 0)
2036                 goto null_message;
2037
2038         r = dispatch_wqueue(bus);
2039         if (r != 0)
2040                 goto null_message;
2041
2042         r = dispatch_rqueue(bus, &m);
2043         if (r < 0)
2044                 return r;
2045         if (!m)
2046                 goto null_message;
2047
2048         r = process_message(bus, m);
2049         if (r != 0)
2050                 goto null_message;
2051
2052         if (ret) {
2053                 r = sd_bus_message_rewind(m, true);
2054                 if (r < 0)
2055                         return r;
2056
2057                 *ret = m;
2058                 m = NULL;
2059                 return 1;
2060         }
2061
2062         if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2063
2064                 r = sd_bus_reply_method_errorf(
2065                                 bus, m,
2066                                 SD_BUS_ERROR_UNKNOWN_OBJECT,
2067                                 "Unknown object '%s'.", m->path);
2068                 if (r < 0)
2069                         return r;
2070         }
2071
2072         return 1;
2073
2074 null_message:
2075         if (r >= 0 && ret)
2076                 *ret = NULL;
2077
2078         return r;
2079 }
2080
2081 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2082         BUS_DONT_DESTROY(bus);
2083         int r;
2084
2085         /* Returns 0 when we didn't do anything. This should cause the
2086          * caller to invoke sd_bus_wait() before returning the next
2087          * time. Returns > 0 when we did something, which possibly
2088          * means *ret is filled in with an unprocessed message. */
2089
2090         assert_return(bus, -EINVAL);
2091         assert_return(!bus_pid_changed(bus), -ECHILD);
2092
2093         /* We don't allow recursively invoking sd_bus_process(). */
2094         assert_return(!bus->processing, -EBUSY);
2095
2096         switch (bus->state) {
2097
2098         case BUS_UNSET:
2099         case BUS_CLOSED:
2100                 return -ENOTCONN;
2101
2102         case BUS_OPENING:
2103                 r = bus_socket_process_opening(bus);
2104                 if (r < 0)
2105                         return r;
2106                 if (ret)
2107                         *ret = NULL;
2108                 return r;
2109
2110         case BUS_AUTHENTICATING:
2111
2112                 r = bus_socket_process_authenticating(bus);
2113                 if (r < 0)
2114                         return r;
2115                 if (ret)
2116                         *ret = NULL;
2117                 return r;
2118
2119         case BUS_RUNNING:
2120         case BUS_HELLO:
2121
2122                 bus->processing = true;
2123                 r = process_running(bus, ret);
2124                 bus->processing = false;
2125
2126                 return r;
2127         }
2128
2129         assert_not_reached("Unknown state");
2130 }
2131
2132 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2133         struct pollfd p[2] = {};
2134         int r, e, n;
2135         struct timespec ts;
2136         usec_t m = (usec_t) -1;
2137
2138         assert(bus);
2139         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2140
2141         e = sd_bus_get_events(bus);
2142         if (e < 0)
2143                 return e;
2144
2145         if (need_more)
2146                 /* The caller really needs some more data, he doesn't
2147                  * care about what's already read, or any timeouts
2148                  * except its own.*/
2149                 e |= POLLIN;
2150         else {
2151                 usec_t until;
2152                 /* The caller wants to process if there's something to
2153                  * process, but doesn't care otherwise */
2154
2155                 r = sd_bus_get_timeout(bus, &until);
2156                 if (r < 0)
2157                         return r;
2158                 if (r > 0) {
2159                         usec_t nw;
2160                         nw = now(CLOCK_MONOTONIC);
2161                         m = until > nw ? until - nw : 0;
2162                 }
2163         }
2164
2165         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2166                 m = timeout_usec;
2167
2168         p[0].fd = bus->input_fd;
2169         if (bus->output_fd == bus->input_fd) {
2170                 p[0].events = e;
2171                 n = 1;
2172         } else {
2173                 p[0].events = e & POLLIN;
2174                 p[1].fd = bus->output_fd;
2175                 p[1].events = e & POLLOUT;
2176                 n = 2;
2177         }
2178
2179         r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2180         if (r < 0)
2181                 return -errno;
2182
2183         return r > 0 ? 1 : 0;
2184 }
2185
2186 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2187
2188         assert_return(bus, -EINVAL);
2189         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2190         assert_return(!bus_pid_changed(bus), -ECHILD);
2191
2192         if (bus->rqueue_size > 0)
2193                 return 0;
2194
2195         return bus_poll(bus, false, timeout_usec);
2196 }
2197
2198 _public_ int sd_bus_flush(sd_bus *bus) {
2199         int r;
2200
2201         assert_return(bus, -EINVAL);
2202         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2203         assert_return(!bus_pid_changed(bus), -ECHILD);
2204
2205         r = bus_ensure_running(bus);
2206         if (r < 0)
2207                 return r;
2208
2209         if (bus->wqueue_size <= 0)
2210                 return 0;
2211
2212         for (;;) {
2213                 r = dispatch_wqueue(bus);
2214                 if (r < 0)
2215                         return r;
2216
2217                 if (bus->wqueue_size <= 0)
2218                         return 0;
2219
2220                 r = bus_poll(bus, false, (uint64_t) -1);
2221                 if (r < 0)
2222                         return r;
2223         }
2224 }
2225
2226 _public_ int sd_bus_add_filter(sd_bus *bus,
2227                                sd_bus_message_handler_t callback,
2228                                void *userdata) {
2229
2230         struct filter_callback *f;
2231
2232         assert_return(bus, -EINVAL);
2233         assert_return(callback, -EINVAL);
2234         assert_return(!bus_pid_changed(bus), -ECHILD);
2235
2236         f = new0(struct filter_callback, 1);
2237         if (!f)
2238                 return -ENOMEM;
2239         f->callback = callback;
2240         f->userdata = userdata;
2241
2242         bus->filter_callbacks_modified = true;
2243         LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2244         return 0;
2245 }
2246
2247 _public_ int sd_bus_remove_filter(sd_bus *bus,
2248                                   sd_bus_message_handler_t callback,
2249                                   void *userdata) {
2250
2251         struct filter_callback *f;
2252
2253         assert_return(bus, -EINVAL);
2254         assert_return(callback, -EINVAL);
2255         assert_return(!bus_pid_changed(bus), -ECHILD);
2256
2257         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2258                 if (f->callback == callback && f->userdata == userdata) {
2259                         bus->filter_callbacks_modified = true;
2260                         LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2261                         free(f);
2262                         return 1;
2263                 }
2264         }
2265
2266         return 0;
2267 }
2268
2269 _public_ int sd_bus_add_match(sd_bus *bus,
2270                               const char *match,
2271                               sd_bus_message_handler_t callback,
2272                               void *userdata) {
2273
2274         struct bus_match_component *components = NULL;
2275         unsigned n_components = 0;
2276         uint64_t cookie = 0;
2277         int r = 0;
2278
2279         assert_return(bus, -EINVAL);
2280         assert_return(match, -EINVAL);
2281         assert_return(!bus_pid_changed(bus), -ECHILD);
2282
2283         r = bus_match_parse(match, &components, &n_components);
2284         if (r < 0)
2285                 goto finish;
2286
2287         if (bus->bus_client) {
2288                 cookie = ++bus->match_cookie;
2289
2290                 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2291                 if (r < 0)
2292                         goto finish;
2293         }
2294
2295         bus->match_callbacks_modified = true;
2296         r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2297         if (r < 0) {
2298                 if (bus->bus_client)
2299                         bus_remove_match_internal(bus, match, cookie);
2300         }
2301
2302 finish:
2303         bus_match_parse_free(components, n_components);
2304         return r;
2305 }
2306
2307 _public_ int sd_bus_remove_match(sd_bus *bus,
2308                                  const char *match,
2309                                  sd_bus_message_handler_t callback,
2310                                  void *userdata) {
2311
2312         struct bus_match_component *components = NULL;
2313         unsigned n_components = 0;
2314         int r = 0, q = 0;
2315         uint64_t cookie = 0;
2316
2317         assert_return(bus, -EINVAL);
2318         assert_return(match, -EINVAL);
2319         assert_return(!bus_pid_changed(bus), -ECHILD);
2320
2321         r = bus_match_parse(match, &components, &n_components);
2322         if (r < 0)
2323                 return r;
2324
2325         bus->match_callbacks_modified = true;
2326         r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2327
2328         if (bus->bus_client)
2329                 q = bus_remove_match_internal(bus, match, cookie);
2330
2331         bus_match_parse_free(components, n_components);
2332
2333         return r < 0 ? r : q;
2334 }
2335
2336 bool bus_pid_changed(sd_bus *bus) {
2337         assert(bus);
2338
2339         /* We don't support people creating a bus connection and
2340          * keeping it around over a fork(). Let's complain. */
2341
2342         return bus->original_pid != getpid();
2343 }
2344
2345 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2346         sd_bus *bus = userdata;
2347         int r;
2348
2349         assert(bus);
2350
2351         r = sd_bus_process(bus, NULL);
2352         if (r < 0)
2353                 return r;
2354
2355         return 1;
2356 }
2357
2358 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2359         sd_bus *bus = userdata;
2360         int r;
2361
2362         assert(bus);
2363
2364         r = sd_bus_process(bus, NULL);
2365         if (r < 0)
2366                 return r;
2367
2368         return 1;
2369 }
2370
2371 static int prepare_callback(sd_event_source *s, void *userdata) {
2372         sd_bus *bus = userdata;
2373         int r, e;
2374         usec_t until;
2375
2376         assert(s);
2377         assert(bus);
2378
2379         e = sd_bus_get_events(bus);
2380         if (e < 0)
2381                 return e;
2382
2383         if (bus->output_fd != bus->input_fd) {
2384
2385                 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2386                 if (r < 0)
2387                         return r;
2388
2389                 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2390                 if (r < 0)
2391                         return r;
2392         } else {
2393                 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2394                 if (r < 0)
2395                         return r;
2396         }
2397
2398         r = sd_bus_get_timeout(bus, &until);
2399         if (r < 0)
2400                 return r;
2401         if (r > 0) {
2402                 int j;
2403
2404                 j = sd_event_source_set_time(bus->time_event_source, until);
2405                 if (j < 0)
2406                         return j;
2407         }
2408
2409         r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2410         if (r < 0)
2411                 return r;
2412
2413         return 1;
2414 }
2415
2416 static int quit_callback(sd_event_source *event, void *userdata) {
2417         sd_bus *bus = userdata;
2418
2419         assert(event);
2420
2421         sd_bus_flush(bus);
2422
2423         return 1;
2424 }
2425
2426 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2427         int r;
2428
2429         assert_return(bus, -EINVAL);
2430         assert_return(!bus->event, -EBUSY);
2431
2432         assert(!bus->input_io_event_source);
2433         assert(!bus->output_io_event_source);
2434         assert(!bus->time_event_source);
2435
2436         if (event)
2437                 bus->event = sd_event_ref(event);
2438         else  {
2439                 r = sd_event_default(&bus->event);
2440                 if (r < 0)
2441                         return r;
2442         }
2443
2444         r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2445         if (r < 0)
2446                 goto fail;
2447
2448         r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2449         if (r < 0)
2450                 goto fail;
2451
2452         if (bus->output_fd != bus->input_fd) {
2453                 r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2454                 if (r < 0)
2455                         goto fail;
2456
2457                 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2458                 if (r < 0)
2459                         goto fail;
2460         }
2461
2462         r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2463         if (r < 0)
2464                 goto fail;
2465
2466         r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
2467         if (r < 0)
2468                 goto fail;
2469
2470         r = sd_event_source_set_priority(bus->time_event_source, priority);
2471         if (r < 0)
2472                 goto fail;
2473
2474         r = sd_event_add_quit(bus->event, quit_callback, bus, &bus->quit_event_source);
2475         if (r < 0)
2476                 goto fail;
2477
2478         return 0;
2479
2480 fail:
2481         sd_bus_detach_event(bus);
2482         return r;
2483 }
2484
2485 _public_ int sd_bus_detach_event(sd_bus *bus) {
2486         assert_return(bus, -EINVAL);
2487         assert_return(bus->event, -ENXIO);
2488
2489         if (bus->input_io_event_source)
2490                 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2491
2492         if (bus->output_io_event_source)
2493                 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2494
2495         if (bus->time_event_source)
2496                 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2497
2498         if (bus->quit_event_source)
2499                 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2500
2501         if (bus->event)
2502                 bus->event = sd_event_unref(bus->event);
2503
2504         return 0;
2505 }
2506
2507 _public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2508         assert_return(bus, NULL);
2509
2510         return bus->current;
2511 }
2512
2513 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
2514         sd_bus *b = NULL;
2515         int r;
2516
2517         assert(bus_open);
2518         assert(default_bus);
2519
2520         if (!ret)
2521                 return !!*default_bus;
2522
2523         if (*default_bus) {
2524                 *ret = sd_bus_ref(*default_bus);
2525                 return 0;
2526         }
2527
2528         r = bus_open(&b);
2529         if (r < 0)
2530                 return r;
2531
2532         b->default_bus_ptr = default_bus;
2533         b->tid = gettid();
2534         *default_bus = b;
2535
2536         *ret = b;
2537         return 1;
2538 }
2539
2540 _public_ int sd_bus_default_system(sd_bus **ret) {
2541         static __thread sd_bus *default_system_bus = NULL;
2542
2543         return bus_default(sd_bus_open_system, &default_system_bus, ret);
2544 }
2545
2546 _public_ int sd_bus_default_user(sd_bus **ret) {
2547         static __thread sd_bus *default_user_bus = NULL;
2548
2549         return bus_default(sd_bus_open_user, &default_user_bus, ret);
2550 }
2551
2552 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
2553         assert_return(b, -EINVAL);
2554         assert_return(tid, -EINVAL);
2555         assert_return(!bus_pid_changed(b), -ECHILD);
2556
2557         if (b->tid != 0) {
2558                 *tid = b->tid;
2559                 return 0;
2560         }
2561
2562         if (b->event)
2563                 return sd_event_get_tid(b->event, tid);
2564
2565         return -ENXIO;
2566 }