chiark / gitweb /
1207d5ad2e1ea8394a2a9a7b7e4d753231bf5c0b
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29 #include <sys/mman.h>
30 #include <pthread.h>
31
32 #include "util.h"
33 #include "macro.h"
34 #include "strv.h"
35 #include "set.h"
36 #include "missing.h"
37
38 #include "sd-bus.h"
39 #include "bus-internal.h"
40 #include "bus-message.h"
41 #include "bus-type.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
48 #include "bus-util.h"
49 #include "bus-container.h"
50
51 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
52
53 static void bus_close_fds(sd_bus *b) {
54         assert(b);
55
56         if (b->input_fd >= 0)
57                 close_nointr_nofail(b->input_fd);
58
59         if (b->output_fd >= 0 && b->output_fd != b->input_fd)
60                 close_nointr_nofail(b->output_fd);
61
62         b->input_fd = b->output_fd = -1;
63 }
64
65 static void bus_node_destroy(sd_bus *b, struct node *n) {
66         struct node_callback *c;
67         struct node_vtable *v;
68         struct node_enumerator *e;
69
70         assert(b);
71
72         if (!n)
73                 return;
74
75         while (n->child)
76                 bus_node_destroy(b, n->child);
77
78         while ((c = n->callbacks)) {
79                 LIST_REMOVE(callbacks, n->callbacks, c);
80                 free(c);
81         }
82
83         while ((v = n->vtables)) {
84                 LIST_REMOVE(vtables, n->vtables, v);
85                 free(v->interface);
86                 free(v);
87         }
88
89         while ((e = n->enumerators)) {
90                 LIST_REMOVE(enumerators, n->enumerators, e);
91                 free(e);
92         }
93
94         if (n->parent)
95                 LIST_REMOVE(siblings, n->parent->child, n);
96
97         assert_se(hashmap_remove(b->nodes, n->path) == n);
98         free(n->path);
99         free(n);
100 }
101
102 static void bus_reset_queues(sd_bus *b) {
103         unsigned i;
104
105         assert(b);
106
107         for (i = 0; i < b->rqueue_size; i++)
108                 sd_bus_message_unref(b->rqueue[i]);
109         free(b->rqueue);
110
111         for (i = 0; i < b->wqueue_size; i++)
112                 sd_bus_message_unref(b->wqueue[i]);
113         free(b->wqueue);
114
115         b->rqueue = b->wqueue = NULL;
116         b->rqueue_size = b->wqueue_size = 0;
117 }
118
119 static void bus_free(sd_bus *b) {
120         struct filter_callback *f;
121         struct node *n;
122
123         assert(b);
124
125         sd_bus_detach_event(b);
126
127         bus_close_fds(b);
128
129         if (b->kdbus_buffer)
130                 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
131
132         free(b->rbuffer);
133         free(b->unique_name);
134         free(b->auth_buffer);
135         free(b->address);
136         free(b->kernel);
137         free(b->machine);
138
139         free(b->exec_path);
140         strv_free(b->exec_argv);
141
142         close_many(b->fds, b->n_fds);
143         free(b->fds);
144
145         bus_reset_queues(b);
146
147         hashmap_free_free(b->reply_callbacks);
148         prioq_free(b->reply_callbacks_prioq);
149
150         while ((f = b->filter_callbacks)) {
151                 LIST_REMOVE(callbacks, b->filter_callbacks, f);
152                 free(f);
153         }
154
155         bus_match_free(&b->match_callbacks);
156
157         hashmap_free_free(b->vtable_methods);
158         hashmap_free_free(b->vtable_properties);
159
160         while ((n = hashmap_first(b->nodes)))
161                 bus_node_destroy(b, n);
162
163         hashmap_free(b->nodes);
164
165         bus_kernel_flush_memfd(b);
166
167         assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
168
169         free(b);
170 }
171
172 _public_ int sd_bus_new(sd_bus **ret) {
173         sd_bus *r;
174
175         assert_return(ret, -EINVAL);
176
177         r = new0(sd_bus, 1);
178         if (!r)
179                 return -ENOMEM;
180
181         r->n_ref = REFCNT_INIT;
182         r->input_fd = r->output_fd = -1;
183         r->message_version = 1;
184         r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
185         r->original_pid = getpid();
186
187         assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
188
189         /* We guarantee that wqueue always has space for at least one
190          * entry */
191         r->wqueue = new(sd_bus_message*, 1);
192         if (!r->wqueue) {
193                 free(r);
194                 return -ENOMEM;
195         }
196
197         *ret = r;
198         return 0;
199 }
200
201 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
202         char *a;
203
204         assert_return(bus, -EINVAL);
205         assert_return(bus->state == BUS_UNSET, -EPERM);
206         assert_return(address, -EINVAL);
207         assert_return(!bus_pid_changed(bus), -ECHILD);
208
209         a = strdup(address);
210         if (!a)
211                 return -ENOMEM;
212
213         free(bus->address);
214         bus->address = a;
215
216         return 0;
217 }
218
219 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
220         assert_return(bus, -EINVAL);
221         assert_return(bus->state == BUS_UNSET, -EPERM);
222         assert_return(input_fd >= 0, -EINVAL);
223         assert_return(output_fd >= 0, -EINVAL);
224         assert_return(!bus_pid_changed(bus), -ECHILD);
225
226         bus->input_fd = input_fd;
227         bus->output_fd = output_fd;
228         return 0;
229 }
230
231 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
232         char *p, **a;
233
234         assert_return(bus, -EINVAL);
235         assert_return(bus->state == BUS_UNSET, -EPERM);
236         assert_return(path, -EINVAL);
237         assert_return(!strv_isempty(argv), -EINVAL);
238         assert_return(!bus_pid_changed(bus), -ECHILD);
239
240         p = strdup(path);
241         if (!p)
242                 return -ENOMEM;
243
244         a = strv_copy(argv);
245         if (!a) {
246                 free(p);
247                 return -ENOMEM;
248         }
249
250         free(bus->exec_path);
251         strv_free(bus->exec_argv);
252
253         bus->exec_path = p;
254         bus->exec_argv = a;
255
256         return 0;
257 }
258
259 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
260         assert_return(bus, -EINVAL);
261         assert_return(bus->state == BUS_UNSET, -EPERM);
262         assert_return(!bus_pid_changed(bus), -ECHILD);
263
264         bus->bus_client = !!b;
265         return 0;
266 }
267
268 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
269         assert_return(bus, -EINVAL);
270         assert_return(bus->state == BUS_UNSET, -EPERM);
271         assert_return(!bus_pid_changed(bus), -ECHILD);
272
273         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
274         return 0;
275 }
276
277 _public_ int sd_bus_negotiate_attach_timestamp(sd_bus *bus, int b) {
278         assert_return(bus, -EINVAL);
279         assert_return(bus->state == BUS_UNSET, -EPERM);
280         assert_return(!bus_pid_changed(bus), -ECHILD);
281
282         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_TIMESTAMP, b);
283         return 0;
284 }
285
286 _public_ int sd_bus_negotiate_attach_creds(sd_bus *bus, int b) {
287         assert_return(bus, -EINVAL);
288         assert_return(bus->state == BUS_UNSET, -EPERM);
289         assert_return(!bus_pid_changed(bus), -ECHILD);
290
291         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CREDS, b);
292         return 0;
293 }
294
295 _public_ int sd_bus_negotiate_attach_comm(sd_bus *bus, int b) {
296         assert_return(bus, -EINVAL);
297         assert_return(bus->state == BUS_UNSET, -EPERM);
298         assert_return(!bus_pid_changed(bus), -ECHILD);
299
300         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_COMM, b);
301         return 0;
302 }
303
304 _public_ int sd_bus_negotiate_attach_exe(sd_bus *bus, int b) {
305         assert_return(bus, -EINVAL);
306         assert_return(bus->state == BUS_UNSET, -EPERM);
307         assert_return(!bus_pid_changed(bus), -ECHILD);
308
309         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_EXE, b);
310         return 0;
311 }
312
313 _public_ int sd_bus_negotiate_attach_cmdline(sd_bus *bus, int b) {
314         assert_return(bus, -EINVAL);
315         assert_return(bus->state == BUS_UNSET, -EPERM);
316         assert_return(!bus_pid_changed(bus), -ECHILD);
317
318         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CMDLINE, b);
319         return 0;
320 }
321
322 _public_ int sd_bus_negotiate_attach_cgroup(sd_bus *bus, int b) {
323         assert_return(bus, -EINVAL);
324         assert_return(bus->state == BUS_UNSET, -EPERM);
325         assert_return(!bus_pid_changed(bus), -ECHILD);
326
327         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CGROUP, b);
328         return 0;
329 }
330
331 _public_ int sd_bus_negotiate_attach_caps(sd_bus *bus, int b) {
332         assert_return(bus, -EINVAL);
333         assert_return(bus->state == BUS_UNSET, -EPERM);
334         assert_return(!bus_pid_changed(bus), -ECHILD);
335
336         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CAPS, b);
337         return 0;
338 }
339
340 _public_ int sd_bus_negotiate_attach_selinux_context(sd_bus *bus, int b) {
341         assert_return(bus, -EINVAL);
342         assert_return(bus->state == BUS_UNSET, -EPERM);
343         assert_return(!bus_pid_changed(bus), -ECHILD);
344
345         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_SECLABEL, b);
346         return 0;
347 }
348
349 _public_ int sd_bus_negotiate_attach_audit(sd_bus *bus, int b) {
350         assert_return(bus, -EINVAL);
351         assert_return(bus->state == BUS_UNSET, -EPERM);
352         assert_return(!bus_pid_changed(bus), -ECHILD);
353
354         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_AUDIT, b);
355         return 0;
356 }
357
358 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
359         assert_return(bus, -EINVAL);
360         assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
361         assert_return(bus->state == BUS_UNSET, -EPERM);
362         assert_return(!bus_pid_changed(bus), -ECHILD);
363
364         bus->is_server = !!b;
365         bus->server_id = server_id;
366         return 0;
367 }
368
369 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
370         assert_return(bus, -EINVAL);
371         assert_return(bus->state == BUS_UNSET, -EPERM);
372         assert_return(!bus_pid_changed(bus), -ECHILD);
373
374         bus->anonymous_auth = !!b;
375         return 0;
376 }
377
378 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata, sd_bus_error *error) {
379         const char *s;
380         int r;
381
382         assert(bus);
383         assert(bus->state == BUS_HELLO);
384         assert(reply);
385
386         r = sd_bus_message_get_errno(reply);
387         if (r < 0)
388                 return r;
389         if (r > 0)
390                 return -r;
391
392         r = sd_bus_message_read(reply, "s", &s);
393         if (r < 0)
394                 return r;
395
396         if (!service_name_is_valid(s) || s[0] != ':')
397                 return -EBADMSG;
398
399         bus->unique_name = strdup(s);
400         if (!bus->unique_name)
401                 return -ENOMEM;
402
403         bus->state = BUS_RUNNING;
404
405         return 1;
406 }
407
408 static int bus_send_hello(sd_bus *bus) {
409         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
410         int r;
411
412         assert(bus);
413
414         if (!bus->bus_client || bus->is_kernel)
415                 return 0;
416
417         r = sd_bus_message_new_method_call(
418                         bus,
419                         "org.freedesktop.DBus",
420                         "/",
421                         "org.freedesktop.DBus",
422                         "Hello",
423                         &m);
424         if (r < 0)
425                 return r;
426
427         return sd_bus_call_async(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
428 }
429
430 int bus_start_running(sd_bus *bus) {
431         assert(bus);
432
433         if (bus->bus_client && !bus->is_kernel) {
434                 bus->state = BUS_HELLO;
435                 return 1;
436         }
437
438         bus->state = BUS_RUNNING;
439         return 1;
440 }
441
442 static int parse_address_key(const char **p, const char *key, char **value) {
443         size_t l, n = 0;
444         const char *a;
445         char *r = NULL;
446
447         assert(p);
448         assert(*p);
449         assert(value);
450
451         if (key) {
452                 l = strlen(key);
453                 if (strncmp(*p, key, l) != 0)
454                         return 0;
455
456                 if ((*p)[l] != '=')
457                         return 0;
458
459                 if (*value)
460                         return -EINVAL;
461
462                 a = *p + l + 1;
463         } else
464                 a = *p;
465
466         while (*a != ';' && *a != ',' && *a != 0) {
467                 char c, *t;
468
469                 if (*a == '%') {
470                         int x, y;
471
472                         x = unhexchar(a[1]);
473                         if (x < 0) {
474                                 free(r);
475                                 return x;
476                         }
477
478                         y = unhexchar(a[2]);
479                         if (y < 0) {
480                                 free(r);
481                                 return y;
482                         }
483
484                         c = (char) ((x << 4) | y);
485                         a += 3;
486                 } else {
487                         c = *a;
488                         a++;
489                 }
490
491                 t = realloc(r, n + 2);
492                 if (!t) {
493                         free(r);
494                         return -ENOMEM;
495                 }
496
497                 r = t;
498                 r[n++] = c;
499         }
500
501         if (!r) {
502                 r = strdup("");
503                 if (!r)
504                         return -ENOMEM;
505         } else
506                 r[n] = 0;
507
508         if (*a == ',')
509                 a++;
510
511         *p = a;
512
513         free(*value);
514         *value = r;
515
516         return 1;
517 }
518
519 static void skip_address_key(const char **p) {
520         assert(p);
521         assert(*p);
522
523         *p += strcspn(*p, ",");
524
525         if (**p == ',')
526                 (*p) ++;
527 }
528
529 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
530         _cleanup_free_ char *path = NULL, *abstract = NULL;
531         size_t l;
532         int r;
533
534         assert(b);
535         assert(p);
536         assert(*p);
537         assert(guid);
538
539         while (**p != 0 && **p != ';') {
540                 r = parse_address_key(p, "guid", guid);
541                 if (r < 0)
542                         return r;
543                 else if (r > 0)
544                         continue;
545
546                 r = parse_address_key(p, "path", &path);
547                 if (r < 0)
548                         return r;
549                 else if (r > 0)
550                         continue;
551
552                 r = parse_address_key(p, "abstract", &abstract);
553                 if (r < 0)
554                         return r;
555                 else if (r > 0)
556                         continue;
557
558                 skip_address_key(p);
559         }
560
561         if (!path && !abstract)
562                 return -EINVAL;
563
564         if (path && abstract)
565                 return -EINVAL;
566
567         if (path) {
568                 l = strlen(path);
569                 if (l > sizeof(b->sockaddr.un.sun_path))
570                         return -E2BIG;
571
572                 b->sockaddr.un.sun_family = AF_UNIX;
573                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
574                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
575         } else if (abstract) {
576                 l = strlen(abstract);
577                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
578                         return -E2BIG;
579
580                 b->sockaddr.un.sun_family = AF_UNIX;
581                 b->sockaddr.un.sun_path[0] = 0;
582                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
583                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
584         }
585
586         return 0;
587 }
588
589 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
590         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
591         int r;
592         struct addrinfo *result, hints = {
593                 .ai_socktype = SOCK_STREAM,
594                 .ai_flags = AI_ADDRCONFIG,
595         };
596
597         assert(b);
598         assert(p);
599         assert(*p);
600         assert(guid);
601
602         while (**p != 0 && **p != ';') {
603                 r = parse_address_key(p, "guid", guid);
604                 if (r < 0)
605                         return r;
606                 else if (r > 0)
607                         continue;
608
609                 r = parse_address_key(p, "host", &host);
610                 if (r < 0)
611                         return r;
612                 else if (r > 0)
613                         continue;
614
615                 r = parse_address_key(p, "port", &port);
616                 if (r < 0)
617                         return r;
618                 else if (r > 0)
619                         continue;
620
621                 r = parse_address_key(p, "family", &family);
622                 if (r < 0)
623                         return r;
624                 else if (r > 0)
625                         continue;
626
627                 skip_address_key(p);
628         }
629
630         if (!host || !port)
631                 return -EINVAL;
632
633         if (family) {
634                 if (streq(family, "ipv4"))
635                         hints.ai_family = AF_INET;
636                 else if (streq(family, "ipv6"))
637                         hints.ai_family = AF_INET6;
638                 else
639                         return -EINVAL;
640         }
641
642         r = getaddrinfo(host, port, &hints, &result);
643         if (r == EAI_SYSTEM)
644                 return -errno;
645         else if (r != 0)
646                 return -EADDRNOTAVAIL;
647
648         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
649         b->sockaddr_size = result->ai_addrlen;
650
651         freeaddrinfo(result);
652
653         return 0;
654 }
655
656 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
657         char *path = NULL;
658         unsigned n_argv = 0, j;
659         char **argv = NULL;
660         int r;
661
662         assert(b);
663         assert(p);
664         assert(*p);
665         assert(guid);
666
667         while (**p != 0 && **p != ';') {
668                 r = parse_address_key(p, "guid", guid);
669                 if (r < 0)
670                         goto fail;
671                 else if (r > 0)
672                         continue;
673
674                 r = parse_address_key(p, "path", &path);
675                 if (r < 0)
676                         goto fail;
677                 else if (r > 0)
678                         continue;
679
680                 if (startswith(*p, "argv")) {
681                         unsigned ul;
682
683                         errno = 0;
684                         ul = strtoul(*p + 4, (char**) p, 10);
685                         if (errno > 0 || **p != '=' || ul > 256) {
686                                 r = -EINVAL;
687                                 goto fail;
688                         }
689
690                         (*p) ++;
691
692                         if (ul >= n_argv) {
693                                 char **x;
694
695                                 x = realloc(argv, sizeof(char*) * (ul + 2));
696                                 if (!x) {
697                                         r = -ENOMEM;
698                                         goto fail;
699                                 }
700
701                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
702
703                                 argv = x;
704                                 n_argv = ul + 1;
705                         }
706
707                         r = parse_address_key(p, NULL, argv + ul);
708                         if (r < 0)
709                                 goto fail;
710
711                         continue;
712                 }
713
714                 skip_address_key(p);
715         }
716
717         if (!path) {
718                 r = -EINVAL;
719                 goto fail;
720         }
721
722         /* Make sure there are no holes in the array, with the
723          * exception of argv[0] */
724         for (j = 1; j < n_argv; j++)
725                 if (!argv[j]) {
726                         r = -EINVAL;
727                         goto fail;
728                 }
729
730         if (argv && argv[0] == NULL) {
731                 argv[0] = strdup(path);
732                 if (!argv[0]) {
733                         r = -ENOMEM;
734                         goto fail;
735                 }
736         }
737
738         b->exec_path = path;
739         b->exec_argv = argv;
740         return 0;
741
742 fail:
743         for (j = 0; j < n_argv; j++)
744                 free(argv[j]);
745
746         free(argv);
747         free(path);
748         return r;
749 }
750
751 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
752         _cleanup_free_ char *path = NULL;
753         int r;
754
755         assert(b);
756         assert(p);
757         assert(*p);
758         assert(guid);
759
760         while (**p != 0 && **p != ';') {
761                 r = parse_address_key(p, "guid", guid);
762                 if (r < 0)
763                         return r;
764                 else if (r > 0)
765                         continue;
766
767                 r = parse_address_key(p, "path", &path);
768                 if (r < 0)
769                         return r;
770                 else if (r > 0)
771                         continue;
772
773                 skip_address_key(p);
774         }
775
776         if (!path)
777                 return -EINVAL;
778
779         free(b->kernel);
780         b->kernel = path;
781         path = NULL;
782
783         return 0;
784 }
785
786 static int parse_container_address(sd_bus *b, const char **p, char **guid) {
787         _cleanup_free_ char *machine = NULL;
788         int r;
789
790         assert(b);
791         assert(p);
792         assert(*p);
793         assert(guid);
794
795         while (**p != 0 && **p != ';') {
796                 r = parse_address_key(p, "guid", guid);
797                 if (r < 0)
798                         return r;
799                 else if (r > 0)
800                         continue;
801
802                 r = parse_address_key(p, "machine", &machine);
803                 if (r < 0)
804                         return r;
805                 else if (r > 0)
806                         continue;
807
808                 skip_address_key(p);
809         }
810
811         if (!machine)
812                 return -EINVAL;
813
814         free(b->machine);
815         b->machine = machine;
816         machine = NULL;
817
818         b->sockaddr.un.sun_family = AF_UNIX;
819         strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
820         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
821
822         return 0;
823 }
824
825 static void bus_reset_parsed_address(sd_bus *b) {
826         assert(b);
827
828         zero(b->sockaddr);
829         b->sockaddr_size = 0;
830         strv_free(b->exec_argv);
831         free(b->exec_path);
832         b->exec_path = NULL;
833         b->exec_argv = NULL;
834         b->server_id = SD_ID128_NULL;
835         free(b->kernel);
836         b->kernel = NULL;
837         free(b->machine);
838         b->machine = NULL;
839 }
840
841 static int bus_parse_next_address(sd_bus *b) {
842         _cleanup_free_ char *guid = NULL;
843         const char *a;
844         int r;
845
846         assert(b);
847
848         if (!b->address)
849                 return 0;
850         if (b->address[b->address_index] == 0)
851                 return 0;
852
853         bus_reset_parsed_address(b);
854
855         a = b->address + b->address_index;
856
857         while (*a != 0) {
858
859                 if (*a == ';') {
860                         a++;
861                         continue;
862                 }
863
864                 if (startswith(a, "unix:")) {
865                         a += 5;
866
867                         r = parse_unix_address(b, &a, &guid);
868                         if (r < 0)
869                                 return r;
870                         break;
871
872                 } else if (startswith(a, "tcp:")) {
873
874                         a += 4;
875                         r = parse_tcp_address(b, &a, &guid);
876                         if (r < 0)
877                                 return r;
878
879                         break;
880
881                 } else if (startswith(a, "unixexec:")) {
882
883                         a += 9;
884                         r = parse_exec_address(b, &a, &guid);
885                         if (r < 0)
886                                 return r;
887
888                         break;
889
890                 } else if (startswith(a, "kernel:")) {
891
892                         a += 7;
893                         r = parse_kernel_address(b, &a, &guid);
894                         if (r < 0)
895                                 return r;
896
897                         break;
898                 } else if (startswith(a, "x-container:")) {
899
900                         a += 12;
901                         r = parse_container_address(b, &a, &guid);
902                         if (r < 0)
903                                 return r;
904
905                         break;
906                 }
907
908                 a = strchr(a, ';');
909                 if (!a)
910                         return 0;
911         }
912
913         if (guid) {
914                 r = sd_id128_from_string(guid, &b->server_id);
915                 if (r < 0)
916                         return r;
917         }
918
919         b->address_index = a - b->address;
920         return 1;
921 }
922
923 static int bus_start_address(sd_bus *b) {
924         int r;
925
926         assert(b);
927
928         for (;;) {
929                 sd_bus_close(b);
930
931                 if (b->exec_path) {
932
933                         r = bus_socket_exec(b);
934                         if (r >= 0)
935                                 return r;
936
937                         b->last_connect_error = -r;
938                 } else if (b->kernel) {
939
940                         r = bus_kernel_connect(b);
941                         if (r >= 0)
942                                 return r;
943
944                         b->last_connect_error = -r;
945
946                 } else if (b->machine) {
947
948                         r = bus_container_connect(b);
949                         if (r >= 0)
950                                 return r;
951
952                         b->last_connect_error = -r;
953
954                 } else if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
955
956                         r = bus_socket_connect(b);
957                         if (r >= 0)
958                                 return r;
959
960                         b->last_connect_error = -r;
961                 }
962
963                 r = bus_parse_next_address(b);
964                 if (r < 0)
965                         return r;
966                 if (r == 0)
967                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
968         }
969 }
970
971 int bus_next_address(sd_bus *b) {
972         assert(b);
973
974         bus_reset_parsed_address(b);
975         return bus_start_address(b);
976 }
977
978 static int bus_start_fd(sd_bus *b) {
979         struct stat st;
980         int r;
981
982         assert(b);
983         assert(b->input_fd >= 0);
984         assert(b->output_fd >= 0);
985
986         r = fd_nonblock(b->input_fd, true);
987         if (r < 0)
988                 return r;
989
990         r = fd_cloexec(b->input_fd, true);
991         if (r < 0)
992                 return r;
993
994         if (b->input_fd != b->output_fd) {
995                 r = fd_nonblock(b->output_fd, true);
996                 if (r < 0)
997                         return r;
998
999                 r = fd_cloexec(b->output_fd, true);
1000                 if (r < 0)
1001                         return r;
1002         }
1003
1004         if (fstat(b->input_fd, &st) < 0)
1005                 return -errno;
1006
1007         if (S_ISCHR(b->input_fd))
1008                 return bus_kernel_take_fd(b);
1009         else
1010                 return bus_socket_take_fd(b);
1011 }
1012
1013 _public_ int sd_bus_start(sd_bus *bus) {
1014         int r;
1015
1016         assert_return(bus, -EINVAL);
1017         assert_return(bus->state == BUS_UNSET, -EPERM);
1018         assert_return(!bus_pid_changed(bus), -ECHILD);
1019
1020         bus->state = BUS_OPENING;
1021
1022         if (bus->is_server && bus->bus_client)
1023                 return -EINVAL;
1024
1025         if (bus->input_fd >= 0)
1026                 r = bus_start_fd(bus);
1027         else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
1028                 r = bus_start_address(bus);
1029         else
1030                 return -EINVAL;
1031
1032         if (r < 0)
1033                 return r;
1034
1035         return bus_send_hello(bus);
1036 }
1037
1038 _public_ int sd_bus_open_system(sd_bus **ret) {
1039         const char *e;
1040         sd_bus *b;
1041         int r;
1042
1043         assert_return(ret, -EINVAL);
1044
1045         r = sd_bus_new(&b);
1046         if (r < 0)
1047                 return r;
1048
1049         e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1050         if (e) {
1051                 r = sd_bus_set_address(b, e);
1052                 if (r < 0)
1053                         goto fail;
1054         } else {
1055                 b->sockaddr.un.sun_family = AF_UNIX;
1056                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
1057                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
1058         }
1059
1060         b->bus_client = true;
1061
1062         r = sd_bus_start(b);
1063         if (r < 0)
1064                 goto fail;
1065
1066         *ret = b;
1067         return 0;
1068
1069 fail:
1070         bus_free(b);
1071         return r;
1072 }
1073
1074 _public_ int sd_bus_open_user(sd_bus **ret) {
1075         const char *e;
1076         sd_bus *b;
1077         size_t l;
1078         int r;
1079
1080         assert_return(ret, -EINVAL);
1081
1082         r = sd_bus_new(&b);
1083         if (r < 0)
1084                 return r;
1085
1086         e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1087         if (e) {
1088                 r = sd_bus_set_address(b, e);
1089                 if (r < 0)
1090                         goto fail;
1091         } else {
1092                 e = secure_getenv("XDG_RUNTIME_DIR");
1093                 if (!e) {
1094                         r = -ENOENT;
1095                         goto fail;
1096                 }
1097
1098                 l = strlen(e);
1099                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1100                         r = -E2BIG;
1101                         goto fail;
1102                 }
1103
1104                 b->sockaddr.un.sun_family = AF_UNIX;
1105                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1106                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1107         }
1108
1109         b->bus_client = true;
1110
1111         r = sd_bus_start(b);
1112         if (r < 0)
1113                 goto fail;
1114
1115         *ret = b;
1116         return 0;
1117
1118 fail:
1119         bus_free(b);
1120         return r;
1121 }
1122
1123 _public_ int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1124         _cleanup_free_ char *e = NULL;
1125         char *p = NULL;
1126         sd_bus *bus;
1127         int r;
1128
1129         assert_return(host, -EINVAL);
1130         assert_return(ret, -EINVAL);
1131
1132         e = bus_address_escape(host);
1133         if (!e)
1134                 return -ENOMEM;
1135
1136         p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1137         if (!p)
1138                 return -ENOMEM;
1139
1140         r = sd_bus_new(&bus);
1141         if (r < 0) {
1142                 free(p);
1143                 return r;
1144         }
1145
1146         bus->address = p;
1147         bus->bus_client = true;
1148
1149         r = sd_bus_start(bus);
1150         if (r < 0) {
1151                 bus_free(bus);
1152                 return r;
1153         }
1154
1155         *ret = bus;
1156         return 0;
1157 }
1158
1159 _public_ int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1160         _cleanup_free_ char *e = NULL;
1161         sd_bus *bus;
1162         char *p;
1163         int r;
1164
1165         assert_return(machine, -EINVAL);
1166         assert_return(ret, -EINVAL);
1167
1168         e = bus_address_escape(machine);
1169         if (!e)
1170                 return -ENOMEM;
1171
1172         p = strjoin("x-container:machine=", e, NULL);
1173         if (!p)
1174                 return -ENOMEM;
1175
1176         r = sd_bus_new(&bus);
1177         if (r < 0) {
1178                 free(p);
1179                 return r;
1180         }
1181
1182         bus->address = p;
1183         bus->bus_client = true;
1184
1185         r = sd_bus_start(bus);
1186         if (r < 0) {
1187                 bus_free(bus);
1188                 return r;
1189         }
1190
1191         *ret = bus;
1192         return 0;
1193 }
1194
1195 _public_ void sd_bus_close(sd_bus *bus) {
1196
1197         if (!bus)
1198                 return;
1199         if (bus->state == BUS_CLOSED)
1200                 return;
1201         if (bus_pid_changed(bus))
1202                 return;
1203
1204         bus->state = BUS_CLOSED;
1205
1206         sd_bus_detach_event(bus);
1207
1208         /* Drop all queued messages so that they drop references to
1209          * the bus object and the bus may be freed */
1210         bus_reset_queues(bus);
1211
1212         if (!bus->is_kernel)
1213                 bus_close_fds(bus);
1214
1215         /* We'll leave the fd open in case this is a kernel bus, since
1216          * there might still be memblocks around that reference this
1217          * bus, and they might need to invoke the
1218          * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1219          * freed. */
1220 }
1221
1222 static void bus_enter_closing(sd_bus *bus) {
1223         assert(bus);
1224
1225         if (bus->state != BUS_OPENING &&
1226             bus->state != BUS_AUTHENTICATING &&
1227             bus->state != BUS_HELLO &&
1228             bus->state != BUS_RUNNING)
1229                 return;
1230
1231         bus->state = BUS_CLOSING;
1232 }
1233
1234 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1235         assert_return(bus, NULL);
1236
1237         assert_se(REFCNT_INC(bus->n_ref) >= 2);
1238
1239         return bus;
1240 }
1241
1242 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1243         assert_return(bus, NULL);
1244
1245         if (REFCNT_DEC(bus->n_ref) <= 0)
1246                 bus_free(bus);
1247
1248         return NULL;
1249 }
1250
1251 _public_ int sd_bus_is_open(sd_bus *bus) {
1252
1253         assert_return(bus, -EINVAL);
1254         assert_return(!bus_pid_changed(bus), -ECHILD);
1255
1256         return BUS_IS_OPEN(bus->state);
1257 }
1258
1259 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1260         int r;
1261
1262         assert_return(bus, -EINVAL);
1263         assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1264         assert_return(!bus_pid_changed(bus), -ECHILD);
1265
1266         if (type == SD_BUS_TYPE_UNIX_FD) {
1267                 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1268                         return 0;
1269
1270                 r = bus_ensure_running(bus);
1271                 if (r < 0)
1272                         return r;
1273
1274                 return bus->can_fds;
1275         }
1276
1277         return bus_type_is_valid(type);
1278 }
1279
1280 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1281         int r;
1282
1283         assert_return(bus, -EINVAL);
1284         assert_return(server_id, -EINVAL);
1285         assert_return(!bus_pid_changed(bus), -ECHILD);
1286
1287         r = bus_ensure_running(bus);
1288         if (r < 0)
1289                 return r;
1290
1291         *server_id = bus->server_id;
1292         return 0;
1293 }
1294
1295 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1296         assert(m);
1297
1298         if (m->header->version > b->message_version)
1299                 return -EPERM;
1300
1301         if (m->sealed) {
1302                 /* If we copy the same message to multiple
1303                  * destinations, avoid using the same serial
1304                  * numbers. */
1305                 b->serial = MAX(b->serial, BUS_MESSAGE_SERIAL(m));
1306                 return 0;
1307         }
1308
1309         return bus_message_seal(m, ++b->serial);
1310 }
1311
1312 static int bus_write_message(sd_bus *bus, sd_bus_message *message, size_t *idx) {
1313         int r;
1314
1315         assert(bus);
1316         assert(message);
1317
1318         if (bus->is_kernel)
1319                 r = bus_kernel_write_message(bus, message);
1320         else
1321                 r = bus_socket_write_message(bus, message, idx);
1322
1323         return r;
1324 }
1325
1326 static int dispatch_wqueue(sd_bus *bus) {
1327         int r, ret = 0;
1328
1329         assert(bus);
1330         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1331
1332         while (bus->wqueue_size > 0) {
1333
1334                 r = bus_write_message(bus, bus->wqueue[0], &bus->windex);
1335                 if (r < 0)
1336                         return r;
1337                 else if (r == 0)
1338                         /* Didn't do anything this time */
1339                         return ret;
1340                 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1341                         /* Fully written. Let's drop the entry from
1342                          * the queue.
1343                          *
1344                          * This isn't particularly optimized, but
1345                          * well, this is supposed to be our worst-case
1346                          * buffer only, and the socket buffer is
1347                          * supposed to be our primary buffer, and if
1348                          * it got full, then all bets are off
1349                          * anyway. */
1350
1351                         sd_bus_message_unref(bus->wqueue[0]);
1352                         bus->wqueue_size --;
1353                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1354                         bus->windex = 0;
1355
1356                         ret = 1;
1357                 }
1358         }
1359
1360         return ret;
1361 }
1362
1363 static int bus_read_message(sd_bus *bus, sd_bus_message **m) {
1364         int r;
1365
1366         assert(bus);
1367         assert(m);
1368
1369         if (bus->is_kernel)
1370                 r = bus_kernel_read_message(bus, m);
1371         else
1372                 r = bus_socket_read_message(bus, m);
1373
1374         return r;
1375 }
1376
1377 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1378         sd_bus_message *z = NULL;
1379         int r, ret = 0;
1380
1381         assert(bus);
1382         assert(m);
1383         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1384
1385         if (bus->rqueue_size > 0) {
1386                 /* Dispatch a queued message */
1387
1388                 *m = bus->rqueue[0];
1389                 bus->rqueue_size --;
1390                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1391                 return 1;
1392         }
1393
1394         /* Try to read a new message */
1395         do {
1396                 r = bus_read_message(bus, &z);
1397                 if (r < 0)
1398                         return r;
1399                 if (r == 0)
1400                         return ret;
1401
1402                 ret = 1;
1403         } while (!z);
1404
1405         *m = z;
1406         return ret;
1407 }
1408
1409 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1410         int r;
1411
1412         assert_return(bus, -EINVAL);
1413         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1414         assert_return(m, -EINVAL);
1415         assert_return(!bus_pid_changed(bus), -ECHILD);
1416
1417         if (m->n_fds > 0) {
1418                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1419                 if (r < 0)
1420                         return r;
1421                 if (r == 0)
1422                         return -ENOTSUP;
1423         }
1424
1425         /* If the serial number isn't kept, then we know that no reply
1426          * is expected */
1427         if (!serial && !m->sealed)
1428                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1429
1430         r = bus_seal_message(bus, m);
1431         if (r < 0)
1432                 return r;
1433
1434         /* If this is a reply and no reply was requested, then let's
1435          * suppress this, if we can */
1436         if (m->dont_send && !serial)
1437                 return 1;
1438
1439         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1440                 size_t idx = 0;
1441
1442                 r = bus_write_message(bus, m, &idx);
1443                 if (r < 0)
1444                         return r;
1445                 else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m))  {
1446                         /* Wasn't fully written. So let's remember how
1447                          * much was written. Note that the first entry
1448                          * of the wqueue array is always allocated so
1449                          * that we always can remember how much was
1450                          * written. */
1451                         bus->wqueue[0] = sd_bus_message_ref(m);
1452                         bus->wqueue_size = 1;
1453                         bus->windex = idx;
1454                 }
1455         } else {
1456                 sd_bus_message **q;
1457
1458                 /* Just append it to the queue. */
1459
1460                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1461                         return -ENOBUFS;
1462
1463                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1464                 if (!q)
1465                         return -ENOMEM;
1466
1467                 bus->wqueue = q;
1468                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1469         }
1470
1471         if (serial)
1472                 *serial = BUS_MESSAGE_SERIAL(m);
1473
1474         return 1;
1475 }
1476
1477 _public_ int sd_bus_send_to(sd_bus *bus, sd_bus_message *m, const char *destination, uint64_t *serial) {
1478         int r;
1479
1480         assert_return(bus, -EINVAL);
1481         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1482         assert_return(m, -EINVAL);
1483         assert_return(!bus_pid_changed(bus), -ECHILD);
1484
1485         if (!streq_ptr(m->destination, destination)) {
1486
1487                 if (!destination)
1488                         return -EEXIST;
1489
1490                 r = sd_bus_message_set_destination(m, destination);
1491                 if (r < 0)
1492                         return r;
1493         }
1494
1495         return sd_bus_send(bus, m, serial);
1496 }
1497
1498 static usec_t calc_elapse(uint64_t usec) {
1499         if (usec == (uint64_t) -1)
1500                 return 0;
1501
1502         if (usec == 0)
1503                 usec = BUS_DEFAULT_TIMEOUT;
1504
1505         return now(CLOCK_MONOTONIC) + usec;
1506 }
1507
1508 static int timeout_compare(const void *a, const void *b) {
1509         const struct reply_callback *x = a, *y = b;
1510
1511         if (x->timeout != 0 && y->timeout == 0)
1512                 return -1;
1513
1514         if (x->timeout == 0 && y->timeout != 0)
1515                 return 1;
1516
1517         if (x->timeout < y->timeout)
1518                 return -1;
1519
1520         if (x->timeout > y->timeout)
1521                 return 1;
1522
1523         return 0;
1524 }
1525
1526 _public_ int sd_bus_call_async(
1527                 sd_bus *bus,
1528                 sd_bus_message *m,
1529                 sd_bus_message_handler_t callback,
1530                 void *userdata,
1531                 uint64_t usec,
1532                 uint64_t *serial) {
1533
1534         struct reply_callback *c;
1535         int r;
1536
1537         assert_return(bus, -EINVAL);
1538         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1539         assert_return(m, -EINVAL);
1540         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1541         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1542         assert_return(callback, -EINVAL);
1543         assert_return(!bus_pid_changed(bus), -ECHILD);
1544
1545         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1546         if (r < 0)
1547                 return r;
1548
1549         if (usec != (uint64_t) -1) {
1550                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1551                 if (r < 0)
1552                         return r;
1553         }
1554
1555         r = bus_seal_message(bus, m);
1556         if (r < 0)
1557                 return r;
1558
1559         c = new0(struct reply_callback, 1);
1560         if (!c)
1561                 return -ENOMEM;
1562
1563         c->callback = callback;
1564         c->userdata = userdata;
1565         c->serial = BUS_MESSAGE_SERIAL(m);
1566         c->timeout = calc_elapse(usec);
1567
1568         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1569         if (r < 0) {
1570                 free(c);
1571                 return r;
1572         }
1573
1574         if (c->timeout != 0) {
1575                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1576                 if (r < 0) {
1577                         c->timeout = 0;
1578                         sd_bus_call_async_cancel(bus, c->serial);
1579                         return r;
1580                 }
1581         }
1582
1583         r = sd_bus_send(bus, m, serial);
1584         if (r < 0) {
1585                 sd_bus_call_async_cancel(bus, c->serial);
1586                 return r;
1587         }
1588
1589         return r;
1590 }
1591
1592 _public_ int sd_bus_call_async_cancel(sd_bus *bus, uint64_t serial) {
1593         struct reply_callback *c;
1594
1595         assert_return(bus, -EINVAL);
1596         assert_return(serial != 0, -EINVAL);
1597         assert_return(!bus_pid_changed(bus), -ECHILD);
1598
1599         c = hashmap_remove(bus->reply_callbacks, &serial);
1600         if (!c)
1601                 return 0;
1602
1603         if (c->timeout != 0)
1604                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1605
1606         free(c);
1607         return 1;
1608 }
1609
1610 int bus_ensure_running(sd_bus *bus) {
1611         int r;
1612
1613         assert(bus);
1614
1615         if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED || bus->state == BUS_CLOSING)
1616                 return -ENOTCONN;
1617         if (bus->state == BUS_RUNNING)
1618                 return 1;
1619
1620         for (;;) {
1621                 r = sd_bus_process(bus, NULL);
1622                 if (r < 0)
1623                         return r;
1624                 if (bus->state == BUS_RUNNING)
1625                         return 1;
1626                 if (r > 0)
1627                         continue;
1628
1629                 r = sd_bus_wait(bus, (uint64_t) -1);
1630                 if (r < 0)
1631                         return r;
1632         }
1633 }
1634
1635 _public_ int sd_bus_call(
1636                 sd_bus *bus,
1637                 sd_bus_message *m,
1638                 uint64_t usec,
1639                 sd_bus_error *error,
1640                 sd_bus_message **reply) {
1641
1642         int r;
1643         usec_t timeout;
1644         uint64_t serial;
1645         bool room = false;
1646
1647         assert_return(bus, -EINVAL);
1648         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1649         assert_return(m, -EINVAL);
1650         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1651         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1652         assert_return(!bus_error_is_dirty(error), -EINVAL);
1653         assert_return(!bus_pid_changed(bus), -ECHILD);
1654
1655         r = bus_ensure_running(bus);
1656         if (r < 0)
1657                 return r;
1658
1659         r = sd_bus_send(bus, m, &serial);
1660         if (r < 0)
1661                 return r;
1662
1663         timeout = calc_elapse(usec);
1664
1665         for (;;) {
1666                 usec_t left;
1667                 sd_bus_message *incoming = NULL;
1668
1669                 if (!room) {
1670                         sd_bus_message **q;
1671
1672                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1673                                 return -ENOBUFS;
1674
1675                         /* Make sure there's room for queuing this
1676                          * locally, before we read the message */
1677
1678                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1679                         if (!q)
1680                                 return -ENOMEM;
1681
1682                         bus->rqueue = q;
1683                         room = true;
1684                 }
1685
1686                 r = bus_read_message(bus, &incoming);
1687                 if (r < 0)
1688                         return r;
1689
1690                 if (incoming) {
1691
1692                         if (incoming->reply_serial == serial) {
1693                                 /* Found a match! */
1694
1695                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1696
1697                                         if (reply)
1698                                                 *reply = incoming;
1699                                         else
1700                                                 sd_bus_message_unref(incoming);
1701
1702                                         return 1;
1703                                 }
1704
1705                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) {
1706                                         int k;
1707
1708                                         r = sd_bus_error_copy(error, &incoming->error);
1709                                         if (r < 0) {
1710                                                 sd_bus_message_unref(incoming);
1711                                                 return r;
1712                                         }
1713
1714                                         k = sd_bus_error_get_errno(&incoming->error);
1715                                         sd_bus_message_unref(incoming);
1716                                         return -k;
1717                                 }
1718
1719                                 sd_bus_message_unref(incoming);
1720                                 return -EIO;
1721
1722                         } else if (incoming->header->serial == serial &&
1723                                    bus->unique_name &&
1724                                    incoming->sender &&
1725                                    streq(bus->unique_name, incoming->sender)) {
1726
1727                                 /* Our own message? Somebody is trying
1728                                  * to send its own client a message,
1729                                  * let's not dead-lock, let's fail
1730                                  * immediately. */
1731
1732                                 sd_bus_message_unref(incoming);
1733                                 return -ELOOP;
1734                         }
1735
1736                         /* There's already guaranteed to be room for
1737                          * this, so need to resize things here */
1738                         bus->rqueue[bus->rqueue_size ++] = incoming;
1739                         room = false;
1740
1741                         /* Try to read more, right-away */
1742                         continue;
1743                 }
1744                 if (r != 0)
1745                         continue;
1746
1747                 if (timeout > 0) {
1748                         usec_t n;
1749
1750                         n = now(CLOCK_MONOTONIC);
1751                         if (n >= timeout)
1752                                 return -ETIMEDOUT;
1753
1754                         left = timeout - n;
1755                 } else
1756                         left = (uint64_t) -1;
1757
1758                 r = bus_poll(bus, true, left);
1759                 if (r < 0)
1760                         return r;
1761
1762                 r = dispatch_wqueue(bus);
1763                 if (r < 0)
1764                         return r;
1765         }
1766 }
1767
1768 _public_ int sd_bus_get_fd(sd_bus *bus) {
1769
1770         assert_return(bus, -EINVAL);
1771         assert_return(bus->input_fd == bus->output_fd, -EPERM);
1772         assert_return(!bus_pid_changed(bus), -ECHILD);
1773
1774         return bus->input_fd;
1775 }
1776
1777 _public_ int sd_bus_get_events(sd_bus *bus) {
1778         int flags = 0;
1779
1780         assert_return(bus, -EINVAL);
1781         assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1782         assert_return(!bus_pid_changed(bus), -ECHILD);
1783
1784         if (bus->state == BUS_OPENING)
1785                 flags |= POLLOUT;
1786         else if (bus->state == BUS_AUTHENTICATING) {
1787
1788                 if (bus_socket_auth_needs_write(bus))
1789                         flags |= POLLOUT;
1790
1791                 flags |= POLLIN;
1792
1793         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1794                 if (bus->rqueue_size <= 0)
1795                         flags |= POLLIN;
1796                 if (bus->wqueue_size > 0)
1797                         flags |= POLLOUT;
1798         }
1799
1800         return flags;
1801 }
1802
1803 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1804         struct reply_callback *c;
1805
1806         assert_return(bus, -EINVAL);
1807         assert_return(timeout_usec, -EINVAL);
1808         assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1809         assert_return(!bus_pid_changed(bus), -ECHILD);
1810
1811         if (bus->state == BUS_CLOSING) {
1812                 *timeout_usec = 0;
1813                 return 1;
1814         }
1815
1816         if (bus->state == BUS_AUTHENTICATING) {
1817                 *timeout_usec = bus->auth_timeout;
1818                 return 1;
1819         }
1820
1821         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1822                 *timeout_usec = (uint64_t) -1;
1823                 return 0;
1824         }
1825
1826         if (bus->rqueue_size > 0) {
1827                 *timeout_usec = 0;
1828                 return 1;
1829         }
1830
1831         c = prioq_peek(bus->reply_callbacks_prioq);
1832         if (!c) {
1833                 *timeout_usec = (uint64_t) -1;
1834                 return 0;
1835         }
1836
1837         *timeout_usec = c->timeout;
1838         return 1;
1839 }
1840
1841 static int process_timeout(sd_bus *bus) {
1842         _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1843         _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1844         struct reply_callback *c;
1845         usec_t n;
1846         int r;
1847
1848         assert(bus);
1849
1850         c = prioq_peek(bus->reply_callbacks_prioq);
1851         if (!c)
1852                 return 0;
1853
1854         n = now(CLOCK_MONOTONIC);
1855         if (c->timeout > n)
1856                 return 0;
1857
1858         r = bus_message_new_synthetic_error(
1859                         bus,
1860                         c->serial,
1861                         &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1862                         &m);
1863         if (r < 0)
1864                 return r;
1865
1866         r = bus_seal_message(bus, m);
1867         if (r < 0)
1868                 return r;
1869
1870         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1871         hashmap_remove(bus->reply_callbacks, &c->serial);
1872
1873         bus->current = m;
1874         bus->iteration_counter ++;
1875
1876         r = c->callback(bus, m, c->userdata, &error_buffer);
1877         r = bus_maybe_reply_error(m, r, &error_buffer);
1878         free(c);
1879
1880         bus->current = NULL;
1881
1882         return r;
1883 }
1884
1885 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1886         assert(bus);
1887         assert(m);
1888
1889         if (bus->state != BUS_HELLO)
1890                 return 0;
1891
1892         /* Let's make sure the first message on the bus is the HELLO
1893          * reply. But note that we don't actually parse the message
1894          * here (we leave that to the usual handling), we just verify
1895          * we don't let any earlier msg through. */
1896
1897         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1898             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1899                 return -EIO;
1900
1901         if (m->reply_serial != bus->hello_serial)
1902                 return -EIO;
1903
1904         return 0;
1905 }
1906
1907 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1908         _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1909         struct reply_callback *c;
1910         int r;
1911
1912         assert(bus);
1913         assert(m);
1914
1915         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1916             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1917                 return 0;
1918
1919         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1920         if (!c)
1921                 return 0;
1922
1923         if (c->timeout != 0)
1924                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1925
1926         r = sd_bus_message_rewind(m, true);
1927         if (r < 0)
1928                 return r;
1929
1930         r = c->callback(bus, m, c->userdata, &error_buffer);
1931         r = bus_maybe_reply_error(m, r, &error_buffer);
1932         free(c);
1933
1934         return r;
1935 }
1936
1937 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1938         _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1939         struct filter_callback *l;
1940         int r;
1941
1942         assert(bus);
1943         assert(m);
1944
1945         do {
1946                 bus->filter_callbacks_modified = false;
1947
1948                 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1949
1950                         if (bus->filter_callbacks_modified)
1951                                 break;
1952
1953                         /* Don't run this more than once per iteration */
1954                         if (l->last_iteration == bus->iteration_counter)
1955                                 continue;
1956
1957                         l->last_iteration = bus->iteration_counter;
1958
1959                         r = sd_bus_message_rewind(m, true);
1960                         if (r < 0)
1961                                 return r;
1962
1963                         r = l->callback(bus, m, l->userdata, &error_buffer);
1964                         r = bus_maybe_reply_error(m, r, &error_buffer);
1965                         if (r != 0)
1966                                 return r;
1967
1968                 }
1969
1970         } while (bus->filter_callbacks_modified);
1971
1972         return 0;
1973 }
1974
1975 static int process_match(sd_bus *bus, sd_bus_message *m) {
1976         int r;
1977
1978         assert(bus);
1979         assert(m);
1980
1981         do {
1982                 bus->match_callbacks_modified = false;
1983
1984                 r = bus_match_run(bus, &bus->match_callbacks, m);
1985                 if (r != 0)
1986                         return r;
1987
1988         } while (bus->match_callbacks_modified);
1989
1990         return 0;
1991 }
1992
1993 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1994         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1995         int r;
1996
1997         assert(bus);
1998         assert(m);
1999
2000         if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
2001                 return 0;
2002
2003         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
2004                 return 0;
2005
2006         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2007                 return 1;
2008
2009         if (streq_ptr(m->member, "Ping"))
2010                 r = sd_bus_message_new_method_return(m, &reply);
2011         else if (streq_ptr(m->member, "GetMachineId")) {
2012                 sd_id128_t id;
2013                 char sid[33];
2014
2015                 r = sd_id128_get_machine(&id);
2016                 if (r < 0)
2017                         return r;
2018
2019                 r = sd_bus_message_new_method_return(m, &reply);
2020                 if (r < 0)
2021                         return r;
2022
2023                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
2024         } else {
2025                 r = sd_bus_message_new_method_errorf(
2026                                 m, &reply,
2027                                 SD_BUS_ERROR_UNKNOWN_METHOD,
2028                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
2029         }
2030
2031         if (r < 0)
2032                 return r;
2033
2034         r = sd_bus_send(bus, reply, NULL);
2035         if (r < 0)
2036                 return r;
2037
2038         return 1;
2039 }
2040
2041 static int process_message(sd_bus *bus, sd_bus_message *m) {
2042         int r;
2043
2044         assert(bus);
2045         assert(m);
2046
2047         bus->current = m;
2048         bus->iteration_counter++;
2049
2050         log_debug("Got message sender=%s object=%s interface=%s member=%s",
2051                   strna(sd_bus_message_get_sender(m)),
2052                   strna(sd_bus_message_get_path(m)),
2053                   strna(sd_bus_message_get_interface(m)),
2054                   strna(sd_bus_message_get_member(m)));
2055
2056         r = process_hello(bus, m);
2057         if (r != 0)
2058                 goto finish;
2059
2060         r = process_reply(bus, m);
2061         if (r != 0)
2062                 goto finish;
2063
2064         r = process_filter(bus, m);
2065         if (r != 0)
2066                 goto finish;
2067
2068         r = process_match(bus, m);
2069         if (r != 0)
2070                 goto finish;
2071
2072         r = process_builtin(bus, m);
2073         if (r != 0)
2074                 goto finish;
2075
2076         r = bus_process_object(bus, m);
2077
2078 finish:
2079         bus->current = NULL;
2080         return r;
2081 }
2082
2083 static int process_running(sd_bus *bus, sd_bus_message **ret) {
2084         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2085         int r;
2086
2087         assert(bus);
2088         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2089
2090         r = process_timeout(bus);
2091         if (r != 0)
2092                 goto null_message;
2093
2094         r = dispatch_wqueue(bus);
2095         if (r != 0)
2096                 goto null_message;
2097
2098         r = dispatch_rqueue(bus, &m);
2099         if (r < 0)
2100                 return r;
2101         if (!m)
2102                 goto null_message;
2103
2104         r = process_message(bus, m);
2105         if (r != 0)
2106                 goto null_message;
2107
2108         if (ret) {
2109                 r = sd_bus_message_rewind(m, true);
2110                 if (r < 0)
2111                         return r;
2112
2113                 *ret = m;
2114                 m = NULL;
2115                 return 1;
2116         }
2117
2118         if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2119
2120                 r = sd_bus_reply_method_errorf(
2121                                 m,
2122                                 SD_BUS_ERROR_UNKNOWN_OBJECT,
2123                                 "Unknown object '%s'.", m->path);
2124                 if (r < 0)
2125                         return r;
2126         }
2127
2128         return 1;
2129
2130 null_message:
2131         if (r >= 0 && ret)
2132                 *ret = NULL;
2133
2134         return r;
2135 }
2136
2137 static int process_closing(sd_bus *bus, sd_bus_message **ret) {
2138         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2139         struct reply_callback *c;
2140         int r;
2141
2142         assert(bus);
2143         assert(bus->state == BUS_CLOSING);
2144
2145         c = hashmap_first(bus->reply_callbacks);
2146         if (c) {
2147                 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2148
2149                 /* First, fail all outstanding method calls */
2150                 r = bus_message_new_synthetic_error(
2151                                 bus,
2152                                 c->serial,
2153                                 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Connection terminated"),
2154                                 &m);
2155                 if (r < 0)
2156                         return r;
2157
2158                 r = bus_seal_message(bus, m);
2159                 if (r < 0)
2160                         return r;
2161
2162                 if (c->timeout != 0)
2163                         prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
2164
2165                 hashmap_remove(bus->reply_callbacks, &c->serial);
2166
2167                 bus->current = m;
2168                 bus->iteration_counter++;
2169
2170                 r = c->callback(bus, m, c->userdata, &error_buffer);
2171                 r = bus_maybe_reply_error(m, r, &error_buffer);
2172                 free(c);
2173
2174                 goto finish;
2175         }
2176
2177         /* Then, synthesize a Disconnected message */
2178         r = sd_bus_message_new_signal(
2179                         bus,
2180                         "/org/freedesktop/DBus/Local",
2181                         "org.freedesktop.DBus.Local",
2182                         "Disconnected",
2183                         &m);
2184         if (r < 0)
2185                 return r;
2186
2187         r = bus_seal_message(bus, m);
2188         if (r < 0)
2189                 return r;
2190
2191         sd_bus_close(bus);
2192
2193         bus->current = m;
2194         bus->iteration_counter++;
2195
2196         r = process_filter(bus, m);
2197         if (r != 0)
2198                 goto finish;
2199
2200         r = process_match(bus, m);
2201         if (r != 0)
2202                 goto finish;
2203
2204         if (ret) {
2205                 *ret = m;
2206                 m = NULL;
2207         }
2208
2209         r = 1;
2210
2211 finish:
2212         bus->current = NULL;
2213         return r;
2214 }
2215
2216 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2217         BUS_DONT_DESTROY(bus);
2218         int r;
2219
2220         /* Returns 0 when we didn't do anything. This should cause the
2221          * caller to invoke sd_bus_wait() before returning the next
2222          * time. Returns > 0 when we did something, which possibly
2223          * means *ret is filled in with an unprocessed message. */
2224
2225         assert_return(bus, -EINVAL);
2226         assert_return(!bus_pid_changed(bus), -ECHILD);
2227
2228         /* We don't allow recursively invoking sd_bus_process(). */
2229         assert_return(!bus->current, -EBUSY);
2230
2231         switch (bus->state) {
2232
2233         case BUS_UNSET:
2234         case BUS_CLOSED:
2235                 return -ENOTCONN;
2236
2237         case BUS_OPENING:
2238                 r = bus_socket_process_opening(bus);
2239                 if (r == -ECONNRESET || r == -EPIPE) {
2240                         bus_enter_closing(bus);
2241                         r = 1;
2242                 } else if (r < 0)
2243                         return r;
2244                 if (ret)
2245                         *ret = NULL;
2246                 return r;
2247
2248         case BUS_AUTHENTICATING:
2249                 r = bus_socket_process_authenticating(bus);
2250                 if (r == -ECONNRESET || r == -EPIPE) {
2251                         bus_enter_closing(bus);
2252                         r = 1;
2253                 } else if (r < 0)
2254                         return r;
2255
2256                 if (ret)
2257                         *ret = NULL;
2258
2259                 return r;
2260
2261         case BUS_RUNNING:
2262         case BUS_HELLO:
2263                 r = process_running(bus, ret);
2264                 if (r == -ECONNRESET || r == -EPIPE) {
2265                         bus_enter_closing(bus);
2266                         r = 1;
2267
2268                         if (ret)
2269                                 *ret = NULL;
2270                 }
2271
2272                 return r;
2273
2274         case BUS_CLOSING:
2275                 return process_closing(bus, ret);
2276         }
2277
2278         assert_not_reached("Unknown state");
2279 }
2280
2281 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2282         struct pollfd p[2] = {};
2283         int r, e, n;
2284         struct timespec ts;
2285         usec_t m = (usec_t) -1;
2286
2287         assert(bus);
2288
2289         if (bus->state == BUS_CLOSING)
2290                 return 1;
2291
2292         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2293
2294         e = sd_bus_get_events(bus);
2295         if (e < 0)
2296                 return e;
2297
2298         if (need_more)
2299                 /* The caller really needs some more data, he doesn't
2300                  * care about what's already read, or any timeouts
2301                  * except its own.*/
2302                 e |= POLLIN;
2303         else {
2304                 usec_t until;
2305                 /* The caller wants to process if there's something to
2306                  * process, but doesn't care otherwise */
2307
2308                 r = sd_bus_get_timeout(bus, &until);
2309                 if (r < 0)
2310                         return r;
2311                 if (r > 0) {
2312                         usec_t nw;
2313                         nw = now(CLOCK_MONOTONIC);
2314                         m = until > nw ? until - nw : 0;
2315                 }
2316         }
2317
2318         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2319                 m = timeout_usec;
2320
2321         p[0].fd = bus->input_fd;
2322         if (bus->output_fd == bus->input_fd) {
2323                 p[0].events = e;
2324                 n = 1;
2325         } else {
2326                 p[0].events = e & POLLIN;
2327                 p[1].fd = bus->output_fd;
2328                 p[1].events = e & POLLOUT;
2329                 n = 2;
2330         }
2331
2332         r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2333         if (r < 0)
2334                 return -errno;
2335
2336         return r > 0 ? 1 : 0;
2337 }
2338
2339 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2340
2341         assert_return(bus, -EINVAL);
2342         assert_return(!bus_pid_changed(bus), -ECHILD);
2343
2344         if (bus->state == BUS_CLOSING)
2345                 return 0;
2346
2347         assert_return(BUS_IS_OPEN(bus->state) , -ENOTCONN);
2348
2349         if (bus->rqueue_size > 0)
2350                 return 0;
2351
2352         return bus_poll(bus, false, timeout_usec);
2353 }
2354
2355 _public_ int sd_bus_flush(sd_bus *bus) {
2356         int r;
2357
2358         assert_return(bus, -EINVAL);
2359         assert_return(!bus_pid_changed(bus), -ECHILD);
2360
2361         if (bus->state == BUS_CLOSING)
2362                 return 0;
2363
2364         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2365
2366         r = bus_ensure_running(bus);
2367         if (r < 0)
2368                 return r;
2369
2370         if (bus->wqueue_size <= 0)
2371                 return 0;
2372
2373         for (;;) {
2374                 r = dispatch_wqueue(bus);
2375                 if (r < 0)
2376                         return r;
2377
2378                 if (bus->wqueue_size <= 0)
2379                         return 0;
2380
2381                 r = bus_poll(bus, false, (uint64_t) -1);
2382                 if (r < 0)
2383                         return r;
2384         }
2385 }
2386
2387 _public_ int sd_bus_add_filter(sd_bus *bus,
2388                                sd_bus_message_handler_t callback,
2389                                void *userdata) {
2390
2391         struct filter_callback *f;
2392
2393         assert_return(bus, -EINVAL);
2394         assert_return(callback, -EINVAL);
2395         assert_return(!bus_pid_changed(bus), -ECHILD);
2396
2397         f = new0(struct filter_callback, 1);
2398         if (!f)
2399                 return -ENOMEM;
2400         f->callback = callback;
2401         f->userdata = userdata;
2402
2403         bus->filter_callbacks_modified = true;
2404         LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2405         return 0;
2406 }
2407
2408 _public_ int sd_bus_remove_filter(sd_bus *bus,
2409                                   sd_bus_message_handler_t callback,
2410                                   void *userdata) {
2411
2412         struct filter_callback *f;
2413
2414         assert_return(bus, -EINVAL);
2415         assert_return(callback, -EINVAL);
2416         assert_return(!bus_pid_changed(bus), -ECHILD);
2417
2418         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2419                 if (f->callback == callback && f->userdata == userdata) {
2420                         bus->filter_callbacks_modified = true;
2421                         LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2422                         free(f);
2423                         return 1;
2424                 }
2425         }
2426
2427         return 0;
2428 }
2429
2430 _public_ int sd_bus_add_match(sd_bus *bus,
2431                               const char *match,
2432                               sd_bus_message_handler_t callback,
2433                               void *userdata) {
2434
2435         struct bus_match_component *components = NULL;
2436         unsigned n_components = 0;
2437         uint64_t cookie = 0;
2438         int r = 0;
2439
2440         assert_return(bus, -EINVAL);
2441         assert_return(match, -EINVAL);
2442         assert_return(!bus_pid_changed(bus), -ECHILD);
2443
2444         r = bus_match_parse(match, &components, &n_components);
2445         if (r < 0)
2446                 goto finish;
2447
2448         if (bus->bus_client) {
2449                 cookie = ++bus->match_cookie;
2450
2451                 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2452                 if (r < 0)
2453                         goto finish;
2454         }
2455
2456         bus->match_callbacks_modified = true;
2457         r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2458         if (r < 0) {
2459                 if (bus->bus_client)
2460                         bus_remove_match_internal(bus, match, cookie);
2461         }
2462
2463 finish:
2464         bus_match_parse_free(components, n_components);
2465         return r;
2466 }
2467
2468 _public_ int sd_bus_remove_match(sd_bus *bus,
2469                                  const char *match,
2470                                  sd_bus_message_handler_t callback,
2471                                  void *userdata) {
2472
2473         struct bus_match_component *components = NULL;
2474         unsigned n_components = 0;
2475         int r = 0, q = 0;
2476         uint64_t cookie = 0;
2477
2478         assert_return(bus, -EINVAL);
2479         assert_return(match, -EINVAL);
2480         assert_return(!bus_pid_changed(bus), -ECHILD);
2481
2482         r = bus_match_parse(match, &components, &n_components);
2483         if (r < 0)
2484                 return r;
2485
2486         bus->match_callbacks_modified = true;
2487         r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2488
2489         if (bus->bus_client)
2490                 q = bus_remove_match_internal(bus, match, cookie);
2491
2492         bus_match_parse_free(components, n_components);
2493
2494         return r < 0 ? r : q;
2495 }
2496
2497 bool bus_pid_changed(sd_bus *bus) {
2498         assert(bus);
2499
2500         /* We don't support people creating a bus connection and
2501          * keeping it around over a fork(). Let's complain. */
2502
2503         return bus->original_pid != getpid();
2504 }
2505
2506 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2507         sd_bus *bus = userdata;
2508         int r;
2509
2510         assert(bus);
2511
2512         r = sd_bus_process(bus, NULL);
2513         if (r < 0)
2514                 return r;
2515
2516         return 1;
2517 }
2518
2519 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2520         sd_bus *bus = userdata;
2521         int r;
2522
2523         assert(bus);
2524
2525         r = sd_bus_process(bus, NULL);
2526         if (r < 0)
2527                 return r;
2528
2529         return 1;
2530 }
2531
2532 static int prepare_callback(sd_event_source *s, void *userdata) {
2533         sd_bus *bus = userdata;
2534         int r, e;
2535         usec_t until;
2536
2537         assert(s);
2538         assert(bus);
2539
2540         e = sd_bus_get_events(bus);
2541         if (e < 0)
2542                 return e;
2543
2544         if (bus->output_fd != bus->input_fd) {
2545
2546                 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2547                 if (r < 0)
2548                         return r;
2549
2550                 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2551                 if (r < 0)
2552                         return r;
2553         } else {
2554                 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2555                 if (r < 0)
2556                         return r;
2557         }
2558
2559         r = sd_bus_get_timeout(bus, &until);
2560         if (r < 0)
2561                 return r;
2562         if (r > 0) {
2563                 int j;
2564
2565                 j = sd_event_source_set_time(bus->time_event_source, until);
2566                 if (j < 0)
2567                         return j;
2568         }
2569
2570         r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2571         if (r < 0)
2572                 return r;
2573
2574         return 1;
2575 }
2576
2577 static int quit_callback(sd_event_source *event, void *userdata) {
2578         sd_bus *bus = userdata;
2579
2580         assert(event);
2581
2582         sd_bus_flush(bus);
2583
2584         return 1;
2585 }
2586
2587 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2588         int r;
2589
2590         assert_return(bus, -EINVAL);
2591         assert_return(!bus->event, -EBUSY);
2592
2593         assert(!bus->input_io_event_source);
2594         assert(!bus->output_io_event_source);
2595         assert(!bus->time_event_source);
2596
2597         if (event)
2598                 bus->event = sd_event_ref(event);
2599         else  {
2600                 r = sd_event_default(&bus->event);
2601                 if (r < 0)
2602                         return r;
2603         }
2604
2605         r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2606         if (r < 0)
2607                 goto fail;
2608
2609         r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2610         if (r < 0)
2611                 goto fail;
2612
2613         if (bus->output_fd != bus->input_fd) {
2614                 r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2615                 if (r < 0)
2616                         goto fail;
2617
2618                 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2619                 if (r < 0)
2620                         goto fail;
2621         }
2622
2623         r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2624         if (r < 0)
2625                 goto fail;
2626
2627         r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
2628         if (r < 0)
2629                 goto fail;
2630
2631         r = sd_event_source_set_priority(bus->time_event_source, priority);
2632         if (r < 0)
2633                 goto fail;
2634
2635         r = sd_event_add_quit(bus->event, quit_callback, bus, &bus->quit_event_source);
2636         if (r < 0)
2637                 goto fail;
2638
2639         return 0;
2640
2641 fail:
2642         sd_bus_detach_event(bus);
2643         return r;
2644 }
2645
2646 _public_ int sd_bus_detach_event(sd_bus *bus) {
2647         assert_return(bus, -EINVAL);
2648         assert_return(bus->event, -ENXIO);
2649
2650         if (bus->input_io_event_source) {
2651                 sd_event_source_set_enabled(bus->input_io_event_source, SD_EVENT_OFF);
2652                 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2653         }
2654
2655         if (bus->output_io_event_source) {
2656                 sd_event_source_set_enabled(bus->output_io_event_source, SD_EVENT_OFF);
2657                 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2658         }
2659
2660         if (bus->time_event_source) {
2661                 sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);
2662                 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2663         }
2664
2665         if (bus->quit_event_source) {
2666                 sd_event_source_set_enabled(bus->quit_event_source, SD_EVENT_OFF);
2667                 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2668         }
2669
2670         if (bus->event)
2671                 bus->event = sd_event_unref(bus->event);
2672
2673         return 0;
2674 }
2675
2676 _public_ sd_event* sd_bus_get_event(sd_bus *bus) {
2677         assert_return(bus, NULL);
2678
2679         return bus->event;
2680 }
2681
2682 _public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2683         assert_return(bus, NULL);
2684
2685         return bus->current;
2686 }
2687
2688 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
2689         sd_bus *b = NULL;
2690         int r;
2691
2692         assert(bus_open);
2693         assert(default_bus);
2694
2695         if (!ret)
2696                 return !!*default_bus;
2697
2698         if (*default_bus) {
2699                 *ret = sd_bus_ref(*default_bus);
2700                 return 0;
2701         }
2702
2703         r = bus_open(&b);
2704         if (r < 0)
2705                 return r;
2706
2707         b->default_bus_ptr = default_bus;
2708         b->tid = gettid();
2709         *default_bus = b;
2710
2711         *ret = b;
2712         return 1;
2713 }
2714
2715 _public_ int sd_bus_default_system(sd_bus **ret) {
2716         static __thread sd_bus *default_system_bus = NULL;
2717
2718         return bus_default(sd_bus_open_system, &default_system_bus, ret);
2719 }
2720
2721 _public_ int sd_bus_default_user(sd_bus **ret) {
2722         static __thread sd_bus *default_user_bus = NULL;
2723
2724         return bus_default(sd_bus_open_user, &default_user_bus, ret);
2725 }
2726
2727 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
2728         assert_return(b, -EINVAL);
2729         assert_return(tid, -EINVAL);
2730         assert_return(!bus_pid_changed(b), -ECHILD);
2731
2732         if (b->tid != 0) {
2733                 *tid = b->tid;
2734                 return 0;
2735         }
2736
2737         if (b->event)
2738                 return sd_event_get_tid(b->event, tid);
2739
2740         return -ENXIO;
2741 }
2742
2743 _public_ char *sd_bus_label_escape(const char *s) {
2744         char *r, *t;
2745         const char *f;
2746
2747         assert_return(s, NULL);
2748
2749         /* Escapes all chars that D-Bus' object path cannot deal
2750          * with. Can be reversed with bus_path_unescape(). We special
2751          * case the empty string. */
2752
2753         if (*s == 0)
2754                 return strdup("_");
2755
2756         r = new(char, strlen(s)*3 + 1);
2757         if (!r)
2758                 return NULL;
2759
2760         for (f = s, t = r; *f; f++) {
2761
2762                 /* Escape everything that is not a-zA-Z0-9. We also
2763                  * escape 0-9 if it's the first character */
2764
2765                 if (!(*f >= 'A' && *f <= 'Z') &&
2766                     !(*f >= 'a' && *f <= 'z') &&
2767                     !(f > s && *f >= '0' && *f <= '9')) {
2768                         *(t++) = '_';
2769                         *(t++) = hexchar(*f >> 4);
2770                         *(t++) = hexchar(*f);
2771                 } else
2772                         *(t++) = *f;
2773         }
2774
2775         *t = 0;
2776
2777         return r;
2778 }
2779
2780 _public_ char *sd_bus_label_unescape(const char *f) {
2781         char *r, *t;
2782
2783         assert_return(f, NULL);
2784
2785         /* Special case for the empty string */
2786         if (streq(f, "_"))
2787                 return strdup("");
2788
2789         r = new(char, strlen(f) + 1);
2790         if (!r)
2791                 return NULL;
2792
2793         for (t = r; *f; f++) {
2794
2795                 if (*f == '_') {
2796                         int a, b;
2797
2798                         if ((a = unhexchar(f[1])) < 0 ||
2799                             (b = unhexchar(f[2])) < 0) {
2800                                 /* Invalid escape code, let's take it literal then */
2801                                 *(t++) = '_';
2802                         } else {
2803                                 *(t++) = (char) ((a << 4) | b);
2804                                 f += 2;
2805                         }
2806                 } else
2807                         *(t++) = *f;
2808         }
2809
2810         *t = 0;
2811
2812         return r;
2813 }