chiark / gitweb /
bus: properly handle if we get disconnected during HELLO phase
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29 #include <sys/mman.h>
30 #include <pthread.h>
31
32 #include "util.h"
33 #include "macro.h"
34 #include "strv.h"
35 #include "set.h"
36 #include "missing.h"
37
38 #include "sd-bus.h"
39 #include "bus-internal.h"
40 #include "bus-message.h"
41 #include "bus-type.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
48 #include "bus-util.h"
49 #include "bus-container.h"
50
51 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
52
53 static void bus_close_fds(sd_bus *b) {
54         assert(b);
55
56         if (b->input_fd >= 0)
57                 close_nointr_nofail(b->input_fd);
58
59         if (b->output_fd >= 0 && b->output_fd != b->input_fd)
60                 close_nointr_nofail(b->output_fd);
61
62         b->input_fd = b->output_fd = -1;
63 }
64
65 static void bus_node_destroy(sd_bus *b, struct node *n) {
66         struct node_callback *c;
67         struct node_vtable *v;
68         struct node_enumerator *e;
69
70         assert(b);
71
72         if (!n)
73                 return;
74
75         while (n->child)
76                 bus_node_destroy(b, n->child);
77
78         while ((c = n->callbacks)) {
79                 LIST_REMOVE(callbacks, n->callbacks, c);
80                 free(c);
81         }
82
83         while ((v = n->vtables)) {
84                 LIST_REMOVE(vtables, n->vtables, v);
85                 free(v->interface);
86                 free(v);
87         }
88
89         while ((e = n->enumerators)) {
90                 LIST_REMOVE(enumerators, n->enumerators, e);
91                 free(e);
92         }
93
94         if (n->parent)
95                 LIST_REMOVE(siblings, n->parent->child, n);
96
97         assert_se(hashmap_remove(b->nodes, n->path) == n);
98         free(n->path);
99         free(n);
100 }
101
102 static void bus_reset_queues(sd_bus *b) {
103         unsigned i;
104
105         assert(b);
106
107         for (i = 0; i < b->rqueue_size; i++)
108                 sd_bus_message_unref(b->rqueue[i]);
109         free(b->rqueue);
110
111         for (i = 0; i < b->wqueue_size; i++)
112                 sd_bus_message_unref(b->wqueue[i]);
113         free(b->wqueue);
114
115         b->rqueue = b->wqueue = NULL;
116         b->rqueue_size = b->wqueue_size = 0;
117 }
118
119 static void bus_free(sd_bus *b) {
120         struct filter_callback *f;
121         struct node *n;
122
123         assert(b);
124
125         sd_bus_detach_event(b);
126
127         bus_close_fds(b);
128
129         if (b->kdbus_buffer)
130                 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
131
132         free(b->rbuffer);
133         free(b->unique_name);
134         free(b->auth_buffer);
135         free(b->address);
136         free(b->kernel);
137         free(b->machine);
138
139         free(b->exec_path);
140         strv_free(b->exec_argv);
141
142         close_many(b->fds, b->n_fds);
143         free(b->fds);
144
145         bus_reset_queues(b);
146
147         hashmap_free_free(b->reply_callbacks);
148         prioq_free(b->reply_callbacks_prioq);
149
150         while ((f = b->filter_callbacks)) {
151                 LIST_REMOVE(callbacks, b->filter_callbacks, f);
152                 free(f);
153         }
154
155         bus_match_free(&b->match_callbacks);
156
157         hashmap_free_free(b->vtable_methods);
158         hashmap_free_free(b->vtable_properties);
159
160         while ((n = hashmap_first(b->nodes)))
161                 bus_node_destroy(b, n);
162
163         hashmap_free(b->nodes);
164
165         bus_kernel_flush_memfd(b);
166
167         assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
168
169         free(b);
170 }
171
172 _public_ int sd_bus_new(sd_bus **ret) {
173         sd_bus *r;
174
175         assert_return(ret, -EINVAL);
176
177         r = new0(sd_bus, 1);
178         if (!r)
179                 return -ENOMEM;
180
181         r->n_ref = REFCNT_INIT;
182         r->input_fd = r->output_fd = -1;
183         r->message_version = 1;
184         r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
185         r->original_pid = getpid();
186
187         assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
188
189         /* We guarantee that wqueue always has space for at least one
190          * entry */
191         r->wqueue = new(sd_bus_message*, 1);
192         if (!r->wqueue) {
193                 free(r);
194                 return -ENOMEM;
195         }
196
197         *ret = r;
198         return 0;
199 }
200
201 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
202         char *a;
203
204         assert_return(bus, -EINVAL);
205         assert_return(bus->state == BUS_UNSET, -EPERM);
206         assert_return(address, -EINVAL);
207         assert_return(!bus_pid_changed(bus), -ECHILD);
208
209         a = strdup(address);
210         if (!a)
211                 return -ENOMEM;
212
213         free(bus->address);
214         bus->address = a;
215
216         return 0;
217 }
218
219 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
220         assert_return(bus, -EINVAL);
221         assert_return(bus->state == BUS_UNSET, -EPERM);
222         assert_return(input_fd >= 0, -EINVAL);
223         assert_return(output_fd >= 0, -EINVAL);
224         assert_return(!bus_pid_changed(bus), -ECHILD);
225
226         bus->input_fd = input_fd;
227         bus->output_fd = output_fd;
228         return 0;
229 }
230
231 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
232         char *p, **a;
233
234         assert_return(bus, -EINVAL);
235         assert_return(bus->state == BUS_UNSET, -EPERM);
236         assert_return(path, -EINVAL);
237         assert_return(!strv_isempty(argv), -EINVAL);
238         assert_return(!bus_pid_changed(bus), -ECHILD);
239
240         p = strdup(path);
241         if (!p)
242                 return -ENOMEM;
243
244         a = strv_copy(argv);
245         if (!a) {
246                 free(p);
247                 return -ENOMEM;
248         }
249
250         free(bus->exec_path);
251         strv_free(bus->exec_argv);
252
253         bus->exec_path = p;
254         bus->exec_argv = a;
255
256         return 0;
257 }
258
259 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
260         assert_return(bus, -EINVAL);
261         assert_return(bus->state == BUS_UNSET, -EPERM);
262         assert_return(!bus_pid_changed(bus), -ECHILD);
263
264         bus->bus_client = !!b;
265         return 0;
266 }
267
268 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
269         assert_return(bus, -EINVAL);
270         assert_return(bus->state == BUS_UNSET, -EPERM);
271         assert_return(!bus_pid_changed(bus), -ECHILD);
272
273         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
274         return 0;
275 }
276
277 _public_ int sd_bus_negotiate_attach_timestamp(sd_bus *bus, int b) {
278         assert_return(bus, -EINVAL);
279         assert_return(bus->state == BUS_UNSET, -EPERM);
280         assert_return(!bus_pid_changed(bus), -ECHILD);
281
282         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_TIMESTAMP, b);
283         return 0;
284 }
285
286 _public_ int sd_bus_negotiate_attach_creds(sd_bus *bus, int b) {
287         assert_return(bus, -EINVAL);
288         assert_return(bus->state == BUS_UNSET, -EPERM);
289         assert_return(!bus_pid_changed(bus), -ECHILD);
290
291         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CREDS, b);
292         return 0;
293 }
294
295 _public_ int sd_bus_negotiate_attach_comm(sd_bus *bus, int b) {
296         assert_return(bus, -EINVAL);
297         assert_return(bus->state == BUS_UNSET, -EPERM);
298         assert_return(!bus_pid_changed(bus), -ECHILD);
299
300         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_COMM, b);
301         return 0;
302 }
303
304 _public_ int sd_bus_negotiate_attach_exe(sd_bus *bus, int b) {
305         assert_return(bus, -EINVAL);
306         assert_return(bus->state == BUS_UNSET, -EPERM);
307         assert_return(!bus_pid_changed(bus), -ECHILD);
308
309         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_EXE, b);
310         return 0;
311 }
312
313 _public_ int sd_bus_negotiate_attach_cmdline(sd_bus *bus, int b) {
314         assert_return(bus, -EINVAL);
315         assert_return(bus->state == BUS_UNSET, -EPERM);
316         assert_return(!bus_pid_changed(bus), -ECHILD);
317
318         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CMDLINE, b);
319         return 0;
320 }
321
322 _public_ int sd_bus_negotiate_attach_cgroup(sd_bus *bus, int b) {
323         assert_return(bus, -EINVAL);
324         assert_return(bus->state == BUS_UNSET, -EPERM);
325         assert_return(!bus_pid_changed(bus), -ECHILD);
326
327         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CGROUP, b);
328         return 0;
329 }
330
331 _public_ int sd_bus_negotiate_attach_caps(sd_bus *bus, int b) {
332         assert_return(bus, -EINVAL);
333         assert_return(bus->state == BUS_UNSET, -EPERM);
334         assert_return(!bus_pid_changed(bus), -ECHILD);
335
336         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CAPS, b);
337         return 0;
338 }
339
340 _public_ int sd_bus_negotiate_attach_selinux_context(sd_bus *bus, int b) {
341         assert_return(bus, -EINVAL);
342         assert_return(bus->state == BUS_UNSET, -EPERM);
343         assert_return(!bus_pid_changed(bus), -ECHILD);
344
345         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_SECLABEL, b);
346         return 0;
347 }
348
349 _public_ int sd_bus_negotiate_attach_audit(sd_bus *bus, int b) {
350         assert_return(bus, -EINVAL);
351         assert_return(bus->state == BUS_UNSET, -EPERM);
352         assert_return(!bus_pid_changed(bus), -ECHILD);
353
354         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_AUDIT, b);
355         return 0;
356 }
357
358 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
359         assert_return(bus, -EINVAL);
360         assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
361         assert_return(bus->state == BUS_UNSET, -EPERM);
362         assert_return(!bus_pid_changed(bus), -ECHILD);
363
364         bus->is_server = !!b;
365         bus->server_id = server_id;
366         return 0;
367 }
368
369 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
370         assert_return(bus, -EINVAL);
371         assert_return(bus->state == BUS_UNSET, -EPERM);
372         assert_return(!bus_pid_changed(bus), -ECHILD);
373
374         bus->anonymous_auth = !!b;
375         return 0;
376 }
377
378 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata, sd_bus_error *error) {
379         const char *s;
380         int r;
381
382         assert(bus);
383         assert(bus->state == BUS_HELLO || bus->state == BUS_CLOSING);
384         assert(reply);
385
386         r = sd_bus_message_get_errno(reply);
387         if (r < 0)
388                 return r;
389         if (r > 0)
390                 return -r;
391
392         r = sd_bus_message_read(reply, "s", &s);
393         if (r < 0)
394                 return r;
395
396         if (!service_name_is_valid(s) || s[0] != ':')
397                 return -EBADMSG;
398
399         bus->unique_name = strdup(s);
400         if (!bus->unique_name)
401                 return -ENOMEM;
402
403         if (bus->state == BUS_HELLO)
404                 bus->state = BUS_RUNNING;
405
406         return 1;
407 }
408
409 static int bus_send_hello(sd_bus *bus) {
410         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
411         int r;
412
413         assert(bus);
414
415         if (!bus->bus_client || bus->is_kernel)
416                 return 0;
417
418         r = sd_bus_message_new_method_call(
419                         bus,
420                         "org.freedesktop.DBus",
421                         "/",
422                         "org.freedesktop.DBus",
423                         "Hello",
424                         &m);
425         if (r < 0)
426                 return r;
427
428         return sd_bus_call_async(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
429 }
430
431 int bus_start_running(sd_bus *bus) {
432         assert(bus);
433
434         if (bus->bus_client && !bus->is_kernel) {
435                 bus->state = BUS_HELLO;
436                 return 1;
437         }
438
439         bus->state = BUS_RUNNING;
440         return 1;
441 }
442
443 static int parse_address_key(const char **p, const char *key, char **value) {
444         size_t l, n = 0;
445         const char *a;
446         char *r = NULL;
447
448         assert(p);
449         assert(*p);
450         assert(value);
451
452         if (key) {
453                 l = strlen(key);
454                 if (strncmp(*p, key, l) != 0)
455                         return 0;
456
457                 if ((*p)[l] != '=')
458                         return 0;
459
460                 if (*value)
461                         return -EINVAL;
462
463                 a = *p + l + 1;
464         } else
465                 a = *p;
466
467         while (*a != ';' && *a != ',' && *a != 0) {
468                 char c, *t;
469
470                 if (*a == '%') {
471                         int x, y;
472
473                         x = unhexchar(a[1]);
474                         if (x < 0) {
475                                 free(r);
476                                 return x;
477                         }
478
479                         y = unhexchar(a[2]);
480                         if (y < 0) {
481                                 free(r);
482                                 return y;
483                         }
484
485                         c = (char) ((x << 4) | y);
486                         a += 3;
487                 } else {
488                         c = *a;
489                         a++;
490                 }
491
492                 t = realloc(r, n + 2);
493                 if (!t) {
494                         free(r);
495                         return -ENOMEM;
496                 }
497
498                 r = t;
499                 r[n++] = c;
500         }
501
502         if (!r) {
503                 r = strdup("");
504                 if (!r)
505                         return -ENOMEM;
506         } else
507                 r[n] = 0;
508
509         if (*a == ',')
510                 a++;
511
512         *p = a;
513
514         free(*value);
515         *value = r;
516
517         return 1;
518 }
519
520 static void skip_address_key(const char **p) {
521         assert(p);
522         assert(*p);
523
524         *p += strcspn(*p, ",");
525
526         if (**p == ',')
527                 (*p) ++;
528 }
529
530 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
531         _cleanup_free_ char *path = NULL, *abstract = NULL;
532         size_t l;
533         int r;
534
535         assert(b);
536         assert(p);
537         assert(*p);
538         assert(guid);
539
540         while (**p != 0 && **p != ';') {
541                 r = parse_address_key(p, "guid", guid);
542                 if (r < 0)
543                         return r;
544                 else if (r > 0)
545                         continue;
546
547                 r = parse_address_key(p, "path", &path);
548                 if (r < 0)
549                         return r;
550                 else if (r > 0)
551                         continue;
552
553                 r = parse_address_key(p, "abstract", &abstract);
554                 if (r < 0)
555                         return r;
556                 else if (r > 0)
557                         continue;
558
559                 skip_address_key(p);
560         }
561
562         if (!path && !abstract)
563                 return -EINVAL;
564
565         if (path && abstract)
566                 return -EINVAL;
567
568         if (path) {
569                 l = strlen(path);
570                 if (l > sizeof(b->sockaddr.un.sun_path))
571                         return -E2BIG;
572
573                 b->sockaddr.un.sun_family = AF_UNIX;
574                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
575                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
576         } else if (abstract) {
577                 l = strlen(abstract);
578                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
579                         return -E2BIG;
580
581                 b->sockaddr.un.sun_family = AF_UNIX;
582                 b->sockaddr.un.sun_path[0] = 0;
583                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
584                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
585         }
586
587         return 0;
588 }
589
590 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
591         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
592         int r;
593         struct addrinfo *result, hints = {
594                 .ai_socktype = SOCK_STREAM,
595                 .ai_flags = AI_ADDRCONFIG,
596         };
597
598         assert(b);
599         assert(p);
600         assert(*p);
601         assert(guid);
602
603         while (**p != 0 && **p != ';') {
604                 r = parse_address_key(p, "guid", guid);
605                 if (r < 0)
606                         return r;
607                 else if (r > 0)
608                         continue;
609
610                 r = parse_address_key(p, "host", &host);
611                 if (r < 0)
612                         return r;
613                 else if (r > 0)
614                         continue;
615
616                 r = parse_address_key(p, "port", &port);
617                 if (r < 0)
618                         return r;
619                 else if (r > 0)
620                         continue;
621
622                 r = parse_address_key(p, "family", &family);
623                 if (r < 0)
624                         return r;
625                 else if (r > 0)
626                         continue;
627
628                 skip_address_key(p);
629         }
630
631         if (!host || !port)
632                 return -EINVAL;
633
634         if (family) {
635                 if (streq(family, "ipv4"))
636                         hints.ai_family = AF_INET;
637                 else if (streq(family, "ipv6"))
638                         hints.ai_family = AF_INET6;
639                 else
640                         return -EINVAL;
641         }
642
643         r = getaddrinfo(host, port, &hints, &result);
644         if (r == EAI_SYSTEM)
645                 return -errno;
646         else if (r != 0)
647                 return -EADDRNOTAVAIL;
648
649         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
650         b->sockaddr_size = result->ai_addrlen;
651
652         freeaddrinfo(result);
653
654         return 0;
655 }
656
657 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
658         char *path = NULL;
659         unsigned n_argv = 0, j;
660         char **argv = NULL;
661         int r;
662
663         assert(b);
664         assert(p);
665         assert(*p);
666         assert(guid);
667
668         while (**p != 0 && **p != ';') {
669                 r = parse_address_key(p, "guid", guid);
670                 if (r < 0)
671                         goto fail;
672                 else if (r > 0)
673                         continue;
674
675                 r = parse_address_key(p, "path", &path);
676                 if (r < 0)
677                         goto fail;
678                 else if (r > 0)
679                         continue;
680
681                 if (startswith(*p, "argv")) {
682                         unsigned ul;
683
684                         errno = 0;
685                         ul = strtoul(*p + 4, (char**) p, 10);
686                         if (errno > 0 || **p != '=' || ul > 256) {
687                                 r = -EINVAL;
688                                 goto fail;
689                         }
690
691                         (*p) ++;
692
693                         if (ul >= n_argv) {
694                                 char **x;
695
696                                 x = realloc(argv, sizeof(char*) * (ul + 2));
697                                 if (!x) {
698                                         r = -ENOMEM;
699                                         goto fail;
700                                 }
701
702                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
703
704                                 argv = x;
705                                 n_argv = ul + 1;
706                         }
707
708                         r = parse_address_key(p, NULL, argv + ul);
709                         if (r < 0)
710                                 goto fail;
711
712                         continue;
713                 }
714
715                 skip_address_key(p);
716         }
717
718         if (!path) {
719                 r = -EINVAL;
720                 goto fail;
721         }
722
723         /* Make sure there are no holes in the array, with the
724          * exception of argv[0] */
725         for (j = 1; j < n_argv; j++)
726                 if (!argv[j]) {
727                         r = -EINVAL;
728                         goto fail;
729                 }
730
731         if (argv && argv[0] == NULL) {
732                 argv[0] = strdup(path);
733                 if (!argv[0]) {
734                         r = -ENOMEM;
735                         goto fail;
736                 }
737         }
738
739         b->exec_path = path;
740         b->exec_argv = argv;
741         return 0;
742
743 fail:
744         for (j = 0; j < n_argv; j++)
745                 free(argv[j]);
746
747         free(argv);
748         free(path);
749         return r;
750 }
751
752 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
753         _cleanup_free_ char *path = NULL;
754         int r;
755
756         assert(b);
757         assert(p);
758         assert(*p);
759         assert(guid);
760
761         while (**p != 0 && **p != ';') {
762                 r = parse_address_key(p, "guid", guid);
763                 if (r < 0)
764                         return r;
765                 else if (r > 0)
766                         continue;
767
768                 r = parse_address_key(p, "path", &path);
769                 if (r < 0)
770                         return r;
771                 else if (r > 0)
772                         continue;
773
774                 skip_address_key(p);
775         }
776
777         if (!path)
778                 return -EINVAL;
779
780         free(b->kernel);
781         b->kernel = path;
782         path = NULL;
783
784         return 0;
785 }
786
787 static int parse_container_address(sd_bus *b, const char **p, char **guid) {
788         _cleanup_free_ char *machine = NULL;
789         int r;
790
791         assert(b);
792         assert(p);
793         assert(*p);
794         assert(guid);
795
796         while (**p != 0 && **p != ';') {
797                 r = parse_address_key(p, "guid", guid);
798                 if (r < 0)
799                         return r;
800                 else if (r > 0)
801                         continue;
802
803                 r = parse_address_key(p, "machine", &machine);
804                 if (r < 0)
805                         return r;
806                 else if (r > 0)
807                         continue;
808
809                 skip_address_key(p);
810         }
811
812         if (!machine)
813                 return -EINVAL;
814
815         free(b->machine);
816         b->machine = machine;
817         machine = NULL;
818
819         b->sockaddr.un.sun_family = AF_UNIX;
820         strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
821         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
822
823         return 0;
824 }
825
826 static void bus_reset_parsed_address(sd_bus *b) {
827         assert(b);
828
829         zero(b->sockaddr);
830         b->sockaddr_size = 0;
831         strv_free(b->exec_argv);
832         free(b->exec_path);
833         b->exec_path = NULL;
834         b->exec_argv = NULL;
835         b->server_id = SD_ID128_NULL;
836         free(b->kernel);
837         b->kernel = NULL;
838         free(b->machine);
839         b->machine = NULL;
840 }
841
842 static int bus_parse_next_address(sd_bus *b) {
843         _cleanup_free_ char *guid = NULL;
844         const char *a;
845         int r;
846
847         assert(b);
848
849         if (!b->address)
850                 return 0;
851         if (b->address[b->address_index] == 0)
852                 return 0;
853
854         bus_reset_parsed_address(b);
855
856         a = b->address + b->address_index;
857
858         while (*a != 0) {
859
860                 if (*a == ';') {
861                         a++;
862                         continue;
863                 }
864
865                 if (startswith(a, "unix:")) {
866                         a += 5;
867
868                         r = parse_unix_address(b, &a, &guid);
869                         if (r < 0)
870                                 return r;
871                         break;
872
873                 } else if (startswith(a, "tcp:")) {
874
875                         a += 4;
876                         r = parse_tcp_address(b, &a, &guid);
877                         if (r < 0)
878                                 return r;
879
880                         break;
881
882                 } else if (startswith(a, "unixexec:")) {
883
884                         a += 9;
885                         r = parse_exec_address(b, &a, &guid);
886                         if (r < 0)
887                                 return r;
888
889                         break;
890
891                 } else if (startswith(a, "kernel:")) {
892
893                         a += 7;
894                         r = parse_kernel_address(b, &a, &guid);
895                         if (r < 0)
896                                 return r;
897
898                         break;
899                 } else if (startswith(a, "x-container:")) {
900
901                         a += 12;
902                         r = parse_container_address(b, &a, &guid);
903                         if (r < 0)
904                                 return r;
905
906                         break;
907                 }
908
909                 a = strchr(a, ';');
910                 if (!a)
911                         return 0;
912         }
913
914         if (guid) {
915                 r = sd_id128_from_string(guid, &b->server_id);
916                 if (r < 0)
917                         return r;
918         }
919
920         b->address_index = a - b->address;
921         return 1;
922 }
923
924 static int bus_start_address(sd_bus *b) {
925         int r;
926
927         assert(b);
928
929         for (;;) {
930                 sd_bus_close(b);
931
932                 if (b->exec_path) {
933
934                         r = bus_socket_exec(b);
935                         if (r >= 0)
936                                 return r;
937
938                         b->last_connect_error = -r;
939                 } else if (b->kernel) {
940
941                         r = bus_kernel_connect(b);
942                         if (r >= 0)
943                                 return r;
944
945                         b->last_connect_error = -r;
946
947                 } else if (b->machine) {
948
949                         r = bus_container_connect(b);
950                         if (r >= 0)
951                                 return r;
952
953                         b->last_connect_error = -r;
954
955                 } else if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
956
957                         r = bus_socket_connect(b);
958                         if (r >= 0)
959                                 return r;
960
961                         b->last_connect_error = -r;
962                 }
963
964                 r = bus_parse_next_address(b);
965                 if (r < 0)
966                         return r;
967                 if (r == 0)
968                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
969         }
970 }
971
972 int bus_next_address(sd_bus *b) {
973         assert(b);
974
975         bus_reset_parsed_address(b);
976         return bus_start_address(b);
977 }
978
979 static int bus_start_fd(sd_bus *b) {
980         struct stat st;
981         int r;
982
983         assert(b);
984         assert(b->input_fd >= 0);
985         assert(b->output_fd >= 0);
986
987         r = fd_nonblock(b->input_fd, true);
988         if (r < 0)
989                 return r;
990
991         r = fd_cloexec(b->input_fd, true);
992         if (r < 0)
993                 return r;
994
995         if (b->input_fd != b->output_fd) {
996                 r = fd_nonblock(b->output_fd, true);
997                 if (r < 0)
998                         return r;
999
1000                 r = fd_cloexec(b->output_fd, true);
1001                 if (r < 0)
1002                         return r;
1003         }
1004
1005         if (fstat(b->input_fd, &st) < 0)
1006                 return -errno;
1007
1008         if (S_ISCHR(b->input_fd))
1009                 return bus_kernel_take_fd(b);
1010         else
1011                 return bus_socket_take_fd(b);
1012 }
1013
1014 _public_ int sd_bus_start(sd_bus *bus) {
1015         int r;
1016
1017         assert_return(bus, -EINVAL);
1018         assert_return(bus->state == BUS_UNSET, -EPERM);
1019         assert_return(!bus_pid_changed(bus), -ECHILD);
1020
1021         bus->state = BUS_OPENING;
1022
1023         if (bus->is_server && bus->bus_client)
1024                 return -EINVAL;
1025
1026         if (bus->input_fd >= 0)
1027                 r = bus_start_fd(bus);
1028         else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
1029                 r = bus_start_address(bus);
1030         else
1031                 return -EINVAL;
1032
1033         if (r < 0)
1034                 return r;
1035
1036         return bus_send_hello(bus);
1037 }
1038
1039 _public_ int sd_bus_open_system(sd_bus **ret) {
1040         const char *e;
1041         sd_bus *b;
1042         int r;
1043
1044         assert_return(ret, -EINVAL);
1045
1046         r = sd_bus_new(&b);
1047         if (r < 0)
1048                 return r;
1049
1050         e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1051         if (e) {
1052                 r = sd_bus_set_address(b, e);
1053                 if (r < 0)
1054                         goto fail;
1055         } else {
1056                 b->sockaddr.un.sun_family = AF_UNIX;
1057                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
1058                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
1059         }
1060
1061         b->bus_client = true;
1062
1063         r = sd_bus_start(b);
1064         if (r < 0)
1065                 goto fail;
1066
1067         *ret = b;
1068         return 0;
1069
1070 fail:
1071         bus_free(b);
1072         return r;
1073 }
1074
1075 _public_ int sd_bus_open_user(sd_bus **ret) {
1076         const char *e;
1077         sd_bus *b;
1078         size_t l;
1079         int r;
1080
1081         assert_return(ret, -EINVAL);
1082
1083         r = sd_bus_new(&b);
1084         if (r < 0)
1085                 return r;
1086
1087         e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1088         if (e) {
1089                 r = sd_bus_set_address(b, e);
1090                 if (r < 0)
1091                         goto fail;
1092         } else {
1093                 e = secure_getenv("XDG_RUNTIME_DIR");
1094                 if (!e) {
1095                         r = -ENOENT;
1096                         goto fail;
1097                 }
1098
1099                 l = strlen(e);
1100                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1101                         r = -E2BIG;
1102                         goto fail;
1103                 }
1104
1105                 b->sockaddr.un.sun_family = AF_UNIX;
1106                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1107                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1108         }
1109
1110         b->bus_client = true;
1111
1112         r = sd_bus_start(b);
1113         if (r < 0)
1114                 goto fail;
1115
1116         *ret = b;
1117         return 0;
1118
1119 fail:
1120         bus_free(b);
1121         return r;
1122 }
1123
1124 _public_ int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1125         _cleanup_free_ char *e = NULL;
1126         char *p = NULL;
1127         sd_bus *bus;
1128         int r;
1129
1130         assert_return(host, -EINVAL);
1131         assert_return(ret, -EINVAL);
1132
1133         e = bus_address_escape(host);
1134         if (!e)
1135                 return -ENOMEM;
1136
1137         p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1138         if (!p)
1139                 return -ENOMEM;
1140
1141         r = sd_bus_new(&bus);
1142         if (r < 0) {
1143                 free(p);
1144                 return r;
1145         }
1146
1147         bus->address = p;
1148         bus->bus_client = true;
1149
1150         r = sd_bus_start(bus);
1151         if (r < 0) {
1152                 bus_free(bus);
1153                 return r;
1154         }
1155
1156         *ret = bus;
1157         return 0;
1158 }
1159
1160 _public_ int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1161         _cleanup_free_ char *e = NULL;
1162         sd_bus *bus;
1163         char *p;
1164         int r;
1165
1166         assert_return(machine, -EINVAL);
1167         assert_return(ret, -EINVAL);
1168
1169         e = bus_address_escape(machine);
1170         if (!e)
1171                 return -ENOMEM;
1172
1173         p = strjoin("x-container:machine=", e, NULL);
1174         if (!p)
1175                 return -ENOMEM;
1176
1177         r = sd_bus_new(&bus);
1178         if (r < 0) {
1179                 free(p);
1180                 return r;
1181         }
1182
1183         bus->address = p;
1184         bus->bus_client = true;
1185
1186         r = sd_bus_start(bus);
1187         if (r < 0) {
1188                 bus_free(bus);
1189                 return r;
1190         }
1191
1192         *ret = bus;
1193         return 0;
1194 }
1195
1196 _public_ void sd_bus_close(sd_bus *bus) {
1197
1198         if (!bus)
1199                 return;
1200         if (bus->state == BUS_CLOSED)
1201                 return;
1202         if (bus_pid_changed(bus))
1203                 return;
1204
1205         bus->state = BUS_CLOSED;
1206
1207         sd_bus_detach_event(bus);
1208
1209         /* Drop all queued messages so that they drop references to
1210          * the bus object and the bus may be freed */
1211         bus_reset_queues(bus);
1212
1213         if (!bus->is_kernel)
1214                 bus_close_fds(bus);
1215
1216         /* We'll leave the fd open in case this is a kernel bus, since
1217          * there might still be memblocks around that reference this
1218          * bus, and they might need to invoke the
1219          * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1220          * freed. */
1221 }
1222
1223 static void bus_enter_closing(sd_bus *bus) {
1224         assert(bus);
1225
1226         if (bus->state != BUS_OPENING &&
1227             bus->state != BUS_AUTHENTICATING &&
1228             bus->state != BUS_HELLO &&
1229             bus->state != BUS_RUNNING)
1230                 return;
1231
1232         bus->state = BUS_CLOSING;
1233 }
1234
1235 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1236         assert_return(bus, NULL);
1237
1238         assert_se(REFCNT_INC(bus->n_ref) >= 2);
1239
1240         return bus;
1241 }
1242
1243 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1244         assert_return(bus, NULL);
1245
1246         if (REFCNT_DEC(bus->n_ref) <= 0)
1247                 bus_free(bus);
1248
1249         return NULL;
1250 }
1251
1252 _public_ int sd_bus_is_open(sd_bus *bus) {
1253
1254         assert_return(bus, -EINVAL);
1255         assert_return(!bus_pid_changed(bus), -ECHILD);
1256
1257         return BUS_IS_OPEN(bus->state);
1258 }
1259
1260 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1261         int r;
1262
1263         assert_return(bus, -EINVAL);
1264         assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1265         assert_return(!bus_pid_changed(bus), -ECHILD);
1266
1267         if (type == SD_BUS_TYPE_UNIX_FD) {
1268                 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1269                         return 0;
1270
1271                 r = bus_ensure_running(bus);
1272                 if (r < 0)
1273                         return r;
1274
1275                 return bus->can_fds;
1276         }
1277
1278         return bus_type_is_valid(type);
1279 }
1280
1281 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1282         int r;
1283
1284         assert_return(bus, -EINVAL);
1285         assert_return(server_id, -EINVAL);
1286         assert_return(!bus_pid_changed(bus), -ECHILD);
1287
1288         r = bus_ensure_running(bus);
1289         if (r < 0)
1290                 return r;
1291
1292         *server_id = bus->server_id;
1293         return 0;
1294 }
1295
1296 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1297         assert(m);
1298
1299         if (m->header->version > b->message_version)
1300                 return -EPERM;
1301
1302         if (m->sealed) {
1303                 /* If we copy the same message to multiple
1304                  * destinations, avoid using the same serial
1305                  * numbers. */
1306                 b->serial = MAX(b->serial, BUS_MESSAGE_SERIAL(m));
1307                 return 0;
1308         }
1309
1310         return bus_message_seal(m, ++b->serial);
1311 }
1312
1313 static int bus_write_message(sd_bus *bus, sd_bus_message *message, size_t *idx) {
1314         int r;
1315
1316         assert(bus);
1317         assert(message);
1318
1319         if (bus->is_kernel)
1320                 r = bus_kernel_write_message(bus, message);
1321         else
1322                 r = bus_socket_write_message(bus, message, idx);
1323
1324         return r;
1325 }
1326
1327 static int dispatch_wqueue(sd_bus *bus) {
1328         int r, ret = 0;
1329
1330         assert(bus);
1331         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1332
1333         while (bus->wqueue_size > 0) {
1334
1335                 r = bus_write_message(bus, bus->wqueue[0], &bus->windex);
1336                 if (r < 0)
1337                         return r;
1338                 else if (r == 0)
1339                         /* Didn't do anything this time */
1340                         return ret;
1341                 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1342                         /* Fully written. Let's drop the entry from
1343                          * the queue.
1344                          *
1345                          * This isn't particularly optimized, but
1346                          * well, this is supposed to be our worst-case
1347                          * buffer only, and the socket buffer is
1348                          * supposed to be our primary buffer, and if
1349                          * it got full, then all bets are off
1350                          * anyway. */
1351
1352                         sd_bus_message_unref(bus->wqueue[0]);
1353                         bus->wqueue_size --;
1354                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1355                         bus->windex = 0;
1356
1357                         ret = 1;
1358                 }
1359         }
1360
1361         return ret;
1362 }
1363
1364 static int bus_read_message(sd_bus *bus, sd_bus_message **m) {
1365         int r;
1366
1367         assert(bus);
1368         assert(m);
1369
1370         if (bus->is_kernel)
1371                 r = bus_kernel_read_message(bus, m);
1372         else
1373                 r = bus_socket_read_message(bus, m);
1374
1375         return r;
1376 }
1377
1378 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1379         sd_bus_message *z = NULL;
1380         int r, ret = 0;
1381
1382         assert(bus);
1383         assert(m);
1384         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1385
1386         if (bus->rqueue_size > 0) {
1387                 /* Dispatch a queued message */
1388
1389                 *m = bus->rqueue[0];
1390                 bus->rqueue_size --;
1391                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1392                 return 1;
1393         }
1394
1395         /* Try to read a new message */
1396         do {
1397                 r = bus_read_message(bus, &z);
1398                 if (r < 0)
1399                         return r;
1400                 if (r == 0)
1401                         return ret;
1402
1403                 ret = 1;
1404         } while (!z);
1405
1406         *m = z;
1407         return ret;
1408 }
1409
1410 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1411         int r;
1412
1413         assert_return(bus, -EINVAL);
1414         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1415         assert_return(m, -EINVAL);
1416         assert_return(!bus_pid_changed(bus), -ECHILD);
1417
1418         if (m->n_fds > 0) {
1419                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1420                 if (r < 0)
1421                         return r;
1422                 if (r == 0)
1423                         return -ENOTSUP;
1424         }
1425
1426         /* If the serial number isn't kept, then we know that no reply
1427          * is expected */
1428         if (!serial && !m->sealed)
1429                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1430
1431         r = bus_seal_message(bus, m);
1432         if (r < 0)
1433                 return r;
1434
1435         /* If this is a reply and no reply was requested, then let's
1436          * suppress this, if we can */
1437         if (m->dont_send && !serial)
1438                 return 1;
1439
1440         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1441                 size_t idx = 0;
1442
1443                 r = bus_write_message(bus, m, &idx);
1444                 if (r < 0)
1445                         return r;
1446                 else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m))  {
1447                         /* Wasn't fully written. So let's remember how
1448                          * much was written. Note that the first entry
1449                          * of the wqueue array is always allocated so
1450                          * that we always can remember how much was
1451                          * written. */
1452                         bus->wqueue[0] = sd_bus_message_ref(m);
1453                         bus->wqueue_size = 1;
1454                         bus->windex = idx;
1455                 }
1456         } else {
1457                 sd_bus_message **q;
1458
1459                 /* Just append it to the queue. */
1460
1461                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1462                         return -ENOBUFS;
1463
1464                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1465                 if (!q)
1466                         return -ENOMEM;
1467
1468                 bus->wqueue = q;
1469                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1470         }
1471
1472         if (serial)
1473                 *serial = BUS_MESSAGE_SERIAL(m);
1474
1475         return 1;
1476 }
1477
1478 _public_ int sd_bus_send_to(sd_bus *bus, sd_bus_message *m, const char *destination, uint64_t *serial) {
1479         int r;
1480
1481         assert_return(bus, -EINVAL);
1482         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1483         assert_return(m, -EINVAL);
1484         assert_return(!bus_pid_changed(bus), -ECHILD);
1485
1486         if (!streq_ptr(m->destination, destination)) {
1487
1488                 if (!destination)
1489                         return -EEXIST;
1490
1491                 r = sd_bus_message_set_destination(m, destination);
1492                 if (r < 0)
1493                         return r;
1494         }
1495
1496         return sd_bus_send(bus, m, serial);
1497 }
1498
1499 static usec_t calc_elapse(uint64_t usec) {
1500         if (usec == (uint64_t) -1)
1501                 return 0;
1502
1503         if (usec == 0)
1504                 usec = BUS_DEFAULT_TIMEOUT;
1505
1506         return now(CLOCK_MONOTONIC) + usec;
1507 }
1508
1509 static int timeout_compare(const void *a, const void *b) {
1510         const struct reply_callback *x = a, *y = b;
1511
1512         if (x->timeout != 0 && y->timeout == 0)
1513                 return -1;
1514
1515         if (x->timeout == 0 && y->timeout != 0)
1516                 return 1;
1517
1518         if (x->timeout < y->timeout)
1519                 return -1;
1520
1521         if (x->timeout > y->timeout)
1522                 return 1;
1523
1524         return 0;
1525 }
1526
1527 _public_ int sd_bus_call_async(
1528                 sd_bus *bus,
1529                 sd_bus_message *m,
1530                 sd_bus_message_handler_t callback,
1531                 void *userdata,
1532                 uint64_t usec,
1533                 uint64_t *serial) {
1534
1535         struct reply_callback *c;
1536         int r;
1537
1538         assert_return(bus, -EINVAL);
1539         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1540         assert_return(m, -EINVAL);
1541         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1542         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1543         assert_return(callback, -EINVAL);
1544         assert_return(!bus_pid_changed(bus), -ECHILD);
1545
1546         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1547         if (r < 0)
1548                 return r;
1549
1550         if (usec != (uint64_t) -1) {
1551                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1552                 if (r < 0)
1553                         return r;
1554         }
1555
1556         r = bus_seal_message(bus, m);
1557         if (r < 0)
1558                 return r;
1559
1560         c = new0(struct reply_callback, 1);
1561         if (!c)
1562                 return -ENOMEM;
1563
1564         c->callback = callback;
1565         c->userdata = userdata;
1566         c->serial = BUS_MESSAGE_SERIAL(m);
1567         c->timeout = calc_elapse(usec);
1568
1569         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1570         if (r < 0) {
1571                 free(c);
1572                 return r;
1573         }
1574
1575         if (c->timeout != 0) {
1576                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1577                 if (r < 0) {
1578                         c->timeout = 0;
1579                         sd_bus_call_async_cancel(bus, c->serial);
1580                         return r;
1581                 }
1582         }
1583
1584         r = sd_bus_send(bus, m, serial);
1585         if (r < 0) {
1586                 sd_bus_call_async_cancel(bus, c->serial);
1587                 return r;
1588         }
1589
1590         return r;
1591 }
1592
1593 _public_ int sd_bus_call_async_cancel(sd_bus *bus, uint64_t serial) {
1594         struct reply_callback *c;
1595
1596         assert_return(bus, -EINVAL);
1597         assert_return(serial != 0, -EINVAL);
1598         assert_return(!bus_pid_changed(bus), -ECHILD);
1599
1600         c = hashmap_remove(bus->reply_callbacks, &serial);
1601         if (!c)
1602                 return 0;
1603
1604         if (c->timeout != 0)
1605                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1606
1607         free(c);
1608         return 1;
1609 }
1610
1611 int bus_ensure_running(sd_bus *bus) {
1612         int r;
1613
1614         assert(bus);
1615
1616         if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED || bus->state == BUS_CLOSING)
1617                 return -ENOTCONN;
1618         if (bus->state == BUS_RUNNING)
1619                 return 1;
1620
1621         for (;;) {
1622                 r = sd_bus_process(bus, NULL);
1623                 if (r < 0)
1624                         return r;
1625                 if (bus->state == BUS_RUNNING)
1626                         return 1;
1627                 if (r > 0)
1628                         continue;
1629
1630                 r = sd_bus_wait(bus, (uint64_t) -1);
1631                 if (r < 0)
1632                         return r;
1633         }
1634 }
1635
1636 _public_ int sd_bus_call(
1637                 sd_bus *bus,
1638                 sd_bus_message *m,
1639                 uint64_t usec,
1640                 sd_bus_error *error,
1641                 sd_bus_message **reply) {
1642
1643         int r;
1644         usec_t timeout;
1645         uint64_t serial;
1646         bool room = false;
1647
1648         assert_return(bus, -EINVAL);
1649         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1650         assert_return(m, -EINVAL);
1651         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1652         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1653         assert_return(!bus_error_is_dirty(error), -EINVAL);
1654         assert_return(!bus_pid_changed(bus), -ECHILD);
1655
1656         r = bus_ensure_running(bus);
1657         if (r < 0)
1658                 return r;
1659
1660         r = sd_bus_send(bus, m, &serial);
1661         if (r < 0)
1662                 return r;
1663
1664         timeout = calc_elapse(usec);
1665
1666         for (;;) {
1667                 usec_t left;
1668                 sd_bus_message *incoming = NULL;
1669
1670                 if (!room) {
1671                         sd_bus_message **q;
1672
1673                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1674                                 return -ENOBUFS;
1675
1676                         /* Make sure there's room for queuing this
1677                          * locally, before we read the message */
1678
1679                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1680                         if (!q)
1681                                 return -ENOMEM;
1682
1683                         bus->rqueue = q;
1684                         room = true;
1685                 }
1686
1687                 r = bus_read_message(bus, &incoming);
1688                 if (r < 0)
1689                         return r;
1690
1691                 if (incoming) {
1692
1693                         if (incoming->reply_serial == serial) {
1694                                 /* Found a match! */
1695
1696                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1697
1698                                         if (reply)
1699                                                 *reply = incoming;
1700                                         else
1701                                                 sd_bus_message_unref(incoming);
1702
1703                                         return 1;
1704                                 }
1705
1706                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) {
1707                                         int k;
1708
1709                                         r = sd_bus_error_copy(error, &incoming->error);
1710                                         if (r < 0) {
1711                                                 sd_bus_message_unref(incoming);
1712                                                 return r;
1713                                         }
1714
1715                                         k = sd_bus_error_get_errno(&incoming->error);
1716                                         sd_bus_message_unref(incoming);
1717                                         return -k;
1718                                 }
1719
1720                                 sd_bus_message_unref(incoming);
1721                                 return -EIO;
1722
1723                         } else if (incoming->header->serial == serial &&
1724                                    bus->unique_name &&
1725                                    incoming->sender &&
1726                                    streq(bus->unique_name, incoming->sender)) {
1727
1728                                 /* Our own message? Somebody is trying
1729                                  * to send its own client a message,
1730                                  * let's not dead-lock, let's fail
1731                                  * immediately. */
1732
1733                                 sd_bus_message_unref(incoming);
1734                                 return -ELOOP;
1735                         }
1736
1737                         /* There's already guaranteed to be room for
1738                          * this, so need to resize things here */
1739                         bus->rqueue[bus->rqueue_size ++] = incoming;
1740                         room = false;
1741
1742                         /* Try to read more, right-away */
1743                         continue;
1744                 }
1745                 if (r != 0)
1746                         continue;
1747
1748                 if (timeout > 0) {
1749                         usec_t n;
1750
1751                         n = now(CLOCK_MONOTONIC);
1752                         if (n >= timeout)
1753                                 return -ETIMEDOUT;
1754
1755                         left = timeout - n;
1756                 } else
1757                         left = (uint64_t) -1;
1758
1759                 r = bus_poll(bus, true, left);
1760                 if (r < 0)
1761                         return r;
1762
1763                 r = dispatch_wqueue(bus);
1764                 if (r < 0)
1765                         return r;
1766         }
1767 }
1768
1769 _public_ int sd_bus_get_fd(sd_bus *bus) {
1770
1771         assert_return(bus, -EINVAL);
1772         assert_return(bus->input_fd == bus->output_fd, -EPERM);
1773         assert_return(!bus_pid_changed(bus), -ECHILD);
1774
1775         return bus->input_fd;
1776 }
1777
1778 _public_ int sd_bus_get_events(sd_bus *bus) {
1779         int flags = 0;
1780
1781         assert_return(bus, -EINVAL);
1782         assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1783         assert_return(!bus_pid_changed(bus), -ECHILD);
1784
1785         if (bus->state == BUS_OPENING)
1786                 flags |= POLLOUT;
1787         else if (bus->state == BUS_AUTHENTICATING) {
1788
1789                 if (bus_socket_auth_needs_write(bus))
1790                         flags |= POLLOUT;
1791
1792                 flags |= POLLIN;
1793
1794         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1795                 if (bus->rqueue_size <= 0)
1796                         flags |= POLLIN;
1797                 if (bus->wqueue_size > 0)
1798                         flags |= POLLOUT;
1799         }
1800
1801         return flags;
1802 }
1803
1804 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1805         struct reply_callback *c;
1806
1807         assert_return(bus, -EINVAL);
1808         assert_return(timeout_usec, -EINVAL);
1809         assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1810         assert_return(!bus_pid_changed(bus), -ECHILD);
1811
1812         if (bus->state == BUS_CLOSING) {
1813                 *timeout_usec = 0;
1814                 return 1;
1815         }
1816
1817         if (bus->state == BUS_AUTHENTICATING) {
1818                 *timeout_usec = bus->auth_timeout;
1819                 return 1;
1820         }
1821
1822         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1823                 *timeout_usec = (uint64_t) -1;
1824                 return 0;
1825         }
1826
1827         if (bus->rqueue_size > 0) {
1828                 *timeout_usec = 0;
1829                 return 1;
1830         }
1831
1832         c = prioq_peek(bus->reply_callbacks_prioq);
1833         if (!c) {
1834                 *timeout_usec = (uint64_t) -1;
1835                 return 0;
1836         }
1837
1838         *timeout_usec = c->timeout;
1839         return 1;
1840 }
1841
1842 static int process_timeout(sd_bus *bus) {
1843         _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1844         _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1845         struct reply_callback *c;
1846         usec_t n;
1847         int r;
1848
1849         assert(bus);
1850
1851         c = prioq_peek(bus->reply_callbacks_prioq);
1852         if (!c)
1853                 return 0;
1854
1855         n = now(CLOCK_MONOTONIC);
1856         if (c->timeout > n)
1857                 return 0;
1858
1859         r = bus_message_new_synthetic_error(
1860                         bus,
1861                         c->serial,
1862                         &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1863                         &m);
1864         if (r < 0)
1865                 return r;
1866
1867         r = bus_seal_message(bus, m);
1868         if (r < 0)
1869                 return r;
1870
1871         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1872         hashmap_remove(bus->reply_callbacks, &c->serial);
1873
1874         bus->current = m;
1875         bus->iteration_counter ++;
1876
1877         r = c->callback(bus, m, c->userdata, &error_buffer);
1878         r = bus_maybe_reply_error(m, r, &error_buffer);
1879         free(c);
1880
1881         bus->current = NULL;
1882
1883         return r;
1884 }
1885
1886 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1887         assert(bus);
1888         assert(m);
1889
1890         if (bus->state != BUS_HELLO)
1891                 return 0;
1892
1893         /* Let's make sure the first message on the bus is the HELLO
1894          * reply. But note that we don't actually parse the message
1895          * here (we leave that to the usual handling), we just verify
1896          * we don't let any earlier msg through. */
1897
1898         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1899             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1900                 return -EIO;
1901
1902         if (m->reply_serial != bus->hello_serial)
1903                 return -EIO;
1904
1905         return 0;
1906 }
1907
1908 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1909         _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1910         struct reply_callback *c;
1911         int r;
1912
1913         assert(bus);
1914         assert(m);
1915
1916         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1917             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1918                 return 0;
1919
1920         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1921         if (!c)
1922                 return 0;
1923
1924         if (c->timeout != 0)
1925                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1926
1927         r = sd_bus_message_rewind(m, true);
1928         if (r < 0)
1929                 return r;
1930
1931         r = c->callback(bus, m, c->userdata, &error_buffer);
1932         r = bus_maybe_reply_error(m, r, &error_buffer);
1933         free(c);
1934
1935         return r;
1936 }
1937
1938 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1939         _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1940         struct filter_callback *l;
1941         int r;
1942
1943         assert(bus);
1944         assert(m);
1945
1946         do {
1947                 bus->filter_callbacks_modified = false;
1948
1949                 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1950
1951                         if (bus->filter_callbacks_modified)
1952                                 break;
1953
1954                         /* Don't run this more than once per iteration */
1955                         if (l->last_iteration == bus->iteration_counter)
1956                                 continue;
1957
1958                         l->last_iteration = bus->iteration_counter;
1959
1960                         r = sd_bus_message_rewind(m, true);
1961                         if (r < 0)
1962                                 return r;
1963
1964                         r = l->callback(bus, m, l->userdata, &error_buffer);
1965                         r = bus_maybe_reply_error(m, r, &error_buffer);
1966                         if (r != 0)
1967                                 return r;
1968
1969                 }
1970
1971         } while (bus->filter_callbacks_modified);
1972
1973         return 0;
1974 }
1975
1976 static int process_match(sd_bus *bus, sd_bus_message *m) {
1977         int r;
1978
1979         assert(bus);
1980         assert(m);
1981
1982         do {
1983                 bus->match_callbacks_modified = false;
1984
1985                 r = bus_match_run(bus, &bus->match_callbacks, m);
1986                 if (r != 0)
1987                         return r;
1988
1989         } while (bus->match_callbacks_modified);
1990
1991         return 0;
1992 }
1993
1994 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1995         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1996         int r;
1997
1998         assert(bus);
1999         assert(m);
2000
2001         if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
2002                 return 0;
2003
2004         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
2005                 return 0;
2006
2007         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
2008                 return 1;
2009
2010         if (streq_ptr(m->member, "Ping"))
2011                 r = sd_bus_message_new_method_return(m, &reply);
2012         else if (streq_ptr(m->member, "GetMachineId")) {
2013                 sd_id128_t id;
2014                 char sid[33];
2015
2016                 r = sd_id128_get_machine(&id);
2017                 if (r < 0)
2018                         return r;
2019
2020                 r = sd_bus_message_new_method_return(m, &reply);
2021                 if (r < 0)
2022                         return r;
2023
2024                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
2025         } else {
2026                 r = sd_bus_message_new_method_errorf(
2027                                 m, &reply,
2028                                 SD_BUS_ERROR_UNKNOWN_METHOD,
2029                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
2030         }
2031
2032         if (r < 0)
2033                 return r;
2034
2035         r = sd_bus_send(bus, reply, NULL);
2036         if (r < 0)
2037                 return r;
2038
2039         return 1;
2040 }
2041
2042 static int process_message(sd_bus *bus, sd_bus_message *m) {
2043         int r;
2044
2045         assert(bus);
2046         assert(m);
2047
2048         bus->current = m;
2049         bus->iteration_counter++;
2050
2051         log_debug("Got message sender=%s object=%s interface=%s member=%s",
2052                   strna(sd_bus_message_get_sender(m)),
2053                   strna(sd_bus_message_get_path(m)),
2054                   strna(sd_bus_message_get_interface(m)),
2055                   strna(sd_bus_message_get_member(m)));
2056
2057         r = process_hello(bus, m);
2058         if (r != 0)
2059                 goto finish;
2060
2061         r = process_reply(bus, m);
2062         if (r != 0)
2063                 goto finish;
2064
2065         r = process_filter(bus, m);
2066         if (r != 0)
2067                 goto finish;
2068
2069         r = process_match(bus, m);
2070         if (r != 0)
2071                 goto finish;
2072
2073         r = process_builtin(bus, m);
2074         if (r != 0)
2075                 goto finish;
2076
2077         r = bus_process_object(bus, m);
2078
2079 finish:
2080         bus->current = NULL;
2081         return r;
2082 }
2083
2084 static int process_running(sd_bus *bus, sd_bus_message **ret) {
2085         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2086         int r;
2087
2088         assert(bus);
2089         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2090
2091         r = process_timeout(bus);
2092         if (r != 0)
2093                 goto null_message;
2094
2095         r = dispatch_wqueue(bus);
2096         if (r != 0)
2097                 goto null_message;
2098
2099         r = dispatch_rqueue(bus, &m);
2100         if (r < 0)
2101                 return r;
2102         if (!m)
2103                 goto null_message;
2104
2105         r = process_message(bus, m);
2106         if (r != 0)
2107                 goto null_message;
2108
2109         if (ret) {
2110                 r = sd_bus_message_rewind(m, true);
2111                 if (r < 0)
2112                         return r;
2113
2114                 *ret = m;
2115                 m = NULL;
2116                 return 1;
2117         }
2118
2119         if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2120
2121                 r = sd_bus_reply_method_errorf(
2122                                 m,
2123                                 SD_BUS_ERROR_UNKNOWN_OBJECT,
2124                                 "Unknown object '%s'.", m->path);
2125                 if (r < 0)
2126                         return r;
2127         }
2128
2129         return 1;
2130
2131 null_message:
2132         if (r >= 0 && ret)
2133                 *ret = NULL;
2134
2135         return r;
2136 }
2137
2138 static int process_closing(sd_bus *bus, sd_bus_message **ret) {
2139         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2140         struct reply_callback *c;
2141         int r;
2142
2143         assert(bus);
2144         assert(bus->state == BUS_CLOSING);
2145
2146         c = hashmap_first(bus->reply_callbacks);
2147         if (c) {
2148                 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2149
2150                 /* First, fail all outstanding method calls */
2151                 r = bus_message_new_synthetic_error(
2152                                 bus,
2153                                 c->serial,
2154                                 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Connection terminated"),
2155                                 &m);
2156                 if (r < 0)
2157                         return r;
2158
2159                 r = bus_seal_message(bus, m);
2160                 if (r < 0)
2161                         return r;
2162
2163                 if (c->timeout != 0)
2164                         prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
2165
2166                 hashmap_remove(bus->reply_callbacks, &c->serial);
2167
2168                 bus->current = m;
2169                 bus->iteration_counter++;
2170
2171                 r = c->callback(bus, m, c->userdata, &error_buffer);
2172                 r = bus_maybe_reply_error(m, r, &error_buffer);
2173                 free(c);
2174
2175                 goto finish;
2176         }
2177
2178         /* Then, synthesize a Disconnected message */
2179         r = sd_bus_message_new_signal(
2180                         bus,
2181                         "/org/freedesktop/DBus/Local",
2182                         "org.freedesktop.DBus.Local",
2183                         "Disconnected",
2184                         &m);
2185         if (r < 0)
2186                 return r;
2187
2188         r = bus_seal_message(bus, m);
2189         if (r < 0)
2190                 return r;
2191
2192         sd_bus_close(bus);
2193
2194         bus->current = m;
2195         bus->iteration_counter++;
2196
2197         r = process_filter(bus, m);
2198         if (r != 0)
2199                 goto finish;
2200
2201         r = process_match(bus, m);
2202         if (r != 0)
2203                 goto finish;
2204
2205         if (ret) {
2206                 *ret = m;
2207                 m = NULL;
2208         }
2209
2210         r = 1;
2211
2212 finish:
2213         bus->current = NULL;
2214         return r;
2215 }
2216
2217 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2218         BUS_DONT_DESTROY(bus);
2219         int r;
2220
2221         /* Returns 0 when we didn't do anything. This should cause the
2222          * caller to invoke sd_bus_wait() before returning the next
2223          * time. Returns > 0 when we did something, which possibly
2224          * means *ret is filled in with an unprocessed message. */
2225
2226         assert_return(bus, -EINVAL);
2227         assert_return(!bus_pid_changed(bus), -ECHILD);
2228
2229         /* We don't allow recursively invoking sd_bus_process(). */
2230         assert_return(!bus->current, -EBUSY);
2231
2232         switch (bus->state) {
2233
2234         case BUS_UNSET:
2235         case BUS_CLOSED:
2236                 return -ENOTCONN;
2237
2238         case BUS_OPENING:
2239                 r = bus_socket_process_opening(bus);
2240                 if (r == -ECONNRESET || r == -EPIPE) {
2241                         bus_enter_closing(bus);
2242                         r = 1;
2243                 } else if (r < 0)
2244                         return r;
2245                 if (ret)
2246                         *ret = NULL;
2247                 return r;
2248
2249         case BUS_AUTHENTICATING:
2250                 r = bus_socket_process_authenticating(bus);
2251                 if (r == -ECONNRESET || r == -EPIPE) {
2252                         bus_enter_closing(bus);
2253                         r = 1;
2254                 } else if (r < 0)
2255                         return r;
2256
2257                 if (ret)
2258                         *ret = NULL;
2259
2260                 return r;
2261
2262         case BUS_RUNNING:
2263         case BUS_HELLO:
2264                 r = process_running(bus, ret);
2265                 if (r == -ECONNRESET || r == -EPIPE) {
2266                         bus_enter_closing(bus);
2267                         r = 1;
2268
2269                         if (ret)
2270                                 *ret = NULL;
2271                 }
2272
2273                 return r;
2274
2275         case BUS_CLOSING:
2276                 return process_closing(bus, ret);
2277         }
2278
2279         assert_not_reached("Unknown state");
2280 }
2281
2282 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2283         struct pollfd p[2] = {};
2284         int r, e, n;
2285         struct timespec ts;
2286         usec_t m = (usec_t) -1;
2287
2288         assert(bus);
2289
2290         if (bus->state == BUS_CLOSING)
2291                 return 1;
2292
2293         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2294
2295         e = sd_bus_get_events(bus);
2296         if (e < 0)
2297                 return e;
2298
2299         if (need_more)
2300                 /* The caller really needs some more data, he doesn't
2301                  * care about what's already read, or any timeouts
2302                  * except its own.*/
2303                 e |= POLLIN;
2304         else {
2305                 usec_t until;
2306                 /* The caller wants to process if there's something to
2307                  * process, but doesn't care otherwise */
2308
2309                 r = sd_bus_get_timeout(bus, &until);
2310                 if (r < 0)
2311                         return r;
2312                 if (r > 0) {
2313                         usec_t nw;
2314                         nw = now(CLOCK_MONOTONIC);
2315                         m = until > nw ? until - nw : 0;
2316                 }
2317         }
2318
2319         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2320                 m = timeout_usec;
2321
2322         p[0].fd = bus->input_fd;
2323         if (bus->output_fd == bus->input_fd) {
2324                 p[0].events = e;
2325                 n = 1;
2326         } else {
2327                 p[0].events = e & POLLIN;
2328                 p[1].fd = bus->output_fd;
2329                 p[1].events = e & POLLOUT;
2330                 n = 2;
2331         }
2332
2333         r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2334         if (r < 0)
2335                 return -errno;
2336
2337         return r > 0 ? 1 : 0;
2338 }
2339
2340 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2341
2342         assert_return(bus, -EINVAL);
2343         assert_return(!bus_pid_changed(bus), -ECHILD);
2344
2345         if (bus->state == BUS_CLOSING)
2346                 return 0;
2347
2348         assert_return(BUS_IS_OPEN(bus->state) , -ENOTCONN);
2349
2350         if (bus->rqueue_size > 0)
2351                 return 0;
2352
2353         return bus_poll(bus, false, timeout_usec);
2354 }
2355
2356 _public_ int sd_bus_flush(sd_bus *bus) {
2357         int r;
2358
2359         assert_return(bus, -EINVAL);
2360         assert_return(!bus_pid_changed(bus), -ECHILD);
2361
2362         if (bus->state == BUS_CLOSING)
2363                 return 0;
2364
2365         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2366
2367         r = bus_ensure_running(bus);
2368         if (r < 0)
2369                 return r;
2370
2371         if (bus->wqueue_size <= 0)
2372                 return 0;
2373
2374         for (;;) {
2375                 r = dispatch_wqueue(bus);
2376                 if (r < 0)
2377                         return r;
2378
2379                 if (bus->wqueue_size <= 0)
2380                         return 0;
2381
2382                 r = bus_poll(bus, false, (uint64_t) -1);
2383                 if (r < 0)
2384                         return r;
2385         }
2386 }
2387
2388 _public_ int sd_bus_add_filter(sd_bus *bus,
2389                                sd_bus_message_handler_t callback,
2390                                void *userdata) {
2391
2392         struct filter_callback *f;
2393
2394         assert_return(bus, -EINVAL);
2395         assert_return(callback, -EINVAL);
2396         assert_return(!bus_pid_changed(bus), -ECHILD);
2397
2398         f = new0(struct filter_callback, 1);
2399         if (!f)
2400                 return -ENOMEM;
2401         f->callback = callback;
2402         f->userdata = userdata;
2403
2404         bus->filter_callbacks_modified = true;
2405         LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2406         return 0;
2407 }
2408
2409 _public_ int sd_bus_remove_filter(sd_bus *bus,
2410                                   sd_bus_message_handler_t callback,
2411                                   void *userdata) {
2412
2413         struct filter_callback *f;
2414
2415         assert_return(bus, -EINVAL);
2416         assert_return(callback, -EINVAL);
2417         assert_return(!bus_pid_changed(bus), -ECHILD);
2418
2419         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2420                 if (f->callback == callback && f->userdata == userdata) {
2421                         bus->filter_callbacks_modified = true;
2422                         LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2423                         free(f);
2424                         return 1;
2425                 }
2426         }
2427
2428         return 0;
2429 }
2430
2431 _public_ int sd_bus_add_match(sd_bus *bus,
2432                               const char *match,
2433                               sd_bus_message_handler_t callback,
2434                               void *userdata) {
2435
2436         struct bus_match_component *components = NULL;
2437         unsigned n_components = 0;
2438         uint64_t cookie = 0;
2439         int r = 0;
2440
2441         assert_return(bus, -EINVAL);
2442         assert_return(match, -EINVAL);
2443         assert_return(!bus_pid_changed(bus), -ECHILD);
2444
2445         r = bus_match_parse(match, &components, &n_components);
2446         if (r < 0)
2447                 goto finish;
2448
2449         if (bus->bus_client) {
2450                 cookie = ++bus->match_cookie;
2451
2452                 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2453                 if (r < 0)
2454                         goto finish;
2455         }
2456
2457         bus->match_callbacks_modified = true;
2458         r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2459         if (r < 0) {
2460                 if (bus->bus_client)
2461                         bus_remove_match_internal(bus, match, cookie);
2462         }
2463
2464 finish:
2465         bus_match_parse_free(components, n_components);
2466         return r;
2467 }
2468
2469 _public_ int sd_bus_remove_match(sd_bus *bus,
2470                                  const char *match,
2471                                  sd_bus_message_handler_t callback,
2472                                  void *userdata) {
2473
2474         struct bus_match_component *components = NULL;
2475         unsigned n_components = 0;
2476         int r = 0, q = 0;
2477         uint64_t cookie = 0;
2478
2479         assert_return(bus, -EINVAL);
2480         assert_return(match, -EINVAL);
2481         assert_return(!bus_pid_changed(bus), -ECHILD);
2482
2483         r = bus_match_parse(match, &components, &n_components);
2484         if (r < 0)
2485                 return r;
2486
2487         bus->match_callbacks_modified = true;
2488         r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2489
2490         if (bus->bus_client)
2491                 q = bus_remove_match_internal(bus, match, cookie);
2492
2493         bus_match_parse_free(components, n_components);
2494
2495         return r < 0 ? r : q;
2496 }
2497
2498 bool bus_pid_changed(sd_bus *bus) {
2499         assert(bus);
2500
2501         /* We don't support people creating a bus connection and
2502          * keeping it around over a fork(). Let's complain. */
2503
2504         return bus->original_pid != getpid();
2505 }
2506
2507 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2508         sd_bus *bus = userdata;
2509         int r;
2510
2511         assert(bus);
2512
2513         r = sd_bus_process(bus, NULL);
2514         if (r < 0)
2515                 return r;
2516
2517         return 1;
2518 }
2519
2520 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2521         sd_bus *bus = userdata;
2522         int r;
2523
2524         assert(bus);
2525
2526         r = sd_bus_process(bus, NULL);
2527         if (r < 0)
2528                 return r;
2529
2530         return 1;
2531 }
2532
2533 static int prepare_callback(sd_event_source *s, void *userdata) {
2534         sd_bus *bus = userdata;
2535         int r, e;
2536         usec_t until;
2537
2538         assert(s);
2539         assert(bus);
2540
2541         e = sd_bus_get_events(bus);
2542         if (e < 0)
2543                 return e;
2544
2545         if (bus->output_fd != bus->input_fd) {
2546
2547                 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2548                 if (r < 0)
2549                         return r;
2550
2551                 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2552                 if (r < 0)
2553                         return r;
2554         } else {
2555                 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2556                 if (r < 0)
2557                         return r;
2558         }
2559
2560         r = sd_bus_get_timeout(bus, &until);
2561         if (r < 0)
2562                 return r;
2563         if (r > 0) {
2564                 int j;
2565
2566                 j = sd_event_source_set_time(bus->time_event_source, until);
2567                 if (j < 0)
2568                         return j;
2569         }
2570
2571         r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2572         if (r < 0)
2573                 return r;
2574
2575         return 1;
2576 }
2577
2578 static int quit_callback(sd_event_source *event, void *userdata) {
2579         sd_bus *bus = userdata;
2580
2581         assert(event);
2582
2583         sd_bus_flush(bus);
2584
2585         return 1;
2586 }
2587
2588 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2589         int r;
2590
2591         assert_return(bus, -EINVAL);
2592         assert_return(!bus->event, -EBUSY);
2593
2594         assert(!bus->input_io_event_source);
2595         assert(!bus->output_io_event_source);
2596         assert(!bus->time_event_source);
2597
2598         if (event)
2599                 bus->event = sd_event_ref(event);
2600         else  {
2601                 r = sd_event_default(&bus->event);
2602                 if (r < 0)
2603                         return r;
2604         }
2605
2606         r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2607         if (r < 0)
2608                 goto fail;
2609
2610         r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2611         if (r < 0)
2612                 goto fail;
2613
2614         if (bus->output_fd != bus->input_fd) {
2615                 r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2616                 if (r < 0)
2617                         goto fail;
2618
2619                 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2620                 if (r < 0)
2621                         goto fail;
2622         }
2623
2624         r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2625         if (r < 0)
2626                 goto fail;
2627
2628         r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
2629         if (r < 0)
2630                 goto fail;
2631
2632         r = sd_event_source_set_priority(bus->time_event_source, priority);
2633         if (r < 0)
2634                 goto fail;
2635
2636         r = sd_event_add_quit(bus->event, quit_callback, bus, &bus->quit_event_source);
2637         if (r < 0)
2638                 goto fail;
2639
2640         return 0;
2641
2642 fail:
2643         sd_bus_detach_event(bus);
2644         return r;
2645 }
2646
2647 _public_ int sd_bus_detach_event(sd_bus *bus) {
2648         assert_return(bus, -EINVAL);
2649         assert_return(bus->event, -ENXIO);
2650
2651         if (bus->input_io_event_source) {
2652                 sd_event_source_set_enabled(bus->input_io_event_source, SD_EVENT_OFF);
2653                 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2654         }
2655
2656         if (bus->output_io_event_source) {
2657                 sd_event_source_set_enabled(bus->output_io_event_source, SD_EVENT_OFF);
2658                 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2659         }
2660
2661         if (bus->time_event_source) {
2662                 sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);
2663                 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2664         }
2665
2666         if (bus->quit_event_source) {
2667                 sd_event_source_set_enabled(bus->quit_event_source, SD_EVENT_OFF);
2668                 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2669         }
2670
2671         if (bus->event)
2672                 bus->event = sd_event_unref(bus->event);
2673
2674         return 0;
2675 }
2676
2677 _public_ sd_event* sd_bus_get_event(sd_bus *bus) {
2678         assert_return(bus, NULL);
2679
2680         return bus->event;
2681 }
2682
2683 _public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2684         assert_return(bus, NULL);
2685
2686         return bus->current;
2687 }
2688
2689 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
2690         sd_bus *b = NULL;
2691         int r;
2692
2693         assert(bus_open);
2694         assert(default_bus);
2695
2696         if (!ret)
2697                 return !!*default_bus;
2698
2699         if (*default_bus) {
2700                 *ret = sd_bus_ref(*default_bus);
2701                 return 0;
2702         }
2703
2704         r = bus_open(&b);
2705         if (r < 0)
2706                 return r;
2707
2708         b->default_bus_ptr = default_bus;
2709         b->tid = gettid();
2710         *default_bus = b;
2711
2712         *ret = b;
2713         return 1;
2714 }
2715
2716 _public_ int sd_bus_default_system(sd_bus **ret) {
2717         static __thread sd_bus *default_system_bus = NULL;
2718
2719         return bus_default(sd_bus_open_system, &default_system_bus, ret);
2720 }
2721
2722 _public_ int sd_bus_default_user(sd_bus **ret) {
2723         static __thread sd_bus *default_user_bus = NULL;
2724
2725         return bus_default(sd_bus_open_user, &default_user_bus, ret);
2726 }
2727
2728 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
2729         assert_return(b, -EINVAL);
2730         assert_return(tid, -EINVAL);
2731         assert_return(!bus_pid_changed(b), -ECHILD);
2732
2733         if (b->tid != 0) {
2734                 *tid = b->tid;
2735                 return 0;
2736         }
2737
2738         if (b->event)
2739                 return sd_event_get_tid(b->event, tid);
2740
2741         return -ENXIO;
2742 }
2743
2744 _public_ char *sd_bus_label_escape(const char *s) {
2745         char *r, *t;
2746         const char *f;
2747
2748         assert_return(s, NULL);
2749
2750         /* Escapes all chars that D-Bus' object path cannot deal
2751          * with. Can be reversed with bus_path_unescape(). We special
2752          * case the empty string. */
2753
2754         if (*s == 0)
2755                 return strdup("_");
2756
2757         r = new(char, strlen(s)*3 + 1);
2758         if (!r)
2759                 return NULL;
2760
2761         for (f = s, t = r; *f; f++) {
2762
2763                 /* Escape everything that is not a-zA-Z0-9. We also
2764                  * escape 0-9 if it's the first character */
2765
2766                 if (!(*f >= 'A' && *f <= 'Z') &&
2767                     !(*f >= 'a' && *f <= 'z') &&
2768                     !(f > s && *f >= '0' && *f <= '9')) {
2769                         *(t++) = '_';
2770                         *(t++) = hexchar(*f >> 4);
2771                         *(t++) = hexchar(*f);
2772                 } else
2773                         *(t++) = *f;
2774         }
2775
2776         *t = 0;
2777
2778         return r;
2779 }
2780
2781 _public_ char *sd_bus_label_unescape(const char *f) {
2782         char *r, *t;
2783
2784         assert_return(f, NULL);
2785
2786         /* Special case for the empty string */
2787         if (streq(f, "_"))
2788                 return strdup("");
2789
2790         r = new(char, strlen(f) + 1);
2791         if (!r)
2792                 return NULL;
2793
2794         for (t = r; *f; f++) {
2795
2796                 if (*f == '_') {
2797                         int a, b;
2798
2799                         if ((a = unhexchar(f[1])) < 0 ||
2800                             (b = unhexchar(f[2])) < 0) {
2801                                 /* Invalid escape code, let's take it literal then */
2802                                 *(t++) = '_';
2803                         } else {
2804                                 *(t++) = (char) ((a << 4) | b);
2805                                 f += 2;
2806                         }
2807                 } else
2808                         *(t++) = *f;
2809         }
2810
2811         *t = 0;
2812
2813         return r;
2814 }