chiark / gitweb /
bus: suppress creating empty parts in messages
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29 #include <sys/mman.h>
30 #include <pthread.h>
31
32 #include "util.h"
33 #include "macro.h"
34 #include "strv.h"
35 #include "set.h"
36 #include "missing.h"
37
38 #include "sd-bus.h"
39 #include "bus-internal.h"
40 #include "bus-message.h"
41 #include "bus-type.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
48 #include "bus-util.h"
49 #include "bus-container.h"
50 #include "bus-protocol.h"
51
52 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
53
54 static void bus_close_fds(sd_bus *b) {
55         assert(b);
56
57         if (b->input_fd >= 0)
58                 close_nointr_nofail(b->input_fd);
59
60         if (b->output_fd >= 0 && b->output_fd != b->input_fd)
61                 close_nointr_nofail(b->output_fd);
62
63         b->input_fd = b->output_fd = -1;
64 }
65
66 static void bus_node_destroy(sd_bus *b, struct node *n) {
67         struct node_callback *c;
68         struct node_vtable *v;
69         struct node_enumerator *e;
70
71         assert(b);
72
73         if (!n)
74                 return;
75
76         while (n->child)
77                 bus_node_destroy(b, n->child);
78
79         while ((c = n->callbacks)) {
80                 LIST_REMOVE(callbacks, n->callbacks, c);
81                 free(c);
82         }
83
84         while ((v = n->vtables)) {
85                 LIST_REMOVE(vtables, n->vtables, v);
86                 free(v->interface);
87                 free(v);
88         }
89
90         while ((e = n->enumerators)) {
91                 LIST_REMOVE(enumerators, n->enumerators, e);
92                 free(e);
93         }
94
95         if (n->parent)
96                 LIST_REMOVE(siblings, n->parent->child, n);
97
98         assert_se(hashmap_remove(b->nodes, n->path) == n);
99         free(n->path);
100         free(n);
101 }
102
103 static void bus_reset_queues(sd_bus *b) {
104         unsigned i;
105
106         assert(b);
107
108         for (i = 0; i < b->rqueue_size; i++)
109                 sd_bus_message_unref(b->rqueue[i]);
110         free(b->rqueue);
111
112         for (i = 0; i < b->wqueue_size; i++)
113                 sd_bus_message_unref(b->wqueue[i]);
114         free(b->wqueue);
115
116         b->rqueue = b->wqueue = NULL;
117         b->rqueue_size = b->wqueue_size = 0;
118 }
119
120 static void bus_free(sd_bus *b) {
121         struct filter_callback *f;
122         struct node *n;
123
124         assert(b);
125
126         sd_bus_detach_event(b);
127
128         bus_close_fds(b);
129
130         if (b->kdbus_buffer)
131                 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
132
133         free(b->rbuffer);
134         free(b->unique_name);
135         free(b->auth_buffer);
136         free(b->address);
137         free(b->kernel);
138         free(b->machine);
139
140         free(b->exec_path);
141         strv_free(b->exec_argv);
142
143         close_many(b->fds, b->n_fds);
144         free(b->fds);
145
146         bus_reset_queues(b);
147
148         hashmap_free_free(b->reply_callbacks);
149         prioq_free(b->reply_callbacks_prioq);
150
151         while ((f = b->filter_callbacks)) {
152                 LIST_REMOVE(callbacks, b->filter_callbacks, f);
153                 free(f);
154         }
155
156         bus_match_free(&b->match_callbacks);
157
158         hashmap_free_free(b->vtable_methods);
159         hashmap_free_free(b->vtable_properties);
160
161         while ((n = hashmap_first(b->nodes)))
162                 bus_node_destroy(b, n);
163
164         hashmap_free(b->nodes);
165
166         bus_kernel_flush_memfd(b);
167
168         assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
169
170         free(b);
171 }
172
173 _public_ int sd_bus_new(sd_bus **ret) {
174         sd_bus *r;
175
176         assert_return(ret, -EINVAL);
177
178         r = new0(sd_bus, 1);
179         if (!r)
180                 return -ENOMEM;
181
182         r->n_ref = REFCNT_INIT;
183         r->input_fd = r->output_fd = -1;
184         r->message_version = 1;
185         r->creds_mask |= SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
186         r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
187         r->attach_flags |= KDBUS_ATTACH_NAMES;
188         r->original_pid = getpid();
189
190         assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
191
192         /* We guarantee that wqueue always has space for at least one
193          * entry */
194         r->wqueue = new(sd_bus_message*, 1);
195         if (!r->wqueue) {
196                 free(r);
197                 return -ENOMEM;
198         }
199
200         *ret = r;
201         return 0;
202 }
203
204 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
205         char *a;
206
207         assert_return(bus, -EINVAL);
208         assert_return(bus->state == BUS_UNSET, -EPERM);
209         assert_return(address, -EINVAL);
210         assert_return(!bus_pid_changed(bus), -ECHILD);
211
212         a = strdup(address);
213         if (!a)
214                 return -ENOMEM;
215
216         free(bus->address);
217         bus->address = a;
218
219         return 0;
220 }
221
222 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
223         assert_return(bus, -EINVAL);
224         assert_return(bus->state == BUS_UNSET, -EPERM);
225         assert_return(input_fd >= 0, -EINVAL);
226         assert_return(output_fd >= 0, -EINVAL);
227         assert_return(!bus_pid_changed(bus), -ECHILD);
228
229         bus->input_fd = input_fd;
230         bus->output_fd = output_fd;
231         return 0;
232 }
233
234 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
235         char *p, **a;
236
237         assert_return(bus, -EINVAL);
238         assert_return(bus->state == BUS_UNSET, -EPERM);
239         assert_return(path, -EINVAL);
240         assert_return(!strv_isempty(argv), -EINVAL);
241         assert_return(!bus_pid_changed(bus), -ECHILD);
242
243         p = strdup(path);
244         if (!p)
245                 return -ENOMEM;
246
247         a = strv_copy(argv);
248         if (!a) {
249                 free(p);
250                 return -ENOMEM;
251         }
252
253         free(bus->exec_path);
254         strv_free(bus->exec_argv);
255
256         bus->exec_path = p;
257         bus->exec_argv = a;
258
259         return 0;
260 }
261
262 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
263         assert_return(bus, -EINVAL);
264         assert_return(bus->state == BUS_UNSET, -EPERM);
265         assert_return(!bus_pid_changed(bus), -ECHILD);
266
267         bus->bus_client = !!b;
268         return 0;
269 }
270
271 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
272         assert_return(bus, -EINVAL);
273         assert_return(bus->state == BUS_UNSET, -EPERM);
274         assert_return(!bus_pid_changed(bus), -ECHILD);
275
276         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
277         return 0;
278 }
279
280 _public_ int sd_bus_negotiate_attach_timestamp(sd_bus *bus, int b) {
281         assert_return(bus, -EINVAL);
282         assert_return(bus->state == BUS_UNSET, -EPERM);
283         assert_return(!bus_pid_changed(bus), -ECHILD);
284
285         SET_FLAG(bus->attach_flags, KDBUS_ATTACH_TIMESTAMP, b);
286         return 0;
287 }
288
289 _public_ int sd_bus_negotiate_attach_creds(sd_bus *bus, uint64_t mask) {
290         assert_return(bus, -EINVAL);
291         assert_return(mask <= _SD_BUS_CREDS_ALL, -EINVAL);
292         assert_return(bus->state == BUS_UNSET, -EPERM);
293         assert_return(!bus_pid_changed(bus), -ECHILD);
294
295         /* The well knowns we need unconditionally, so that matches can work */
296         bus->creds_mask = mask | SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
297
298         return kdbus_translate_attach_flags(bus->creds_mask, &bus->creds_mask);
299 }
300
301 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
302         assert_return(bus, -EINVAL);
303         assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
304         assert_return(bus->state == BUS_UNSET, -EPERM);
305         assert_return(!bus_pid_changed(bus), -ECHILD);
306
307         bus->is_server = !!b;
308         bus->server_id = server_id;
309         return 0;
310 }
311
312 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
313         assert_return(bus, -EINVAL);
314         assert_return(bus->state == BUS_UNSET, -EPERM);
315         assert_return(!bus_pid_changed(bus), -ECHILD);
316
317         bus->anonymous_auth = !!b;
318         return 0;
319 }
320
321 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata, sd_bus_error *error) {
322         const char *s;
323         int r;
324
325         assert(bus);
326         assert(bus->state == BUS_HELLO || bus->state == BUS_CLOSING);
327         assert(reply);
328
329         r = sd_bus_message_get_errno(reply);
330         if (r < 0)
331                 return r;
332         if (r > 0)
333                 return -r;
334
335         r = sd_bus_message_read(reply, "s", &s);
336         if (r < 0)
337                 return r;
338
339         if (!service_name_is_valid(s) || s[0] != ':')
340                 return -EBADMSG;
341
342         bus->unique_name = strdup(s);
343         if (!bus->unique_name)
344                 return -ENOMEM;
345
346         if (bus->state == BUS_HELLO)
347                 bus->state = BUS_RUNNING;
348
349         return 1;
350 }
351
352 static int bus_send_hello(sd_bus *bus) {
353         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
354         int r;
355
356         assert(bus);
357
358         if (!bus->bus_client || bus->is_kernel)
359                 return 0;
360
361         r = sd_bus_message_new_method_call(
362                         bus,
363                         "org.freedesktop.DBus",
364                         "/",
365                         "org.freedesktop.DBus",
366                         "Hello",
367                         &m);
368         if (r < 0)
369                 return r;
370
371         return sd_bus_call_async(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
372 }
373
374 int bus_start_running(sd_bus *bus) {
375         assert(bus);
376
377         if (bus->bus_client && !bus->is_kernel) {
378                 bus->state = BUS_HELLO;
379                 return 1;
380         }
381
382         bus->state = BUS_RUNNING;
383         return 1;
384 }
385
386 static int parse_address_key(const char **p, const char *key, char **value) {
387         size_t l, n = 0;
388         const char *a;
389         char *r = NULL;
390
391         assert(p);
392         assert(*p);
393         assert(value);
394
395         if (key) {
396                 l = strlen(key);
397                 if (strncmp(*p, key, l) != 0)
398                         return 0;
399
400                 if ((*p)[l] != '=')
401                         return 0;
402
403                 if (*value)
404                         return -EINVAL;
405
406                 a = *p + l + 1;
407         } else
408                 a = *p;
409
410         while (*a != ';' && *a != ',' && *a != 0) {
411                 char c, *t;
412
413                 if (*a == '%') {
414                         int x, y;
415
416                         x = unhexchar(a[1]);
417                         if (x < 0) {
418                                 free(r);
419                                 return x;
420                         }
421
422                         y = unhexchar(a[2]);
423                         if (y < 0) {
424                                 free(r);
425                                 return y;
426                         }
427
428                         c = (char) ((x << 4) | y);
429                         a += 3;
430                 } else {
431                         c = *a;
432                         a++;
433                 }
434
435                 t = realloc(r, n + 2);
436                 if (!t) {
437                         free(r);
438                         return -ENOMEM;
439                 }
440
441                 r = t;
442                 r[n++] = c;
443         }
444
445         if (!r) {
446                 r = strdup("");
447                 if (!r)
448                         return -ENOMEM;
449         } else
450                 r[n] = 0;
451
452         if (*a == ',')
453                 a++;
454
455         *p = a;
456
457         free(*value);
458         *value = r;
459
460         return 1;
461 }
462
463 static void skip_address_key(const char **p) {
464         assert(p);
465         assert(*p);
466
467         *p += strcspn(*p, ",");
468
469         if (**p == ',')
470                 (*p) ++;
471 }
472
473 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
474         _cleanup_free_ char *path = NULL, *abstract = NULL;
475         size_t l;
476         int r;
477
478         assert(b);
479         assert(p);
480         assert(*p);
481         assert(guid);
482
483         while (**p != 0 && **p != ';') {
484                 r = parse_address_key(p, "guid", guid);
485                 if (r < 0)
486                         return r;
487                 else if (r > 0)
488                         continue;
489
490                 r = parse_address_key(p, "path", &path);
491                 if (r < 0)
492                         return r;
493                 else if (r > 0)
494                         continue;
495
496                 r = parse_address_key(p, "abstract", &abstract);
497                 if (r < 0)
498                         return r;
499                 else if (r > 0)
500                         continue;
501
502                 skip_address_key(p);
503         }
504
505         if (!path && !abstract)
506                 return -EINVAL;
507
508         if (path && abstract)
509                 return -EINVAL;
510
511         if (path) {
512                 l = strlen(path);
513                 if (l > sizeof(b->sockaddr.un.sun_path))
514                         return -E2BIG;
515
516                 b->sockaddr.un.sun_family = AF_UNIX;
517                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
518                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
519         } else if (abstract) {
520                 l = strlen(abstract);
521                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
522                         return -E2BIG;
523
524                 b->sockaddr.un.sun_family = AF_UNIX;
525                 b->sockaddr.un.sun_path[0] = 0;
526                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
527                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
528         }
529
530         return 0;
531 }
532
533 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
534         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
535         int r;
536         struct addrinfo *result, hints = {
537                 .ai_socktype = SOCK_STREAM,
538                 .ai_flags = AI_ADDRCONFIG,
539         };
540
541         assert(b);
542         assert(p);
543         assert(*p);
544         assert(guid);
545
546         while (**p != 0 && **p != ';') {
547                 r = parse_address_key(p, "guid", guid);
548                 if (r < 0)
549                         return r;
550                 else if (r > 0)
551                         continue;
552
553                 r = parse_address_key(p, "host", &host);
554                 if (r < 0)
555                         return r;
556                 else if (r > 0)
557                         continue;
558
559                 r = parse_address_key(p, "port", &port);
560                 if (r < 0)
561                         return r;
562                 else if (r > 0)
563                         continue;
564
565                 r = parse_address_key(p, "family", &family);
566                 if (r < 0)
567                         return r;
568                 else if (r > 0)
569                         continue;
570
571                 skip_address_key(p);
572         }
573
574         if (!host || !port)
575                 return -EINVAL;
576
577         if (family) {
578                 if (streq(family, "ipv4"))
579                         hints.ai_family = AF_INET;
580                 else if (streq(family, "ipv6"))
581                         hints.ai_family = AF_INET6;
582                 else
583                         return -EINVAL;
584         }
585
586         r = getaddrinfo(host, port, &hints, &result);
587         if (r == EAI_SYSTEM)
588                 return -errno;
589         else if (r != 0)
590                 return -EADDRNOTAVAIL;
591
592         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
593         b->sockaddr_size = result->ai_addrlen;
594
595         freeaddrinfo(result);
596
597         return 0;
598 }
599
600 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
601         char *path = NULL;
602         unsigned n_argv = 0, j;
603         char **argv = NULL;
604         int r;
605
606         assert(b);
607         assert(p);
608         assert(*p);
609         assert(guid);
610
611         while (**p != 0 && **p != ';') {
612                 r = parse_address_key(p, "guid", guid);
613                 if (r < 0)
614                         goto fail;
615                 else if (r > 0)
616                         continue;
617
618                 r = parse_address_key(p, "path", &path);
619                 if (r < 0)
620                         goto fail;
621                 else if (r > 0)
622                         continue;
623
624                 if (startswith(*p, "argv")) {
625                         unsigned ul;
626
627                         errno = 0;
628                         ul = strtoul(*p + 4, (char**) p, 10);
629                         if (errno > 0 || **p != '=' || ul > 256) {
630                                 r = -EINVAL;
631                                 goto fail;
632                         }
633
634                         (*p) ++;
635
636                         if (ul >= n_argv) {
637                                 char **x;
638
639                                 x = realloc(argv, sizeof(char*) * (ul + 2));
640                                 if (!x) {
641                                         r = -ENOMEM;
642                                         goto fail;
643                                 }
644
645                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
646
647                                 argv = x;
648                                 n_argv = ul + 1;
649                         }
650
651                         r = parse_address_key(p, NULL, argv + ul);
652                         if (r < 0)
653                                 goto fail;
654
655                         continue;
656                 }
657
658                 skip_address_key(p);
659         }
660
661         if (!path) {
662                 r = -EINVAL;
663                 goto fail;
664         }
665
666         /* Make sure there are no holes in the array, with the
667          * exception of argv[0] */
668         for (j = 1; j < n_argv; j++)
669                 if (!argv[j]) {
670                         r = -EINVAL;
671                         goto fail;
672                 }
673
674         if (argv && argv[0] == NULL) {
675                 argv[0] = strdup(path);
676                 if (!argv[0]) {
677                         r = -ENOMEM;
678                         goto fail;
679                 }
680         }
681
682         b->exec_path = path;
683         b->exec_argv = argv;
684         return 0;
685
686 fail:
687         for (j = 0; j < n_argv; j++)
688                 free(argv[j]);
689
690         free(argv);
691         free(path);
692         return r;
693 }
694
695 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
696         _cleanup_free_ char *path = NULL;
697         int r;
698
699         assert(b);
700         assert(p);
701         assert(*p);
702         assert(guid);
703
704         while (**p != 0 && **p != ';') {
705                 r = parse_address_key(p, "guid", guid);
706                 if (r < 0)
707                         return r;
708                 else if (r > 0)
709                         continue;
710
711                 r = parse_address_key(p, "path", &path);
712                 if (r < 0)
713                         return r;
714                 else if (r > 0)
715                         continue;
716
717                 skip_address_key(p);
718         }
719
720         if (!path)
721                 return -EINVAL;
722
723         free(b->kernel);
724         b->kernel = path;
725         path = NULL;
726
727         return 0;
728 }
729
730 static int parse_container_address(sd_bus *b, const char **p, char **guid) {
731         _cleanup_free_ char *machine = NULL;
732         int r;
733
734         assert(b);
735         assert(p);
736         assert(*p);
737         assert(guid);
738
739         while (**p != 0 && **p != ';') {
740                 r = parse_address_key(p, "guid", guid);
741                 if (r < 0)
742                         return r;
743                 else if (r > 0)
744                         continue;
745
746                 r = parse_address_key(p, "machine", &machine);
747                 if (r < 0)
748                         return r;
749                 else if (r > 0)
750                         continue;
751
752                 skip_address_key(p);
753         }
754
755         if (!machine)
756                 return -EINVAL;
757
758         free(b->machine);
759         b->machine = machine;
760         machine = NULL;
761
762         b->sockaddr.un.sun_family = AF_UNIX;
763         strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
764         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
765
766         return 0;
767 }
768
769 static void bus_reset_parsed_address(sd_bus *b) {
770         assert(b);
771
772         zero(b->sockaddr);
773         b->sockaddr_size = 0;
774         strv_free(b->exec_argv);
775         free(b->exec_path);
776         b->exec_path = NULL;
777         b->exec_argv = NULL;
778         b->server_id = SD_ID128_NULL;
779         free(b->kernel);
780         b->kernel = NULL;
781         free(b->machine);
782         b->machine = NULL;
783 }
784
785 static int bus_parse_next_address(sd_bus *b) {
786         _cleanup_free_ char *guid = NULL;
787         const char *a;
788         int r;
789
790         assert(b);
791
792         if (!b->address)
793                 return 0;
794         if (b->address[b->address_index] == 0)
795                 return 0;
796
797         bus_reset_parsed_address(b);
798
799         a = b->address + b->address_index;
800
801         while (*a != 0) {
802
803                 if (*a == ';') {
804                         a++;
805                         continue;
806                 }
807
808                 if (startswith(a, "unix:")) {
809                         a += 5;
810
811                         r = parse_unix_address(b, &a, &guid);
812                         if (r < 0)
813                                 return r;
814                         break;
815
816                 } else if (startswith(a, "tcp:")) {
817
818                         a += 4;
819                         r = parse_tcp_address(b, &a, &guid);
820                         if (r < 0)
821                                 return r;
822
823                         break;
824
825                 } else if (startswith(a, "unixexec:")) {
826
827                         a += 9;
828                         r = parse_exec_address(b, &a, &guid);
829                         if (r < 0)
830                                 return r;
831
832                         break;
833
834                 } else if (startswith(a, "kernel:")) {
835
836                         a += 7;
837                         r = parse_kernel_address(b, &a, &guid);
838                         if (r < 0)
839                                 return r;
840
841                         break;
842                 } else if (startswith(a, "x-container:")) {
843
844                         a += 12;
845                         r = parse_container_address(b, &a, &guid);
846                         if (r < 0)
847                                 return r;
848
849                         break;
850                 }
851
852                 a = strchr(a, ';');
853                 if (!a)
854                         return 0;
855         }
856
857         if (guid) {
858                 r = sd_id128_from_string(guid, &b->server_id);
859                 if (r < 0)
860                         return r;
861         }
862
863         b->address_index = a - b->address;
864         return 1;
865 }
866
867 static int bus_start_address(sd_bus *b) {
868         int r;
869
870         assert(b);
871
872         for (;;) {
873                 sd_bus_close(b);
874
875                 if (b->exec_path) {
876
877                         r = bus_socket_exec(b);
878                         if (r >= 0)
879                                 return r;
880
881                         b->last_connect_error = -r;
882                 } else if (b->kernel) {
883
884                         r = bus_kernel_connect(b);
885                         if (r >= 0)
886                                 return r;
887
888                         b->last_connect_error = -r;
889
890                 } else if (b->machine) {
891
892                         r = bus_container_connect(b);
893                         if (r >= 0)
894                                 return r;
895
896                         b->last_connect_error = -r;
897
898                 } else if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
899
900                         r = bus_socket_connect(b);
901                         if (r >= 0)
902                                 return r;
903
904                         b->last_connect_error = -r;
905                 }
906
907                 r = bus_parse_next_address(b);
908                 if (r < 0)
909                         return r;
910                 if (r == 0)
911                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
912         }
913 }
914
915 int bus_next_address(sd_bus *b) {
916         assert(b);
917
918         bus_reset_parsed_address(b);
919         return bus_start_address(b);
920 }
921
922 static int bus_start_fd(sd_bus *b) {
923         struct stat st;
924         int r;
925
926         assert(b);
927         assert(b->input_fd >= 0);
928         assert(b->output_fd >= 0);
929
930         r = fd_nonblock(b->input_fd, true);
931         if (r < 0)
932                 return r;
933
934         r = fd_cloexec(b->input_fd, true);
935         if (r < 0)
936                 return r;
937
938         if (b->input_fd != b->output_fd) {
939                 r = fd_nonblock(b->output_fd, true);
940                 if (r < 0)
941                         return r;
942
943                 r = fd_cloexec(b->output_fd, true);
944                 if (r < 0)
945                         return r;
946         }
947
948         if (fstat(b->input_fd, &st) < 0)
949                 return -errno;
950
951         if (S_ISCHR(b->input_fd))
952                 return bus_kernel_take_fd(b);
953         else
954                 return bus_socket_take_fd(b);
955 }
956
957 _public_ int sd_bus_start(sd_bus *bus) {
958         int r;
959
960         assert_return(bus, -EINVAL);
961         assert_return(bus->state == BUS_UNSET, -EPERM);
962         assert_return(!bus_pid_changed(bus), -ECHILD);
963
964         bus->state = BUS_OPENING;
965
966         if (bus->is_server && bus->bus_client)
967                 return -EINVAL;
968
969         if (bus->input_fd >= 0)
970                 r = bus_start_fd(bus);
971         else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
972                 r = bus_start_address(bus);
973         else
974                 return -EINVAL;
975
976         if (r < 0)
977                 return r;
978
979         return bus_send_hello(bus);
980 }
981
982 _public_ int sd_bus_open_system(sd_bus **ret) {
983         const char *e;
984         sd_bus *b;
985         int r;
986
987         assert_return(ret, -EINVAL);
988
989         r = sd_bus_new(&b);
990         if (r < 0)
991                 return r;
992
993         e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
994         if (e)
995                 r = sd_bus_set_address(b, e);
996         else
997 #ifdef ENABLE_KDBUS
998                 r = sd_bus_set_address(b, "kernel:path=/dev/kdbus/0-system/bus;unix:path=/run/dbus/system_bus_socket");
999 #else
1000                 r = sd_bus_set_address(b, "unix:path=/run/dbus/system_bus_socket");
1001 #endif
1002
1003         if (r < 0)
1004                 goto fail;
1005
1006         b->bus_client = true;
1007
1008         r = sd_bus_start(b);
1009         if (r < 0)
1010                 goto fail;
1011
1012         *ret = b;
1013         return 0;
1014
1015 fail:
1016         bus_free(b);
1017         return r;
1018 }
1019
1020 _public_ int sd_bus_open_user(sd_bus **ret) {
1021         const char *e;
1022         sd_bus *b;
1023         int r;
1024
1025         assert_return(ret, -EINVAL);
1026
1027         r = sd_bus_new(&b);
1028         if (r < 0)
1029                 return r;
1030
1031         e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1032         if (e) {
1033                 r = sd_bus_set_address(b, e);
1034                 if (r < 0)
1035                         goto fail;
1036         } else {
1037                 e = secure_getenv("XDG_RUNTIME_DIR");
1038                 if (e) {
1039                         _cleanup_free_ char *ee = NULL;
1040
1041                         ee = bus_address_escape(e);
1042                         if (!ee) {
1043                                 r = -ENOMEM;
1044                                 goto fail;
1045                         }
1046
1047 #ifdef ENABLE_KDBUS
1048                         asprintf(&b->address, "kernel:path=/dev/kdbus/%lu-user/bus;unix:path=%s/bus", (unsigned long) getuid(), ee);
1049 #else
1050                         b->address = strjoin("unix:path=", ee, "/bus", NULL);
1051 #endif
1052                 } else {
1053 #ifdef ENABLE_KDBUS
1054                         asprintf(&b->address, "kernel:path=/dev/kdbus/%lu-user/bus", (unsigned long) getuid());
1055 #else
1056                         return -ECONNREFUSED;
1057 #endif
1058                 }
1059
1060                 if (!b->address) {
1061                         r = -ENOMEM;
1062                         goto fail;
1063                 }
1064         }
1065
1066         b->bus_client = true;
1067
1068         r = sd_bus_start(b);
1069         if (r < 0)
1070                 goto fail;
1071
1072         *ret = b;
1073         return 0;
1074
1075 fail:
1076         bus_free(b);
1077         return r;
1078 }
1079
1080 _public_ int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1081         _cleanup_free_ char *e = NULL;
1082         char *p = NULL;
1083         sd_bus *bus;
1084         int r;
1085
1086         assert_return(host, -EINVAL);
1087         assert_return(ret, -EINVAL);
1088
1089         e = bus_address_escape(host);
1090         if (!e)
1091                 return -ENOMEM;
1092
1093         p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1094         if (!p)
1095                 return -ENOMEM;
1096
1097         r = sd_bus_new(&bus);
1098         if (r < 0) {
1099                 free(p);
1100                 return r;
1101         }
1102
1103         bus->address = p;
1104         bus->bus_client = true;
1105
1106         r = sd_bus_start(bus);
1107         if (r < 0) {
1108                 bus_free(bus);
1109                 return r;
1110         }
1111
1112         *ret = bus;
1113         return 0;
1114 }
1115
1116 _public_ int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1117         _cleanup_free_ char *e = NULL;
1118         sd_bus *bus;
1119         char *p;
1120         int r;
1121
1122         assert_return(machine, -EINVAL);
1123         assert_return(ret, -EINVAL);
1124
1125         e = bus_address_escape(machine);
1126         if (!e)
1127                 return -ENOMEM;
1128
1129         p = strjoin("x-container:machine=", e, NULL);
1130         if (!p)
1131                 return -ENOMEM;
1132
1133         r = sd_bus_new(&bus);
1134         if (r < 0) {
1135                 free(p);
1136                 return r;
1137         }
1138
1139         bus->address = p;
1140         bus->bus_client = true;
1141
1142         r = sd_bus_start(bus);
1143         if (r < 0) {
1144                 bus_free(bus);
1145                 return r;
1146         }
1147
1148         *ret = bus;
1149         return 0;
1150 }
1151
1152 _public_ void sd_bus_close(sd_bus *bus) {
1153
1154         if (!bus)
1155                 return;
1156         if (bus->state == BUS_CLOSED)
1157                 return;
1158         if (bus_pid_changed(bus))
1159                 return;
1160
1161         bus->state = BUS_CLOSED;
1162
1163         sd_bus_detach_event(bus);
1164
1165         /* Drop all queued messages so that they drop references to
1166          * the bus object and the bus may be freed */
1167         bus_reset_queues(bus);
1168
1169         if (!bus->is_kernel)
1170                 bus_close_fds(bus);
1171
1172         /* We'll leave the fd open in case this is a kernel bus, since
1173          * there might still be memblocks around that reference this
1174          * bus, and they might need to invoke the * KDBUS_CMD_FREE
1175          * ioctl on the fd when they are freed. */
1176 }
1177
1178 static void bus_enter_closing(sd_bus *bus) {
1179         assert(bus);
1180
1181         if (bus->state != BUS_OPENING &&
1182             bus->state != BUS_AUTHENTICATING &&
1183             bus->state != BUS_HELLO &&
1184             bus->state != BUS_RUNNING)
1185                 return;
1186
1187         bus->state = BUS_CLOSING;
1188 }
1189
1190 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1191         assert_return(bus, NULL);
1192
1193         assert_se(REFCNT_INC(bus->n_ref) >= 2);
1194
1195         return bus;
1196 }
1197
1198 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1199         assert_return(bus, NULL);
1200
1201         if (REFCNT_DEC(bus->n_ref) <= 0)
1202                 bus_free(bus);
1203
1204         return NULL;
1205 }
1206
1207 _public_ int sd_bus_is_open(sd_bus *bus) {
1208
1209         assert_return(bus, -EINVAL);
1210         assert_return(!bus_pid_changed(bus), -ECHILD);
1211
1212         return BUS_IS_OPEN(bus->state);
1213 }
1214
1215 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1216         int r;
1217
1218         assert_return(bus, -EINVAL);
1219         assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1220         assert_return(!bus_pid_changed(bus), -ECHILD);
1221
1222         if (type == SD_BUS_TYPE_UNIX_FD) {
1223                 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1224                         return 0;
1225
1226                 r = bus_ensure_running(bus);
1227                 if (r < 0)
1228                         return r;
1229
1230                 return bus->can_fds;
1231         }
1232
1233         return bus_type_is_valid(type);
1234 }
1235
1236 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1237         int r;
1238
1239         assert_return(bus, -EINVAL);
1240         assert_return(server_id, -EINVAL);
1241         assert_return(!bus_pid_changed(bus), -ECHILD);
1242
1243         r = bus_ensure_running(bus);
1244         if (r < 0)
1245                 return r;
1246
1247         *server_id = bus->server_id;
1248         return 0;
1249 }
1250
1251 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1252         assert(b);
1253         assert(m);
1254
1255         if (m->header->version > b->message_version)
1256                 return -EPERM;
1257
1258         if (m->sealed) {
1259                 /* If we copy the same message to multiple
1260                  * destinations, avoid using the same serial
1261                  * numbers. */
1262                 b->serial = MAX(b->serial, BUS_MESSAGE_SERIAL(m));
1263                 return 0;
1264         }
1265
1266         return bus_message_seal(m, ++b->serial);
1267 }
1268
1269 int bus_seal_synthetic_message(sd_bus *b, sd_bus_message *m) {
1270         assert(b);
1271         assert(m);
1272
1273         if (m->header->version > b->message_version)
1274                 return -EPERM;
1275
1276         /* The bus specification says the serial number cannot be 0,
1277          * hence let's fill something in for synthetic messages. Since
1278          * synthetic messages might have a fake sender and we don't
1279          * want to interfere with the real sender's serial numbers we
1280          * pick a fixed, artifical one. We use (uint32_t) -1 rather
1281          * than (uint64_t) -1 since dbus1 only had 32bit identifiers,
1282          * even though kdbus can do 64bit. */
1283
1284         return bus_message_seal(m, 0xFFFFFFFFULL);
1285 }
1286
1287 static int bus_write_message(sd_bus *bus, sd_bus_message *message, size_t *idx) {
1288         assert(bus);
1289         assert(message);
1290
1291         if (bus->is_kernel)
1292                 return bus_kernel_write_message(bus, message);
1293         else
1294                 return bus_socket_write_message(bus, message, idx);
1295 }
1296
1297 static int dispatch_wqueue(sd_bus *bus) {
1298         int r, ret = 0;
1299
1300         assert(bus);
1301         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1302
1303         while (bus->wqueue_size > 0) {
1304
1305                 r = bus_write_message(bus, bus->wqueue[0], &bus->windex);
1306                 if (r < 0)
1307                         return r;
1308                 else if (r == 0)
1309                         /* Didn't do anything this time */
1310                         return ret;
1311                 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1312                         /* Fully written. Let's drop the entry from
1313                          * the queue.
1314                          *
1315                          * This isn't particularly optimized, but
1316                          * well, this is supposed to be our worst-case
1317                          * buffer only, and the socket buffer is
1318                          * supposed to be our primary buffer, and if
1319                          * it got full, then all bets are off
1320                          * anyway. */
1321
1322                         sd_bus_message_unref(bus->wqueue[0]);
1323                         bus->wqueue_size --;
1324                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1325                         bus->windex = 0;
1326
1327                         ret = 1;
1328                 }
1329         }
1330
1331         return ret;
1332 }
1333
1334 static int bus_read_message(sd_bus *bus) {
1335         assert(bus);
1336
1337         if (bus->is_kernel)
1338                 return bus_kernel_read_message(bus);
1339         else
1340                 return bus_socket_read_message(bus);
1341 }
1342
1343 int bus_rqueue_make_room(sd_bus *bus) {
1344         sd_bus_message **q;
1345         unsigned x;
1346
1347         x = bus->rqueue_size + 1;
1348
1349         if (bus->rqueue_allocated >= x)
1350                 return 0;
1351
1352         if (x > BUS_RQUEUE_MAX)
1353                 return -ENOBUFS;
1354
1355         q = realloc(bus->rqueue, x * sizeof(sd_bus_message*));
1356         if (!q)
1357                 return -ENOMEM;
1358
1359         bus->rqueue = q;
1360         bus->rqueue_allocated = x;
1361
1362         return 0;
1363 }
1364
1365 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1366         int r, ret = 0;
1367
1368         assert(bus);
1369         assert(m);
1370         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1371
1372         for (;;) {
1373                 if (bus->rqueue_size > 0) {
1374                         /* Dispatch a queued message */
1375
1376                         *m = bus->rqueue[0];
1377                         bus->rqueue_size --;
1378                         memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1379                         return 1;
1380                 }
1381
1382                 /* Try to read a new message */
1383                 r = bus_read_message(bus);
1384                 if (r < 0)
1385                         return r;
1386                 if (r == 0)
1387                         return ret;
1388
1389                 ret = 1;
1390         }
1391 }
1392
1393 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1394         int r;
1395
1396         assert_return(bus, -EINVAL);
1397         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1398         assert_return(m, -EINVAL);
1399         assert_return(!bus_pid_changed(bus), -ECHILD);
1400
1401         if (m->n_fds > 0) {
1402                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1403                 if (r < 0)
1404                         return r;
1405                 if (r == 0)
1406                         return -ENOTSUP;
1407         }
1408
1409         /* If the serial number isn't kept, then we know that no reply
1410          * is expected */
1411         if (!serial && !m->sealed)
1412                 m->header->flags |= BUS_MESSAGE_NO_REPLY_EXPECTED;
1413
1414         r = bus_seal_message(bus, m);
1415         if (r < 0)
1416                 return r;
1417
1418         /* If this is a reply and no reply was requested, then let's
1419          * suppress this, if we can */
1420         if (m->dont_send && !serial)
1421                 return 1;
1422
1423         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1424                 size_t idx = 0;
1425
1426                 r = bus_write_message(bus, m, &idx);
1427                 if (r < 0) {
1428                         if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1429                                 bus_enter_closing(bus);
1430
1431                         return r;
1432                 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m))  {
1433                         /* Wasn't fully written. So let's remember how
1434                          * much was written. Note that the first entry
1435                          * of the wqueue array is always allocated so
1436                          * that we always can remember how much was
1437                          * written. */
1438                         bus->wqueue[0] = sd_bus_message_ref(m);
1439                         bus->wqueue_size = 1;
1440                         bus->windex = idx;
1441                 }
1442         } else {
1443                 sd_bus_message **q;
1444
1445                 /* Just append it to the queue. */
1446
1447                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1448                         return -ENOBUFS;
1449
1450                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1451                 if (!q)
1452                         return -ENOMEM;
1453
1454                 bus->wqueue = q;
1455                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1456         }
1457
1458         if (serial)
1459                 *serial = BUS_MESSAGE_SERIAL(m);
1460
1461         return 1;
1462 }
1463
1464 _public_ int sd_bus_send_to(sd_bus *bus, sd_bus_message *m, const char *destination, uint64_t *serial) {
1465         int r;
1466
1467         assert_return(bus, -EINVAL);
1468         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1469         assert_return(m, -EINVAL);
1470         assert_return(!bus_pid_changed(bus), -ECHILD);
1471
1472         if (!streq_ptr(m->destination, destination)) {
1473
1474                 if (!destination)
1475                         return -EEXIST;
1476
1477                 r = sd_bus_message_set_destination(m, destination);
1478                 if (r < 0)
1479                         return r;
1480         }
1481
1482         return sd_bus_send(bus, m, serial);
1483 }
1484
1485 static usec_t calc_elapse(uint64_t usec) {
1486         if (usec == (uint64_t) -1)
1487                 return 0;
1488
1489         if (usec == 0)
1490                 usec = BUS_DEFAULT_TIMEOUT;
1491
1492         return now(CLOCK_MONOTONIC) + usec;
1493 }
1494
1495 static int timeout_compare(const void *a, const void *b) {
1496         const struct reply_callback *x = a, *y = b;
1497
1498         if (x->timeout != 0 && y->timeout == 0)
1499                 return -1;
1500
1501         if (x->timeout == 0 && y->timeout != 0)
1502                 return 1;
1503
1504         if (x->timeout < y->timeout)
1505                 return -1;
1506
1507         if (x->timeout > y->timeout)
1508                 return 1;
1509
1510         return 0;
1511 }
1512
1513 _public_ int sd_bus_call_async(
1514                 sd_bus *bus,
1515                 sd_bus_message *m,
1516                 sd_bus_message_handler_t callback,
1517                 void *userdata,
1518                 uint64_t usec,
1519                 uint64_t *serial) {
1520
1521         struct reply_callback *c;
1522         int r;
1523
1524         assert_return(bus, -EINVAL);
1525         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1526         assert_return(m, -EINVAL);
1527         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1528         assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1529         assert_return(callback, -EINVAL);
1530         assert_return(!bus_pid_changed(bus), -ECHILD);
1531
1532         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1533         if (r < 0)
1534                 return r;
1535
1536         if (usec != (uint64_t) -1) {
1537                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1538                 if (r < 0)
1539                         return r;
1540         }
1541
1542         r = bus_seal_message(bus, m);
1543         if (r < 0)
1544                 return r;
1545
1546         c = new0(struct reply_callback, 1);
1547         if (!c)
1548                 return -ENOMEM;
1549
1550         c->callback = callback;
1551         c->userdata = userdata;
1552         c->serial = BUS_MESSAGE_SERIAL(m);
1553         c->timeout = calc_elapse(usec);
1554
1555         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1556         if (r < 0) {
1557                 free(c);
1558                 return r;
1559         }
1560
1561         if (c->timeout != 0) {
1562                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1563                 if (r < 0) {
1564                         c->timeout = 0;
1565                         sd_bus_call_async_cancel(bus, c->serial);
1566                         return r;
1567                 }
1568         }
1569
1570         r = sd_bus_send(bus, m, serial);
1571         if (r < 0) {
1572                 sd_bus_call_async_cancel(bus, c->serial);
1573                 return r;
1574         }
1575
1576         return r;
1577 }
1578
1579 _public_ int sd_bus_call_async_cancel(sd_bus *bus, uint64_t serial) {
1580         struct reply_callback *c;
1581
1582         assert_return(bus, -EINVAL);
1583         assert_return(serial != 0, -EINVAL);
1584         assert_return(!bus_pid_changed(bus), -ECHILD);
1585
1586         c = hashmap_remove(bus->reply_callbacks, &serial);
1587         if (!c)
1588                 return 0;
1589
1590         if (c->timeout != 0)
1591                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1592
1593         free(c);
1594         return 1;
1595 }
1596
1597 int bus_ensure_running(sd_bus *bus) {
1598         int r;
1599
1600         assert(bus);
1601
1602         if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED || bus->state == BUS_CLOSING)
1603                 return -ENOTCONN;
1604         if (bus->state == BUS_RUNNING)
1605                 return 1;
1606
1607         for (;;) {
1608                 r = sd_bus_process(bus, NULL);
1609                 if (r < 0)
1610                         return r;
1611                 if (bus->state == BUS_RUNNING)
1612                         return 1;
1613                 if (r > 0)
1614                         continue;
1615
1616                 r = sd_bus_wait(bus, (uint64_t) -1);
1617                 if (r < 0)
1618                         return r;
1619         }
1620 }
1621
1622 _public_ int sd_bus_call(
1623                 sd_bus *bus,
1624                 sd_bus_message *m,
1625                 uint64_t usec,
1626                 sd_bus_error *error,
1627                 sd_bus_message **reply) {
1628
1629         usec_t timeout;
1630         uint64_t serial;
1631         unsigned i;
1632         int r;
1633
1634         assert_return(bus, -EINVAL);
1635         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1636         assert_return(m, -EINVAL);
1637         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1638         assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1639         assert_return(!bus_error_is_dirty(error), -EINVAL);
1640         assert_return(!bus_pid_changed(bus), -ECHILD);
1641
1642         r = bus_ensure_running(bus);
1643         if (r < 0)
1644                 return r;
1645
1646         i = bus->rqueue_size;
1647
1648         r = sd_bus_send(bus, m, &serial);
1649         if (r < 0)
1650                 return r;
1651
1652         timeout = calc_elapse(usec);
1653
1654         for (;;) {
1655                 usec_t left;
1656
1657                 while (i < bus->rqueue_size) {
1658                         sd_bus_message *incoming = NULL;
1659
1660                         incoming = bus->rqueue[i];
1661
1662                         if (incoming->reply_serial == serial) {
1663                                 /* Found a match! */
1664
1665                                 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1666                                 bus->rqueue_size--;
1667
1668                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1669
1670                                         if (reply)
1671                                                 *reply = incoming;
1672                                         else
1673                                                 sd_bus_message_unref(incoming);
1674
1675                                         return 1;
1676                                 } else if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR)
1677                                         r = sd_bus_error_copy(error, &incoming->error);
1678                                 else
1679                                         r = -EIO;
1680
1681                                 sd_bus_message_unref(incoming);
1682                                 return r;
1683
1684                         } else if (incoming->header->serial == serial &&
1685                                    bus->unique_name &&
1686                                    incoming->sender &&
1687                                    streq(bus->unique_name, incoming->sender)) {
1688
1689                                 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1690                                 bus->rqueue_size--;
1691
1692                                 /* Our own message? Somebody is trying
1693                                  * to send its own client a message,
1694                                  * let's not dead-lock, let's fail
1695                                  * immediately. */
1696
1697                                 sd_bus_message_unref(incoming);
1698                                 return -ELOOP;
1699                         }
1700
1701                         /* Try to read more, right-away */
1702                         i++;
1703                 }
1704
1705                 r = bus_read_message(bus);
1706                 if (r < 0) {
1707                         if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1708                                 bus_enter_closing(bus);
1709
1710                         return r;
1711                 }
1712                 if (r > 0)
1713                         continue;
1714
1715                 if (timeout > 0) {
1716                         usec_t n;
1717
1718                         n = now(CLOCK_MONOTONIC);
1719                         if (n >= timeout)
1720                                 return -ETIMEDOUT;
1721
1722                         left = timeout - n;
1723                 } else
1724                         left = (uint64_t) -1;
1725
1726                 r = bus_poll(bus, true, left);
1727                 if (r < 0)
1728                         return r;
1729
1730                 r = dispatch_wqueue(bus);
1731                 if (r < 0) {
1732                         if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
1733                                 bus_enter_closing(bus);
1734
1735                         return r;
1736                 }
1737         }
1738 }
1739
1740 _public_ int sd_bus_get_fd(sd_bus *bus) {
1741
1742         assert_return(bus, -EINVAL);
1743         assert_return(bus->input_fd == bus->output_fd, -EPERM);
1744         assert_return(!bus_pid_changed(bus), -ECHILD);
1745
1746         return bus->input_fd;
1747 }
1748
1749 _public_ int sd_bus_get_events(sd_bus *bus) {
1750         int flags = 0;
1751
1752         assert_return(bus, -EINVAL);
1753         assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1754         assert_return(!bus_pid_changed(bus), -ECHILD);
1755
1756         if (bus->state == BUS_OPENING)
1757                 flags |= POLLOUT;
1758         else if (bus->state == BUS_AUTHENTICATING) {
1759
1760                 if (bus_socket_auth_needs_write(bus))
1761                         flags |= POLLOUT;
1762
1763                 flags |= POLLIN;
1764
1765         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1766                 if (bus->rqueue_size <= 0)
1767                         flags |= POLLIN;
1768                 if (bus->wqueue_size > 0)
1769                         flags |= POLLOUT;
1770         }
1771
1772         return flags;
1773 }
1774
1775 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1776         struct reply_callback *c;
1777
1778         assert_return(bus, -EINVAL);
1779         assert_return(timeout_usec, -EINVAL);
1780         assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1781         assert_return(!bus_pid_changed(bus), -ECHILD);
1782
1783         if (bus->state == BUS_CLOSING) {
1784                 *timeout_usec = 0;
1785                 return 1;
1786         }
1787
1788         if (bus->state == BUS_AUTHENTICATING) {
1789                 *timeout_usec = bus->auth_timeout;
1790                 return 1;
1791         }
1792
1793         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1794                 *timeout_usec = (uint64_t) -1;
1795                 return 0;
1796         }
1797
1798         if (bus->rqueue_size > 0) {
1799                 *timeout_usec = 0;
1800                 return 1;
1801         }
1802
1803         c = prioq_peek(bus->reply_callbacks_prioq);
1804         if (!c) {
1805                 *timeout_usec = (uint64_t) -1;
1806                 return 0;
1807         }
1808
1809         *timeout_usec = c->timeout;
1810         return 1;
1811 }
1812
1813 static int process_timeout(sd_bus *bus) {
1814         _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1815         _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1816         struct reply_callback *c;
1817         usec_t n;
1818         int r;
1819
1820         assert(bus);
1821
1822         c = prioq_peek(bus->reply_callbacks_prioq);
1823         if (!c)
1824                 return 0;
1825
1826         n = now(CLOCK_MONOTONIC);
1827         if (c->timeout > n)
1828                 return 0;
1829
1830         r = bus_message_new_synthetic_error(
1831                         bus,
1832                         c->serial,
1833                         &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1834                         &m);
1835         if (r < 0)
1836                 return r;
1837
1838         m->sender = "org.freedesktop.DBus";
1839
1840         r = bus_seal_synthetic_message(bus, m);
1841         if (r < 0)
1842                 return r;
1843
1844         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1845         hashmap_remove(bus->reply_callbacks, &c->serial);
1846
1847         bus->current = m;
1848         bus->iteration_counter ++;
1849
1850         r = c->callback(bus, m, c->userdata, &error_buffer);
1851         r = bus_maybe_reply_error(m, r, &error_buffer);
1852         free(c);
1853
1854         bus->current = NULL;
1855
1856         return r;
1857 }
1858
1859 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1860         assert(bus);
1861         assert(m);
1862
1863         if (bus->state != BUS_HELLO)
1864                 return 0;
1865
1866         /* Let's make sure the first message on the bus is the HELLO
1867          * reply. But note that we don't actually parse the message
1868          * here (we leave that to the usual handling), we just verify
1869          * we don't let any earlier msg through. */
1870
1871         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1872             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1873                 return -EIO;
1874
1875         if (m->reply_serial != bus->hello_serial)
1876                 return -EIO;
1877
1878         return 0;
1879 }
1880
1881 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1882         _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1883         struct reply_callback *c;
1884         int r;
1885
1886         assert(bus);
1887         assert(m);
1888
1889         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1890             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1891                 return 0;
1892
1893         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1894         if (!c)
1895                 return 0;
1896
1897         if (c->timeout != 0)
1898                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1899
1900         r = sd_bus_message_rewind(m, true);
1901         if (r < 0)
1902                 return r;
1903
1904         r = c->callback(bus, m, c->userdata, &error_buffer);
1905         r = bus_maybe_reply_error(m, r, &error_buffer);
1906         free(c);
1907
1908         return r;
1909 }
1910
1911 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1912         _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1913         struct filter_callback *l;
1914         int r;
1915
1916         assert(bus);
1917         assert(m);
1918
1919         do {
1920                 bus->filter_callbacks_modified = false;
1921
1922                 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1923
1924                         if (bus->filter_callbacks_modified)
1925                                 break;
1926
1927                         /* Don't run this more than once per iteration */
1928                         if (l->last_iteration == bus->iteration_counter)
1929                                 continue;
1930
1931                         l->last_iteration = bus->iteration_counter;
1932
1933                         r = sd_bus_message_rewind(m, true);
1934                         if (r < 0)
1935                                 return r;
1936
1937                         r = l->callback(bus, m, l->userdata, &error_buffer);
1938                         r = bus_maybe_reply_error(m, r, &error_buffer);
1939                         if (r != 0)
1940                                 return r;
1941
1942                 }
1943
1944         } while (bus->filter_callbacks_modified);
1945
1946         return 0;
1947 }
1948
1949 static int process_match(sd_bus *bus, sd_bus_message *m) {
1950         int r;
1951
1952         assert(bus);
1953         assert(m);
1954
1955         do {
1956                 bus->match_callbacks_modified = false;
1957
1958                 r = bus_match_run(bus, &bus->match_callbacks, m);
1959                 if (r != 0)
1960                         return r;
1961
1962         } while (bus->match_callbacks_modified);
1963
1964         return 0;
1965 }
1966
1967 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1968         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1969         int r;
1970
1971         assert(bus);
1972         assert(m);
1973
1974         if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
1975                 return 0;
1976
1977         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1978                 return 0;
1979
1980         if (m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED)
1981                 return 1;
1982
1983         if (streq_ptr(m->member, "Ping"))
1984                 r = sd_bus_message_new_method_return(m, &reply);
1985         else if (streq_ptr(m->member, "GetMachineId")) {
1986                 sd_id128_t id;
1987                 char sid[33];
1988
1989                 r = sd_id128_get_machine(&id);
1990                 if (r < 0)
1991                         return r;
1992
1993                 r = sd_bus_message_new_method_return(m, &reply);
1994                 if (r < 0)
1995                         return r;
1996
1997                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1998         } else {
1999                 r = sd_bus_message_new_method_errorf(
2000                                 m, &reply,
2001                                 SD_BUS_ERROR_UNKNOWN_METHOD,
2002                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
2003         }
2004
2005         if (r < 0)
2006                 return r;
2007
2008         r = sd_bus_send(bus, reply, NULL);
2009         if (r < 0)
2010                 return r;
2011
2012         return 1;
2013 }
2014
2015 static int process_message(sd_bus *bus, sd_bus_message *m) {
2016         int r;
2017
2018         assert(bus);
2019         assert(m);
2020
2021         bus->current = m;
2022         bus->iteration_counter++;
2023
2024         log_debug("Got message sender=%s object=%s interface=%s member=%s",
2025                   strna(sd_bus_message_get_sender(m)),
2026                   strna(sd_bus_message_get_path(m)),
2027                   strna(sd_bus_message_get_interface(m)),
2028                   strna(sd_bus_message_get_member(m)));
2029
2030         r = process_hello(bus, m);
2031         if (r != 0)
2032                 goto finish;
2033
2034         r = process_reply(bus, m);
2035         if (r != 0)
2036                 goto finish;
2037
2038         r = process_filter(bus, m);
2039         if (r != 0)
2040                 goto finish;
2041
2042         r = process_match(bus, m);
2043         if (r != 0)
2044                 goto finish;
2045
2046         r = process_builtin(bus, m);
2047         if (r != 0)
2048                 goto finish;
2049
2050         r = bus_process_object(bus, m);
2051
2052 finish:
2053         bus->current = NULL;
2054         return r;
2055 }
2056
2057 static int process_running(sd_bus *bus, sd_bus_message **ret) {
2058         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2059         int r;
2060
2061         assert(bus);
2062         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2063
2064         r = process_timeout(bus);
2065         if (r != 0)
2066                 goto null_message;
2067
2068         r = dispatch_wqueue(bus);
2069         if (r != 0)
2070                 goto null_message;
2071
2072         r = dispatch_rqueue(bus, &m);
2073         if (r < 0)
2074                 return r;
2075         if (!m)
2076                 goto null_message;
2077
2078         r = process_message(bus, m);
2079         if (r != 0)
2080                 goto null_message;
2081
2082         if (ret) {
2083                 r = sd_bus_message_rewind(m, true);
2084                 if (r < 0)
2085                         return r;
2086
2087                 *ret = m;
2088                 m = NULL;
2089                 return 1;
2090         }
2091
2092         if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2093
2094                 r = sd_bus_reply_method_errorf(
2095                                 m,
2096                                 SD_BUS_ERROR_UNKNOWN_OBJECT,
2097                                 "Unknown object '%s'.", m->path);
2098                 if (r < 0)
2099                         return r;
2100         }
2101
2102         return 1;
2103
2104 null_message:
2105         if (r >= 0 && ret)
2106                 *ret = NULL;
2107
2108         return r;
2109 }
2110
2111 static int process_closing(sd_bus *bus, sd_bus_message **ret) {
2112         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2113         struct reply_callback *c;
2114         int r;
2115
2116         assert(bus);
2117         assert(bus->state == BUS_CLOSING);
2118
2119         c = hashmap_first(bus->reply_callbacks);
2120         if (c) {
2121                 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2122
2123                 /* First, fail all outstanding method calls */
2124                 r = bus_message_new_synthetic_error(
2125                                 bus,
2126                                 c->serial,
2127                                 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Connection terminated"),
2128                                 &m);
2129                 if (r < 0)
2130                         return r;
2131
2132                 r = bus_seal_synthetic_message(bus, m);
2133                 if (r < 0)
2134                         return r;
2135
2136                 if (c->timeout != 0)
2137                         prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
2138
2139                 hashmap_remove(bus->reply_callbacks, &c->serial);
2140
2141                 bus->current = m;
2142                 bus->iteration_counter++;
2143
2144                 r = c->callback(bus, m, c->userdata, &error_buffer);
2145                 r = bus_maybe_reply_error(m, r, &error_buffer);
2146                 free(c);
2147
2148                 goto finish;
2149         }
2150
2151         /* Then, synthesize a Disconnected message */
2152         r = sd_bus_message_new_signal(
2153                         bus,
2154                         "/org/freedesktop/DBus/Local",
2155                         "org.freedesktop.DBus.Local",
2156                         "Disconnected",
2157                         &m);
2158         if (r < 0)
2159                 return r;
2160
2161         m->sender = "org.freedesktop.DBus.Local";
2162
2163         r = bus_seal_synthetic_message(bus, m);
2164         if (r < 0)
2165                 return r;
2166
2167         sd_bus_close(bus);
2168
2169         bus->current = m;
2170         bus->iteration_counter++;
2171
2172         r = process_filter(bus, m);
2173         if (r != 0)
2174                 goto finish;
2175
2176         r = process_match(bus, m);
2177         if (r != 0)
2178                 goto finish;
2179
2180         if (ret) {
2181                 *ret = m;
2182                 m = NULL;
2183         }
2184
2185         r = 1;
2186
2187 finish:
2188         bus->current = NULL;
2189         return r;
2190 }
2191
2192 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2193         BUS_DONT_DESTROY(bus);
2194         int r;
2195
2196         /* Returns 0 when we didn't do anything. This should cause the
2197          * caller to invoke sd_bus_wait() before returning the next
2198          * time. Returns > 0 when we did something, which possibly
2199          * means *ret is filled in with an unprocessed message. */
2200
2201         assert_return(bus, -EINVAL);
2202         assert_return(!bus_pid_changed(bus), -ECHILD);
2203
2204         /* We don't allow recursively invoking sd_bus_process(). */
2205         assert_return(!bus->current, -EBUSY);
2206
2207         switch (bus->state) {
2208
2209         case BUS_UNSET:
2210         case BUS_CLOSED:
2211                 return -ENOTCONN;
2212
2213         case BUS_OPENING:
2214                 r = bus_socket_process_opening(bus);
2215                 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2216                         bus_enter_closing(bus);
2217                         r = 1;
2218                 } else if (r < 0)
2219                         return r;
2220                 if (ret)
2221                         *ret = NULL;
2222                 return r;
2223
2224         case BUS_AUTHENTICATING:
2225                 r = bus_socket_process_authenticating(bus);
2226                 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2227                         bus_enter_closing(bus);
2228                         r = 1;
2229                 } else if (r < 0)
2230                         return r;
2231
2232                 if (ret)
2233                         *ret = NULL;
2234
2235                 return r;
2236
2237         case BUS_RUNNING:
2238         case BUS_HELLO:
2239                 r = process_running(bus, ret);
2240                 if (r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2241                         bus_enter_closing(bus);
2242                         r = 1;
2243
2244                         if (ret)
2245                                 *ret = NULL;
2246                 }
2247
2248                 return r;
2249
2250         case BUS_CLOSING:
2251                 return process_closing(bus, ret);
2252         }
2253
2254         assert_not_reached("Unknown state");
2255 }
2256
2257 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2258         struct pollfd p[2] = {};
2259         int r, e, n;
2260         struct timespec ts;
2261         usec_t m = (usec_t) -1;
2262
2263         assert(bus);
2264
2265         if (bus->state == BUS_CLOSING)
2266                 return 1;
2267
2268         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2269
2270         e = sd_bus_get_events(bus);
2271         if (e < 0)
2272                 return e;
2273
2274         if (need_more)
2275                 /* The caller really needs some more data, he doesn't
2276                  * care about what's already read, or any timeouts
2277                  * except its own.*/
2278                 e |= POLLIN;
2279         else {
2280                 usec_t until;
2281                 /* The caller wants to process if there's something to
2282                  * process, but doesn't care otherwise */
2283
2284                 r = sd_bus_get_timeout(bus, &until);
2285                 if (r < 0)
2286                         return r;
2287                 if (r > 0) {
2288                         usec_t nw;
2289                         nw = now(CLOCK_MONOTONIC);
2290                         m = until > nw ? until - nw : 0;
2291                 }
2292         }
2293
2294         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2295                 m = timeout_usec;
2296
2297         p[0].fd = bus->input_fd;
2298         if (bus->output_fd == bus->input_fd) {
2299                 p[0].events = e;
2300                 n = 1;
2301         } else {
2302                 p[0].events = e & POLLIN;
2303                 p[1].fd = bus->output_fd;
2304                 p[1].events = e & POLLOUT;
2305                 n = 2;
2306         }
2307
2308         r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2309         if (r < 0)
2310                 return -errno;
2311
2312         return r > 0 ? 1 : 0;
2313 }
2314
2315 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2316
2317         assert_return(bus, -EINVAL);
2318         assert_return(!bus_pid_changed(bus), -ECHILD);
2319
2320         if (bus->state == BUS_CLOSING)
2321                 return 0;
2322
2323         assert_return(BUS_IS_OPEN(bus->state) , -ENOTCONN);
2324
2325         if (bus->rqueue_size > 0)
2326                 return 0;
2327
2328         return bus_poll(bus, false, timeout_usec);
2329 }
2330
2331 _public_ int sd_bus_flush(sd_bus *bus) {
2332         int r;
2333
2334         assert_return(bus, -EINVAL);
2335         assert_return(!bus_pid_changed(bus), -ECHILD);
2336
2337         if (bus->state == BUS_CLOSING)
2338                 return 0;
2339
2340         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2341
2342         r = bus_ensure_running(bus);
2343         if (r < 0)
2344                 return r;
2345
2346         if (bus->wqueue_size <= 0)
2347                 return 0;
2348
2349         for (;;) {
2350                 r = dispatch_wqueue(bus);
2351                 if (r < 0) {
2352                         if (r == -EPIPE || r == -ENOTCONN || r == -ESHUTDOWN)
2353                                 bus_enter_closing(bus);
2354
2355                         return r;
2356                 }
2357
2358                 if (bus->wqueue_size <= 0)
2359                         return 0;
2360
2361                 r = bus_poll(bus, false, (uint64_t) -1);
2362                 if (r < 0)
2363                         return r;
2364         }
2365 }
2366
2367 _public_ int sd_bus_add_filter(sd_bus *bus,
2368                                sd_bus_message_handler_t callback,
2369                                void *userdata) {
2370
2371         struct filter_callback *f;
2372
2373         assert_return(bus, -EINVAL);
2374         assert_return(callback, -EINVAL);
2375         assert_return(!bus_pid_changed(bus), -ECHILD);
2376
2377         f = new0(struct filter_callback, 1);
2378         if (!f)
2379                 return -ENOMEM;
2380         f->callback = callback;
2381         f->userdata = userdata;
2382
2383         bus->filter_callbacks_modified = true;
2384         LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2385         return 0;
2386 }
2387
2388 _public_ int sd_bus_remove_filter(sd_bus *bus,
2389                                   sd_bus_message_handler_t callback,
2390                                   void *userdata) {
2391
2392         struct filter_callback *f;
2393
2394         assert_return(bus, -EINVAL);
2395         assert_return(callback, -EINVAL);
2396         assert_return(!bus_pid_changed(bus), -ECHILD);
2397
2398         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2399                 if (f->callback == callback && f->userdata == userdata) {
2400                         bus->filter_callbacks_modified = true;
2401                         LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2402                         free(f);
2403                         return 1;
2404                 }
2405         }
2406
2407         return 0;
2408 }
2409
2410 _public_ int sd_bus_add_match(sd_bus *bus,
2411                               const char *match,
2412                               sd_bus_message_handler_t callback,
2413                               void *userdata) {
2414
2415         struct bus_match_component *components = NULL;
2416         unsigned n_components = 0;
2417         uint64_t cookie = 0;
2418         int r = 0;
2419
2420         assert_return(bus, -EINVAL);
2421         assert_return(match, -EINVAL);
2422         assert_return(!bus_pid_changed(bus), -ECHILD);
2423
2424         r = bus_match_parse(match, &components, &n_components);
2425         if (r < 0)
2426                 goto finish;
2427
2428         if (bus->bus_client) {
2429                 cookie = ++bus->match_cookie;
2430
2431                 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2432                 if (r < 0)
2433                         goto finish;
2434         }
2435
2436         bus->match_callbacks_modified = true;
2437         r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2438         if (r < 0) {
2439                 if (bus->bus_client)
2440                         bus_remove_match_internal(bus, match, cookie);
2441         }
2442
2443 finish:
2444         bus_match_parse_free(components, n_components);
2445         return r;
2446 }
2447
2448 _public_ int sd_bus_remove_match(sd_bus *bus,
2449                                  const char *match,
2450                                  sd_bus_message_handler_t callback,
2451                                  void *userdata) {
2452
2453         struct bus_match_component *components = NULL;
2454         unsigned n_components = 0;
2455         int r = 0, q = 0;
2456         uint64_t cookie = 0;
2457
2458         assert_return(bus, -EINVAL);
2459         assert_return(match, -EINVAL);
2460         assert_return(!bus_pid_changed(bus), -ECHILD);
2461
2462         r = bus_match_parse(match, &components, &n_components);
2463         if (r < 0)
2464                 return r;
2465
2466         bus->match_callbacks_modified = true;
2467         r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2468
2469         if (bus->bus_client)
2470                 q = bus_remove_match_internal(bus, match, cookie);
2471
2472         bus_match_parse_free(components, n_components);
2473
2474         return r < 0 ? r : q;
2475 }
2476
2477 bool bus_pid_changed(sd_bus *bus) {
2478         assert(bus);
2479
2480         /* We don't support people creating a bus connection and
2481          * keeping it around over a fork(). Let's complain. */
2482
2483         return bus->original_pid != getpid();
2484 }
2485
2486 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2487         sd_bus *bus = userdata;
2488         int r;
2489
2490         assert(bus);
2491
2492         r = sd_bus_process(bus, NULL);
2493         if (r < 0)
2494                 return r;
2495
2496         return 1;
2497 }
2498
2499 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2500         sd_bus *bus = userdata;
2501         int r;
2502
2503         assert(bus);
2504
2505         r = sd_bus_process(bus, NULL);
2506         if (r < 0)
2507                 return r;
2508
2509         return 1;
2510 }
2511
2512 static int prepare_callback(sd_event_source *s, void *userdata) {
2513         sd_bus *bus = userdata;
2514         int r, e;
2515         usec_t until;
2516
2517         assert(s);
2518         assert(bus);
2519
2520         e = sd_bus_get_events(bus);
2521         if (e < 0)
2522                 return e;
2523
2524         if (bus->output_fd != bus->input_fd) {
2525
2526                 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2527                 if (r < 0)
2528                         return r;
2529
2530                 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2531                 if (r < 0)
2532                         return r;
2533         } else {
2534                 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2535                 if (r < 0)
2536                         return r;
2537         }
2538
2539         r = sd_bus_get_timeout(bus, &until);
2540         if (r < 0)
2541                 return r;
2542         if (r > 0) {
2543                 int j;
2544
2545                 j = sd_event_source_set_time(bus->time_event_source, until);
2546                 if (j < 0)
2547                         return j;
2548         }
2549
2550         r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2551         if (r < 0)
2552                 return r;
2553
2554         return 1;
2555 }
2556
2557 static int quit_callback(sd_event_source *event, void *userdata) {
2558         sd_bus *bus = userdata;
2559
2560         assert(event);
2561
2562         sd_bus_flush(bus);
2563
2564         return 1;
2565 }
2566
2567 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2568         int r;
2569
2570         assert_return(bus, -EINVAL);
2571         assert_return(!bus->event, -EBUSY);
2572
2573         assert(!bus->input_io_event_source);
2574         assert(!bus->output_io_event_source);
2575         assert(!bus->time_event_source);
2576
2577         if (event)
2578                 bus->event = sd_event_ref(event);
2579         else  {
2580                 r = sd_event_default(&bus->event);
2581                 if (r < 0)
2582                         return r;
2583         }
2584
2585         r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2586         if (r < 0)
2587                 goto fail;
2588
2589         r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2590         if (r < 0)
2591                 goto fail;
2592
2593         if (bus->output_fd != bus->input_fd) {
2594                 r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2595                 if (r < 0)
2596                         goto fail;
2597
2598                 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2599                 if (r < 0)
2600                         goto fail;
2601         }
2602
2603         r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2604         if (r < 0)
2605                 goto fail;
2606
2607         r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
2608         if (r < 0)
2609                 goto fail;
2610
2611         r = sd_event_source_set_priority(bus->time_event_source, priority);
2612         if (r < 0)
2613                 goto fail;
2614
2615         r = sd_event_add_quit(bus->event, quit_callback, bus, &bus->quit_event_source);
2616         if (r < 0)
2617                 goto fail;
2618
2619         return 0;
2620
2621 fail:
2622         sd_bus_detach_event(bus);
2623         return r;
2624 }
2625
2626 _public_ int sd_bus_detach_event(sd_bus *bus) {
2627         assert_return(bus, -EINVAL);
2628         assert_return(bus->event, -ENXIO);
2629
2630         if (bus->input_io_event_source) {
2631                 sd_event_source_set_enabled(bus->input_io_event_source, SD_EVENT_OFF);
2632                 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2633         }
2634
2635         if (bus->output_io_event_source) {
2636                 sd_event_source_set_enabled(bus->output_io_event_source, SD_EVENT_OFF);
2637                 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2638         }
2639
2640         if (bus->time_event_source) {
2641                 sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);
2642                 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2643         }
2644
2645         if (bus->quit_event_source) {
2646                 sd_event_source_set_enabled(bus->quit_event_source, SD_EVENT_OFF);
2647                 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2648         }
2649
2650         if (bus->event)
2651                 bus->event = sd_event_unref(bus->event);
2652
2653         return 0;
2654 }
2655
2656 _public_ sd_event* sd_bus_get_event(sd_bus *bus) {
2657         assert_return(bus, NULL);
2658
2659         return bus->event;
2660 }
2661
2662 _public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2663         assert_return(bus, NULL);
2664
2665         return bus->current;
2666 }
2667
2668 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
2669         sd_bus *b = NULL;
2670         int r;
2671
2672         assert(bus_open);
2673         assert(default_bus);
2674
2675         if (!ret)
2676                 return !!*default_bus;
2677
2678         if (*default_bus) {
2679                 *ret = sd_bus_ref(*default_bus);
2680                 return 0;
2681         }
2682
2683         r = bus_open(&b);
2684         if (r < 0)
2685                 return r;
2686
2687         b->default_bus_ptr = default_bus;
2688         b->tid = gettid();
2689         *default_bus = b;
2690
2691         *ret = b;
2692         return 1;
2693 }
2694
2695 _public_ int sd_bus_default_system(sd_bus **ret) {
2696         static __thread sd_bus *default_system_bus = NULL;
2697
2698         return bus_default(sd_bus_open_system, &default_system_bus, ret);
2699 }
2700
2701 _public_ int sd_bus_default_user(sd_bus **ret) {
2702         static __thread sd_bus *default_user_bus = NULL;
2703
2704         return bus_default(sd_bus_open_user, &default_user_bus, ret);
2705 }
2706
2707 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
2708         assert_return(b, -EINVAL);
2709         assert_return(tid, -EINVAL);
2710         assert_return(!bus_pid_changed(b), -ECHILD);
2711
2712         if (b->tid != 0) {
2713                 *tid = b->tid;
2714                 return 0;
2715         }
2716
2717         if (b->event)
2718                 return sd_event_get_tid(b->event, tid);
2719
2720         return -ENXIO;
2721 }
2722
2723 _public_ char *sd_bus_label_escape(const char *s) {
2724         char *r, *t;
2725         const char *f;
2726
2727         assert_return(s, NULL);
2728
2729         /* Escapes all chars that D-Bus' object path cannot deal
2730          * with. Can be reversed with bus_path_unescape(). We special
2731          * case the empty string. */
2732
2733         if (*s == 0)
2734                 return strdup("_");
2735
2736         r = new(char, strlen(s)*3 + 1);
2737         if (!r)
2738                 return NULL;
2739
2740         for (f = s, t = r; *f; f++) {
2741
2742                 /* Escape everything that is not a-zA-Z0-9. We also
2743                  * escape 0-9 if it's the first character */
2744
2745                 if (!(*f >= 'A' && *f <= 'Z') &&
2746                     !(*f >= 'a' && *f <= 'z') &&
2747                     !(f > s && *f >= '0' && *f <= '9')) {
2748                         *(t++) = '_';
2749                         *(t++) = hexchar(*f >> 4);
2750                         *(t++) = hexchar(*f);
2751                 } else
2752                         *(t++) = *f;
2753         }
2754
2755         *t = 0;
2756
2757         return r;
2758 }
2759
2760 _public_ char *sd_bus_label_unescape(const char *f) {
2761         char *r, *t;
2762
2763         assert_return(f, NULL);
2764
2765         /* Special case for the empty string */
2766         if (streq(f, "_"))
2767                 return strdup("");
2768
2769         r = new(char, strlen(f) + 1);
2770         if (!r)
2771                 return NULL;
2772
2773         for (t = r; *f; f++) {
2774
2775                 if (*f == '_') {
2776                         int a, b;
2777
2778                         if ((a = unhexchar(f[1])) < 0 ||
2779                             (b = unhexchar(f[2])) < 0) {
2780                                 /* Invalid escape code, let's take it literal then */
2781                                 *(t++) = '_';
2782                         } else {
2783                                 *(t++) = (char) ((a << 4) | b);
2784                                 f += 2;
2785                         }
2786                 } else
2787                         *(t++) = *f;
2788         }
2789
2790         *t = 0;
2791
2792         return r;
2793 }
2794
2795 _public_ int sd_bus_get_peer_creds(sd_bus *bus, uint64_t mask, sd_bus_creds **ret) {
2796         sd_bus_creds *c;
2797         pid_t pid = 0;
2798         int r;
2799
2800         assert_return(bus, -EINVAL);
2801         assert_return(mask <= _SD_BUS_CREDS_ALL, -ENOTSUP);
2802         assert_return(ret, -EINVAL);
2803         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2804         assert_return(!bus_pid_changed(bus), -ECHILD);
2805         assert_return(!bus->is_kernel, -ENOTSUP);
2806
2807         if (!bus->ucred_valid && !isempty(bus->label))
2808                 return -ENODATA;
2809
2810         c = bus_creds_new();
2811         if (!c)
2812                 return -ENOMEM;
2813
2814         if (bus->ucred_valid) {
2815                 pid = c->pid = bus->ucred.pid;
2816                 c->uid = bus->ucred.uid;
2817                 c->gid = bus->ucred.gid;
2818
2819                 c->mask |= (SD_BUS_CREDS_UID | SD_BUS_CREDS_PID | SD_BUS_CREDS_GID) & mask;
2820         }
2821
2822         if (!isempty(bus->label) && (mask & SD_BUS_CREDS_SELINUX_CONTEXT)) {
2823                 c->label = strdup(bus->label);
2824                 if (!c->label) {
2825                         sd_bus_creds_unref(c);
2826                         return -ENOMEM;
2827                 }
2828
2829                 c->mask |= SD_BUS_CREDS_SELINUX_CONTEXT;
2830         }
2831
2832         r = bus_creds_add_more(c, mask, pid, 0);
2833         if (r < 0)
2834                 return r;
2835
2836         *ret = c;
2837         return 0;
2838 }